From a8d51e65c2f379b63a13756e61e574849671a449 Mon Sep 17 00:00:00 2001 From: Pablo Cuadrado Date: Wed, 5 Jun 2024 19:15:05 -0300 Subject: [PATCH 1/4] add taskClient (WIP) --- src/core/index.ts | 1 + src/core/sdk/index.ts | 1 + src/core/sdk/taskDefinition.ts | 41 +++++++++++++++ src/core/taskClient.ts | 91 ++++++++++++++++++++++++++++++++++ 4 files changed, 134 insertions(+) create mode 100644 src/core/sdk/taskDefinition.ts create mode 100644 src/core/taskClient.ts diff --git a/src/core/index.ts b/src/core/index.ts index 2587cd28..f75b404a 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -4,3 +4,4 @@ export * from "./human"; export * from "./sdk"; export * from "./generators"; export * from "./schedulerClient"; +export * from "./taskClient"; diff --git a/src/core/sdk/index.ts b/src/core/sdk/index.ts index 61e54089..747338f1 100644 --- a/src/core/sdk/index.ts +++ b/src/core/sdk/index.ts @@ -11,6 +11,7 @@ export * from "./setVariable"; export * from "./simple"; export * from "./subWorkflow"; export * from "./switch"; +export * from "./taskDefinition"; export * from "./terminate"; export * from "./wait"; export * from "./workflow"; diff --git a/src/core/sdk/taskDefinition.ts b/src/core/sdk/taskDefinition.ts new file mode 100644 index 00000000..5569ff39 --- /dev/null +++ b/src/core/sdk/taskDefinition.ts @@ -0,0 +1,41 @@ +import type { TaskDef } from '../../common/open-api/models/TaskDef'; + +export const taskDefinition = ({ + name, + ownerApp = "", + description = "", + retryCount = 3, + timeoutSeconds = 3600, + inputKeys = [], + outputKeys = [], + timeoutPolicy = "TIME_OUT_WF", + retryLogic = "FIXED", + retryDelaySeconds = 60, + responseTimeoutSeconds = 600, + concurrentExecLimit = 0, + inputTemplate = {}, + rateLimitPerFrequency = 0, + rateLimitFrequencyInSeconds = 1, + ownerEmail = "", + pollTimeoutSeconds = 3600, + backoffScaleFactor = 1, +}: TaskDef) : TaskDef => ({ + name, + ownerApp, + description, + retryCount, + timeoutSeconds, + inputKeys, + outputKeys, + timeoutPolicy, + retryLogic, + retryDelaySeconds, + responseTimeoutSeconds, + concurrentExecLimit, + inputTemplate, + rateLimitPerFrequency, + rateLimitFrequencyInSeconds, + ownerEmail, + pollTimeoutSeconds, + backoffScaleFactor, +}) diff --git a/src/core/taskClient.ts b/src/core/taskClient.ts new file mode 100644 index 00000000..561e7cf3 --- /dev/null +++ b/src/core/taskClient.ts @@ -0,0 +1,91 @@ +import { + ConductorClient, + SaveScheduleRequest, + SearchResultWorkflowScheduleExecutionModel, +} from "../common"; +import type { TaskDef } from '../common/open-api/models/TaskDef'; +import { tryCatchReThrow } from "./helpers"; + +export class TaskClient { + public readonly _client: ConductorClient; + + constructor(client: ConductorClient) { + this._client = client; + } + + /** + * Searches for existing scheduler execution based on below parameters + * + * @param start + * @param size + * @param sort + * @param freeText + * @param query + * @returns SearchResultWorkflowScheduleExecutionModel + */ + public search( + start: number, + size: number, + sort: string = "", + freeText: string, + query: string + ): Promise { + return tryCatchReThrow(() => + this._client.taskResource.search( + start, + size, + sort, + freeText, + query + ) + ); + } + + /** + * Get an existing schedule by Id + * @param taskId + * @returns SaveScheduleRequest + */ + public getTask(taskId: string): Promise { + return tryCatchReThrow(() => + this._client.taskResource.getTask(taskId) + ); + } + + /** + * Unregisters an existing task definition by name + * + * @param name + * @returns + */ + public unregisterTask(name: string): Promise { + return tryCatchReThrow(() => + this._client.metadataResource.unregisterTaskDef(name) + ); + } + + /** + * Registers a new task definition + * + * @param taskDef + * @returns + */ + public registerTask(taskDef: TaskDef): Promise { + return tryCatchReThrow(() => + this._client.metadataResource.registerTaskDef([taskDef]) + ); + } + + // /** + // * Get all existing workflow schedules and optionally filter by workflow name + // * @param workflowName + // * @returns Array + // */ + // public getAllSchedules( + // workflowName?: string + // ): Promise> { + // return tryCatchReThrow(() => + // this._client.taskResource.getAllSchedules(workflowName) + // ); + // } +} From f9b306f30ce21cda9e975b81ca585acb92c9a9a4 Mon Sep 17 00:00:00 2001 From: Pablo Cuadrado Date: Thu, 6 Jun 2024 12:34:01 -0300 Subject: [PATCH 2/4] split metadata client - add template client --- src/core/index.ts | 2 ++ src/core/metadataClient.ts | 49 +++++++++++++++++++++++++++++++ src/core/taskClient.ts | 60 ++++++++++++++++---------------------- src/core/templateClient.ts | 29 ++++++++++++++++++ 4 files changed, 105 insertions(+), 35 deletions(-) create mode 100644 src/core/metadataClient.ts create mode 100644 src/core/templateClient.ts diff --git a/src/core/index.ts b/src/core/index.ts index f75b404a..1d92f0c7 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -5,3 +5,5 @@ export * from "./sdk"; export * from "./generators"; export * from "./schedulerClient"; export * from "./taskClient"; +export * from "./templateClient"; +export * from "./metadataClient"; diff --git a/src/core/metadataClient.ts b/src/core/metadataClient.ts new file mode 100644 index 00000000..459081a5 --- /dev/null +++ b/src/core/metadataClient.ts @@ -0,0 +1,49 @@ +import { + ConductorClient, + TaskDef, +} from "../common"; +import { tryCatchReThrow } from "./helpers"; + +export class MetadataClient { + public readonly _client: ConductorClient; + + constructor(client: ConductorClient) { + this._client = client; + } + + /** + * Unregisters an existing task definition by name + * + * @param name + * @returns + */ + public unregisterTask(name: string): Promise { + return tryCatchReThrow(() => + this._client.metadataResource.unregisterTaskDef(name) + ); + } + + /** + * Registers a new task definition + * + * @param taskDef + * @returns + */ + public registerTask(taskDef: TaskDef): Promise { + return tryCatchReThrow(() => + this._client.metadataResource.registerTaskDef([taskDef]) + ); + } + + /** + * Update an existing task definition + * + * @param taskDef + * @returns + */ + public updateTask(taskDef: TaskDef): Promise { + return tryCatchReThrow(() => + this._client.metadataResource.updateTaskDef(taskDef) + ); + } +} diff --git a/src/core/taskClient.ts b/src/core/taskClient.ts index 561e7cf3..d4a3a86d 100644 --- a/src/core/taskClient.ts +++ b/src/core/taskClient.ts @@ -1,9 +1,10 @@ +import { TaskResultStatus } from "../../dist"; import { ConductorClient, - SaveScheduleRequest, - SearchResultWorkflowScheduleExecutionModel, + SearchResultTask, + Task, + TaskResult, } from "../common"; -import type { TaskDef } from '../common/open-api/models/TaskDef'; import { tryCatchReThrow } from "./helpers"; export class TaskClient { @@ -29,7 +30,7 @@ export class TaskClient { sort: string = "", freeText: string, query: string - ): Promise { + ): Promise { return tryCatchReThrow(() => this._client.taskResource.search( start, @@ -44,48 +45,37 @@ export class TaskClient { /** * Get an existing schedule by Id * @param taskId - * @returns SaveScheduleRequest + * @returns Task */ - public getTask(taskId: string): Promise { + public getTask(taskId: string): Promise { return tryCatchReThrow(() => this._client.taskResource.getTask(taskId) ); } /** - * Unregisters an existing task definition by name + * Update task result status * - * @param name + * @param workflowId + * @param taskReferenceName + * @param status + * @param outputData + * @param workerId * @returns */ - public unregisterTask(name: string): Promise { + public updateTaskResult( + workflowId: string, + taskReferenceName: string, + status: TaskResultStatus, + outputData: Record, + ): Promise { return tryCatchReThrow(() => - this._client.metadataResource.unregisterTaskDef(name) - ); - } - - /** - * Registers a new task definition - * - * @param taskDef - * @returns - */ - public registerTask(taskDef: TaskDef): Promise { - return tryCatchReThrow(() => - this._client.metadataResource.registerTaskDef([taskDef]) + this._client.taskResource.updateTask( + workflowId, + taskReferenceName, + status, + outputData + ) ); } - - // /** - // * Get all existing workflow schedules and optionally filter by workflow name - // * @param workflowName - // * @returns Array - // */ - // public getAllSchedules( - // workflowName?: string - // ): Promise> { - // return tryCatchReThrow(() => - // this._client.taskResource.getAllSchedules(workflowName) - // ); - // } } diff --git a/src/core/templateClient.ts b/src/core/templateClient.ts new file mode 100644 index 00000000..4e9d29f0 --- /dev/null +++ b/src/core/templateClient.ts @@ -0,0 +1,29 @@ +import { + ConductorClient, + HumanTaskTemplate, +} from "../common"; +import { tryCatchReThrow } from "./helpers"; + + +export class TemplateClient { + public readonly _client: ConductorClient; + + constructor(client: ConductorClient) { + this._client = client; + } + + /** + * Register a new human task template + * + * @param template + * @returns + */ + public async registerTemplate( + template: HumanTaskTemplate, + asNewVersion: boolean = false + ): Promise { + return tryCatchReThrow(() => + this._client.humanTask.saveTemplate(template, asNewVersion) + ); + } +} From c920ed4c51af9d672feb207b08e6d6ed5a78bc70 Mon Sep 17 00:00:00 2001 From: Pablo Cuadrado Date: Tue, 11 Jun 2024 20:38:03 -0300 Subject: [PATCH 3/4] add .env.development.local to gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 36d2da8a..02c09bc9 100644 --- a/.gitignore +++ b/.gitignore @@ -81,3 +81,5 @@ crashlytics-build.properties fabric.properties .DS_Store .env.development +.env.development.local + From bc2004f5ee850930fe099c350cbdbf8f0dd93ffc Mon Sep 17 00:00:00 2001 From: Pablo Cuadrado Date: Tue, 11 Jun 2024 21:50:00 -0300 Subject: [PATCH 4/4] add metadataCilent test (WIP) --- src/core/__test__/MetadataClient.test.ts | 129 +++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 src/core/__test__/MetadataClient.test.ts diff --git a/src/core/__test__/MetadataClient.test.ts b/src/core/__test__/MetadataClient.test.ts new file mode 100644 index 00000000..323ece0b --- /dev/null +++ b/src/core/__test__/MetadataClient.test.ts @@ -0,0 +1,129 @@ +import { expect, describe, test, jest } from "@jest/globals"; +import { MetadataClient, taskDefinition } from ".."; +import { orkesConductorClient, OrkesApiConfig } from "../../orkes"; + +const playConfig: Partial = { + keyId: `${process.env.KEY_ID}`, + keySecret: `${process.env.KEY_SECRET}`, + serverUrl: `${process.env.SERVER_URL}`, + refreshTokenInterval: 0, +}; + +describe("MetadataClient", () => { + const clientPromise = orkesConductorClient(playConfig); + + jest.setTimeout(15000); + test("Should register a task definition", async () => { + const client = await clientPromise; + const metadataClient = new MetadataClient(client); + + const newTaskDefinition = taskDefinition({ + name: "test_task_definition", + description: "New Task Definition", + retryCount: 4, + timeoutSeconds: 7200, + inputKeys: ["inputKey1", "inputKey2"], + outputKeys: ["outputKey1", "outputKey2"], + timeoutPolicy: "TIME_OUT_WF", + retryLogic: "FIXED", + retryDelaySeconds: 61, + responseTimeoutSeconds: 601, + concurrentExecLimit: 1, + inputTemplate: { + "inputKey1": "${workflow.input.inputKey1}", + }, + rateLimitPerFrequency: 1, + rateLimitFrequencyInSeconds: 2, + pollTimeoutSeconds: 3601, + backoffScaleFactor: 1, + }); + + await expect( + metadataClient.registerTask(newTaskDefinition) + ).resolves.not.toThrowError(); + const taskDefinitionFromApi = await client.metadataResource.getTaskDef( + newTaskDefinition.name + ); + + expect(taskDefinitionFromApi.name).toEqual(newTaskDefinition.name); + expect(taskDefinitionFromApi.description).toEqual(newTaskDefinition.description); + expect(taskDefinitionFromApi.retryCount).toEqual(newTaskDefinition.retryCount); + expect(taskDefinitionFromApi.timeoutSeconds).toEqual(newTaskDefinition.timeoutSeconds); + expect(taskDefinitionFromApi.inputKeys).toEqual(newTaskDefinition.inputKeys); + expect(taskDefinitionFromApi.outputKeys).toEqual(newTaskDefinition.outputKeys); + expect(taskDefinitionFromApi.timeoutPolicy).toEqual(newTaskDefinition.timeoutPolicy); + expect(taskDefinitionFromApi.retryLogic).toEqual(newTaskDefinition.retryLogic); + expect(taskDefinitionFromApi.retryDelaySeconds).toEqual(newTaskDefinition.retryDelaySeconds); + expect(taskDefinitionFromApi.responseTimeoutSeconds).toEqual(newTaskDefinition.responseTimeoutSeconds); + expect(taskDefinitionFromApi.concurrentExecLimit).toEqual(newTaskDefinition.concurrentExecLimit); + expect(taskDefinitionFromApi.inputTemplate).toEqual(newTaskDefinition.inputTemplate); + expect(taskDefinitionFromApi.rateLimitPerFrequency).toEqual(newTaskDefinition.rateLimitPerFrequency); + expect(taskDefinitionFromApi.rateLimitFrequencyInSeconds).toEqual(newTaskDefinition.rateLimitFrequencyInSeconds); + expect(taskDefinitionFromApi.pollTimeoutSeconds).toEqual(newTaskDefinition.pollTimeoutSeconds); + expect(taskDefinitionFromApi.backoffScaleFactor).toEqual(newTaskDefinition.backoffScaleFactor); + }); + + test("Should update a task definition", async () => { + const client = await clientPromise; + const metadataClient = new MetadataClient(client); + + const newTaskDefinition = taskDefinition({ + name: "test_task_definition", + description: "New Task Definition Update", + retryCount: 5, + timeoutSeconds: 7201, + inputKeys: ["inputKey1_1", "inputKey2_1"], + outputKeys: ["outputKey1_1", "outputKey2_2"], + timeoutPolicy: "TIME_OUT_WF", + retryLogic: "FIXED", + retryDelaySeconds: 62, + responseTimeoutSeconds: 602, + concurrentExecLimit: 1, + inputTemplate: { + "inputKey2": "${workflow.input.inputKey2}", + }, + rateLimitPerFrequency: 1, + rateLimitFrequencyInSeconds: 3, + pollTimeoutSeconds: 3602, + backoffScaleFactor: 1, + }); + + await expect( + metadataClient.updateTask(newTaskDefinition) + ).resolves.not.toThrowError(); + const taskDefinitionFromApi = await client.metadataResource.getTaskDef( + newTaskDefinition.name + ); + + expect(taskDefinitionFromApi.description).toEqual(newTaskDefinition.description); + expect(taskDefinitionFromApi.retryCount).toEqual(newTaskDefinition.retryCount); + expect(taskDefinitionFromApi.timeoutSeconds).toEqual(newTaskDefinition.timeoutSeconds); + expect(taskDefinitionFromApi.inputKeys).toEqual(newTaskDefinition.inputKeys); + expect(taskDefinitionFromApi.outputKeys).toEqual(newTaskDefinition.outputKeys); + expect(taskDefinitionFromApi.timeoutPolicy).toEqual(newTaskDefinition.timeoutPolicy); + expect(taskDefinitionFromApi.retryLogic).toEqual(newTaskDefinition.retryLogic); + expect(taskDefinitionFromApi.retryDelaySeconds).toEqual(newTaskDefinition.retryDelaySeconds); + expect(taskDefinitionFromApi.responseTimeoutSeconds).toEqual(newTaskDefinition.responseTimeoutSeconds); + expect(taskDefinitionFromApi.concurrentExecLimit).toEqual(newTaskDefinition.concurrentExecLimit); + expect(taskDefinitionFromApi.inputTemplate).toEqual(newTaskDefinition.inputTemplate); + expect(taskDefinitionFromApi.rateLimitPerFrequency).toEqual(newTaskDefinition.rateLimitPerFrequency); + expect(taskDefinitionFromApi.rateLimitFrequencyInSeconds).toEqual(newTaskDefinition.rateLimitFrequencyInSeconds); + expect(taskDefinitionFromApi.pollTimeoutSeconds).toEqual(newTaskDefinition.pollTimeoutSeconds); + expect(taskDefinitionFromApi.backoffScaleFactor).toEqual(newTaskDefinition.backoffScaleFactor); + + }); + + test("Should unregister a task definition", async () => { + const client = await clientPromise; + const metadataClient = new MetadataClient(client); + const name ="test_task_definition"; + + await expect( + metadataClient.unregisterTask("test_task_definition") + ).resolves.not.toThrowError(); + + await expect(client.metadataResource.getTaskDef( + name + )).rejects.toThrowError(); + }) +});