Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,10 @@
"timestamp": "2025-09-03T22:20:22.674Z",
"size": 213216,
"gitCommit": "c0034633240ceb29e00fd34c67f0d33cfe0a6714"
},
{
"timestamp": "2025-09-04T17:42:40.403Z",
"size": 214426,
"gitCommit": "9801dc79f2a4a980f971e2de1b85aa23b4c7a579"
}
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -815,13 +815,13 @@ describe("Mock Integration", () => {
test("should use custom summaryGenerator for large payloads", async () => {
const largePayload = { data: "x".repeat(300000) };
const childFn = jest.fn().mockResolvedValue(largePayload);
const summaryGenerator = jest.fn().mockReturnValue("Custom summary of large data");
const summaryGenerator = jest
.fn()
.mockReturnValue("Custom summary of large data");

await runInChildContextHandler(
TEST_CONSTANTS.CHILD_CONTEXT_NAME,
childFn,
{ summaryGenerator },
);
await runInChildContextHandler(TEST_CONSTANTS.CHILD_CONTEXT_NAME, childFn, {
summaryGenerator,
});

expect(summaryGenerator).toHaveBeenCalledWith(largePayload);
expect(mockCheckpoint).toHaveBeenNthCalledWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,20 @@ export const executeChildContext = async <T>(
// Check if payload is too large for adaptive mode
let payloadToCheckpoint = serializedResult;
let replayChildren = false;

if (
serializedResult &&
Buffer.byteLength(serializedResult, "utf8") > CHECKPOINT_SIZE_LIMIT
) {
replayChildren = true;

// Use summary generator if provided, otherwise use empty string
if (options?.summaryGenerator) {
payloadToCheckpoint = options.summaryGenerator(result);
} else {
payloadToCheckpoint = "";
}

log(
context.isVerbose,
"📦",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ describe("Step Handler", () => {

// Verify terminate was called
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
reason: TerminationReason.RETRY_SCHEDULED,
message: expect.stringContaining("test-step"),
});
}, 10000);
Expand Down Expand Up @@ -553,7 +553,7 @@ describe("Step Handler", () => {

// Verify terminate was called
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
reason: TerminationReason.RETRY_SCHEDULED,
message: expect.stringContaining("test-step"),
});
}, 10000);
Expand Down Expand Up @@ -652,11 +652,41 @@ describe("Step Handler", () => {

// Verify terminate was called with stepId in the message
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
message: "Retry scheduled for interrupted step test-step-id",
reason: TerminationReason.RETRY_SCHEDULED,
message: "Retry scheduled for test-step-id",
});
});

test("should wait for timer when status is PENDING", async () => {
const stepId = "test-step-id";
const hashedStepId = hashId(stepId);
mockExecutionContext._stepData = {
[hashedStepId]: {
Id: hashedStepId,
Status: OperationStatus.PENDING,
},
};

const stepFunction = jest.fn().mockResolvedValue("result");

const promise = stepHandler(stepId, stepFunction);

// Should terminate with retry scheduled message
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.RETRY_SCHEDULED,
message: "Retry scheduled for test-step-id",
});

// Should return never-resolving promise
let resolved = false;
promise.then(() => {
resolved = true;
});

await new Promise((resolve) => setTimeout(resolve, 50));
expect(resolved).toBe(false);
});

test("should handle missing attemptCount for interrupted step", async () => {
// Set up a step that was started but not completed and has no attempt
const stepId = "test-step-id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ import { OperationInterceptor } from "../../mocks/operation-interceptor";
import { createErrorObjectFromError } from "../../utils/error-object/error-object";
import { createStructuredLogger } from "../../utils/logger/structured-logger";

const waitForTimer = <T>(
context: ExecutionContext,
stepId: string,
name: string | undefined,
): Promise<T> => {
// TODO: Current implementation assumes sequential operations only
// Will be enhanced to handle concurrent operations in future milestone
context.terminationManager.terminate({
reason: TerminationReason.RETRY_SCHEDULED,
message: `Retry scheduled for ${name || stepId}`,
});
return new Promise<T>(() => { });
};

export const createStepHandler = (
context: ExecutionContext,
checkpoint: ReturnType<typeof createCheckpoint>,
Expand Down Expand Up @@ -72,6 +86,11 @@ export const createStepHandler = (
throw new Error(errorMessage || "Unknown error");
}

// If PENDING, wait for timer to complete
if (stepData?.Status === OperationStatus.PENDING) {
return waitForTimer(context, stepId, name);
}

// Check for interrupted step with AT_MOST_ONCE_PER_RETRY semantics
if (stepData?.Status === OperationStatus.STARTED) {
const semantics = options?.semantics || StepSemantics.AtLeastOncePerRetry;
Expand Down Expand Up @@ -130,17 +149,15 @@ export const createStepHandler = (
},
});

context.terminationManager.terminate({
reason: TerminationReason.RETRY_INTERRUPTED_STEP,
message: `Retry scheduled for interrupted step ${name || stepId}`,
});

// Return a never-resolving promise to ensure the execution doesn't continue
return new Promise<T>(() => {});
return waitForTimer(context, stepId, name);
}
}
}

