Skip to content

Commit

Permalink
Fetch job queue counts for metrics on scraping (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed May 16, 2024
1 parent 20fa6ce commit 1ca7454
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export class JobRunnerCmd {
await startJobRunner(jobRunner);
jobRunner.handleShutdown();

await startMetricsServer(config, indexer, this._currentEndpointIndex);
await startMetricsServer(config, jobQueue, indexer, this._currentEndpointIndex);
}

_getArgv (): any {
Expand Down
37 changes: 9 additions & 28 deletions packages/util/src/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import assert from 'assert';
import debug from 'debug';
import PgBoss from 'pg-boss';
import PgBoss, { MonitorStates } from 'pg-boss';

import { jobCount, lastJobCompletedOn } from './metrics';
import { lastJobCompletedOn } from './metrics';
import { wait } from './misc';

interface Config {
Expand Down Expand Up @@ -48,35 +48,10 @@ export class JobQueue {

deleteAfterHours: 1, // 1 hour

newJobCheckInterval: 100,

// Time interval for firing monitor-states event.
monitorStateIntervalSeconds: 10
newJobCheckInterval: 100
});

this._boss.on('error', error => log(error));

this._boss.on('monitor-states', monitorStates => {
jobCount.set({ state: 'all' }, monitorStates.all);
jobCount.set({ state: 'created' }, monitorStates.created);
jobCount.set({ state: 'retry' }, monitorStates.retry);
jobCount.set({ state: 'active' }, monitorStates.active);
jobCount.set({ state: 'completed' }, monitorStates.completed);
jobCount.set({ state: 'expired' }, monitorStates.expired);
jobCount.set({ state: 'cancelled' }, monitorStates.cancelled);
jobCount.set({ state: 'failed' }, monitorStates.failed);

Object.entries(monitorStates.queues).forEach(([name, counts]) => {
jobCount.set({ state: 'all', name }, counts.all);
jobCount.set({ state: 'created', name }, counts.created);
jobCount.set({ state: 'retry', name }, counts.retry);
jobCount.set({ state: 'active', name }, counts.active);
jobCount.set({ state: 'completed', name }, counts.completed);
jobCount.set({ state: 'expired', name }, counts.expired);
jobCount.set({ state: 'cancelled', name }, counts.cancelled);
jobCount.set({ state: 'failed', name }, counts.failed);
});
});
}

get maxCompletionLag (): number {
Expand Down Expand Up @@ -178,4 +153,10 @@ export class JobQueue {
await wait(EMPTY_QUEUE_CHECK_INTERVAL);
}
}

async getJobCounts (): Promise<MonitorStates> {
// Use any as countStates() method is not present in the types
const monitorStates = await (this._boss as any).countStates();
return monitorStates;
}
}
42 changes: 36 additions & 6 deletions packages/util/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import JsonRpcProvider = ethers.providers.JsonRpcProvider;

import { Config } from './config';
import { IndexerInterface } from './types';
import { JobQueue } from './job-queue';

const DB_SIZE_QUERY = 'SELECT pg_database_size(current_database())';

Expand All @@ -27,11 +28,6 @@ export async function fetchLatestBlockNumber (provider: JsonRpcProvider): Promis
}

// Create custom metrics
export const jobCount = new client.Gauge({
name: 'pgboss_jobs_total',
help: 'Total entries in job table',
labelNames: ['state', 'name'] as const
});

export const lastJobCompletedOn = new client.Gauge({
name: 'pgboss_last_job_completed_timestamp_seconds',
Expand Down Expand Up @@ -116,7 +112,7 @@ const upstreamEndpointsMetric = new client.Gauge({
// Export metrics on a server
const app: Application = express();

export const startMetricsServer = async (config: Config, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<void> => {
export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<void> => {
if (!config.metrics) {
log('Metrics is disabled. To enable add metrics host and port.');
return;
Expand All @@ -142,6 +138,8 @@ export const startMetricsServer = async (config: Config, indexer: IndexerInterfa
}
});

await registerJobQueueMetrics(jobQueue);

await registerWatcherConfigMetrics(config);

setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint);
Expand Down Expand Up @@ -246,3 +244,35 @@ const registerWatcherConfigMetrics = async ({ server, upstream, jobQueue }: Conf
watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_logs_block_range' }, Number(jobQueue.historicalLogsBlockRange));
watcherConfigMetric.set({ category: 'jobqueue', field: 'historical_max_fetch_ahead' }, Number(jobQueue.historicalMaxFetchAhead));
};

const registerJobQueueMetrics = async (jobQueue: JobQueue): Promise<void> => {
// eslint-disable-next-line no-new
new client.Gauge({
name: 'pgboss_jobs_total',
help: 'Total entries in job table',
labelNames: ['state', 'name'] as const,
async collect () {
const jobCounts = await jobQueue.getJobCounts();

this.set({ state: 'all' }, jobCounts.all);
this.set({ state: 'created' }, jobCounts.created);
this.set({ state: 'retry' }, jobCounts.retry);
this.set({ state: 'active' }, jobCounts.active);
this.set({ state: 'completed' }, jobCounts.completed);
this.set({ state: 'expired' }, jobCounts.expired);
this.set({ state: 'cancelled' }, jobCounts.cancelled);
this.set({ state: 'failed' }, jobCounts.failed);

Object.entries(jobCounts.queues as Array<any>).forEach(([name, counts]) => {
this.set({ state: 'all', name }, counts.all);
this.set({ state: 'created', name }, counts.created);
this.set({ state: 'retry', name }, counts.retry);
this.set({ state: 'active', name }, counts.active);
this.set({ state: 'completed', name }, counts.completed);
this.set({ state: 'expired', name }, counts.expired);
this.set({ state: 'cancelled', name }, counts.cancelled);
this.set({ state: 'failed', name }, counts.failed);
});
}
});
};

0 comments on commit 1ca7454

Please sign in to comment.