diff --git a/packages/idempotency/src/IdempotencyHandler.ts b/packages/idempotency/src/IdempotencyHandler.ts index 96505222b7..456b9ed0ca 100644 --- a/packages/idempotency/src/IdempotencyHandler.ts +++ b/packages/idempotency/src/IdempotencyHandler.ts @@ -177,7 +177,11 @@ export class IdempotencyHandler { * window, we might get an `IdempotencyInconsistentStateError`. In such * cases we can safely retry the handling a few times. */ - public async handle(): Promise> { + public async handle({ + isReplay, + }: { + isReplay?: boolean; + } = {}): Promise> { // early return if we should skip idempotency completely if (this.shouldSkipIdempotency()) { return await this.#functionToMakeIdempotent.apply( @@ -190,7 +194,7 @@ export class IdempotencyHandler { for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) { try { const { isIdempotent, result } = - await this.#saveInProgressOrReturnExistingResult(); + await this.#saveInProgressOrReturnExistingResult({ isReplay }); if (isIdempotent) return result as ReturnType; return await this.getFunctionResult(); @@ -356,7 +360,11 @@ export class IdempotencyHandler { * Before returning a result, we might neede to look up the idempotency record * and validate it to ensure that it is consistent with the payload to be hashed. */ - readonly #saveInProgressOrReturnExistingResult = async (): Promise<{ + readonly #saveInProgressOrReturnExistingResult = async ({ + isReplay, + }: { + isReplay?: boolean; + } = {}): Promise<{ isIdempotent: boolean; result: JSONValue; }> => { @@ -381,6 +389,10 @@ export class IdempotencyHandler { { cause: error } ); if (error.name === 'IdempotencyItemAlreadyExistsError') { + if (isReplay) { + return returnValue; + } + let idempotencyRecord = (error as IdempotencyItemAlreadyExistsError) .existingRecord; if (idempotencyRecord === undefined) { diff --git a/packages/idempotency/src/makeIdempotent.ts b/packages/idempotency/src/makeIdempotent.ts index 2e6e9b274f..09a319b8e5 100644 --- a/packages/idempotency/src/makeIdempotent.ts +++ b/packages/idempotency/src/makeIdempotent.ts @@ -17,6 +17,15 @@ const isContext = (arg: unknown): arg is Context => { ); }; +const isDurableContext = (arg: unknown): boolean => { + return ( + arg !== undefined && + arg !== null && + typeof arg === 'object' && + 'step' in arg + ); +} + const isFnHandler = ( fn: AnyFunction, args: Parameters @@ -26,7 +35,7 @@ const isFnHandler = ( fn !== undefined && fn !== null && typeof fn === 'function' && - isContext(args[1]) + (isContext(args[1])|| isDurableContext(args[1])) ); }; @@ -125,7 +134,9 @@ function makeIdempotent( let functionPayloadToBeHashed: JSONValue; if (isFnHandler(fn, args)) { - idempotencyConfig.registerLambdaContext(args[1]); + // If it's a durable context, retrieve the lambdaContext property + // Otherwise use the context + idempotencyConfig.registerLambdaContext(args[1]?.lambdaContext || args[1]); functionPayloadToBeHashed = args[0]; } else { if (isOptionsWithDataIndexArgument(options)) { @@ -135,6 +146,8 @@ function makeIdempotent( } } + const isReplay = args[1]?.durableExecutionMode === "REPLAY_MODE" + return new IdempotencyHandler({ functionToMakeIdempotent: fn, idempotencyConfig: idempotencyConfig, @@ -143,7 +156,7 @@ function makeIdempotent( functionArguments: args, functionPayloadToBeHashed, thisArg: this, - }).handle() as ReturnType; + }).handle({ isReplay }) as ReturnType }; } diff --git a/packages/idempotency/tests/unit/IdempotencyHandler.test.ts b/packages/idempotency/tests/unit/IdempotencyHandler.test.ts index 2481314dc1..abe315ec5c 100644 --- a/packages/idempotency/tests/unit/IdempotencyHandler.test.ts +++ b/packages/idempotency/tests/unit/IdempotencyHandler.test.ts @@ -239,6 +239,76 @@ describe('Class IdempotencyHandler', () => { ); expect(mockProcessIdempotency).toHaveBeenCalledTimes(MAX_RETRIES + 1); }); + + it("allows execution when isReplay is true and there is IN PROGRESS record", async ()=> { + // Prepare + // Mock saveInProgress to simulate an existing IN_PROGRESS record + vi.spyOn(persistenceStore, 'saveInProgress') + .mockRejectedValueOnce( + new IdempotencyItemAlreadyExistsError( + 'Record exists', + new IdempotencyRecord({ + idempotencyKey: 'test-key', + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp: Date.now() + 10000, + }) + ) + ); + + // Act + await idempotentHandler.handle({isReplay: true}) + + // Assess + expect(mockFunctionToMakeIdempotent).toBeCalled() + }) + + it("raises an IdempotencyAlreadyInProgressError error when isReplay is false and there is an IN PROGRESS record", async ()=> { + // Prepare + // Mock saveInProgress to simulate an existing IN_PROGRESS record + vi.spyOn(persistenceStore, 'saveInProgress') + .mockRejectedValueOnce( + new IdempotencyItemAlreadyExistsError( + 'Record exists', + new IdempotencyRecord({ + idempotencyKey: 'test-key', + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp: Date.now() + 10000, + }) + ) + ); + + // Act & Assess + await expect(idempotentHandler.handle({ isReplay: false })).rejects.toThrow(IdempotencyAlreadyInProgressError); + }) + + it("returns the result of the original durable execution when another durable execution with the same payload is invoked", async () => { + + // Prepare + vi.spyOn( + persistenceStore, + 'saveInProgress' + ).mockRejectedValue(new IdempotencyItemAlreadyExistsError()); + + const stubRecord = new IdempotencyRecord({ + idempotencyKey: 'idempotencyKey', + expiryTimestamp: Date.now() + 10000, + inProgressExpiryTimestamp: 0, + responseData: { response: false }, + payloadHash: 'payloadHash', + status: IdempotencyRecordStatus.COMPLETED, + }); + const getRecordSpy = vi + .spyOn(persistenceStore, 'getRecord') + .mockResolvedValue(stubRecord); + + // Act + const result = await idempotentHandler.handle({isReplay: false}) + + // Assess + expect(result).toStrictEqual({ response: false }); + expect(getRecordSpy).toHaveBeenCalledTimes(1); + expect(getRecordSpy).toHaveBeenCalledWith(mockFunctionPayloadToBeHashed); + }) }); describe('Method: getFunctionResult', () => { diff --git a/packages/idempotency/tests/unit/makeIdempotent.test.ts b/packages/idempotency/tests/unit/makeIdempotent.test.ts index bd4e39238e..38011a0770 100644 --- a/packages/idempotency/tests/unit/makeIdempotent.test.ts +++ b/packages/idempotency/tests/unit/makeIdempotent.test.ts @@ -597,4 +597,18 @@ describe('Function: makeIdempotent', () => { ); expect(getRecordSpy).toHaveBeenCalledTimes(0); }); + + it("registers the LambdaContext when provided a durable context", async ()=> { + // Prepare + const registerLambdaContextSpy = vi.spyOn(IdempotencyConfig.prototype, "registerLambdaContext") + const fn = async (_event: any, _context: any) => { } + const handler = makeIdempotent(fn, mockIdempotencyOptions) + const mockDurableContext = {step: vi.fn(), lambdaContext: context} + + // Act + await handler(event, mockDurableContext) + + // Assess + expect(registerLambdaContextSpy).toHaveBeenCalledOnce() + }) });