From 7bc74121b08dc78ebe67954539c316d6287e92f9 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Fri, 13 Jun 2025 19:05:43 +0530 Subject: [PATCH 1/5] Update ServiceRegistry test --- .../__test__/ServiceRegistryClient.test.ts | 128 +++++++++--------- 1 file changed, 61 insertions(+), 67 deletions(-) diff --git a/src/core/__test__/ServiceRegistryClient.test.ts b/src/core/__test__/ServiceRegistryClient.test.ts index 9cea0023..884ee979 100644 --- a/src/core/__test__/ServiceRegistryClient.test.ts +++ b/src/core/__test__/ServiceRegistryClient.test.ts @@ -1,4 +1,4 @@ -import {beforeAll, describe, expect, jest, test} from "@jest/globals"; +import {beforeAll, afterEach, describe, expect, jest, test} from "@jest/globals"; import {ServiceRegistryClient} from "../serviceRegistryClient"; import {orkesConductorClient} from "../../orkes"; import {ServiceType} from "../../common/open-api/models/ServiceRegistryModels"; @@ -8,12 +8,26 @@ import * as path from 'path'; describe("ServiceRegistryClient", () => { const clientPromise = orkesConductorClient({useEnvVars: true}); let serviceRegistryClient: ServiceRegistryClient; + const testServicesToCleanup: string[] = []; beforeAll(async () => { const client = await clientPromise; serviceRegistryClient = new ServiceRegistryClient(client); }); + afterEach(async () => { + // Clean up any services created during tests + for (const serviceName of testServicesToCleanup) { + try { + await serviceRegistryClient.removeService(serviceName); + } catch (e) { + // Ignore cleanup errors - service might already be deleted or not exist + console.debug(`Failed to cleanup service ${serviceName}:`, e); + } + } + testServicesToCleanup.length = 0; + }); + jest.setTimeout(15000); test("Should add and retrieve a service registry", async () => { @@ -37,6 +51,9 @@ describe("ServiceRegistryClient", () => { } }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry await expect( serviceRegistryClient.addOrUpdateService(testServiceRegistry) @@ -75,6 +92,9 @@ describe("ServiceRegistryClient", () => { serviceURI: "http://localhost:8081" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry await expect( serviceRegistryClient.addOrUpdateService(testServiceRegistry) @@ -104,6 +124,9 @@ describe("ServiceRegistryClient", () => { serviceURI: "http://localhost:8082" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry await expect( serviceRegistryClient.addOrUpdateService(testServiceRegistry) @@ -145,102 +168,73 @@ describe("ServiceRegistryClient", () => { expect(foundMethod?.methodType).toEqual(testServiceMethod.methodType); expect(foundMethod?.inputType).toEqual(testServiceMethod.inputType); expect(foundMethod?.outputType).toEqual(testServiceMethod.outputType); - - // Clean up - await serviceRegistryClient.removeService(testServiceRegistry.name); }); test("Should discover methods from a http service", async () => { // Create a test service registry for discovery - // Note: This should point to a real service that supports discovery - // For HTTP services, it should point to a service with a Swagger/OpenAPI doc - // For gRPC services, it should point to a running gRPC service with reflection const testServiceRegistry = { name: "test_service_registry_discovery", type: ServiceType.HTTP, serviceURI: "http://localhost:8081/api-docs" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry - await expect( - serviceRegistryClient.addOrUpdateService(testServiceRegistry) - ).resolves.not.toThrowError(); + await serviceRegistryClient.addOrUpdateService(testServiceRegistry); - try { - // Attempt to discover methods without creating them - const discoveredMethods = await serviceRegistryClient.discover( - testServiceRegistry.name, - true - ); - - // Verify that we discovered at least one method - expect(discoveredMethods).toBeDefined(); - expect(Array.isArray(discoveredMethods)).toBe(true); - - // Check that we got at least one method - // If the service URI is valid, this should pass - expect(discoveredMethods.length).toBeGreaterThan(0); - - if (discoveredMethods.length > 0) { - // Check that the discovered methods have the expected properties - const firstMethod = discoveredMethods[0]; - expect(firstMethod.methodName).toBeDefined(); - expect(firstMethod.methodType).toBeDefined(); - } - } catch (error) { - // If the discovery endpoint fails (e.g., if the petstore API is down), - // we'll log the error but not fail the test - console.warn("Discovery test failed, possibly due to external service unavailability:", error); - } finally { - // Clean up - await serviceRegistryClient.removeService(testServiceRegistry.name); + // Attempt to discover methods - this will fail the test if discovery fails + const discoveredMethods = await serviceRegistryClient.discover( + testServiceRegistry.name, + true + ); + + // Verify that we discovered methods + expect(discoveredMethods).toBeDefined(); + expect(Array.isArray(discoveredMethods)).toBe(true); + expect(discoveredMethods.length).toBeGreaterThan(0); + + if (discoveredMethods.length > 0) { + // Check that the discovered methods have the expected properties + const firstMethod = discoveredMethods[0]; + expect(firstMethod.methodName).toBeDefined(); + expect(firstMethod.methodType).toBeDefined(); } }); test("Should discover methods from a gRPC service", async () => { // Create a test service registry for discovery - // Note: This should point to a real service that supports discovery - // For HTTP services, it should point to a service with a Swagger/OpenAPI doc - // For gRPC services, it should point to a running gRPC service with reflection const testServiceRegistry = { name: "test_gRPC_service_registry_discovery", type: ServiceType.gRPC, serviceURI: "localhost:50051" }; + // Add service to cleanup list + testServicesToCleanup.push(testServiceRegistry.name); + // Register the service registry - await expect( - serviceRegistryClient.addOrUpdateService(testServiceRegistry) - ).resolves.not.toThrowError(); + await serviceRegistryClient.addOrUpdateService(testServiceRegistry); const filePath = path.join(__dirname, 'metadata', 'compiled.bin'); const fileBuffer = fs.readFileSync(filePath); const blob = new Blob([fileBuffer], {type: 'application/octet-stream'}); - // Register the service registry - await expect( - serviceRegistryClient.setProtoData(testServiceRegistry.name, 'compiled.bin', blob) - ).resolves.not.toThrowError(); + // Set proto data + await serviceRegistryClient.setProtoData(testServiceRegistry.name, 'compiled.bin', blob); - try { - const serviceMethods = await serviceRegistryClient.getService(testServiceRegistry.name).then(); - const methods = serviceMethods.methods; - expect(serviceMethods).toBeDefined(); - expect(methods?.length).toBeGreaterThan(0); - expect(Array.isArray(methods)).toBe(true); - - if (methods) { - const firstMethod = methods[0]; - expect(firstMethod.methodName).toBeDefined(); - expect(firstMethod.methodType).toBeDefined(); - } - } catch (error) { - // If the discovery endpoint fails (e.g., if the petstore API is down), - // we'll log the error but not fail the test - console.warn("Discovery test failed, possibly due to external service unavailability:", error); - } finally { - // Clean up - await serviceRegistryClient.removeService(testServiceRegistry.name); + const serviceMethods = await serviceRegistryClient.getService(testServiceRegistry.name); + const methods = serviceMethods.methods; + + expect(serviceMethods).toBeDefined(); + expect(methods?.length).toBeGreaterThan(0); + expect(Array.isArray(methods)).toBe(true); + + if (methods) { + const firstMethod = methods[0]; + expect(firstMethod.methodName).toBeDefined(); + expect(firstMethod.methodType).toBeDefined(); } }); }); \ No newline at end of file From 3757782bbda4c95f3358f421901680da8a97208a Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Fri, 13 Jun 2025 19:08:37 +0530 Subject: [PATCH 2/5] Update ServiceRegistry test --- src/core/__test__/ServiceRegistryClient.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/__test__/ServiceRegistryClient.test.ts b/src/core/__test__/ServiceRegistryClient.test.ts index 884ee979..ebf8ebc6 100644 --- a/src/core/__test__/ServiceRegistryClient.test.ts +++ b/src/core/__test__/ServiceRegistryClient.test.ts @@ -35,7 +35,7 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_service_registry", type: ServiceType.HTTP, - serviceURI: "http://localhost:8081/api-docs", + serviceURI: "http://httpbin:8081/api-docs", config: { circuitBreakerConfig: { failureRateThreshold: 50.0, @@ -89,7 +89,7 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_service_registry_to_remove", type: ServiceType.HTTP, - serviceURI: "http://localhost:8081" + serviceURI: "http://httpbin:8081/api-docs" }; // Add service to cleanup list @@ -121,7 +121,7 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_service_registry_with_method", type: ServiceType.HTTP, - serviceURI: "http://localhost:8082" + serviceURI: "http://httpbin:8081/api-docs" }; // Add service to cleanup list @@ -175,7 +175,7 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_service_registry_discovery", type: ServiceType.HTTP, - serviceURI: "http://localhost:8081/api-docs" + serviceURI: "http://httpbin:8081/api-docs" }; // Add service to cleanup list @@ -208,7 +208,7 @@ describe("ServiceRegistryClient", () => { const testServiceRegistry = { name: "test_gRPC_service_registry_discovery", type: ServiceType.gRPC, - serviceURI: "localhost:50051" + serviceURI: "grpcbin:50051" }; // Add service to cleanup list From e627732dfc8e2c800006dc17040dddc45b57057e Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Fri, 13 Jun 2025 20:41:03 +0530 Subject: [PATCH 3/5] Made few changes in test --- src/core/__test__/executor.test.ts | 53 +++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/src/core/__test__/executor.test.ts b/src/core/__test__/executor.test.ts index 979c83cb..61e8123a 100644 --- a/src/core/__test__/executor.test.ts +++ b/src/core/__test__/executor.test.ts @@ -1,11 +1,10 @@ -import {expect, describe, test, jest, beforeAll} from "@jest/globals"; +import {expect, describe, test, jest, beforeAll, afterEach, afterAll} from "@jest/globals"; import {Consistency, ReturnStrategy, SetVariableTaskDef, TaskType, WorkflowDef} from "../../common"; import { orkesConductorClient } from "../../orkes"; import { WorkflowExecutor } from "../executor"; import { v4 as uuidv4 } from "uuid"; import {MetadataClient} from "../metadataClient"; import {TestUtil} from "./utils/test-util"; -import {after} from "node:test"; import {TaskResultStatusEnum} from "../../common/open-api/models/TaskResultStatusEnum"; import {SignalResponse} from "../../common/open-api/models/SignalResponse"; @@ -114,7 +113,6 @@ describe("Executor", () => { }); }); - describe("Execute with Return Strategy and Consistency", () => { // Constants specific to this test suite const WORKFLOW_NAMES = { @@ -130,6 +128,8 @@ describe("Execute with Return Strategy and Consistency", () => { let client: any; let executor: WorkflowExecutor; let metadataClient: MetadataClient; + const workflowsToCleanup: {name: string, version: number}[] = []; + const executionsToCleanup: string[] = []; beforeAll(async () => { client = await clientPromise; @@ -139,9 +139,21 @@ describe("Execute with Return Strategy and Consistency", () => { // Register all test workflows await registerAllWorkflows(); - }) + }); + + afterEach(async () => { + // Clean up executions first + for (const executionId of executionsToCleanup) { + try { + await executor.terminate(executionId, "Test cleanup"); + } catch (e) { + console.debug(`Failed to cleanup execution ${executionId}:`, e); + } + } + executionsToCleanup.length = 0; + }); - after(async () => { + afterAll(async () => { // Cleanup all workflows await cleanupAllWorkflows(); }); @@ -154,6 +166,12 @@ describe("Execute with Return Strategy and Consistency", () => { TestUtil.registerWorkflow(WORKFLOW_NAMES.SUB_WF_2), TestUtil.registerWorkflow(WORKFLOW_NAMES.WAIT_SIGNAL_TEST) ]); + + // Add to cleanup list + Object.values(WORKFLOW_NAMES).forEach(name => { + workflowsToCleanup.push({name, version: 1}); + }); + console.log('✓ All workflows registered successfully'); } catch (error) { throw new Error(`Failed to register workflows: ${error}`); @@ -161,17 +179,14 @@ describe("Execute with Return Strategy and Consistency", () => { } async function cleanupAllWorkflows(): Promise { - const cleanupPromises = [ - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.COMPLEX_WF, 1), - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.SUB_WF_1, 1), - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.SUB_WF_2, 1), - TestUtil.unregisterWorkflow(WORKFLOW_NAMES.WAIT_SIGNAL_TEST, 1) - ]; + const cleanupPromises = workflowsToCleanup.map(({name, version}) => + TestUtil.unregisterWorkflow(name, version) + ); const results = await Promise.allSettled(cleanupPromises); results.forEach((result, index) => { if (result.status === 'rejected') { - console.warn(`Failed to cleanup workflow ${Object.values(WORKFLOW_NAMES)[index]}: ${result.reason}`); + console.warn(`Failed to cleanup workflow ${workflowsToCleanup[index].name}: ${result.reason}`); } }); console.log('✓ Cleanup completed'); @@ -241,7 +256,6 @@ describe("Execute with Return Strategy and Consistency", () => { } ]; - // Let's write one comprehensive test first, then replicate test("Should execute complex workflow with SYNC + TARGET_WORKFLOW and validate all aspects", async () => { const testCase = testCombinations[0]; // SYNC + TARGET_WORKFLOW @@ -262,6 +276,11 @@ describe("Execute with Return Strategy and Consistency", () => { // Convert to SignalResponse instance const result = Object.assign(new SignalResponse(), rawResult); + // Add to cleanup list + if (result.workflowId) { + executionsToCleanup.push(result.workflowId); + } + console.log(`Started workflow with ID: ${result.workflowId} for strategy: ${testCase.name}`); // ========== BASIC VALIDATIONS ========== @@ -285,7 +304,6 @@ describe("Execute with Return Strategy and Consistency", () => { expect(result.status).toBeDefined(); expect(result.createTime).toBeGreaterThan(0); expect(result.updateTime).toBeGreaterThan(0); - //expect(result.updateTime).toBeGreaterThanOrEqual(result.createTime); expect(result.tasks).toBeDefined(); expect(Array.isArray(result.tasks)).toBe(true); expect(result.tasks!.length).toBeGreaterThan(0); @@ -317,7 +335,6 @@ describe("Execute with Return Strategy and Consistency", () => { expect(workflowFromResp.status).toEqual(result.status); expect(workflowFromResp.createTime).toEqual(result.createTime); expect(workflowFromResp.updateTime).toEqual(result.updateTime); - //expect(workflowFromResp.tasks.length).toEqual(result.tasks!.length); // Test that task helper methods throw errors expect(() => result.getBlockingTask()).toThrow('does not contain task details'); @@ -405,6 +422,11 @@ describe("Execute with Return Strategy and Consistency", () => { // Convert to SignalResponse instance const result = Object.assign(new SignalResponse(), rawResult); + // Add to cleanup list + if (result.workflowId) { + executionsToCleanup.push(result.workflowId); + } + // Basic validations expect(result.responseType).toEqual(testCase.returnStrategy); expect(result.workflowId).toBeDefined(); @@ -451,5 +473,4 @@ describe("Execute with Return Strategy and Consistency", () => { }); }); }); - }); \ No newline at end of file From 4d4b08a62375102f9c9e60d3da95f74911c6a823 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Fri, 13 Jun 2025 21:25:24 +0530 Subject: [PATCH 4/5] Fix test --- src/core/__test__/executor.test.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/core/__test__/executor.test.ts b/src/core/__test__/executor.test.ts index 61e8123a..650cc10e 100644 --- a/src/core/__test__/executor.test.ts +++ b/src/core/__test__/executor.test.ts @@ -145,7 +145,14 @@ describe("Execute with Return Strategy and Consistency", () => { // Clean up executions first for (const executionId of executionsToCleanup) { try { - await executor.terminate(executionId, "Test cleanup"); + const workflowStatus = await executor.getWorkflowStatus(executionId, false, false); + + if (workflowStatus.status && !['COMPLETED', 'FAILED', 'TERMINATED', 'TIMED_OUT'].includes(workflowStatus.status)) { + await executor.terminate(executionId, "Test cleanup"); + console.debug(`Terminated running workflow: ${executionId}`); + } else { + console.debug(`Skipping cleanup for ${workflowStatus.status} workflow: ${executionId}`); + } } catch (e) { console.debug(`Failed to cleanup execution ${executionId}:`, e); } From 404120f799a189a694253026f10438101b55f327 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Fri, 13 Jun 2025 23:55:07 +0530 Subject: [PATCH 5/5] waitForWorflowCompletion --- src/core/__test__/utils/test-util.ts | 34 ++++++++++++++++++++++++++ src/task/__tests__/TaskManager.test.ts | 3 ++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/core/__test__/utils/test-util.ts b/src/core/__test__/utils/test-util.ts index a47f93d0..d0d903a4 100644 --- a/src/core/__test__/utils/test-util.ts +++ b/src/core/__test__/utils/test-util.ts @@ -2,6 +2,7 @@ import * as fs from 'fs'; import * as path from 'path'; import {WorkflowDef} from "../../../common"; import {MetadataClient} from "../../metadataClient"; +import {WorkflowExecutor} from "../../executor"; export class TestUtil { private static metadataClient: MetadataClient; @@ -35,6 +36,39 @@ export class TestUtil { } } + // Helper function to wait for workflow completion + public static async waitForWorkflowCompletion( + executor: WorkflowExecutor, + workflowId: string, + maxWaitMs: number = 300000, // 5 minutes default + pollIntervalMs: number = 100 // 100ms default + ) { + const startTime = Date.now(); + + while (Date.now() - startTime < maxWaitMs) { + try { + const workflowStatus = await executor.getWorkflow(workflowId, true); + + // Check if workflow is in a terminal state + if (['COMPLETED', 'FAILED', 'TERMINATED', 'TIMED_OUT'].includes(workflowStatus.status!)) { + console.debug(`Workflow ${workflowId} reached terminal state: ${workflowStatus.status}`); + return workflowStatus; + } + + console.debug(`Workflow ${workflowId} status: ${workflowStatus.status}, waiting...`); + + // Wait before next poll + await new Promise(resolve => setTimeout(resolve, pollIntervalMs)); + + } catch (error) { + console.warn(`Error checking workflow status for ${workflowId}:`, error); + await new Promise(resolve => setTimeout(resolve, pollIntervalMs)); + } + } + + throw new Error(`Workflow ${workflowId} did not complete within ${maxWaitMs}ms`); + } + /** * Unregister a workflow */ diff --git a/src/task/__tests__/TaskManager.test.ts b/src/task/__tests__/TaskManager.test.ts index bbd24839..51b18847 100644 --- a/src/task/__tests__/TaskManager.test.ts +++ b/src/task/__tests__/TaskManager.test.ts @@ -3,6 +3,7 @@ import { simpleTask, WorkflowExecutor } from "../../core"; import { orkesConductorClient } from "../../orkes"; import { TaskManager, ConductorWorker } from "../index"; import { mockLogger } from "./mockLogger"; +import {TestUtil} from "../../core/__test__/utils/test-util"; const BASE_TIME = 500; @@ -169,7 +170,7 @@ describe("TaskManager", () => { // decrease speed again manager.updatePollingOptions({ pollInterval: BASE_TIME, concurrency: 1 }); - const workflowStatus = await executor.getWorkflow(executionId!, true); + const workflowStatus = await TestUtil.waitForWorkflowCompletion(executor, executionId!, 30000); expect(workflowStatus.status).toEqual("COMPLETED"); await manager.stopPolling();