Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 56 additions & 42 deletions src/task/TaskManager.ts
Comment thread
gardusig marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,83 +1,97 @@
import os from "os"
import {TaskRunner} from "./TaskRunner"
import {ConductorLogger, DefaultLogger} from "../common"
import {ConductorWorker} from "./Worker"
import {ConductorClient} from "../common/open-api"
import os from "os";
import { TaskRunner, TaskErrorHandler, noopErrorHandler } from "./TaskRunner";
import { ConductorLogger, DefaultLogger } from "../common";
import { ConductorWorker } from "./Worker";
import { ConductorClient } from "../common/open-api";

export interface TaskManagerOptions {
workerID: string
domain: string | undefined
pollInterval?: number,
concurrency?: number
workerID: string;
domain: string | undefined;
pollInterval?: number;
concurrency?: number;
}

export interface TaskManagerConfig {
logger?: ConductorLogger
options?: Partial<TaskManagerOptions>
logger?: ConductorLogger;
options?: Partial<TaskManagerOptions>;
onError?: TaskErrorHandler;
}

const defaultManagerOptions: Required<TaskManagerOptions> = {
workerID: '',
workerID: "",
pollInterval: 1000,
domain: undefined,
concurrency: 1
}
concurrency: 1,
};

function workerId (options: Partial<TaskManagerOptions>) {
return options.workerID ?? os.hostname()
function workerId(options: Partial<TaskManagerOptions>) {
return options.workerID ?? os.hostname();
}

/**
* Responsible for initializing and managing the runners that poll and work different task queues.
*/
export class TaskManager {
private tasks: Record<string, Array<TaskRunner>> = {}
private readonly client: ConductorClient
private readonly logger: ConductorLogger
private workers: Array<ConductorWorker>
private readonly taskManageOptions: Required<TaskManagerOptions>
private tasks: Record<string, Array<TaskRunner>> = {};
private readonly client: ConductorClient;
private readonly logger: ConductorLogger;
private readonly errorHandler: TaskErrorHandler;
private workers: Array<ConductorWorker>;
private readonly taskManageOptions: Required<TaskManagerOptions>;

constructor(client: ConductorClient, workers: Array<ConductorWorker>, config: TaskManagerConfig = {}) {
if (!workers) { throw new Error("No workers supplied to TaskManager. Please pass an array of workers.") }
this.client = client
this.logger = config.logger ?? new DefaultLogger()
this.workers = workers
const providedOptions = config.options ?? {}
constructor(
client: ConductorClient,
workers: Array<ConductorWorker>,
config: TaskManagerConfig = {}
) {
if (!workers) {
throw new Error(
"No workers supplied to TaskManager. Please pass an array of workers."
);
}
this.client = client;
this.logger = config.logger ?? new DefaultLogger();
this.errorHandler = config.onError ?? noopErrorHandler;
this.workers = workers;
const providedOptions = config.options ?? {};
this.taskManageOptions = {
...defaultManagerOptions,
...providedOptions,
workerID: workerId(providedOptions),
}
};
}

startPolling = () => {
this.workers.forEach(worker => {
this.tasks[worker.taskDefName] = []
this.workers.forEach((worker) => {
this.tasks[worker.taskDefName] = [];
const options = {
...this.taskManageOptions,
concurrency: worker.concurrency ?? this.taskManageOptions.concurrency,
domain: worker.domain ?? this.taskManageOptions.domain
}
this.logger.debug(`Starting taskDefName=${worker.taskDefName} concurrency=${options.concurrency} domain=${options.domain}`)
domain: worker.domain ?? this.taskManageOptions.domain,
};
this.logger.debug(
`Starting taskDefName=${worker.taskDefName} concurrency=${options.concurrency} domain=${options.domain}`
);
for (let i = 0; i < options.concurrency; i++) {
const runner = new TaskRunner({
worker,
options,
taskResource: this.client.taskResource,
logger: this.logger
})
logger: this.logger,
onError: this.errorHandler,
});
// TODO(@ntomlin): right now we aren't handling these promises
// which will inevitably lead to chaos
runner.startPolling()
this.tasks[worker.taskDefName].push(runner)
runner.startPolling();
this.tasks[worker.taskDefName].push(runner);
}
})
}
});
};

stopPolling = () => {
for (const taskType in this.tasks) {
this.tasks[taskType].forEach(runner => runner.stopPolling())
this.tasks[taskType] = []
this.tasks[taskType].forEach((runner) => runner.stopPolling());
this.tasks[taskType] = [];
}
}
};
}
46 changes: 41 additions & 5 deletions src/task/TaskRunner.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import { ConductorLogger } from "../common";
import { ConductorWorker } from "./Worker";
import { Task, TaskResourceService } from "../common/open-api";
import { Task, TaskResourceService, TaskResult } from "../common/open-api";
import { TaskManagerOptions } from "./TaskManager";

const DEFAULT_ERROR_MESSAGE = "An unknown error occurred";
const MAX_RETRIES = 3;

export type TaskErrorHandler = (error: Error, task?: Task) => void;

