From 84c0f75980cf7d08755d3d391d09dbd5268adcda Mon Sep 17 00:00:00 2001 From: Philipp Page Date: Mon, 3 Nov 2025 17:17:57 +0100 Subject: [PATCH 1/2] Validate payload for optimistic idempotent writes. --- .../persistence/BasePersistenceStore.java | 45 ++++++++++----- .../persistence/BasePersistenceStoreTest.java | 56 +++++++++++++++++-- 2 files changed, 82 insertions(+), 19 deletions(-) diff --git a/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java b/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java index 7e93dc00c..00ddc4336 100644 --- a/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java +++ b/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStore.java @@ -16,10 +16,6 @@ import static software.amazon.lambda.powertools.common.internal.LambdaConstants.LAMBDA_FUNCTION_NAME_ENV; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectWriter; -import io.burt.jmespath.Expression; import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; @@ -33,8 +29,15 @@ import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectWriter; + +import io.burt.jmespath.Expression; import software.amazon.lambda.powertools.idempotency.IdempotencyConfig; import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyItemAlreadyExistsException; import software.amazon.lambda.powertools.idempotency.exceptions.IdempotencyItemNotFoundException; @@ -125,8 +128,7 @@ public void saveSuccess(JsonNode data, Object result, Instant now) { DataRecord.Status.COMPLETED, getExpiryEpochSecond(now), responseJson, - getHashedPayload(data) - ); + getHashedPayload(data)); LOG.debug("Function successfully executed. Saving record to persistence store with idempotency key: {}", dataRecord.getIdempotencyKey()); updateRecord(dataRecord); @@ -157,8 +159,8 @@ public void saveInProgress(JsonNode data, Instant now, OptionalInt remainingTime OptionalLong inProgressExpirationMsTimestamp = OptionalLong.empty(); if (remainingTimeInMs.isPresent()) { - inProgressExpirationMsTimestamp = - OptionalLong.of(now.plus(remainingTimeInMs.getAsInt(), ChronoUnit.MILLIS).toEpochMilli()); + inProgressExpirationMsTimestamp = OptionalLong + .of(now.plus(remainingTimeInMs.getAsInt(), ChronoUnit.MILLIS).toEpochMilli()); } DataRecord dataRecord = new DataRecord( @@ -167,10 +169,23 @@ public void saveInProgress(JsonNode data, Instant now, OptionalInt remainingTime getExpiryEpochSecond(now), null, getHashedPayload(data), - inProgressExpirationMsTimestamp - ); + inProgressExpirationMsTimestamp); LOG.debug("saving in progress record for idempotency key: {}", dataRecord.getIdempotencyKey()); - putRecord(dataRecord, now); + + try { + putRecord(dataRecord, now); + } catch (IdempotencyItemAlreadyExistsException iaee) { + // Similar to getRecord, we need to call validatePayload before returning a data record. + // PR https://github.com/aws-powertools/powertools-lambda-java/pull/1821 introduced returning a data record + // through IdempotencyItemAlreadyExistsException to save DynamoDB calls when using DDB as store. + Optional dr = iaee.getDataRecord(); + if (dr.isPresent()) { + // throws IdempotencyValidationException if payload validation is enabled and failing + validatePayload(data, dr.get()); + } + + throw iaee; + } } /** @@ -188,7 +203,7 @@ public void deleteRecord(JsonNode data, Throwable throwable) { String idemPotencyKey = hashedIdempotencyKey.get(); LOG.debug("Function raised an exception {}. " + - "Clearing in progress record in persistence store for idempotency key: {}", + "Clearing in progress record in persistence store for idempotency key: {}", throwable.getClass(), idemPotencyKey); @@ -255,9 +270,9 @@ private Optional getHashedIdempotencyKey(JsonNode data) { private boolean isMissingIdemPotencyKey(JsonNode data) { if (data.isContainerNode()) { - Stream stream = - StreamSupport.stream(Spliterators.spliteratorUnknownSize(data.elements(), Spliterator.ORDERED), - false); + Stream stream = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(data.elements(), Spliterator.ORDERED), + false); return stream.allMatch(JsonNode::isNull); } return data.isNull(); diff --git a/powertools-idempotency/powertools-idempotency-core/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java b/powertools-idempotency/powertools-idempotency-core/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java index d5d45c78f..8e46de1cc 100644 --- a/powertools-idempotency/powertools-idempotency-core/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java +++ b/powertools-idempotency/powertools-idempotency-core/src/test/java/software/amazon/lambda/powertools/idempotency/persistence/BasePersistenceStoreTest.java @@ -28,6 +28,7 @@ import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.tests.EventLoader; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.DoubleNode; import com.fasterxml.jackson.databind.node.TextNode; @@ -145,8 +146,8 @@ void saveInProgress_jmespath_NotFound_shouldThrowException() { assertThatThrownBy( () -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty())) - .isInstanceOf(IdempotencyKeyException.class) - .hasMessageContaining("No data found to create a hashed idempotency key"); + .isInstanceOf(IdempotencyKeyException.class) + .hasMessageContaining("No data found to create a hashed idempotency key"); assertThat(status).isEqualTo(-1); } @@ -181,7 +182,7 @@ void saveInProgress_withLocalCache_NotExpired_ShouldThrowException() { assertThatThrownBy( () -> persistenceStore.saveInProgress(JsonConfig.get().getObjectMapper().valueToTree(event), now, OptionalInt.empty())) - .isInstanceOf(IdempotencyItemAlreadyExistsException.class); + .isInstanceOf(IdempotencyItemAlreadyExistsException.class); assertThat(status).isEqualTo(-1); } @@ -243,7 +244,8 @@ void saveSuccess_withCacheEnabled_shouldSaveInCache() throws JsonProcessingExcep DataRecord cachedDr = cache.get("testFunction#8d6a8f173b46479eff55e0997864a514"); assertThat(cachedDr.getStatus()).isEqualTo(DataRecord.Status.COMPLETED); assertThat(cachedDr.getExpiryTimestamp()).isEqualTo(now.plus(3600, ChronoUnit.SECONDS).getEpochSecond()); - assertThat(cachedDr.getResponseData()).isEqualTo(JsonConfig.get().getObjectMapper().writeValueAsString(product)); + assertThat(cachedDr.getResponseData()) + .isEqualTo(JsonConfig.get().getObjectMapper().writeValueAsString(product)); assertThat(cachedDr.getIdempotencyKey()).isEqualTo("testFunction#8d6a8f173b46479eff55e0997864a514"); assertThat(cachedDr.getPayloadHash()).isEmpty(); } @@ -325,6 +327,52 @@ void getRecord_invalidPayload_shouldThrowValidationException() { assertThatThrownBy( () -> persistenceStore.getRecord(JsonConfig.get().getObjectMapper().valueToTree(event), Instant.now())) + .isInstanceOf(IdempotencyValidationException.class); + } + + @Test + void saveInProgress_invalidPayload_shouldThrowValidationException() { + APIGatewayProxyRequestEvent event = EventLoader.loadApiGatewayRestEvent("apigw_event.json"); + persistenceStore = new BasePersistenceStore() { + @Override + public DataRecord getRecord(String idempotencyKey) throws IdempotencyItemNotFoundException { + return new DataRecord(idempotencyKey, DataRecord.Status.INPROGRESS, + Instant.now().plus(3600, ChronoUnit.SECONDS).getEpochSecond(), "Response", "different hash"); + } + + @Override + public void putRecord(DataRecord dataRecord, Instant now) throws IdempotencyItemAlreadyExistsException { + DataRecord existingRecord = new DataRecord( + dataRecord.getIdempotencyKey(), + DataRecord.Status.INPROGRESS, + Instant.now().plus(3600, ChronoUnit.SECONDS).getEpochSecond(), + null, + "different hash"); + throw new IdempotencyItemAlreadyExistsException("Item already exists", new Exception(), existingRecord); + } + + @Override + public void updateRecord(DataRecord dataRecord) { + // Not needed for this test. + } + + @Override + public void deleteRecord(String idempotencyKey) { + // Not needed for this test. + + } + }; + + persistenceStore.configure(IdempotencyConfig.builder() + .withEventKeyJMESPath("powertools_json(body).id") + .withPayloadValidationJMESPath("powertools_json(body).message") + .build(), + "myfunc"); + + Instant now = Instant.now(); + OptionalInt remainingTime = OptionalInt.empty(); + JsonNode eventJson = JsonConfig.get().getObjectMapper().valueToTree(event); + assertThatThrownBy(() -> persistenceStore.saveInProgress(eventJson, now, remainingTime)) .isInstanceOf(IdempotencyValidationException.class); } From 5bf44d885617ecfb1dd471b3f9f72dd4c85e0e0d Mon Sep 17 00:00:00 2001 From: Philipp Page Date: Mon, 3 Nov 2025 17:36:36 +0100 Subject: [PATCH 2/2] Re-throw IdempotencyValidationException instead of wrapping in a persistence layer exception. --- .../powertools/idempotency/internal/IdempotencyHandler.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java b/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java index 77d3d49b0..d4e0d2222 100644 --- a/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java +++ b/powertools-idempotency/powertools-idempotency-core/src/main/java/software/amazon/lambda/powertools/idempotency/internal/IdempotencyHandler.java @@ -109,6 +109,8 @@ private Object processIdempotency() throws Throwable { } } catch (IdempotencyKeyException ike) { throw ike; + } catch (IdempotencyValidationException ive) { + throw ive; } catch (Exception e) { throw new IdempotencyPersistenceLayerException( "Failed to save in progress record to idempotency store. If you believe this is a Powertools for AWS Lambda (Java) bug, please open an issue.",