diff --git a/package-lock.json b/package-lock.json index 7cd4d3b8..4fca15de 100644 --- a/package-lock.json +++ b/package-lock.json @@ -220,7 +220,6 @@ "resolved": "https://registry.npmjs.org/@aws-sdk/client-dynamodb/-/client-dynamodb-3.943.0.tgz", "integrity": "sha512-1VvbsDSBrrvQ2UwdDab+YNpygyoDRSlOQ452KlEPh3jo155bMkiiY1siXaaXj0EMVwhPsLyKvbM4eBSl0Bi0yA==", "license": "Apache-2.0", - "peer": true, "dependencies": { "@aws-crypto/sha256-browser": "5.2.0", "@aws-crypto/sha256-js": "5.2.0", diff --git a/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.test.ts b/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.test.ts index 8b20eff4..b45e96a3 100644 --- a/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.test.ts @@ -138,7 +138,7 @@ describe("initializeExecutionContext", () => { durableExecutionClient: mockDurableExecutionClient, _stepData: {}, terminationManager: expect.any(Object), - activeOperationsTracker: expect.any(Object), + durableExecutionArn: mockDurableExecutionArn, pendingCompletions: expect.any(Set), getStepData: expect.any(Function), diff --git a/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.ts b/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.ts index 65d21406..e17ee3d0 100644 --- a/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.ts +++ b/packages/aws-durable-execution-sdk-js/src/context/execution-context/execution-context.ts @@ -8,7 +8,7 @@ import { import { log } from "../../utils/logger/logger"; import { getStepData as getStepDataUtil } from "../../utils/step-id-utils/step-id-utils"; import { createDefaultLogger } from "../../utils/logger/default-logger"; -import { ActiveOperationsTracker } from "../../utils/termination-helper/active-operations-tracker"; + import { Context } from "aws-lambda"; import { DurableExecutionApiClient } from "../../durable-execution-api-client/durable-execution-api-client"; import { DurableExecutionInvocationInputWithClient } from "../../utils/durable-execution-invocation-input/durable-execution-invocation-input"; @@ -84,7 +84,7 @@ export const initializeExecutionContext = async ( durableExecutionClient, _stepData: stepData, terminationManager: new TerminationManager(), - activeOperationsTracker: new ActiveOperationsTracker(), + durableExecutionArn, pendingCompletions: new Set(), getStepData(stepId: string): Operation | undefined { diff --git a/packages/aws-durable-execution-sdk-js/src/termination-manager/termination-manager-checkpoint.test.ts b/packages/aws-durable-execution-sdk-js/src/termination-manager/termination-manager-checkpoint.test.ts index 9b78506c..bfe97a5d 100644 --- a/packages/aws-durable-execution-sdk-js/src/termination-manager/termination-manager-checkpoint.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/termination-manager/termination-manager-checkpoint.test.ts @@ -18,7 +18,6 @@ const createCheckpoint = ( {}, context.durableExecutionClient, context.terminationManager, - undefined, token, emitter, logger, diff --git a/packages/aws-durable-execution-sdk-js/src/testing/create-test-checkpoint-manager.ts b/packages/aws-durable-execution-sdk-js/src/testing/create-test-checkpoint-manager.ts index 4fc87602..658d1559 100644 --- a/packages/aws-durable-execution-sdk-js/src/testing/create-test-checkpoint-manager.ts +++ b/packages/aws-durable-execution-sdk-js/src/testing/create-test-checkpoint-manager.ts @@ -13,7 +13,6 @@ export const createTestCheckpointManager = ( context._stepData, context.durableExecutionClient, context.terminationManager, - context.activeOperationsTracker, checkpointToken, emitter, logger, diff --git a/packages/aws-durable-execution-sdk-js/src/testing/create-test-durable-context.ts b/packages/aws-durable-execution-sdk-js/src/testing/create-test-durable-context.ts index bb2cd482..d4a63585 100644 --- a/packages/aws-durable-execution-sdk-js/src/testing/create-test-durable-context.ts +++ b/packages/aws-durable-execution-sdk-js/src/testing/create-test-durable-context.ts @@ -82,7 +82,6 @@ export function createTestDurableContext(options?: { }, requestId: "mock-request-id", tenantId: undefined, - activeOperationsTracker: undefined, }; const mockLambdaContext: Context = { diff --git a/packages/aws-durable-execution-sdk-js/src/testing/mock-checkpoint-manager.ts b/packages/aws-durable-execution-sdk-js/src/testing/mock-checkpoint-manager.ts index 691d0d9d..9fd6321a 100644 --- a/packages/aws-durable-execution-sdk-js/src/testing/mock-checkpoint-manager.ts +++ b/packages/aws-durable-execution-sdk-js/src/testing/mock-checkpoint-manager.ts @@ -20,7 +20,6 @@ export class MockCheckpointManager extends CheckpointManager { {}, {} as DurableExecutionClient, {} as TerminationManager, - undefined, "mock-token", {} as EventEmitter, {} as DurableLogger, diff --git a/packages/aws-durable-execution-sdk-js/src/types/core.ts b/packages/aws-durable-execution-sdk-js/src/types/core.ts index be75eefc..c16de5b9 100644 --- a/packages/aws-durable-execution-sdk-js/src/types/core.ts +++ b/packages/aws-durable-execution-sdk-js/src/types/core.ts @@ -1,7 +1,6 @@ import { TerminationManager } from "../termination-manager/termination-manager"; import { DurableExecutionClient } from "./durable-execution"; import { ErrorObject, Operation } from "@aws-sdk/client-lambda"; -import { ActiveOperationsTracker } from "../utils/termination-helper/active-operations-tracker"; /** * @internal @@ -323,7 +322,7 @@ export interface ExecutionContext { _stepData: Record; // Private, use getStepData() instead terminationManager: TerminationManager; durableExecutionArn: string; - activeOperationsTracker?: ActiveOperationsTracker; + requestId: string; tenantId: string | undefined; pendingCompletions: Set; // Track stepIds with pending SUCCEED/FAIL diff --git a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts index ccb38f7c..2a96242d 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts @@ -33,7 +33,6 @@ describe("CheckpointManager - Centralized Termination", () => { {}, mockClient, mockTerminationManager, - undefined, "test-token", mockStepDataEmitter, {} as any, diff --git a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-integration.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-integration.test.ts index f1dea431..4a0ffcff 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-integration.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-integration.test.ts @@ -52,7 +52,6 @@ describe("Checkpoint Integration Tests", () => { {}, mockState, { terminate: jest.fn() } as any, - undefined, TEST_CONSTANTS.CHECKPOINT_TOKEN, mockEmitter, mockLogger, @@ -90,7 +89,6 @@ describe("Checkpoint Integration Tests", () => { {}, mockState, { terminate: jest.fn() } as any, - undefined, TEST_CONSTANTS.CHECKPOINT_TOKEN, mockEmitter, mockLogger, @@ -177,7 +175,6 @@ describe("Checkpoint Integration Tests", () => { {}, mockState, { terminate: jest.fn() } as any, - undefined, TEST_CONSTANTS.CHECKPOINT_TOKEN, mockEmitter, mockLogger, @@ -217,7 +214,6 @@ describe("Checkpoint Integration Tests", () => { {}, mockState, { terminate: jest.fn() } as any, - undefined, TEST_CONSTANTS.CHECKPOINT_TOKEN, mockEmitter, mockLogger, @@ -270,7 +266,6 @@ describe("Checkpoint Integration Tests", () => { {}, mockState, { terminate: jest.fn() } as any, - undefined, TEST_CONSTANTS.CHECKPOINT_TOKEN, mockEmitter, mockLogger, @@ -284,7 +279,6 @@ describe("Checkpoint Integration Tests", () => { {}, mockState2, { terminate: jest.fn() } as any, - undefined, TEST_CONSTANTS.CHECKPOINT_TOKEN, mockEmitter, mockLogger, diff --git a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts index dbf58ea1..2374420d 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts @@ -32,11 +32,6 @@ interface QueuedCheckpoint { reject: (error: Error) => void; } -interface ActiveOperationsTracker { - increment(): void; - decrement(): void; -} - export class CheckpointManager implements Checkpoint { private queue: QueuedCheckpoint[] = []; private isProcessing = false; @@ -64,7 +59,6 @@ export class CheckpointManager implements Checkpoint { private stepData: Record, private storage: DurableExecutionClient, private terminationManager: TerminationManager, - private activeOperationsTracker: ActiveOperationsTracker | undefined, initialTaskToken: string, private stepDataEmitter: EventEmitter, private logger: DurableLogger, @@ -154,10 +148,6 @@ export class CheckpointManager implements Checkpoint { return new Promise(() => {}); // Never resolves during termination } - if (this.activeOperationsTracker) { - this.activeOperationsTracker.increment(); - } - return new Promise((resolve, reject) => { if ( data.Action === OperationAction.SUCCEED || @@ -170,15 +160,9 @@ export class CheckpointManager implements Checkpoint { stepId, data, resolve: () => { - if (this.activeOperationsTracker) { - this.activeOperationsTracker.decrement(); - } resolve(); }, reject: (error: Error) => { - if (this.activeOperationsTracker) { - this.activeOperationsTracker.decrement(); - } reject(error); }, }; diff --git a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-queue-completion.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-queue-completion.test.ts index fac67d44..9b19c1b3 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-queue-completion.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-queue-completion.test.ts @@ -26,7 +26,6 @@ describe("CheckpointManager Queue Completion", () => { mockContext._stepData, mockContext.state, mockTerminationManager, - undefined, "test-token", mockEmitter, mockLogger, diff --git a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint.test.ts index 946c2b68..2dd9aea1 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint.test.ts @@ -30,7 +30,6 @@ const createCheckpoint = ( context._stepData, context.durableExecutionClient, context.terminationManager, - undefined, token, emitter, logger, diff --git a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/active-operations-tracker.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/active-operations-tracker.test.ts deleted file mode 100644 index fd57a8a9..00000000 --- a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/active-operations-tracker.test.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { - ActiveOperationsTracker, - trackOperation, -} from "./active-operations-tracker"; - -describe("ActiveOperationsTracker", () => { - let tracker: ActiveOperationsTracker; - - beforeEach(() => { - tracker = new ActiveOperationsTracker(); - }); - - describe("basic operations", () => { - it("should start with zero active operations", () => { - expect(tracker.hasActive()).toBe(false); - expect(tracker.getCount()).toBe(0); - }); - - it("should increment and decrement correctly", () => { - tracker.increment(); - expect(tracker.hasActive()).toBe(true); - expect(tracker.getCount()).toBe(1); - - tracker.increment(); - expect(tracker.getCount()).toBe(2); - - tracker.decrement(); - expect(tracker.getCount()).toBe(1); - - tracker.decrement(); - expect(tracker.hasActive()).toBe(false); - expect(tracker.getCount()).toBe(0); - }); - - it("should not go below zero", () => { - tracker.decrement(); - tracker.decrement(); - expect(tracker.getCount()).toBe(0); - }); - - it("should reset to zero", () => { - tracker.increment(); - tracker.increment(); - tracker.reset(); - expect(tracker.getCount()).toBe(0); - expect(tracker.hasActive()).toBe(false); - }); - }); - - describe("trackOperation", () => { - it("should track successful async operation", async () => { - const operation = async (): Promise => { - await new Promise((resolve) => setTimeout(resolve, 10)); - return "success"; - }; - - expect(tracker.hasActive()).toBe(false); - - const promise = trackOperation(tracker, operation); - expect(tracker.hasActive()).toBe(true); - expect(tracker.getCount()).toBe(1); - - const result = await promise; - expect(result).toBe("success"); - expect(tracker.hasActive()).toBe(false); - expect(tracker.getCount()).toBe(0); - }); - - it("should track failed async operation", async () => { - const operation = async (): Promise => { - await new Promise((resolve) => setTimeout(resolve, 10)); - throw new Error("operation failed"); - }; - - expect(tracker.hasActive()).toBe(false); - - const promise = trackOperation(tracker, operation); - expect(tracker.hasActive()).toBe(true); - - await expect(promise).rejects.toThrow("operation failed"); - expect(tracker.hasActive()).toBe(false); - expect(tracker.getCount()).toBe(0); - }); - - it("should track multiple concurrent operations", async () => { - const operation1 = async (): Promise => { - await new Promise((resolve) => setTimeout(resolve, 20)); - return "op1"; - }; - - const operation2 = async (): Promise => { - await new Promise((resolve) => setTimeout(resolve, 10)); - return "op2"; - }; - - const promise1 = trackOperation(tracker, operation1); - const promise2 = trackOperation(tracker, operation2); - - expect(tracker.getCount()).toBe(2); - - await promise2; - expect(tracker.getCount()).toBe(1); - - await promise1; - expect(tracker.getCount()).toBe(0); - }); - }); -}); diff --git a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/active-operations-tracker.ts b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/active-operations-tracker.ts deleted file mode 100644 index 97eaeee7..00000000 --- a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/active-operations-tracker.ts +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Tracks active async operations to prevent premature termination - */ -export class ActiveOperationsTracker { - private activeCount = 0; - - /** - * Increment the counter when starting an async operation - */ - increment(): void { - this.activeCount++; - } - - /** - * Decrement the counter when an async operation completes - */ - decrement(): void { - this.activeCount = Math.max(0, this.activeCount - 1); - } - - /** - * Check if there are any active operations - */ - hasActive(): boolean { - return this.activeCount > 0; - } - - /** - * Get the current count of active operations - */ - getCount(): number { - return this.activeCount; - } - - /** - * Reset the counter (useful for testing) - */ - reset(): void { - this.activeCount = 0; - } -} - -/** - * Wraps an async function to track its execution - */ -export async function trackOperation( - tracker: ActiveOperationsTracker, - operation: () => Promise, -): Promise { - tracker.increment(); - try { - return await operation(); - } finally { - tracker.decrement(); - } -} diff --git a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-deferral.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-deferral.test.ts index 1e499679..724a1c5e 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-deferral.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-deferral.test.ts @@ -1,101 +1,57 @@ -import { terminate } from "./termination-helper"; +import { terminateForUnrecoverableError } from "./termination-helper"; import { ExecutionContext } from "../../types"; import { TerminationReason } from "../../termination-manager/types"; -import { ActiveOperationsTracker } from "./active-operations-tracker"; +import { UnrecoverableError } from "../../errors/unrecoverable-error/unrecoverable-error"; -describe("termination deferral with active operations", () => { +// Create concrete implementation for testing +class TestUnrecoverableError extends UnrecoverableError { + readonly terminationReason = TerminationReason.CUSTOM; + + constructor(message: string, originalError?: Error) { + super(message, originalError); + } +} + +describe("terminateForUnrecoverableError", () => { let mockContext: jest.Mocked; - let tracker: ActiveOperationsTracker; beforeEach(() => { - tracker = new ActiveOperationsTracker(); mockContext = { terminationManager: { terminate: jest.fn(), }, - activeOperationsTracker: tracker, } as any; }); - it("should terminate immediately when no active operations", () => { - terminate(mockContext, TerminationReason.WAIT_SCHEDULED, "Test message"); - - expect(mockContext.terminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.WAIT_SCHEDULED, - message: "Test message", - }); - }); - - it("should defer termination when operations are active", async () => { - // Simulate an active operation - tracker.increment(); - - const _terminatePromise = terminate( - mockContext, - TerminationReason.CALLBACK_PENDING, - "Callback pending", - ); - - // Should not terminate immediately - expect(mockContext.terminationManager.terminate).not.toHaveBeenCalled(); - - // Complete the operation after a delay - setTimeout(() => { - tracker.decrement(); - }, 50); + it("should terminate immediately with unrecoverable error", () => { + const error = new TestUnrecoverableError("Test error"); - // Wait a bit longer for the check interval to detect completion - await new Promise((resolve) => setTimeout(resolve, 100)); + terminateForUnrecoverableError(mockContext, error, "test-step"); - // Now termination should have been called expect(mockContext.terminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.CALLBACK_PENDING, - message: "Callback pending", + reason: TerminationReason.CUSTOM, + message: "Unrecoverable error in step test-step: Test error", }); }); - it("should handle parallel scenario with minSuccessful", async () => { - // Simulate parallel with 2 branches - // Branch 1: completes successfully (checkpoint in progress) - // Branch 2: tries to terminate (callback pending) + it("should return never-resolving promise", () => { + const error = new TestUnrecoverableError("Test error"); - // Branch 1 starts checkpoint - tracker.increment(); - - // Branch 2 tries to terminate - const _terminatePromise = terminate( + const promise = terminateForUnrecoverableError( mockContext, - TerminationReason.CALLBACK_PENDING, - "Branch 2 callback pending", + error, + "test-step", ); - // Termination should be deferred - expect(mockContext.terminationManager.terminate).not.toHaveBeenCalled(); + expect(promise).toBeInstanceOf(Promise); + // Promise should never resolve + let resolved = false; + promise.then(() => { + resolved = true; + }); - // Branch 1 completes checkpoint setTimeout(() => { - tracker.decrement(); - }, 30); - - // Wait for termination to proceed - await new Promise((resolve) => setTimeout(resolve, 80)); - - // Now termination should proceed - expect(mockContext.terminationManager.terminate).toHaveBeenCalled(); - }); - - it("should work without tracker (backward compatibility)", () => { - const contextWithoutTracker = { - terminationManager: { - terminate: jest.fn(), - }, - activeOperationsTracker: undefined, - } as any; - - terminate(contextWithoutTracker, TerminationReason.WAIT_SCHEDULED, "Test"); - - expect( - contextWithoutTracker.terminationManager.terminate, - ).toHaveBeenCalled(); + expect(resolved).toBe(false); + }, 100); }); }); diff --git a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.test.ts index ae0752ba..cdabd6d6 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.test.ts @@ -1,11 +1,17 @@ -import { - terminate, - terminateForUnrecoverableError, -} from "./termination-helper"; +import { terminateForUnrecoverableError } from "./termination-helper"; import { ExecutionContext } from "../../types"; import { UnrecoverableError } from "../../errors/unrecoverable-error/unrecoverable-error"; import { TerminationReason } from "../../termination-manager/types"; +// Create concrete implementation for testing +class TestUnrecoverableError extends UnrecoverableError { + readonly terminationReason = TerminationReason.CUSTOM; + + constructor(message: string, originalError?: Error) { + super(message, originalError); + } +} + describe("termination helpers", () => { let mockContext: jest.Mocked; @@ -17,81 +23,22 @@ describe("termination helpers", () => { } as any; }); - describe("terminate", () => { - it("should terminate execution with correct parameters", () => { - const reason = TerminationReason.CUSTOM; - const message = "Test termination message"; - - terminate(mockContext, reason, message); - - expect(mockContext.terminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.CUSTOM, - message: "Test termination message", - }); - }); - - it("should return a never-resolving promise", () => { - const reason = TerminationReason.RETRY_SCHEDULED; - const message = "Test message"; - - const promise = terminate(mockContext, reason, message); - - expect(promise).toBeInstanceOf(Promise); - - let resolved = false; - promise.then(() => { - resolved = true; - }); - - return new Promise((resolve) => { - setTimeout(() => { - expect(resolved).toBe(false); - resolve(); - }, 0); - }); - }); - - it("should work with different termination reasons", () => { - terminate(mockContext, TerminationReason.WAIT_SCHEDULED, "Wait message"); - expect(mockContext.terminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.WAIT_SCHEDULED, - message: "Wait message", - }); - - terminate( - mockContext, - TerminationReason.CALLBACK_PENDING, - "Callback message", - ); - expect(mockContext.terminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.CALLBACK_PENDING, - message: "Callback message", - }); - }); - }); - describe("terminateForUnrecoverableError", () => { - let mockError: UnrecoverableError; - - beforeEach(() => { - mockError = { - terminationReason: TerminationReason.CUSTOM, - message: "Test error message", - } as UnrecoverableError; - }); - it("should terminate execution with correct parameters for unrecoverable error", () => { + const mockError = new TestUnrecoverableError("Test unrecoverable error"); const stepIdentifier = "test-step"; terminateForUnrecoverableError(mockContext, mockError, stepIdentifier); expect(mockContext.terminationManager.terminate).toHaveBeenCalledWith({ reason: TerminationReason.CUSTOM, - message: "Unrecoverable error in step test-step: Test error message", + message: + "Unrecoverable error in step test-step: Test unrecoverable error", }); }); it("should return a never-resolving promise", () => { + const mockError = new TestUnrecoverableError("Test error"); const stepIdentifier = "test-step"; const promise = terminateForUnrecoverableError( @@ -101,18 +48,6 @@ describe("termination helpers", () => { ); expect(promise).toBeInstanceOf(Promise); - - let resolved = false; - promise.then(() => { - resolved = true; - }); - - return new Promise((resolve) => { - setTimeout(() => { - expect(resolved).toBe(false); - resolve(); - }, 0); - }); }); }); }); diff --git a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.ts b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.ts index f224c1d1..8aa5a542 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/termination-helper/termination-helper.ts @@ -1,214 +1,5 @@ import { ExecutionContext } from "../../types"; import { UnrecoverableError } from "../../errors/unrecoverable-error/unrecoverable-error"; -import { TerminationReason } from "../../termination-manager/types"; -import { log } from "../logger/logger"; -import { getActiveContext } from "../context-tracker/context-tracker"; -import { Operation, OperationStatus } from "@aws-sdk/client-lambda"; -import { hashId } from "../step-id-utils/step-id-utils"; - -/** - * Checks if any ancestor operation in the parent chain has finished (SUCCEEDED or FAILED) - * or has a pending completion checkpoint - */ -function hasFinishedAncestor( - context: ExecutionContext, - parentId?: string, -): boolean { - if (!parentId) { - log("🔍", "hasFinishedAncestor: No parentId provided"); - return false; - } - - // First check if any ancestor has a pending completion checkpoint - if (hasPendingAncestorCompletion(context, parentId)) { - log("🔍", "hasFinishedAncestor: Found ancestor with pending completion!", { - parentId, - }); - return true; - } - - let currentHashedId: string | undefined = hashId(parentId); - log("🔍", "hasFinishedAncestor: Starting check", { - parentId, - initialHashedId: currentHashedId, - }); - - while (currentHashedId) { - const parentOperation: Operation | undefined = - context._stepData[currentHashedId]; - - log("🔍", "hasFinishedAncestor: Checking operation", { - hashedId: currentHashedId, - hasOperation: !!parentOperation, - status: parentOperation?.Status, - type: parentOperation?.Type, - }); - - if ( - parentOperation?.Status === OperationStatus.SUCCEEDED || - parentOperation?.Status === OperationStatus.FAILED - ) { - log("🔍", "hasFinishedAncestor: Found finished ancestor!", { - hashedId: currentHashedId, - status: parentOperation.Status, - }); - return true; - } - - currentHashedId = parentOperation?.ParentId; - } - - log("🔍", "hasFinishedAncestor: No finished ancestor found"); - return false; -} - -/** - * Checks if any ancestor has a pending completion checkpoint - */ -function hasPendingAncestorCompletion( - context: ExecutionContext, - stepId: string, -): boolean { - let currentHashedId: string | undefined = hashId(stepId); - - while (currentHashedId) { - if (context.pendingCompletions.has(currentHashedId)) { - return true; - } - - const operation: Operation | undefined = context._stepData[currentHashedId]; - currentHashedId = operation?.ParentId; - } - - return false; -} - -/** - * Terminates execution and returns a never-resolving promise to prevent code progression - * @param context - The execution context containing the termination manager - * @param reason - The termination reason - * @param message - The termination message - * @returns A never-resolving promise - */ -export function terminate( - context: ExecutionContext, - reason: TerminationReason, - message: string, -): Promise { - const activeContext = getActiveContext(); - - // If we have a parent context, add delay to let checkpoints process - if (activeContext?.parentId) { - return new Promise(async (_resolve, _reject) => { - // Wait a tick to let any pending checkpoints start processing - await new Promise((resolve) => setImmediate(resolve)); - - log("🔍", "Terminate called - checking context:", { - hasActiveContext: !!activeContext, - contextId: activeContext?.contextId, - parentId: activeContext?.parentId, - reason, - message, - }); - - const ancestorFinished = hasFinishedAncestor( - context, - activeContext.parentId, - ); - log("🔍", "Ancestor check result:", { - parentId: activeContext.parentId, - ancestorFinished, - }); - - if (ancestorFinished) { - log("🛑", "Skipping termination - ancestor already finished:", { - contextId: activeContext.contextId, - parentId: activeContext.parentId, - reason, - message, - }); - // Return never-resolving promise without terminating - return; - } - - // Check if there are active operations before terminating - const tracker = context.activeOperationsTracker; - if (tracker && tracker.hasActive()) { - log("⏳", "Deferring termination - active operations in progress:", { - activeCount: tracker.getCount(), - reason, - message, - }); - - // Wait for operations to complete, then terminate - const checkInterval = setInterval(() => { - if (!tracker.hasActive()) { - clearInterval(checkInterval); - log( - "✅", - "Active operations completed, proceeding with termination:", - { - reason, - message, - }, - ); - - context.terminationManager.terminate({ - reason, - message, - }); - } - }, 10); - return; - } - - // No active operations, terminate immediately - context.terminationManager.terminate({ - reason, - message, - }); - }); - } - - // No parent context - check active operations and terminate - const tracker = context.activeOperationsTracker; - if (tracker && tracker.hasActive()) { - log("⏳", "Deferring termination - active operations in progress:", { - activeCount: tracker.getCount(), - reason, - message, - }); - - return new Promise((_resolve, _reject) => { - const checkInterval = setInterval(() => { - if (!tracker.hasActive()) { - clearInterval(checkInterval); - log( - "✅", - "Active operations completed, proceeding with termination:", - { - reason, - message, - }, - ); - - context.terminationManager.terminate({ - reason, - message, - }); - } - }, 10); - }); - } - - // No parent, no active operations - terminate immediately - context.terminationManager.terminate({ - reason, - message, - }); - - return new Promise(() => {}); -} /** * Terminates execution for unrecoverable errors and returns a never-resolving promise @@ -222,9 +13,10 @@ export function terminateForUnrecoverableError( error: UnrecoverableError, stepIdentifier: string, ): Promise { - return terminate( - context, - error.terminationReason, - `Unrecoverable error in step ${stepIdentifier}: ${error.message}`, - ); + context.terminationManager.terminate({ + reason: error.terminationReason, + message: `Unrecoverable error in step ${stepIdentifier}: ${error.message}`, + }); + + return new Promise(() => {}); // Never-resolving promise } diff --git a/packages/aws-durable-execution-sdk-js/src/with-durable-execution-queue-completion.test.ts b/packages/aws-durable-execution-sdk-js/src/with-durable-execution-queue-completion.test.ts index 65e9b05a..dee4b8c3 100644 --- a/packages/aws-durable-execution-sdk-js/src/with-durable-execution-queue-completion.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/with-durable-execution-queue-completion.test.ts @@ -41,7 +41,7 @@ describe("withDurableExecution Queue Completion", () => { _stepData: {}, terminationManager: mockTerminationManager, durableExecutionArn: "test-arn", - activeOperationsTracker: undefined, + pendingCompletions: new Set(), }; diff --git a/packages/aws-durable-execution-sdk-js/src/with-durable-execution.ts b/packages/aws-durable-execution-sdk-js/src/with-durable-execution.ts index 93867fca..d80fe8f1 100644 --- a/packages/aws-durable-execution-sdk-js/src/with-durable-execution.ts +++ b/packages/aws-durable-execution-sdk-js/src/with-durable-execution.ts @@ -48,7 +48,6 @@ async function runHandler< executionContext._stepData, executionContext.durableExecutionClient, executionContext.terminationManager, - executionContext.activeOperationsTracker, checkpointToken, stepDataEmitter, createDefaultLogger(executionContext),