Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ export class DurableContextImpl implements DurableContext {
funcIdOrInput?: string | I,
inputOrConfig?: I | InvokeConfig<I, O>,
maybeConfig?: InvokeConfig<I, O>,
): Promise<O> {
): DurablePromise<O> {
validateContextUsage(
this._stepPrefix,
"invoke",
this.executionContext.terminationManager,
);
return this.withModeManagement(() => {
return this.withDurableModeManagement(() => {
const invokeHandler = createInvokeHandler(
this.executionContext,
this.checkpoint,
Expand All @@ -268,13 +268,13 @@ export class DurableContextImpl implements DurableContext {
nameOrFn: string | undefined | ChildFunc<T>,
fnOrOptions?: ChildFunc<T> | ChildConfig<T>,
maybeOptions?: ChildConfig<T>,
): Promise<T> {
): DurablePromise<T> {
validateContextUsage(
this._stepPrefix,
"runInChildContext",
this.executionContext.terminationManager,
);
return this.withModeManagement(() => {
return this.withDurableModeManagement(() => {
const blockHandler = createRunInChildContextHandler(
this.executionContext,
this.checkpoint,
Expand All @@ -284,23 +284,20 @@ export class DurableContextImpl implements DurableContext {
createDurableContext,
this._parentId,
);
const promise = blockHandler(nameOrFn, fnOrOptions, maybeOptions);
// Prevent unhandled promise rejections
promise?.catch(() => {});
return promise;
return blockHandler(nameOrFn, fnOrOptions, maybeOptions);
});
}

wait(
nameOrDuration: string | Duration,
maybeDuration?: Duration,
): Promise<void> {
): DurablePromise<void> {
validateContextUsage(
this._stepPrefix,
"wait",
this.executionContext.terminationManager,
);
return this.withModeManagement(() => {
return this.withDurableModeManagement(() => {
const waitHandler = createWaitHandler(
this.executionContext,
this.checkpoint,
Expand Down Expand Up @@ -367,27 +364,22 @@ export class DurableContextImpl implements DurableContext {
nameOrSubmitter?: string | undefined | WaitForCallbackSubmitterFunc,
submitterOrConfig?: WaitForCallbackSubmitterFunc | WaitForCallbackConfig<T>,
maybeConfig?: WaitForCallbackConfig<T>,
): Promise<T> {
): DurablePromise<T> {
validateContextUsage(
this._stepPrefix,
"waitForCallback",
this.executionContext.terminationManager,
);
return this.withModeManagement(() => {
return this.withDurableModeManagement(() => {
const waitForCallbackHandler = createWaitForCallbackHandler(
this.executionContext,
this.runInChildContext.bind(this),
);
const promise = waitForCallbackHandler(
return waitForCallbackHandler(
nameOrSubmitter!,
submitterOrConfig,
maybeConfig,
);
// Prevent unhandled promise rejections
promise?.catch(() => {});
return promise?.finally(() => {
this.checkAndUpdateReplayMode();
});
});
}

Expand All @@ -397,13 +389,13 @@ export class DurableContextImpl implements DurableContext {
| WaitForConditionCheckFunc<T>
| WaitForConditionConfig<T>,
maybeConfig?: WaitForConditionConfig<T>,
): Promise<T> {
): DurablePromise<T> {
validateContextUsage(
this._stepPrefix,
"waitForCondition",
this.executionContext.terminationManager,
);
return this.withModeManagement(() => {
return this.withDurableModeManagement(() => {
const waitForConditionHandler = createWaitForConditionHandler(
this.executionContext,
this.checkpoint,
Expand All @@ -416,20 +408,17 @@ export class DurableContextImpl implements DurableContext {
this._parentId,
);

const promise =
typeof nameOrCheckFunc === "string" || nameOrCheckFunc === undefined
? waitForConditionHandler(
nameOrCheckFunc,
checkFuncOrConfig as WaitForConditionCheckFunc<T>,
maybeConfig!,
)
: waitForConditionHandler(
nameOrCheckFunc,
checkFuncOrConfig as WaitForConditionConfig<T>,
);
// Prevent unhandled promise rejections
promise?.catch(() => {});
return promise;
return typeof nameOrCheckFunc === "string" ||
nameOrCheckFunc === undefined
? waitForConditionHandler(
nameOrCheckFunc,
checkFuncOrConfig as WaitForConditionCheckFunc<T>,
maybeConfig!,
)
: waitForConditionHandler(
nameOrCheckFunc,
checkFuncOrConfig as WaitForConditionConfig<T>,
);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createRunInChildContextHandler } from "./run-in-child-context-handler";
import { ExecutionContext, DurableContext } from "../../types";
import { ExecutionContext } from "../../types";
import { DurablePromise } from "../../types/durable-promise";
import { Context } from "aws-lambda";

Expand Down Expand Up @@ -58,15 +58,18 @@ describe("Run In Child Context Handler Two-Phase Execution", () => {
// Phase 1: Create the promise - this executes the logic immediately
const childPromise = handler(childFn);

// Wait for phase 1 to complete
await new Promise((resolve) => setTimeout(resolve, 50));

// Should return a DurablePromise
expect(childPromise).toBeInstanceOf(DurablePromise);

// Phase 1 should have executed the child function
// Wait briefly for phase 1 to start executing
await new Promise((resolve) => setTimeout(resolve, 10));

// Phase 1 should have executed the child function (before we await the promise)
expect(childFn).toHaveBeenCalled();
expect(mockCheckpoint).toHaveBeenCalled();

// Now await the promise to verify it completes
await childPromise;
});

it("should return cached result in phase 2 when awaited", async () => {
Expand All @@ -84,10 +87,12 @@ describe("Run In Child Context Handler Two-Phase Execution", () => {
// Phase 1: Create the promise
const childPromise = handler(childFn);

// Wait for phase 1 to complete
await new Promise((resolve) => setTimeout(resolve, 50));
// Wait briefly for phase 1 to execute
await new Promise((resolve) => setTimeout(resolve, 10));

// Child function should have been called before we await the promise
const phase1Calls = childFn.mock.calls.length;
expect(phase1Calls).toBeGreaterThan(0);

// Phase 2: Await the promise - should return cached result
const result = await childPromise;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ describe("Step Handler Two-Phase Execution", () => {
// Phase 1: Create the promise - this executes the logic immediately
const stepPromise = stepHandler(stepFn);

// Wait for phase 1 to complete
await new Promise((resolve) => setTimeout(resolve, 50));

// Should return a DurablePromise
expect(stepPromise).toBeInstanceOf(DurablePromise);

// Phase 1 should have executed the step function
// Wait briefly for phase 1 to start executing
await new Promise((resolve) => setTimeout(resolve, 10));

// Phase 1 should have executed the step function (before we await the promise)
expect(stepFn).toHaveBeenCalled();
expect(mockCheckpoint).toHaveBeenCalled();

// Now await the promise to verify it completes
await stepPromise;
});

it("should return cached result in phase 2 when awaited", async () => {
Expand All @@ -97,10 +100,12 @@ describe("Step Handler Two-Phase Execution", () => {
// Phase 1: Create the promise
const stepPromise = stepHandler(stepFn);

// Wait for phase 1 to complete
await new Promise((resolve) => setTimeout(resolve, 50));
// Wait briefly for phase 1 to execute
await new Promise((resolve) => setTimeout(resolve, 10));

// Step function should have been called before we await the promise
const phase1Calls = stepFn.mock.calls.length;
expect(phase1Calls).toBeGreaterThan(0);

// Phase 2: Await the promise - should return cached result
const result = await stepPromise;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { createWaitForConditionHandler } from "./wait-for-condition-handler";
import { ExecutionContext, WaitForConditionCheckFunc } from "../../types";
import { EventEmitter } from "events";
import { DurablePromise } from "../../types/durable-promise";

describe("WaitForCondition Handler Two-Phase Execution", () => {
let mockContext: ExecutionContext;
let mockCheckpoint: any;
let createStepId: () => string;
let createContextLogger: (stepId: string, attempt?: number) => any;
let addRunningOperation: jest.Mock;
let removeRunningOperation: jest.Mock;
let hasRunningOperations: () => boolean;
let getOperationsEmitter: () => EventEmitter;
let stepIdCounter = 0;

beforeEach(() => {
stepIdCounter = 0;
mockContext = {
getStepData: jest.fn().mockReturnValue(null),
durableExecutionArn: "test-arn",
terminationManager: {
shouldTerminate: jest.fn().mockReturnValue(false),
terminate: jest.fn(),
},
} as any;

mockCheckpoint = jest.fn().mockResolvedValue(undefined);
mockCheckpoint.force = jest.fn().mockResolvedValue(undefined);
mockCheckpoint.setTerminating = jest.fn();
mockCheckpoint.hasPendingAncestorCompletion = jest
.fn()
.mockReturnValue(false);

createStepId = (): string => `step-${++stepIdCounter}`;
createContextLogger = jest.fn().mockReturnValue({
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
});
addRunningOperation = jest.fn();
removeRunningOperation = jest.fn();
hasRunningOperations = jest.fn().mockReturnValue(false) as () => boolean;
getOperationsEmitter = (): EventEmitter => new EventEmitter();
});

it("should execute check function in phase 1 immediately", async () => {
const waitForConditionHandler = createWaitForConditionHandler(
mockContext,
mockCheckpoint,
createStepId,
createContextLogger,
addRunningOperation,
removeRunningOperation,
hasRunningOperations,
getOperationsEmitter,
undefined,
);

const checkFn: WaitForConditionCheckFunc<number> = jest
.fn()
.mockResolvedValue(10);

// Phase 1: Create the promise - this executes the logic immediately
const promise = waitForConditionHandler(checkFn, {
initialState: 0,
waitStrategy: (_state) => ({ shouldContinue: false }),
});

// Should return a DurablePromise
expect(promise).toBeInstanceOf(DurablePromise);

// Wait briefly for phase 1 to start executing
await new Promise((resolve) => setTimeout(resolve, 10));

// Phase 1 should have executed the check function (before we await the promise)
expect(checkFn).toHaveBeenCalled();
expect(mockCheckpoint).toHaveBeenCalled();

// Now await the promise to verify it completes
await promise;
});

it("should return cached result in phase 2 when awaited", async () => {
const waitForConditionHandler = createWaitForConditionHandler(
mockContext,
mockCheckpoint,
createStepId,
createContextLogger,
addRunningOperation,
removeRunningOperation,
hasRunningOperations,
getOperationsEmitter,
undefined,
);

const checkFn: WaitForConditionCheckFunc<string> = jest
.fn()
.mockResolvedValue("completed");

// Phase 1: Create the promise
const promise = waitForConditionHandler(checkFn, {
initialState: "initial",
waitStrategy: (_state) => ({ shouldContinue: false }),
});

// Wait briefly for phase 1 to execute
await new Promise((resolve) => setTimeout(resolve, 10));

// Check function should have been called before we await the promise
expect(checkFn).toHaveBeenCalledTimes(1);

// Phase 2: Await the promise to get the result
const result = await promise;

expect(result).toBe("completed");
expect(checkFn).toHaveBeenCalledTimes(1);
});

it("should execute check function before await", async () => {
const waitForConditionHandler = createWaitForConditionHandler(
mockContext,
mockCheckpoint,
createStepId,
createContextLogger,
addRunningOperation,
removeRunningOperation,
hasRunningOperations,
getOperationsEmitter,
undefined,
);

let executionOrder: string[] = [];
const checkFn: WaitForConditionCheckFunc<number> = jest.fn(async () => {
executionOrder.push("check-executed");
return 42;
});

// Phase 1: Create the promise
executionOrder.push("promise-created");
const promise = waitForConditionHandler(checkFn, {
initialState: 0,
waitStrategy: (_state) => ({ shouldContinue: false }),
});
executionOrder.push("after-handler-call");

// Wait briefly for phase 1 to execute
await new Promise((resolve) => setTimeout(resolve, 10));

// Check should have executed before we await
expect(checkFn).toHaveBeenCalled();

executionOrder.push("before-await");
const result = await promise;
executionOrder.push("after-await");

// Verify execution order: check should execute before await
expect(executionOrder).toEqual([
"promise-created",
"check-executed",
"after-handler-call",
"before-await",
"after-await",
]);
expect(result).toBe(42);
});
});
Loading
Loading