From 51e09ed9a1da264525d1694a1c8eefdcf67400e4 Mon Sep 17 00:00:00 2001 From: James Stuart Milne Date: Mon, 17 Apr 2023 17:28:16 -0300 Subject: [PATCH 1/4] feat: add external error handler to TaskManager and TaskRunner --- src/task/TaskManager.ts | 98 +++++++++++++++----------- src/task/TaskRunner.ts | 17 ++++- src/task/__tests__/TaskManager.test.ts | 78 ++++++++++++++++++++ 3 files changed, 149 insertions(+), 44 deletions(-) create mode 100644 src/task/__tests__/TaskManager.test.ts diff --git a/src/task/TaskManager.ts b/src/task/TaskManager.ts index c7966a77..cf32e612 100644 --- a/src/task/TaskManager.ts +++ b/src/task/TaskManager.ts @@ -1,83 +1,97 @@ -import os from "os" -import {TaskRunner} from "./TaskRunner" -import {ConductorLogger, DefaultLogger} from "../common" -import {ConductorWorker} from "./Worker" -import {ConductorClient} from "../common/open-api" +import os from "os"; +import { TaskRunner, TaskErrorHandler, noopErrorHandler } from "./TaskRunner"; +import { ConductorLogger, DefaultLogger } from "../common"; +import { ConductorWorker } from "./Worker"; +import { ConductorClient } from "../common/open-api"; export interface TaskManagerOptions { - workerID: string - domain: string | undefined - pollInterval?: number, - concurrency?: number + workerID: string; + domain: string | undefined; + pollInterval?: number; + concurrency?: number; } export interface TaskManagerConfig { - logger?: ConductorLogger - options?: Partial + logger?: ConductorLogger; + options?: Partial; + onError?: TaskErrorHandler; } const defaultManagerOptions: Required = { - workerID: '', + workerID: "", pollInterval: 1000, domain: undefined, - concurrency: 1 -} + concurrency: 1, +}; -function workerId (options: Partial) { - return options.workerID ?? os.hostname() +function workerId(options: Partial) { + return options.workerID ?? os.hostname(); } /** * Responsible for initializing and managing the runners that poll and work different task queues. */ export class TaskManager { - private tasks: Record> = {} - private readonly client: ConductorClient - private readonly logger: ConductorLogger - private workers: Array - private readonly taskManageOptions: Required + private tasks: Record> = {}; + private readonly client: ConductorClient; + private readonly logger: ConductorLogger; + private readonly errorHandler: TaskErrorHandler; + private workers: Array; + private readonly taskManageOptions: Required; - constructor(client: ConductorClient, workers: Array, config: TaskManagerConfig = {}) { - if (!workers) { throw new Error("No workers supplied to TaskManager. Please pass an array of workers.") } - this.client = client - this.logger = config.logger ?? new DefaultLogger() - this.workers = workers - const providedOptions = config.options ?? {} + constructor( + client: ConductorClient, + workers: Array, + config: TaskManagerConfig = {} + ) { + if (!workers) { + throw new Error( + "No workers supplied to TaskManager. Please pass an array of workers." + ); + } + this.client = client; + this.logger = config.logger ?? new DefaultLogger(); + this.errorHandler = config.onError ?? noopErrorHandler; + this.workers = workers; + const providedOptions = config.options ?? {}; this.taskManageOptions = { ...defaultManagerOptions, ...providedOptions, workerID: workerId(providedOptions), - } + }; } startPolling = () => { - this.workers.forEach(worker => { - this.tasks[worker.taskDefName] = [] + this.workers.forEach((worker) => { + this.tasks[worker.taskDefName] = []; const options = { ...this.taskManageOptions, concurrency: worker.concurrency ?? this.taskManageOptions.concurrency, - domain: worker.domain ?? this.taskManageOptions.domain - } - this.logger.debug(`Starting taskDefName=${worker.taskDefName} concurrency=${options.concurrency} domain=${options.domain}`) + domain: worker.domain ?? this.taskManageOptions.domain, + }; + this.logger.debug( + `Starting taskDefName=${worker.taskDefName} concurrency=${options.concurrency} domain=${options.domain}` + ); for (let i = 0; i < options.concurrency; i++) { const runner = new TaskRunner({ worker, options, taskResource: this.client.taskResource, - logger: this.logger - }) + logger: this.logger, + onError: this.errorHandler, + }); // TODO(@ntomlin): right now we aren't handling these promises // which will inevitably lead to chaos - runner.startPolling() - this.tasks[worker.taskDefName].push(runner) + runner.startPolling(); + this.tasks[worker.taskDefName].push(runner); } - }) - } + }); + }; stopPolling = () => { for (const taskType in this.tasks) { - this.tasks[taskType].forEach(runner => runner.stopPolling()) - this.tasks[taskType] = [] + this.tasks[taskType].forEach((runner) => runner.stopPolling()); + this.tasks[taskType] = []; } - } + }; } diff --git a/src/task/TaskRunner.ts b/src/task/TaskRunner.ts index 50e35d09..1d253a56 100644 --- a/src/task/TaskRunner.ts +++ b/src/task/TaskRunner.ts @@ -4,13 +4,19 @@ import { Task, TaskResourceService } from "../common/open-api"; import { TaskManagerOptions } from "./TaskManager"; const DEFAULT_ERROR_MESSAGE = "An unknown error occurred"; +export type TaskErrorHandler = (error: Error, task?: Task) => void; export interface RunnerArgs { worker: ConductorWorker; taskResource: TaskResourceService; options: Required; logger?: ConductorLogger; + onError?: TaskErrorHandler; } + +//eslint-disable-next-line +export const noopErrorHandler: TaskErrorHandler = (__error: Error) => {}; + const noopLogger: ConductorLogger = { //eslint-disable-next-line debug: (...args: any) => {}, @@ -35,17 +41,20 @@ export class TaskRunner { worker: ConductorWorker; logger: ConductorLogger; options: Required; + errorHandler: TaskErrorHandler; constructor({ worker, taskResource, options, logger = noopLogger, + onError: errorHandler = noopErrorHandler, }: RunnerArgs) { this.taskResource = taskResource; this.logger = logger; this.worker = worker; this.options = options; + this.errorHandler = errorHandler; } startPolling = () => { @@ -77,9 +86,12 @@ export class TaskRunner { } } catch (unknownError: unknown) { this.handleUnknownError(unknownError); + this.errorHandler(unknownError as Error, {} as Task); } - await new Promise((r) => setTimeout(() => r(true), this.options.pollInterval)); + await new Promise((r) => + setTimeout(() => r(true), this.options.pollInterval) + ); } }; @@ -93,7 +105,6 @@ export class TaskRunner { }); this.logger.debug(`Finished polling for task ${task.taskId}`); } catch (error: unknown) { - this.logger.error(`Error executing ${task.taskId}`, error); await this.taskResource.updateTask1({ workflowInstanceId: task.workflowInstanceId!, taskId: task.taskId!, @@ -102,6 +113,8 @@ export class TaskRunner { status: "FAILED", outputData: {}, }); + this.errorHandler(error as Error, task); + this.logger.error(`Error executing ${task.taskId}`, error); } }; diff --git a/src/task/__tests__/TaskManager.test.ts b/src/task/__tests__/TaskManager.test.ts new file mode 100644 index 00000000..7f67b55e --- /dev/null +++ b/src/task/__tests__/TaskManager.test.ts @@ -0,0 +1,78 @@ +import { expect, describe, test, jest } from "@jest/globals"; +import { OrkesApiConfig, orkesConductorClient } from "../../common"; +import { WorkflowExecutor } from "../../core"; +import { TaskManager } from "../TaskManager"; +import { ConductorWorker } from "../Worker"; + +const playConfig: Partial = { + keyId: `${process.env.KEY_ID}`, + keySecret: `${process.env.KEY_SECRET}`, + serverUrl: "https://pg-staging.orkesconductor.com/api", +}; + +describe("TaskManager", () => { + const clientPromise = orkesConductorClient(playConfig); + + jest.setTimeout(10000); + test("Should run workflow with worker", async () => { + const client = await clientPromise; + const executor = new WorkflowExecutor(client); + + const worker: ConductorWorker = { + taskDefName: "taskmanager-test", + execute: async () => { + return { + outputData: { + hello: "From your worker", + }, + status: "COMPLETED", + }; + }, + }; + + const manager = new TaskManager(client, [worker]); + manager.startPolling(); + + const executionId = await executor.startWorkflow({ + name: "TaskManagerTest", + input: {}, + version: 1, + }); + await new Promise((r) => setTimeout(() => r(true), 2500)); + const workflowStatus = await client.workflowResource.getExecutionStatus( + executionId, + true + ); + expect(workflowStatus.status).toEqual("COMPLETED"); + manager.stopPolling(); + }); + + test("On error should set task as FAILED", async () => { + const client = await clientPromise; + const executor = new WorkflowExecutor(client); + + const worker: ConductorWorker = { + taskDefName: "taskmanager-test", + execute: async () => { + throw Error("This is a forced error"); + }, + }; + + const errorHandler = jest.fn(); + + const manager = new TaskManager(client, [worker], { + onError: errorHandler, + }); + + manager.startPolling(); + + await executor.startWorkflow({ + name: "TaskManagerTest", + input: {}, + version: 1, + }); + await new Promise((r) => setTimeout(() => r(true), 3500)); + expect(errorHandler).toBeCalledTimes(1); + manager.stopPolling(); + }); +}); From a916b33f0f972e5fe688e683abb9a81c54ccb278 Mon Sep 17 00:00:00 2001 From: James Stuart Milne Date: Mon, 17 Apr 2023 17:33:09 -0300 Subject: [PATCH 2/4] fix: task is undefined for unknown error --- src/task/TaskRunner.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task/TaskRunner.ts b/src/task/TaskRunner.ts index 1d253a56..ce308ef1 100644 --- a/src/task/TaskRunner.ts +++ b/src/task/TaskRunner.ts @@ -86,7 +86,7 @@ export class TaskRunner { } } catch (unknownError: unknown) { this.handleUnknownError(unknownError); - this.errorHandler(unknownError as Error, {} as Task); + this.errorHandler(unknownError as Error); } await new Promise((r) => From 312a9e0e960d54909dc90b2d203f6c9e307b71a0 Mon Sep 17 00:00:00 2001 From: James Stuart Milne Date: Mon, 17 Apr 2023 19:28:01 -0300 Subject: [PATCH 3/4] feat: added max retries to TaskRunner --- src/task/TaskRunner.ts | 29 +++++++++++++++++++++++--- src/task/__tests__/TaskManager.test.ts | 28 ++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/src/task/TaskRunner.ts b/src/task/TaskRunner.ts index ce308ef1..3eee8cca 100644 --- a/src/task/TaskRunner.ts +++ b/src/task/TaskRunner.ts @@ -1,9 +1,11 @@ import { ConductorLogger } from "../common"; import { ConductorWorker } from "./Worker"; -import { Task, TaskResourceService } from "../common/open-api"; +import { Task, TaskResourceService, TaskResult } from "../common/open-api"; import { TaskManagerOptions } from "./TaskManager"; const DEFAULT_ERROR_MESSAGE = "An unknown error occurred"; +const MAX_RETRIES = 3; + export type TaskErrorHandler = (error: Error, task?: Task) => void; export interface RunnerArgs { @@ -95,17 +97,38 @@ export class TaskRunner { } }; + updateTaskWithRetry = async (task: Task, taskResult: TaskResult) => { + let retryCount = 0; + while (retryCount < MAX_RETRIES) { + try { + await this.taskResource.updateTask1(taskResult); + return; + } catch (error: unknown) { + this.logger.error( + `Error updating task ${taskResult.taskId} on retry ${retryCount}`, + error + ); + this.errorHandler(error as Error, task); + retryCount++; + await new Promise((r) => setTimeout(() => r(true), retryCount * 10)); + } + } + this.logger.error( + `Unable to update task ${taskResult.taskId} after ${retryCount} retries` + ); + }; + executeTask = async (task: Task) => { try { const result = await this.worker.execute(task); - await this.taskResource.updateTask1({ + await this.updateTaskWithRetry(task, { ...result, workflowInstanceId: task.workflowInstanceId!, taskId: task.taskId!, }); this.logger.debug(`Finished polling for task ${task.taskId}`); } catch (error: unknown) { - await this.taskResource.updateTask1({ + await this.updateTaskWithRetry(task, { workflowInstanceId: task.workflowInstanceId!, taskId: task.taskId!, reasonForIncompletion: diff --git a/src/task/__tests__/TaskManager.test.ts b/src/task/__tests__/TaskManager.test.ts index 7f67b55e..5faba362 100644 --- a/src/task/__tests__/TaskManager.test.ts +++ b/src/task/__tests__/TaskManager.test.ts @@ -47,7 +47,7 @@ describe("TaskManager", () => { manager.stopPolling(); }); - test("On error should set task as FAILED", async () => { + test("On error it should call the errorHandler provided", async () => { const client = await clientPromise; const executor = new WorkflowExecutor(client); @@ -75,4 +75,30 @@ describe("TaskManager", () => { expect(errorHandler).toBeCalledTimes(1); manager.stopPolling(); }); + + test("If no error handler provided. it should just update the task", async () => { + const client = await clientPromise; + const executor = new WorkflowExecutor(client); + + const worker: ConductorWorker = { + taskDefName: "taskmanager-test", + execute: async () => { + throw Error("This is a forced error"); + }, + }; + + const manager = new TaskManager(client, [worker]); + + manager.startPolling(); + + await executor.startWorkflow({ + name: "TaskManagerTest", + input: {}, + version: 1, + }); + await new Promise((r) => setTimeout(() => r(true), 3500)); + manager.stopPolling(); + }); }); + + From 61e9342f6dbf09aae406ddd258ad958d310ec160 Mon Sep 17 00:00:00 2001 From: James Stuart Milne Date: Mon, 17 Apr 2023 19:30:04 -0300 Subject: [PATCH 4/4] refactor: move errorHandler before logger NFC --- src/task/TaskRunner.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task/TaskRunner.ts b/src/task/TaskRunner.ts index 3eee8cca..0eb8697a 100644 --- a/src/task/TaskRunner.ts +++ b/src/task/TaskRunner.ts @@ -104,11 +104,11 @@ export class TaskRunner { await this.taskResource.updateTask1(taskResult); return; } catch (error: unknown) { + this.errorHandler(error as Error, task); this.logger.error( `Error updating task ${taskResult.taskId} on retry ${retryCount}`, error ); - this.errorHandler(error as Error, task); retryCount++; await new Promise((r) => setTimeout(() => r(true), retryCount * 10)); }