export interface RunnerArgs {
worker: ConductorWorker;
taskResource: TaskResourceService;
options: Required<TaskManagerOptions>;
logger?: ConductorLogger;
onError?: TaskErrorHandler;
}

//eslint-disable-next-line
export const noopErrorHandler: TaskErrorHandler = (__error: Error) => {};

const noopLogger: ConductorLogger = {
//eslint-disable-next-line
debug: (...args: any) => {},
Expand All @@ -35,17 +43,20 @@ export class TaskRunner {
worker: ConductorWorker;
logger: ConductorLogger;
options: Required<TaskManagerOptions>;
errorHandler: TaskErrorHandler;

constructor({
worker,
taskResource,
options,
logger = noopLogger,
onError: errorHandler = noopErrorHandler,
}: RunnerArgs) {
this.taskResource = taskResource;
this.logger = logger;
this.worker = worker;
this.options = options;
this.errorHandler = errorHandler;
}

startPolling = () => {
Expand Down Expand Up @@ -77,31 +88,56 @@ export class TaskRunner {
}
} catch (unknownError: unknown) {
this.handleUnknownError(unknownError);
this.errorHandler(unknownError as Error);
}

await new Promise((r) => setTimeout(() => r(true), this.options.pollInterval));
await new Promise((r) =>
setTimeout(() => r(true), this.options.pollInterval)
);
}
};

updateTaskWithRetry = async (task: Task, taskResult: TaskResult) => {
let retryCount = 0;
while (retryCount < MAX_RETRIES) {
try {
await this.taskResource.updateTask1(taskResult);
return;
} catch (error: unknown) {
this.errorHandler(error as Error, task);
this.logger.error(
`Error updating task ${taskResult.taskId} on retry ${retryCount}`,
error
);
retryCount++;
await new Promise((r) => setTimeout(() => r(true), retryCount * 10));
}
}
this.logger.error(
`Unable to update task ${taskResult.taskId} after ${retryCount} retries`
);
};

executeTask = async (task: Task) => {
try {
const result = await this.worker.execute(task);
await this.taskResource.updateTask1({
await this.updateTaskWithRetry(task, {
...result,
workflowInstanceId: task.workflowInstanceId!,
taskId: task.taskId!,
});
this.logger.debug(`Finished polling for task ${task.taskId}`);
} catch (error: unknown) {
this.logger.error(`Error executing ${task.taskId}`, error);
await this.taskResource.updateTask1({
await this.updateTaskWithRetry(task, {
workflowInstanceId: task.workflowInstanceId!,
taskId: task.taskId!,
reasonForIncompletion:
(error as Record<string, string>)?.message ?? DEFAULT_ERROR_MESSAGE,
status: "FAILED",
outputData: {},
});
this.errorHandler(error as Error, task);
this.logger.error(`Error executing ${task.taskId}`, error);
}
};

Expand Down
104 changes: 104 additions & 0 deletions src/task/__tests__/TaskManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { expect, describe, test, jest } from "@jest/globals";
import { OrkesApiConfig, orkesConductorClient } from "../../common";
import { WorkflowExecutor } from "../../core";
import { TaskManager } from "../TaskManager";
import { ConductorWorker } from "../Worker";

const playConfig: Partial<OrkesApiConfig> = {
keyId: `${process.env.KEY_ID}`,
keySecret: `${process.env.KEY_SECRET}`,
serverUrl: "https://pg-staging.orkesconductor.com/api",
};

describe("TaskManager", () => {
const clientPromise = orkesConductorClient(playConfig);

jest.setTimeout(10000);
test("Should run workflow with worker", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);

const worker: ConductorWorker = {
taskDefName: "taskmanager-test",
execute: async () => {
return {
outputData: {
hello: "From your worker",
},
status: "COMPLETED",
};
},
};

const manager = new TaskManager(client, [worker]);
manager.startPolling();

const executionId = await executor.startWorkflow({
name: "TaskManagerTest",
input: {},
version: 1,
});
await new Promise((r) => setTimeout(() => r(true), 2500));
const workflowStatus = await client.workflowResource.getExecutionStatus(
executionId,
true
);
expect(workflowStatus.status).toEqual("COMPLETED");
manager.stopPolling();
});

test("On error it should call the errorHandler provided", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);

const worker: ConductorWorker = {
taskDefName: "taskmanager-test",
execute: async () => {
throw Error("This is a forced error");
},
};

const errorHandler = jest.fn();

const manager = new TaskManager(client, [worker], {
onError: errorHandler,
});

manager.startPolling();

await executor.startWorkflow({
name: "TaskManagerTest",
input: {},
version: 1,
});
await new Promise((r) => setTimeout(() => r(true), 3500));
expect(errorHandler).toBeCalledTimes(1);
manager.stopPolling();
});

test("If no error handler provided. it should just update the task", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);

const worker: ConductorWorker = {
taskDefName: "taskmanager-test",
execute: async () => {
throw Error("This is a forced error");
},
};

const manager = new TaskManager(client, [worker]);

manager.startPolling();

await executor.startWorkflow({
name: "TaskManagerTest",
input: {},
version: 1,
});
await new Promise((r) => setTimeout(() => r(true), 3500));
manager.stopPolling();
});
});