Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ export class IdempotencyHandler<Func extends AnyFunction> {
* window, we might get an `IdempotencyInconsistentStateError`. In such
* cases we can safely retry the handling a few times.
*/
public async handle(): Promise<ReturnType<Func>> {
public async handle({
isReplay,
}: {
isReplay?: boolean;
} = {}): Promise<ReturnType<Func>> {
// early return if we should skip idempotency completely
if (this.shouldSkipIdempotency()) {
return await this.#functionToMakeIdempotent.apply(
Expand All @@ -190,7 +194,7 @@ export class IdempotencyHandler<Func extends AnyFunction> {
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
try {
const { isIdempotent, result } =
await this.#saveInProgressOrReturnExistingResult();
await this.#saveInProgressOrReturnExistingResult({ isReplay });
if (isIdempotent) return result as ReturnType<Func>;

return await this.getFunctionResult();
Expand Down Expand Up @@ -356,7 +360,11 @@ export class IdempotencyHandler<Func extends AnyFunction> {
* Before returning a result, we might neede to look up the idempotency record
* and validate it to ensure that it is consistent with the payload to be hashed.
*/
readonly #saveInProgressOrReturnExistingResult = async (): Promise<{
readonly #saveInProgressOrReturnExistingResult = async ({
isReplay,
}: {
isReplay?: boolean;
} = {}): Promise<{
isIdempotent: boolean;
result: JSONValue;
}> => {
Expand All @@ -381,6 +389,10 @@ export class IdempotencyHandler<Func extends AnyFunction> {
{ cause: error }
);
if (error.name === 'IdempotencyItemAlreadyExistsError') {
if (isReplay) {
return returnValue;
}

let idempotencyRecord = (error as IdempotencyItemAlreadyExistsError)
.existingRecord;
if (idempotencyRecord === undefined) {
Expand Down
19 changes: 16 additions & 3 deletions packages/idempotency/src/makeIdempotent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ const isContext = (arg: unknown): arg is Context => {
);
};

const isDurableContext = (arg: unknown): boolean => {
return (
arg !== undefined &&
arg !== null &&
typeof arg === 'object' &&
'step' in arg
);
}

const isFnHandler = (
fn: AnyFunction,
args: Parameters<AnyFunction>
Expand All @@ -26,7 +35,7 @@ const isFnHandler = (
fn !== undefined &&
fn !== null &&
typeof fn === 'function' &&
isContext(args[1])
(isContext(args[1])|| isDurableContext(args[1]))
);
};

Expand Down Expand Up @@ -125,7 +134,9 @@ function makeIdempotent<Func extends AnyFunction>(
let functionPayloadToBeHashed: JSONValue;

if (isFnHandler(fn, args)) {
idempotencyConfig.registerLambdaContext(args[1]);
// If it's a durable context, retrieve the lambdaContext property
// Otherwise use the context
idempotencyConfig.registerLambdaContext(args[1]?.lambdaContext || args[1]);
functionPayloadToBeHashed = args[0];
} else {
if (isOptionsWithDataIndexArgument(options)) {
Expand All @@ -135,6 +146,8 @@ function makeIdempotent<Func extends AnyFunction>(
}
}

const isReplay = args[1]?.durableExecutionMode === "REPLAY_MODE"

return new IdempotencyHandler({
functionToMakeIdempotent: fn,
idempotencyConfig: idempotencyConfig,
Expand All @@ -143,7 +156,7 @@ function makeIdempotent<Func extends AnyFunction>(
functionArguments: args,
functionPayloadToBeHashed,
thisArg: this,
}).handle() as ReturnType<Func>;
}).handle({ isReplay }) as ReturnType<Func>
};
}

Expand Down
70 changes: 70 additions & 0 deletions packages/idempotency/tests/unit/IdempotencyHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,76 @@ describe('Class IdempotencyHandler', () => {
);
expect(mockProcessIdempotency).toHaveBeenCalledTimes(MAX_RETRIES + 1);
});

it("allows execution when isReplay is true and there is IN PROGRESS record", async ()=> {
// Prepare
// Mock saveInProgress to simulate an existing IN_PROGRESS record
vi.spyOn(persistenceStore, 'saveInProgress')
.mockRejectedValueOnce(
new IdempotencyItemAlreadyExistsError(
'Record exists',
new IdempotencyRecord({
idempotencyKey: 'test-key',
status: IdempotencyRecordStatus.INPROGRESS,
expiryTimestamp: Date.now() + 10000,
})
)
);

// Act
await idempotentHandler.handle({isReplay: true})

// Assess
expect(mockFunctionToMakeIdempotent).toBeCalled()
})

it("raises an IdempotencyAlreadyInProgressError error when isReplay is false and there is an IN PROGRESS record", async ()=> {
// Prepare
// Mock saveInProgress to simulate an existing IN_PROGRESS record
vi.spyOn(persistenceStore, 'saveInProgress')
.mockRejectedValueOnce(
new IdempotencyItemAlreadyExistsError(
'Record exists',
new IdempotencyRecord({
idempotencyKey: 'test-key',
status: IdempotencyRecordStatus.INPROGRESS,
expiryTimestamp: Date.now() + 10000,
})
)
);

// Act & Assess
await expect(idempotentHandler.handle({ isReplay: false })).rejects.toThrow(IdempotencyAlreadyInProgressError);
})

it("returns the result of the original durable execution when another durable execution with the same payload is invoked", async () => {

// Prepare
vi.spyOn(
persistenceStore,
'saveInProgress'
).mockRejectedValue(new IdempotencyItemAlreadyExistsError());

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: Date.now() + 10000,
inProgressExpiryTimestamp: 0,
responseData: { response: false },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.COMPLETED,
});
const getRecordSpy = vi
.spyOn(persistenceStore, 'getRecord')
.mockResolvedValue(stubRecord);

// Act
const result = await idempotentHandler.handle({isReplay: false})

// Assess
expect(result).toStrictEqual({ response: false });
expect(getRecordSpy).toHaveBeenCalledTimes(1);
expect(getRecordSpy).toHaveBeenCalledWith(mockFunctionPayloadToBeHashed);
})
});

describe('Method: getFunctionResult', () => {
Expand Down
14 changes: 14 additions & 0 deletions packages/idempotency/tests/unit/makeIdempotent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,18 @@ describe('Function: makeIdempotent', () => {
);
expect(getRecordSpy).toHaveBeenCalledTimes(0);
});

it("registers the LambdaContext when provided a durable context", async ()=> {
// Prepare
const registerLambdaContextSpy = vi.spyOn(IdempotencyConfig.prototype, "registerLambdaContext")
const fn = async (_event: any, _context: any) => { }
const handler = makeIdempotent(fn, mockIdempotencyOptions)
const mockDurableContext = {step: vi.fn(), lambdaContext: context}

// Act
await handler(event, mockDurableContext)

// Assess
expect(registerLambdaContextSpy).toHaveBeenCalledOnce()
})
});