// Execute step function for READY, STARTED (AtLeastOncePerRetry), or first time (undefined)
// READY: Timer completed, execute step function
// STARTED: Retry after error (AtLeastOncePerRetry semantics), execute step function
// undefined: First execution, execute step function
return executeStep(context, checkpoint, stepId, name, fn, options);
};
};
Expand Down Expand Up @@ -184,27 +201,30 @@ export const executeStep = async <T>(
const semantics = options?.semantics || StepSemantics.AtLeastOncePerRetry;
const serdes = options?.serdes || defaultSerdes;

// Checkpoint at start for both semantics
if (semantics === StepSemantics.AtMostOncePerRetry) {
// Wait for checkpoint to complete
await checkpoint(stepId, {
Id: stepId,
ParentId: context.parentId,
Action: "START",
SubType: OperationSubType.STEP,
Type: OperationType.STEP,
Name: name,
});
} else {
// Fire and forget for AtLeastOncePerRetry
checkpoint(stepId, {
Id: stepId,
ParentId: context.parentId,
Action: "START",
SubType: OperationSubType.STEP,
Type: OperationType.STEP,
Name: name,
});
// Checkpoint at start for both semantics (only if not already started)
const stepData = context.getStepData(stepId);
if (stepData?.Status !== OperationStatus.STARTED) {
if (semantics === StepSemantics.AtMostOncePerRetry) {
// Wait for checkpoint to complete
await checkpoint(stepId, {
Id: stepId,
ParentId: context.parentId,
Action: OperationAction.START,
SubType: OperationSubType.STEP,
Type: OperationType.STEP,
Name: name,
});
} else {
// Fire and forget for AtLeastOncePerRetry
checkpoint(stepId, {
Id: stepId,
ParentId: context.parentId,
Action: OperationAction.START,
SubType: OperationSubType.STEP,
Type: OperationType.STEP,
Name: name,
});
}
}

try {
Expand Down Expand Up @@ -241,7 +261,7 @@ export const executeStep = async <T>(
await checkpoint(stepId, {
Id: stepId,
ParentId: context.parentId,
Action: "SUCCEED",
Action: OperationAction.SUCCEED,
SubType: OperationSubType.STEP,
Type: OperationType.STEP,
Payload: serializedResult,
Expand Down Expand Up @@ -289,7 +309,7 @@ export const executeStep = async <T>(
});

// Return a never-resolving promise to ensure the execution doesn't continue
return new Promise<T>(() => {});
return new Promise<T>(() => { });
}

const stepData = context.getStepData(stepId);
Expand Down Expand Up @@ -349,14 +369,7 @@ export const executeStep = async <T>(
},
});

context.terminationManager.terminate({
reason: TerminationReason.RETRY_SCHEDULED,
message: `Retry scheduled for ${name || stepId}`,
});

// Return a never-resolving promise to ensure the execution doesn't continue
// This will be handled by Promise.race in withDurableFunctions.ts
return new Promise<T>(() => {});
return waitForTimer(context, stepId, name);
}
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ describe("WaitForCondition Handler", () => {

expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.RETRY_SCHEDULED,
message: "waitForCondition step-1 will retry in 30 seconds",
message: "Retry scheduled for step-1",
});
});
});
Expand Down Expand Up @@ -343,6 +343,45 @@ describe("WaitForCondition Handler", () => {
expect(mockExecutionRunner.execute).toHaveBeenCalled();
});

it("should restore state from valid checkpoint data when status is READY", async () => {
const stepId = "step-1";
const hashedStepId = hashId(stepId);
mockExecutionContext._stepData[hashedStepId] = {
Id: hashedStepId,
Status: OperationStatus.READY,
StepDetails: {
Result: '"previous-state"', // Just the serialized state, not wrapped
Attempt: 2, // System-provided attempt number
},
} as any;

const checkFunc: WaitForConditionCheckFunc<string> = jest
.fn()
.mockResolvedValue("ready");
const config: WaitForConditionConfig<string> = {
waitStrategy: (state, attempt) => {
expect(state).toBe("ready");
expect(attempt).toBe(2); // Should use attempt from system
return { shouldContinue: false };
},
initialState: "initial",
};

// Mock the execution to call the check function with the restored state
mockExecutionRunner.execute.mockImplementation(
async (name: any, fn: any) => {
const result = await fn();
return result;
},
);

const result = await waitForConditionHandler(checkFunc, config);

expect(result).toBe("ready");
// Verify the execution runner was called
expect(mockExecutionRunner.execute).toHaveBeenCalled();
});

it("should use initial state when checkpoint data is invalid JSON", async () => {
const stepId = "step-1";
const hashedStepId = hashId(stepId);
Expand Down Expand Up @@ -482,12 +521,46 @@ describe("WaitForCondition Handler", () => {

expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.RETRY_SCHEDULED,
message: "waitForCondition step-1 will retry in 30 seconds",
message: "Retry scheduled for step-1",
});

// Verify that the promise is indeed never-resolving by checking its constructor
expect(promise).toBeInstanceOf(Promise);
});

it("should wait for timer when status is PENDING", async () => {
const stepId = "step-1";
const hashedStepId = hashId(stepId);
mockExecutionContext._stepData[hashedStepId] = {
Id: hashedStepId,
Status: OperationStatus.PENDING,
} as any;

const checkFunc: WaitForConditionCheckFunc<string> = jest
.fn()
.mockResolvedValue("ready");
const config: WaitForConditionConfig<string> = {
waitStrategy: () => ({ shouldContinue: false }),
initialState: "initial",
};

const promise = waitForConditionHandler(checkFunc, config);

// Should terminate with retry scheduled message
expect(mockTerminationManager.terminate).toHaveBeenCalledWith({
reason: TerminationReason.RETRY_SCHEDULED,
message: "Retry scheduled for step-1",
});

// Should return never-resolving promise
let resolved = false;
promise.then(() => {
resolved = true;
});

await new Promise((resolve) => setTimeout(resolve, 50));
expect(resolved).toBe(false);
});
});

describe("Error handling", () => {
Expand Down
Loading