Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stale-horses-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@hyperdx/api": minor
---

Add metrics to task execution
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
AlertTaskType,
loadProvider,
} from '@/tasks/checkAlerts/providers';
import { CheckAlertsTaskArgs } from '@/tasks/types';
import { CheckAlertsTaskArgs, TaskName } from '@/tasks/types';

jest.mock('@/tasks/checkAlerts/providers', () => {
return {
Expand Down Expand Up @@ -65,7 +65,7 @@ describe('CheckAlertTask', () => {
});

it('should execute successfully with no alert tasks', async () => {
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
const args: CheckAlertsTaskArgs = { taskName: TaskName.CHECK_ALERTS };
const task = new CheckAlertTask(args);

mockAlertProvider.getAlertTasks.mockResolvedValue([]);
Expand All @@ -83,7 +83,7 @@ describe('CheckAlertTask', () => {

it('should execute successfully with custom provider', async () => {
const args: CheckAlertsTaskArgs = {
taskName: 'check-alerts',
taskName: TaskName.CHECK_ALERTS,
provider: 'custom-provider',
};
const task = new CheckAlertTask(args);
Expand All @@ -99,7 +99,7 @@ describe('CheckAlertTask', () => {
});

it('should process alert tasks', async () => {
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
const args: CheckAlertsTaskArgs = { taskName: TaskName.CHECK_ALERTS };
const task = new CheckAlertTask(args);

const mockAlert = {
Expand Down Expand Up @@ -171,7 +171,7 @@ describe('CheckAlertTask', () => {
});

it("should ensure that the correct team's webhooks are passed to processAlert", async () => {
const args: CheckAlertsTaskArgs = { taskName: 'check-alerts' };
const args: CheckAlertsTaskArgs = { taskName: TaskName.CHECK_ALERTS };
const task = new CheckAlertTask(args);

// Create two teams
Expand Down
40 changes: 26 additions & 14 deletions packages/api/src/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,75 @@
import { CronJob } from 'cron';
import minimist from 'minimist';
import { performance } from 'perf_hooks';
import { serializeError } from 'serialize-error';

import { RUN_SCHEDULED_TASKS_EXTERNALLY } from '@/config';
import CheckAlertTask from '@/tasks/checkAlerts';
import {
taskExecutionDurationGauge,
taskExecutionFailureCounter,
taskExecutionSuccessCounter,
timeExec,
} from '@/tasks/metrics';
import PingPongTask from '@/tasks/pingPongTask';
import { asTaskArgs, HdxTask, TaskArgs } from '@/tasks/types';
import { asTaskArgs, HdxTask, TaskArgs, TaskName } from '@/tasks/types';
import logger from '@/utils/logger';

import { tasksTracer } from './tracer';

function createTask(argv: TaskArgs): HdxTask<TaskArgs> {
const taskName = argv.taskName;
switch (taskName) {
case 'check-alerts':
case TaskName.CHECK_ALERTS:
return new CheckAlertTask(argv);
case 'ping-pong':
case TaskName.PING_PONG:
return new PingPongTask(argv);
default:
throw new Error(`Unknown task name ${taskName}`);
}
}

const main = async (argv: TaskArgs) => {
async function main(argv: TaskArgs): Promise<void> {
await tasksTracer.startActiveSpan(argv.taskName || 'task', async span => {
const task: HdxTask<TaskArgs> = createTask(argv);
try {
const t0 = performance.now();
logger.info(`Task [${task.name()}] started at ${new Date()}`);
logger.info(`${task.name()} started at ${new Date()}`);
await task.execute();
logger.info(
`Task [${task.name()}] finished in ${(performance.now() - t0).toFixed(2)} ms`,
);
taskExecutionSuccessCounter.get(argv.taskName)?.add(1);
} catch (e: unknown) {
logger.error(
{
cause: e,
task,
},
`Task [${task.name()}] failed: ${serializeError(e)}`,
`${task.name()} failed: ${serializeError(e)}`,
);
taskExecutionFailureCounter.get(argv.taskName)?.add(1);
} finally {
await task.asyncDispose();
span.end();
}
});
};
}

// Entry point
const argv = asTaskArgs(minimist(process.argv.slice(2)));

const instrumentedMain = timeExec(main, duration => {
const gauge = taskExecutionDurationGauge.get(argv.taskName);
if (gauge) {
gauge.record(duration, { useCron: !RUN_SCHEDULED_TASKS_EXTERNALLY });
}
logger.info(`${argv.taskName} finished in ${duration.toFixed(2)} ms`);
});

// WARNING: the cron job will be enabled only in development mode
if (!RUN_SCHEDULED_TASKS_EXTERNALLY) {
logger.info('In-app cron job is enabled');
// run cron job every 1 minute
const job = CronJob.from({
cronTime: '0 * * * * *',
waitForCompletion: true,
onTick: async () => main(argv),
onTick: async () => instrumentedMain(argv),
errorHandler: async err => {
console.error(err);
},
Expand All @@ -66,7 +78,7 @@ if (!RUN_SCHEDULED_TASKS_EXTERNALLY) {
});
} else {
logger.warn('In-app cron job is disabled');
main(argv)
instrumentedMain(argv)
.then(() => {
process.exit(0);
})
Expand Down
71 changes: 71 additions & 0 deletions packages/api/src/tasks/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
Attributes,
Counter,
Gauge,
metrics,
ValueType,
} from '@opentelemetry/api';
import { performance } from 'perf_hooks';

import { TaskName } from '@/tasks/types';

const meter = metrics.getMeter('hyperdx-tasks');

export const taskExecutionSuccessCounter: Map<
TaskName,
Counter<Attributes>
> = new Map();

export const taskExecutionFailureCounter: Map<
TaskName,
Counter<Attributes>
> = new Map();

export const taskExecutionDurationGauge: Map<
TaskName,
Gauge<Attributes>
> = new Map();

for (const name of Object.values(TaskName)) {
taskExecutionSuccessCounter.set(
name,
meter.createCounter(`hyperdx.tasks.${name}.success`, {
description:
'Count of the number of times the task finished without exceptions.',
}),
);

taskExecutionFailureCounter.set(
name,
meter.createCounter(`hyperdx.tasks.${name}.failure`, {
description:
'Count of the number of times the task failed to finish because of an exception',
}),
);

taskExecutionDurationGauge.set(
name,
meter.createGauge(`hyperdx.tasks.${name}.duration`, {
description: `The wall time required for the ${name} task to complete execution.`,
unit: 'ms',
valueType: ValueType.DOUBLE,
}),
);
}

export function timeExec<T extends unknown[], R>(
fn: (...args: T) => Promise<R>,
recordFn?: (duration: number) => void,
) {
return async (...args: T) => {
const start = performance.now();
try {
return await fn(...args);
} finally {
if (recordFn) {
const end = performance.now();
recordFn(end - start);
}
}
};
}
9 changes: 7 additions & 2 deletions packages/api/src/tasks/types.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import { z } from 'zod';

export enum TaskName {
PING_PONG = 'ping-pong',
CHECK_ALERTS = 'check-alerts',
}

/**
* Command line arguments structure for tasks.
* Contains task name and optional provider configuration.
*/
const pingTaskArgsSchema = z.object({
taskName: z.literal('ping-pong'),
taskName: z.literal(TaskName.PING_PONG),
});

const checkAlertsTaskArgsSchema = z.object({
taskName: z.literal('check-alerts'),
taskName: z.literal(TaskName.CHECK_ALERTS),
provider: z.string().optional(),
concurrency: z
.number()
Expand Down
Loading