Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 66 additions & 72 deletions src/core/__test__/ServiceRegistryClient.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {beforeAll, describe, expect, jest, test} from "@jest/globals";
import {beforeAll, afterEach, describe, expect, jest, test} from "@jest/globals";
import {ServiceRegistryClient} from "../serviceRegistryClient";
import {orkesConductorClient} from "../../orkes";
import {ServiceType} from "../../common/open-api/models/ServiceRegistryModels";
Expand All @@ -8,20 +8,34 @@ import * as path from 'path';
describe("ServiceRegistryClient", () => {
const clientPromise = orkesConductorClient({useEnvVars: true});
let serviceRegistryClient: ServiceRegistryClient;
const testServicesToCleanup: string[] = [];

beforeAll(async () => {
const client = await clientPromise;
serviceRegistryClient = new ServiceRegistryClient(client);
});

afterEach(async () => {
// Clean up any services created during tests
for (const serviceName of testServicesToCleanup) {
try {
await serviceRegistryClient.removeService(serviceName);
} catch (e) {
// Ignore cleanup errors - service might already be deleted or not exist
console.debug(`Failed to cleanup service ${serviceName}:`, e);
}
}
testServicesToCleanup.length = 0;
});

jest.setTimeout(15000);

test("Should add and retrieve a service registry", async () => {
// Create a test service registry
const testServiceRegistry = {
name: "test_service_registry",
type: ServiceType.HTTP,
serviceURI: "http://localhost:8081/api-docs",
serviceURI: "http://httpbin:8081/api-docs",
config: {
circuitBreakerConfig: {
failureRateThreshold: 50.0,
Expand All @@ -37,6 +51,9 @@ describe("ServiceRegistryClient", () => {
}
};

// Add service to cleanup list
testServicesToCleanup.push(testServiceRegistry.name);

// Register the service registry
await expect(
serviceRegistryClient.addOrUpdateService(testServiceRegistry)
Expand Down Expand Up @@ -72,9 +89,12 @@ describe("ServiceRegistryClient", () => {
const testServiceRegistry = {
name: "test_service_registry_to_remove",
type: ServiceType.HTTP,
serviceURI: "http://localhost:8081"
serviceURI: "http://httpbin:8081/api-docs"
};

// Add service to cleanup list
testServicesToCleanup.push(testServiceRegistry.name);

// Register the service registry
await expect(
serviceRegistryClient.addOrUpdateService(testServiceRegistry)
Expand All @@ -101,9 +121,12 @@ describe("ServiceRegistryClient", () => {
const testServiceRegistry = {
name: "test_service_registry_with_method",
type: ServiceType.HTTP,
serviceURI: "http://localhost:8082"
serviceURI: "http://httpbin:8081/api-docs"
};

// Add service to cleanup list
testServicesToCleanup.push(testServiceRegistry.name);

// Register the service registry
await expect(
serviceRegistryClient.addOrUpdateService(testServiceRegistry)
Expand Down Expand Up @@ -145,102 +168,73 @@ describe("ServiceRegistryClient", () => {
expect(foundMethod?.methodType).toEqual(testServiceMethod.methodType);
expect(foundMethod?.inputType).toEqual(testServiceMethod.inputType);
expect(foundMethod?.outputType).toEqual(testServiceMethod.outputType);

// Clean up
await serviceRegistryClient.removeService(testServiceRegistry.name);
});

test("Should discover methods from a http service", async () => {
// Create a test service registry for discovery
// Note: This should point to a real service that supports discovery
// For HTTP services, it should point to a service with a Swagger/OpenAPI doc
// For gRPC services, it should point to a running gRPC service with reflection
const testServiceRegistry = {
name: "test_service_registry_discovery",
type: ServiceType.HTTP,
serviceURI: "http://localhost:8081/api-docs"
serviceURI: "http://httpbin:8081/api-docs"
};

// Add service to cleanup list
testServicesToCleanup.push(testServiceRegistry.name);

// Register the service registry
await expect(
serviceRegistryClient.addOrUpdateService(testServiceRegistry)
).resolves.not.toThrowError();
await serviceRegistryClient.addOrUpdateService(testServiceRegistry);

try {
// Attempt to discover methods without creating them
const discoveredMethods = await serviceRegistryClient.discover(
testServiceRegistry.name,
true
);

// Verify that we discovered at least one method
expect(discoveredMethods).toBeDefined();
expect(Array.isArray(discoveredMethods)).toBe(true);

// Check that we got at least one method
// If the service URI is valid, this should pass
expect(discoveredMethods.length).toBeGreaterThan(0);

if (discoveredMethods.length > 0) {
// Check that the discovered methods have the expected properties
const firstMethod = discoveredMethods[0];
expect(firstMethod.methodName).toBeDefined();
expect(firstMethod.methodType).toBeDefined();
}
} catch (error) {
// If the discovery endpoint fails (e.g., if the petstore API is down),
// we'll log the error but not fail the test
console.warn("Discovery test failed, possibly due to external service unavailability:", error);
} finally {
// Clean up
await serviceRegistryClient.removeService(testServiceRegistry.name);
// Attempt to discover methods - this will fail the test if discovery fails
const discoveredMethods = await serviceRegistryClient.discover(
testServiceRegistry.name,
true
);

// Verify that we discovered methods
expect(discoveredMethods).toBeDefined();
expect(Array.isArray(discoveredMethods)).toBe(true);
expect(discoveredMethods.length).toBeGreaterThan(0);

if (discoveredMethods.length > 0) {
// Check that the discovered methods have the expected properties
const firstMethod = discoveredMethods[0];
expect(firstMethod.methodName).toBeDefined();
expect(firstMethod.methodType).toBeDefined();
}
});

test("Should discover methods from a gRPC service", async () => {
// Create a test service registry for discovery
// Note: This should point to a real service that supports discovery
// For HTTP services, it should point to a service with a Swagger/OpenAPI doc
// For gRPC services, it should point to a running gRPC service with reflection
const testServiceRegistry = {
name: "test_gRPC_service_registry_discovery",
type: ServiceType.gRPC,
serviceURI: "localhost:50051"
serviceURI: "grpcbin:50051"
};

// Add service to cleanup list
testServicesToCleanup.push(testServiceRegistry.name);

// Register the service registry
await expect(
serviceRegistryClient.addOrUpdateService(testServiceRegistry)
).resolves.not.toThrowError();
await serviceRegistryClient.addOrUpdateService(testServiceRegistry);

const filePath = path.join(__dirname, 'metadata', 'compiled.bin');
const fileBuffer = fs.readFileSync(filePath);
const blob = new Blob([fileBuffer], {type: 'application/octet-stream'});

// Register the service registry
await expect(
serviceRegistryClient.setProtoData(testServiceRegistry.name, 'compiled.bin', blob)
).resolves.not.toThrowError();
// Set proto data
await serviceRegistryClient.setProtoData(testServiceRegistry.name, 'compiled.bin', blob);

try {
const serviceMethods = await serviceRegistryClient.getService(testServiceRegistry.name).then();
const methods = serviceMethods.methods;
expect(serviceMethods).toBeDefined();
expect(methods?.length).toBeGreaterThan(0);
expect(Array.isArray(methods)).toBe(true);

if (methods) {
const firstMethod = methods[0];
expect(firstMethod.methodName).toBeDefined();
expect(firstMethod.methodType).toBeDefined();
}
} catch (error) {
// If the discovery endpoint fails (e.g., if the petstore API is down),
// we'll log the error but not fail the test
console.warn("Discovery test failed, possibly due to external service unavailability:", error);
} finally {
// Clean up
await serviceRegistryClient.removeService(testServiceRegistry.name);
const serviceMethods = await serviceRegistryClient.getService(testServiceRegistry.name);
const methods = serviceMethods.methods;

expect(serviceMethods).toBeDefined();
expect(methods?.length).toBeGreaterThan(0);
expect(Array.isArray(methods)).toBe(true);

if (methods) {
const firstMethod = methods[0];
expect(firstMethod.methodName).toBeDefined();
expect(firstMethod.methodType).toBeDefined();
}
});
});
60 changes: 44 additions & 16 deletions src/core/__test__/executor.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import {expect, describe, test, jest, beforeAll} from "@jest/globals";
import {expect, describe, test, jest, beforeAll, afterEach, afterAll} from "@jest/globals";
import {Consistency, ReturnStrategy, SetVariableTaskDef, TaskType, WorkflowDef} from "../../common";
import { orkesConductorClient } from "../../orkes";
import { WorkflowExecutor } from "../executor";
import { v4 as uuidv4 } from "uuid";
import {MetadataClient} from "../metadataClient";
import {TestUtil} from "./utils/test-util";
import {after} from "node:test";
import {TaskResultStatusEnum} from "../../common/open-api/models/TaskResultStatusEnum";
import {SignalResponse} from "../../common/open-api/models/SignalResponse";

Expand Down Expand Up @@ -114,7 +113,6 @@ describe("Executor", () => {
});
});


describe("Execute with Return Strategy and Consistency", () => {
// Constants specific to this test suite
const WORKFLOW_NAMES = {
Expand All @@ -130,6 +128,8 @@ describe("Execute with Return Strategy and Consistency", () => {
let client: any;
let executor: WorkflowExecutor;
let metadataClient: MetadataClient;
const workflowsToCleanup: {name: string, version: number}[] = [];
const executionsToCleanup: string[] = [];

beforeAll(async () => {
client = await clientPromise;
Expand All @@ -139,9 +139,28 @@ describe("Execute with Return Strategy and Consistency", () => {

// Register all test workflows
await registerAllWorkflows();
})
});

afterEach(async () => {
// Clean up executions first
for (const executionId of executionsToCleanup) {
try {
const workflowStatus = await executor.getWorkflowStatus(executionId, false, false);

if (workflowStatus.status && !['COMPLETED', 'FAILED', 'TERMINATED', 'TIMED_OUT'].includes(workflowStatus.status)) {
await executor.terminate(executionId, "Test cleanup");
console.debug(`Terminated running workflow: ${executionId}`);
} else {
console.debug(`Skipping cleanup for ${workflowStatus.status} workflow: ${executionId}`);
}
} catch (e) {
console.debug(`Failed to cleanup execution ${executionId}:`, e);
}
}
executionsToCleanup.length = 0;
});

after(async () => {
afterAll(async () => {
// Cleanup all workflows
await cleanupAllWorkflows();
});
Expand All @@ -154,24 +173,27 @@ describe("Execute with Return Strategy and Consistency", () => {
TestUtil.registerWorkflow(WORKFLOW_NAMES.SUB_WF_2),
TestUtil.registerWorkflow(WORKFLOW_NAMES.WAIT_SIGNAL_TEST)
]);

// Add to cleanup list
Object.values(WORKFLOW_NAMES).forEach(name => {
workflowsToCleanup.push({name, version: 1});
});

console.log('✓ All workflows registered successfully');
} catch (error) {
throw new Error(`Failed to register workflows: ${error}`);
}
}

async function cleanupAllWorkflows(): Promise<void> {
const cleanupPromises = [
TestUtil.unregisterWorkflow(WORKFLOW_NAMES.COMPLEX_WF, 1),
TestUtil.unregisterWorkflow(WORKFLOW_NAMES.SUB_WF_1, 1),
TestUtil.unregisterWorkflow(WORKFLOW_NAMES.SUB_WF_2, 1),
TestUtil.unregisterWorkflow(WORKFLOW_NAMES.WAIT_SIGNAL_TEST, 1)
];
const cleanupPromises = workflowsToCleanup.map(({name, version}) =>
TestUtil.unregisterWorkflow(name, version)
);

const results = await Promise.allSettled(cleanupPromises);
results.forEach((result, index) => {
if (result.status === 'rejected') {
console.warn(`Failed to cleanup workflow ${Object.values(WORKFLOW_NAMES)[index]}: ${result.reason}`);
console.warn(`Failed to cleanup workflow ${workflowsToCleanup[index].name}: ${result.reason}`);
}
});
console.log('✓ Cleanup completed');
Expand Down Expand Up @@ -241,7 +263,6 @@ describe("Execute with Return Strategy and Consistency", () => {
}
];

// Let's write one comprehensive test first, then replicate
test("Should execute complex workflow with SYNC + TARGET_WORKFLOW and validate all aspects", async () => {
const testCase = testCombinations[0]; // SYNC + TARGET_WORKFLOW

Expand All @@ -262,6 +283,11 @@ describe("Execute with Return Strategy and Consistency", () => {
// Convert to SignalResponse instance
const result = Object.assign(new SignalResponse(), rawResult);

// Add to cleanup list
if (result.workflowId) {
executionsToCleanup.push(result.workflowId);
}

console.log(`Started workflow with ID: ${result.workflowId} for strategy: ${testCase.name}`);

// ========== BASIC VALIDATIONS ==========
Expand All @@ -285,7 +311,6 @@ describe("Execute with Return Strategy and Consistency", () => {
expect(result.status).toBeDefined();
expect(result.createTime).toBeGreaterThan(0);
expect(result.updateTime).toBeGreaterThan(0);
//expect(result.updateTime).toBeGreaterThanOrEqual(result.createTime);
expect(result.tasks).toBeDefined();
expect(Array.isArray(result.tasks)).toBe(true);
expect(result.tasks!.length).toBeGreaterThan(0);
Expand Down Expand Up @@ -317,7 +342,6 @@ describe("Execute with Return Strategy and Consistency", () => {
expect(workflowFromResp.status).toEqual(result.status);
expect(workflowFromResp.createTime).toEqual(result.createTime);
expect(workflowFromResp.updateTime).toEqual(result.updateTime);
//expect(workflowFromResp.tasks.length).toEqual(result.tasks!.length);

// Test that task helper methods throw errors
expect(() => result.getBlockingTask()).toThrow('does not contain task details');
Expand Down Expand Up @@ -405,6 +429,11 @@ describe("Execute with Return Strategy and Consistency", () => {
// Convert to SignalResponse instance
const result = Object.assign(new SignalResponse(), rawResult);

// Add to cleanup list
if (result.workflowId) {
executionsToCleanup.push(result.workflowId);
}

// Basic validations
expect(result.responseType).toEqual(testCase.returnStrategy);
expect(result.workflowId).toBeDefined();
Expand Down Expand Up @@ -451,5 +480,4 @@ describe("Execute with Return Strategy and Consistency", () => {
});
});
});

});
Loading