diff --git a/packages/lambda-durable-functions-sdk-js/bundle-size-history.json b/packages/lambda-durable-functions-sdk-js/bundle-size-history.json index 05d5dbdb..bd99aeb6 100644 --- a/packages/lambda-durable-functions-sdk-js/bundle-size-history.json +++ b/packages/lambda-durable-functions-sdk-js/bundle-size-history.json @@ -188,5 +188,10 @@ "timestamp": "2025-09-03T22:20:22.674Z", "size": 213216, "gitCommit": "c0034633240ceb29e00fd34c67f0d33cfe0a6714" + }, + { + "timestamp": "2025-09-04T17:42:40.403Z", + "size": 214426, + "gitCommit": "9801dc79f2a4a980f971e2de1b85aa23b4c7a579" } -] +] \ No newline at end of file diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts index 2b4feb7f..a131f721 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.test.ts @@ -815,13 +815,13 @@ describe("Mock Integration", () => { test("should use custom summaryGenerator for large payloads", async () => { const largePayload = { data: "x".repeat(300000) }; const childFn = jest.fn().mockResolvedValue(largePayload); - const summaryGenerator = jest.fn().mockReturnValue("Custom summary of large data"); + const summaryGenerator = jest + .fn() + .mockReturnValue("Custom summary of large data"); - await runInChildContextHandler( - TEST_CONSTANTS.CHILD_CONTEXT_NAME, - childFn, - { summaryGenerator }, - ); + await runInChildContextHandler(TEST_CONSTANTS.CHILD_CONTEXT_NAME, childFn, { + summaryGenerator, + }); expect(summaryGenerator).toHaveBeenCalledWith(largePayload); expect(mockCheckpoint).toHaveBeenNthCalledWith( diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts index 6fe56879..c664b295 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts @@ -181,20 +181,20 @@ export const executeChildContext = async ( // Check if payload is too large for adaptive mode let payloadToCheckpoint = serializedResult; let replayChildren = false; - + if ( serializedResult && Buffer.byteLength(serializedResult, "utf8") > CHECKPOINT_SIZE_LIMIT ) { replayChildren = true; - + // Use summary generator if provided, otherwise use empty string if (options?.summaryGenerator) { payloadToCheckpoint = options.summaryGenerator(result); } else { payloadToCheckpoint = ""; } - + log( context.isVerbose, "📦", diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts index 5522c180..21dfd3cf 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.test.ts @@ -206,7 +206,7 @@ describe("Step Handler", () => { // Verify terminate was called expect(mockTerminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.RETRY_INTERRUPTED_STEP, + reason: TerminationReason.RETRY_SCHEDULED, message: expect.stringContaining("test-step"), }); }, 10000); @@ -553,7 +553,7 @@ describe("Step Handler", () => { // Verify terminate was called expect(mockTerminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.RETRY_INTERRUPTED_STEP, + reason: TerminationReason.RETRY_SCHEDULED, message: expect.stringContaining("test-step"), }); }, 10000); @@ -652,11 +652,41 @@ describe("Step Handler", () => { // Verify terminate was called with stepId in the message expect(mockTerminationManager.terminate).toHaveBeenCalledWith({ - reason: TerminationReason.RETRY_INTERRUPTED_STEP, - message: "Retry scheduled for interrupted step test-step-id", + reason: TerminationReason.RETRY_SCHEDULED, + message: "Retry scheduled for test-step-id", }); }); + test("should wait for timer when status is PENDING", async () => { + const stepId = "test-step-id"; + const hashedStepId = hashId(stepId); + mockExecutionContext._stepData = { + [hashedStepId]: { + Id: hashedStepId, + Status: OperationStatus.PENDING, + }, + }; + + const stepFunction = jest.fn().mockResolvedValue("result"); + + const promise = stepHandler(stepId, stepFunction); + + // Should terminate with retry scheduled message + expect(mockTerminationManager.terminate).toHaveBeenCalledWith({ + reason: TerminationReason.RETRY_SCHEDULED, + message: "Retry scheduled for test-step-id", + }); + + // Should return never-resolving promise + let resolved = false; + promise.then(() => { + resolved = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(resolved).toBe(false); + }); + test("should handle missing attemptCount for interrupted step", async () => { // Set up a step that was started but not completed and has no attempt const stepId = "test-step-id"; diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts index 3d1c9b71..40e5eacf 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/step-handler/step-handler.ts @@ -28,6 +28,20 @@ import { OperationInterceptor } from "../../mocks/operation-interceptor"; import { createErrorObjectFromError } from "../../utils/error-object/error-object"; import { createStructuredLogger } from "../../utils/logger/structured-logger"; +const waitForTimer = ( + context: ExecutionContext, + stepId: string, + name: string | undefined, +): Promise => { + // TODO: Current implementation assumes sequential operations only + // Will be enhanced to handle concurrent operations in future milestone + context.terminationManager.terminate({ + reason: TerminationReason.RETRY_SCHEDULED, + message: `Retry scheduled for ${name || stepId}`, + }); + return new Promise(() => { }); +}; + export const createStepHandler = ( context: ExecutionContext, checkpoint: ReturnType, @@ -72,6 +86,11 @@ export const createStepHandler = ( throw new Error(errorMessage || "Unknown error"); } + // If PENDING, wait for timer to complete + if (stepData?.Status === OperationStatus.PENDING) { + return waitForTimer(context, stepId, name); + } + // Check for interrupted step with AT_MOST_ONCE_PER_RETRY semantics if (stepData?.Status === OperationStatus.STARTED) { const semantics = options?.semantics || StepSemantics.AtLeastOncePerRetry; @@ -130,17 +149,15 @@ export const createStepHandler = ( }, }); - context.terminationManager.terminate({ - reason: TerminationReason.RETRY_INTERRUPTED_STEP, - message: `Retry scheduled for interrupted step ${name || stepId}`, - }); - - // Return a never-resolving promise to ensure the execution doesn't continue - return new Promise(() => {}); + return waitForTimer(context, stepId, name); } } } + // Execute step function for READY, STARTED (AtLeastOncePerRetry), or first time (undefined) + // READY: Timer completed, execute step function + // STARTED: Retry after error (AtLeastOncePerRetry semantics), execute step function + // undefined: First execution, execute step function return executeStep(context, checkpoint, stepId, name, fn, options); }; }; @@ -184,27 +201,30 @@ export const executeStep = async ( const semantics = options?.semantics || StepSemantics.AtLeastOncePerRetry; const serdes = options?.serdes || defaultSerdes; - // Checkpoint at start for both semantics - if (semantics === StepSemantics.AtMostOncePerRetry) { - // Wait for checkpoint to complete - await checkpoint(stepId, { - Id: stepId, - ParentId: context.parentId, - Action: "START", - SubType: OperationSubType.STEP, - Type: OperationType.STEP, - Name: name, - }); - } else { - // Fire and forget for AtLeastOncePerRetry - checkpoint(stepId, { - Id: stepId, - ParentId: context.parentId, - Action: "START", - SubType: OperationSubType.STEP, - Type: OperationType.STEP, - Name: name, - }); + // Checkpoint at start for both semantics (only if not already started) + const stepData = context.getStepData(stepId); + if (stepData?.Status !== OperationStatus.STARTED) { + if (semantics === StepSemantics.AtMostOncePerRetry) { + // Wait for checkpoint to complete + await checkpoint(stepId, { + Id: stepId, + ParentId: context.parentId, + Action: OperationAction.START, + SubType: OperationSubType.STEP, + Type: OperationType.STEP, + Name: name, + }); + } else { + // Fire and forget for AtLeastOncePerRetry + checkpoint(stepId, { + Id: stepId, + ParentId: context.parentId, + Action: OperationAction.START, + SubType: OperationSubType.STEP, + Type: OperationType.STEP, + Name: name, + }); + } } try { @@ -241,7 +261,7 @@ export const executeStep = async ( await checkpoint(stepId, { Id: stepId, ParentId: context.parentId, - Action: "SUCCEED", + Action: OperationAction.SUCCEED, SubType: OperationSubType.STEP, Type: OperationType.STEP, Payload: serializedResult, @@ -289,7 +309,7 @@ export const executeStep = async ( }); // Return a never-resolving promise to ensure the execution doesn't continue - return new Promise(() => {}); + return new Promise(() => { }); } const stepData = context.getStepData(stepId); @@ -349,14 +369,7 @@ export const executeStep = async ( }, }); - context.terminationManager.terminate({ - reason: TerminationReason.RETRY_SCHEDULED, - message: `Retry scheduled for ${name || stepId}`, - }); - - // Return a never-resolving promise to ensure the execution doesn't continue - // This will be handled by Promise.race in withDurableFunctions.ts - return new Promise(() => {}); + return waitForTimer(context, stepId, name); } } }; diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts index 264f4b87..145eb554 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.test.ts @@ -298,7 +298,7 @@ describe("WaitForCondition Handler", () => { expect(mockTerminationManager.terminate).toHaveBeenCalledWith({ reason: TerminationReason.RETRY_SCHEDULED, - message: "waitForCondition step-1 will retry in 30 seconds", + message: "Retry scheduled for step-1", }); }); }); @@ -343,6 +343,45 @@ describe("WaitForCondition Handler", () => { expect(mockExecutionRunner.execute).toHaveBeenCalled(); }); + it("should restore state from valid checkpoint data when status is READY", async () => { + const stepId = "step-1"; + const hashedStepId = hashId(stepId); + mockExecutionContext._stepData[hashedStepId] = { + Id: hashedStepId, + Status: OperationStatus.READY, + StepDetails: { + Result: '"previous-state"', // Just the serialized state, not wrapped + Attempt: 2, // System-provided attempt number + }, + } as any; + + const checkFunc: WaitForConditionCheckFunc = jest + .fn() + .mockResolvedValue("ready"); + const config: WaitForConditionConfig = { + waitStrategy: (state, attempt) => { + expect(state).toBe("ready"); + expect(attempt).toBe(2); // Should use attempt from system + return { shouldContinue: false }; + }, + initialState: "initial", + }; + + // Mock the execution to call the check function with the restored state + mockExecutionRunner.execute.mockImplementation( + async (name: any, fn: any) => { + const result = await fn(); + return result; + }, + ); + + const result = await waitForConditionHandler(checkFunc, config); + + expect(result).toBe("ready"); + // Verify the execution runner was called + expect(mockExecutionRunner.execute).toHaveBeenCalled(); + }); + it("should use initial state when checkpoint data is invalid JSON", async () => { const stepId = "step-1"; const hashedStepId = hashId(stepId); @@ -482,12 +521,46 @@ describe("WaitForCondition Handler", () => { expect(mockTerminationManager.terminate).toHaveBeenCalledWith({ reason: TerminationReason.RETRY_SCHEDULED, - message: "waitForCondition step-1 will retry in 30 seconds", + message: "Retry scheduled for step-1", }); // Verify that the promise is indeed never-resolving by checking its constructor expect(promise).toBeInstanceOf(Promise); }); + + it("should wait for timer when status is PENDING", async () => { + const stepId = "step-1"; + const hashedStepId = hashId(stepId); + mockExecutionContext._stepData[hashedStepId] = { + Id: hashedStepId, + Status: OperationStatus.PENDING, + } as any; + + const checkFunc: WaitForConditionCheckFunc = jest + .fn() + .mockResolvedValue("ready"); + const config: WaitForConditionConfig = { + waitStrategy: () => ({ shouldContinue: false }), + initialState: "initial", + }; + + const promise = waitForConditionHandler(checkFunc, config); + + // Should terminate with retry scheduled message + expect(mockTerminationManager.terminate).toHaveBeenCalledWith({ + reason: TerminationReason.RETRY_SCHEDULED, + message: "Retry scheduled for step-1", + }); + + // Should return never-resolving promise + let resolved = false; + promise.then(() => { + resolved = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(resolved).toBe(false); + }); }); describe("Error handling", () => { diff --git a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts index ab7ec2fb..96ac1ac7 100644 --- a/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts +++ b/packages/lambda-durable-functions-sdk-js/src/handlers/wait-for-condition-handler/wait-for-condition-handler.ts @@ -24,6 +24,20 @@ import { import { OperationInterceptor } from "../../mocks/operation-interceptor"; import { createErrorObjectFromError } from "../../utils/error-object/error-object"; +const waitForTimer = ( + context: ExecutionContext, + stepId: string, + name: string | undefined, +): Promise => { + // TODO: Current implementation assumes sequential operations only + // Will be enhanced to handle concurrent operations in future milestone + context.terminationManager.terminate({ + reason: TerminationReason.RETRY_SCHEDULED, + message: `Retry scheduled for ${name || stepId}`, + }); + return new Promise(() => { }); +}; + export const createWaitForConditionHandler = ( context: ExecutionContext, checkpoint: ReturnType, @@ -79,6 +93,15 @@ export const createWaitForConditionHandler = ( throw new Error(errorMessage || "waitForCondition failed"); } + // If PENDING, wait for timer to complete + if (stepData?.Status === OperationStatus.PENDING) { + return waitForTimer(context, stepId, name); + } + + // Execute check function for READY, STARTED, or first time (undefined) + // READY: Timer completed, execute check function + // STARTED: Retry after error, execute check function + // undefined: First execution, execute check function return executeWaitForCondition( context, checkpoint, @@ -131,7 +154,10 @@ export const executeWaitForCondition = async ( let currentState: T; const existingOperation = context.getStepData(stepId); - if (existingOperation?.Status === OperationStatus.STARTED) { + if ( + existingOperation?.Status === OperationStatus.STARTED || + existingOperation?.Status === OperationStatus.READY + ) { // This is a retry - get state from previous checkpoint const checkpointData = existingOperation.StepDetails?.Result; if (checkpointData) { @@ -168,6 +194,19 @@ export const executeWaitForCondition = async ( const currentAttemptForWaitStrategy = existingOperation?.StepDetails?.Attempt || 1; + // Checkpoint START for observability (fire and forget) - only if not already started + const stepData = context.getStepData(stepId); + if (stepData?.Status !== OperationStatus.STARTED) { + checkpoint(stepId, { + Id: stepId, + ParentId: context.parentId, + Action: OperationAction.START, + SubType: OperationSubType.WAIT_FOR_CONDITION, + Type: OperationType.STEP, + Name: name, + }); + } + try { // Create Telemetry with logger for the check function const logger = createStructuredLogger({ @@ -223,7 +262,7 @@ export const executeWaitForCondition = async ( await checkpoint(stepId, { Id: stepId, ParentId: context.parentId, - Action: "SUCCEED", + Action: OperationAction.SUCCEED, SubType: OperationSubType.WAIT_FOR_CONDITION, Type: OperationType.STEP, Payload: serializedState, @@ -244,7 +283,7 @@ export const executeWaitForCondition = async ( await checkpoint(stepId, { Id: stepId, ParentId: context.parentId, - Action: "RETRY", + Action: OperationAction.RETRY, SubType: OperationSubType.WAIT_FOR_CONDITION, Type: OperationType.STEP, Payload: serializedState, // Just the state, not wrapped in an object @@ -254,13 +293,7 @@ export const executeWaitForCondition = async ( }, }); - context.terminationManager.terminate({ - reason: TerminationReason.RETRY_SCHEDULED, - message: `waitForCondition ${name || stepId} will retry in ${decision.delaySeconds} seconds`, - }); - - // Return a never-resolving promise to ensure the execution doesn't continue - return new Promise(() => {}); + return waitForTimer(context, stepId, name); } } catch (error) { log(context.isVerbose, "❌", "waitForCondition check function failed:", { diff --git a/packages/lambda-durable-functions-sdk-js/src/types/index.ts b/packages/lambda-durable-functions-sdk-js/src/types/index.ts index 3769b5ec..1a80e43d 100644 --- a/packages/lambda-durable-functions-sdk-js/src/types/index.ts +++ b/packages/lambda-durable-functions-sdk-js/src/types/index.ts @@ -187,9 +187,9 @@ export interface StepConfig { export interface ChildConfig { serdes?: Serdes; subType?: string; - // summaryGenerator Will be used internall to create a summary for + // summaryGenerator will be used internally to create a summary for // ctx.map and ctx.parallel when result is big - summaryGenerator?: (result: T) => string; + summaryGenerator?: (result: T) => string; } export interface CreateCallbackConfig {