diff --git a/pom.xml b/pom.xml index 9a562ac39..96538ab75 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,7 @@ spring-cloud-aws-testcontainers spring-cloud-aws-starters/spring-cloud-aws-starter spring-cloud-aws-starters/spring-cloud-aws-starter-dynamodb + spring-cloud-aws-starters/spring-cloud-aws-starter-integration-dynamodb spring-cloud-aws-starters/spring-cloud-aws-starter-metrics spring-cloud-aws-starters/spring-cloud-aws-starter-parameter-store spring-cloud-aws-starters/spring-cloud-aws-starter-s3 diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index 6582cf592..54d88fdd4 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -172,6 +172,12 @@ ${project.version} + + io.awspring.cloud + spring-cloud-aws-starter-integration-dynamodb + ${project.version} + + io.awspring.cloud spring-cloud-aws-starter-imds diff --git a/spring-cloud-aws-dynamodb/pom.xml b/spring-cloud-aws-dynamodb/pom.xml index 16ac9972c..0b4461da7 100644 --- a/spring-cloud-aws-dynamodb/pom.xml +++ b/spring-cloud-aws-dynamodb/pom.xml @@ -18,7 +18,12 @@ org.springframework - spring-core + spring-beans + + + org.springframework.integration + spring-integration-core + true software.amazon.awssdk diff --git a/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbLockRegistry.java b/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbLockRegistry.java new file mode 100644 index 000000000..820c713ed --- /dev/null +++ b/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbLockRegistry.java @@ -0,0 +1,336 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.dynamodb; + +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.springframework.dao.CannotAcquireLockException; +import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.integration.support.locks.DistributedLock; +import org.springframework.integration.support.locks.ExpirableLockRegistry; +import org.springframework.integration.support.locks.RenewableLockRegistry; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException; + +/** + * An {@link ExpirableLockRegistry} and {@link RenewableLockRegistry} implementation for the AWS DynamoDB. The algorithm + * is based on the {@link DynamoDbLockRepository}. + * + * @author Artem Bilan + * @author Karl Lessard + * @author Asiel Caballero + * + * @since 4.0 + */ +public class DynamoDbLockRegistry + implements ExpirableLockRegistry, RenewableLockRegistry { + + private final Map locks = new ConcurrentHashMap<>(); + + private final DynamoDbLockRepository dynamoDbLockRepository; + + private Duration idleBetweenTries = Duration.ofMillis(100); + + private Duration ttl = Duration.ofSeconds(60); + + public DynamoDbLockRegistry(DynamoDbLockRepository dynamoDbLockRepository) { + this.dynamoDbLockRepository = dynamoDbLockRepository; + } + + /** + * Specify a {@link Duration} to sleep between lock record insert/update attempts. Defaults to 100 milliseconds. + * @param idleBetweenTries the {@link Duration} to sleep between insert/update attempts. + */ + public void setIdleBetweenTries(Duration idleBetweenTries) { + this.idleBetweenTries = idleBetweenTries; + } + + /** + * Specify a {@link Duration} for a lock record lease expiration. Defaults to 60 seconds. This property is used as a + * default value {@link Lock} API without {@code ttl} argument. + * @param ttl the {@link Duration} for a lock record lease expiration. + */ + public void setTimeToLive(Duration ttl) { + this.ttl = ttl; + } + + @Override + public DistributedLock obtain(Object lockKey) { + Assert.isInstanceOf(String.class, lockKey, "'lockKey' must of String type"); + return this.locks.computeIfAbsent((String) lockKey, DynamoDbLock::new); + } + + @Override + public void expireUnusedOlderThan(long age) { + long now = System.currentTimeMillis(); + synchronized (this.locks) { + this.locks.entrySet().removeIf(entry -> { + DynamoDbLock lock = entry.getValue(); + return now - lock.lastUsed > age && !lock.isAcquiredInThisProcess(); + }); + } + } + + @Override + public void renewLock(Object lockKey) { + renewLock(lockKey, this.ttl); + } + + @Override + public void renewLock(Object lockKey, Duration ttl) { + String lockId = (String) lockKey; + DynamoDbLock dynamoDbLock = this.locks.get(lockId); + if (dynamoDbLock == null) { + throw new IllegalStateException("Could not find mutex at " + lockId); + } + if (!dynamoDbLock.renew(ttl)) { + throw new IllegalStateException("Could not renew mutex at " + lockId); + } + } + + @Override + public String toString() { + return "DynamoDbLockRegistry{" + "tableName='" + this.dynamoDbLockRepository.getTableName() + '\'' + ", owner='" + + this.dynamoDbLockRepository.getOwner() + '}'; + } + + private final class DynamoDbLock implements DistributedLock { + + private final ReentrantLock delegate = new ReentrantLock(); + + private final String key; + + private volatile long lastUsed = System.currentTimeMillis(); + + private DynamoDbLock(String key) { + this.key = key; + } + + private void rethrowAsLockException(Exception e) { + throw new CannotAcquireLockException("Failed to lock at " + this.key, e); + } + + @Override + public void lock() { + lock(DynamoDbLockRegistry.this.ttl); + } + + @Override + public void lock(Duration ttl) { + this.delegate.lock(); + while (true) { + try { + while (!doLock(ttl)) { + sleepBetweenRetries(); + } + break; + } + catch (TransactionConflictException ex) { + // try again + } + catch (InterruptedException ex) { + /* + * This method must be uninterruptible, so catch and ignore interrupts and only break out of the + * while loop when we get the lock. + */ + } + catch (Exception ex) { + this.delegate.unlock(); + rethrowAsLockException(ex); + } + } + } + + @Override + public void lockInterruptibly() throws InterruptedException { + this.delegate.lockInterruptibly(); + while (true) { + try { + while (!doLock(DynamoDbLockRegistry.this.ttl)) { + sleepBetweenRetries(); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + } + break; + } + catch (TransactionConflictException ex) { + // try again + } + catch (InterruptedException ie) { + this.delegate.unlock(); + Thread.currentThread().interrupt(); + throw ie; + } + catch (Exception e) { + this.delegate.unlock(); + rethrowAsLockException(e); + } + } + } + + @Override + public boolean tryLock() { + try { + return tryLock(0, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return false; + } + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return tryLock(Duration.of(time, unit.toChronoUnit()), DynamoDbLockRegistry.this.ttl); + } + + @Override + public boolean tryLock(Duration waitTime, Duration ttl) throws InterruptedException { + long now = System.currentTimeMillis(); + if (!this.delegate.tryLock(waitTime.toMillis(), TimeUnit.MILLISECONDS)) { + return false; + } + long expire = now + waitTime.toMillis(); + boolean acquired; + while (true) { + try { + while (!(acquired = doLock(ttl)) && System.currentTimeMillis() < expire) { + sleepBetweenRetries(); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + } + if (!acquired) { + this.delegate.unlock(); + } + return acquired; + } + catch (TransactionConflictException ex) { + // try again + } + catch (Exception ex) { + this.delegate.unlock(); + rethrowAsLockException(ex); + } + } + } + + private boolean doLock(Duration ttl) throws InterruptedException { + boolean acquired = DynamoDbLockRegistry.this.dynamoDbLockRepository.acquire(this.key, ttl); + if (acquired) { + this.lastUsed = System.currentTimeMillis(); + } + return acquired; + } + + @Override + public void unlock() { + if (!this.delegate.isHeldByCurrentThread()) { + throw new IllegalMonitorStateException("The current thread doesn't own mutex at '" + this.key + "'"); + } + if (this.delegate.getHoldCount() > 1) { + this.delegate.unlock(); + return; + } + try { + while (true) { + try { + DynamoDbLockRegistry.this.dynamoDbLockRepository.delete(this.key); + return; + } + catch (TransactionConflictException ex) { + // try again + try { + sleepBetweenRetries(); + } + catch (InterruptedException intEx) { + /* + * This method must be uninterruptible, so catch and ignore interrupts and only break out of + * the while loop when we get a 'renewed' result. + */ + } + } + catch (Exception ex) { + throw new DataAccessResourceFailureException("Failed to release mutex at " + this.key, ex); + } + } + } + finally { + this.delegate.unlock(); + } + } + + public boolean renew(Duration ttl) { + if (!this.delegate.isHeldByCurrentThread()) { + throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.key); + } + while (true) { + try { + boolean renewed = DynamoDbLockRegistry.this.dynamoDbLockRepository.renew(this.key, ttl); + if (renewed) { + this.lastUsed = System.currentTimeMillis(); + } + return renewed; + } + catch (TransactionConflictException ex) { + // try again + try { + sleepBetweenRetries(); + } + catch (InterruptedException intEx) { + /* + * This method must be uninterruptible, so catch and ignore interrupts and only break out of the + * while loop when we get a 'renewed' result. + */ + } + } + catch (Exception ex) { + throw new DataAccessResourceFailureException("Failed to renew mutex at " + this.key, ex); + } + } + } + + public boolean isAcquiredInThisProcess() { + return DynamoDbLockRegistry.this.dynamoDbLockRepository.isAcquired(this.key); + } + + private void sleepBetweenRetries() throws InterruptedException { + Thread.sleep(DynamoDbLockRegistry.this.idleBetweenTries.toMillis()); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException("DynamoDb locks don't support conditions."); + } + + @Override + public String toString() { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd@HH:mm:ss.SSS"); + return "DynamoDbLock [lockKey=" + this.key + ",lockedAt=" + dateFormat.format(new Date(this.lastUsed)) + + "]"; + } + + } + +} diff --git a/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbLockRepository.java b/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbLockRepository.java new file mode 100644 index 000000000..db42d9fc9 --- /dev/null +++ b/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbLockRepository.java @@ -0,0 +1,422 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.dynamodb; + +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; +import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.Select; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; + +/** + * Encapsulation of the DynamoDB shunting that is needed for locks. + *

+ * The DynamoDb table must have these attributes: + *

    + *
  • {@link DynamoDbLockRepository#KEY_ATTR} {@link ScalarAttributeType#S} - partition key {@link KeyType#HASH} + *
  • {@link DynamoDbLockRepository#OWNER_ATTR} {@link ScalarAttributeType#S} + *
  • {@link DynamoDbLockRepository#CREATED_ATTR} {@link ScalarAttributeType#N} + *
  • {@link DynamoDbLockRepository#TTL_ATTR} {@link ScalarAttributeType#N} + *
+ * + * @author Artem Bilan + * + * @since 4.0 + */ +public class DynamoDbLockRepository implements InitializingBean, DisposableBean, Closeable { + + /** + * The {@value DEFAULT_TABLE_NAME} default name for the locks table in the DynamoDB. + */ + public static final String DEFAULT_TABLE_NAME = "SpringIntegrationLockRegistry"; + + /** + * The {@value KEY_ATTR} name for the partition key in the table. + */ + public static final String KEY_ATTR = "lockKey"; + + /** + * The {@value OWNER_ATTR} name for the owner of lock in the table. + */ + public static final String OWNER_ATTR = "lockOwner"; + + /** + * The {@value CREATED_ATTR} date for lock item. + */ + public static final String CREATED_ATTR = "createdAt"; + + /** + * The {@value TTL_ATTR} for how long the lock is valid. + */ + public static final String TTL_ATTR = "expireAt"; + + private static final String LOCK_EXISTS_EXPRESSION = String.format("attribute_exists(%s) AND %s = :owner", KEY_ATTR, + OWNER_ATTR); + + private static final String LOCK_NOT_EXISTS_EXPRESSION = String + .format("attribute_not_exists(%s) OR %s = :owner OR %s < :ttl", KEY_ATTR, OWNER_ATTR, TTL_ATTR); + + private static final Log LOGGER = LogFactory.getLog(DynamoDbLockRepository.class); + + private final CountDownLatch createTableLatch = new CountDownLatch(1); + + private final Set heldLocks = Collections.synchronizedSet(new HashSet<>()); + + private final DynamoDbAsyncClient dynamoDB; + + private final String tableName; + + private BillingMode billingMode = BillingMode.PAY_PER_REQUEST; + + private long readCapacity = 1L; + + private long writeCapacity = 1L; + + private String owner = UUID.randomUUID().toString(); + + private Map ownerAttribute; + + private volatile boolean initialized; + + public DynamoDbLockRepository(DynamoDbAsyncClient dynamoDB) { + this(dynamoDB, DEFAULT_TABLE_NAME); + } + + public DynamoDbLockRepository(DynamoDbAsyncClient dynamoDB, String tableName) { + this.dynamoDB = dynamoDB; + this.tableName = tableName; + } + + public void setBillingMode(BillingMode billingMode) { + Assert.notNull(billingMode, "'billingMode' must not be null"); + this.billingMode = billingMode; + } + + public void setReadCapacity(long readCapacity) { + this.readCapacity = readCapacity; + } + + public void setWriteCapacity(long writeCapacity) { + this.writeCapacity = writeCapacity; + } + + /** + * Specify a custom client id (owner) for locks in DB. Must be unique per cluster to avoid interlocking between + * different instances. + * @param owner the client id to be associated with locks handled by the repository. + */ + public void setOwner(String owner) { + this.owner = owner; + } + + public String getTableName() { + return this.tableName; + } + + public String getOwner() { + return this.owner; + } + + @Override + public void afterPropertiesSet() { + + this.dynamoDB.describeTable(request -> request.tableName(this.tableName)).thenRun(() -> { + }).exceptionallyCompose((ex) -> { + Throwable cause = ex.getCause(); + if (cause instanceof ResourceNotFoundException) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No table '" + getTableName() + "'. Creating one..."); + } + return createTable(); + } + else { + return rethrowAsRuntimeException(cause); + } + }).exceptionally((ex) -> { + LOGGER.error("Cannot create DynamoDb table: " + this.tableName, ex.getCause()); + return null; + }).thenRun(this.createTableLatch::countDown); + + this.ownerAttribute = Map.of(":owner", AttributeValue.fromS(this.owner)); + this.initialized = true; + } + + /* + * Creates a DynamoDB table with the right schema for it to be used by this locking library. The table should be set + * up in advance because it takes a few minutes for DynamoDB to provision a new instance. If the table already + * exists, no exception. + */ + private CompletableFuture createTable() { + CreateTableRequest.Builder createTableRequest = CreateTableRequest.builder().tableName(this.tableName) + .keySchema(KeySchemaElement.builder().attributeName(KEY_ATTR).keyType(KeyType.HASH).build()) + .attributeDefinitions(AttributeDefinition.builder().attributeName(KEY_ATTR) + .attributeType(ScalarAttributeType.S).build()) + .billingMode(this.billingMode); + + if (BillingMode.PROVISIONED.equals(this.billingMode)) { + createTableRequest.provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(this.readCapacity).writeCapacityUnits(this.writeCapacity).build()); + } + + return this.dynamoDB.createTable(createTableRequest.build()) + .thenCompose(result -> this.dynamoDB.waiter().waitUntilTableExists( + request -> request.tableName(this.tableName), + waiter -> waiter.maxAttempts(60) + .backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofSeconds(1))))) + .thenCompose((response) -> updateTimeToLive()).thenRun(() -> { + }); + } + + private CompletableFuture updateTimeToLive() { + return this.dynamoDB.updateTimeToLive(ttlRequest -> ttlRequest.tableName(this.tableName) + .timeToLiveSpecification(ttlSpec -> ttlSpec.enabled(true).attributeName(TTL_ATTR))); + } + + private void awaitForActive() { + Assert.state(this.initialized, + () -> "The component has not been initialized: " + this + ".\n Is it declared as a bean?"); + + try { + if (!this.createTableLatch.await(60, TimeUnit.SECONDS)) { + throw new IllegalStateException( + "The DynamoDb table " + getTableName() + " has not been created during " + 60 + " seconds"); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "The DynamoDb table " + getTableName() + " has not been created and waiting thread is interrupted"); + } + } + + /** + * Check if a lock is held by this repository. + * @param lock the lock to check. + * @return acquired or not. + */ + public boolean isAcquired(String lock) { + awaitForActive(); + if (this.heldLocks.contains(lock)) { + Map values = ownerWithTtlValues(currentEpochSeconds()); + values.put(":lock", AttributeValue.fromS(lock)); + + QueryRequest.Builder queryRequest = QueryRequest.builder().tableName(this.tableName).select(Select.COUNT) + .limit(1).keyConditionExpression(KEY_ATTR + " = :lock") + .filterExpression(OWNER_ATTR + " = :owner AND " + TTL_ATTR + " >= :ttl") + .expressionAttributeValues(values); + + try { + return this.dynamoDB.query(queryRequest.build()).get().count() > 0; + } + catch (CompletionException | ExecutionException ex) { + rethrowAsRuntimeException(ex.getCause()); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return rethrowAsRuntimeException(ex); + } + } + return false; + } + + /** + * Remove a lock from this repository. + * @param lock the lock to remove. + */ + public void delete(String lock) { + awaitForActive(); + if (this.heldLocks.remove(lock)) { + deleteFromDb(lock); + } + } + + private void deleteFromDb(String lock) { + doDelete(DeleteItemRequest.builder().key(Map.of(KEY_ATTR, AttributeValue.fromS(lock))) + .conditionExpression(OWNER_ATTR + " = :owner").expressionAttributeValues(this.ownerAttribute)); + } + + private void doDelete(DeleteItemRequest.Builder deleteItemRequest) { + try { + this.dynamoDB.deleteItem(deleteItemRequest.tableName(this.tableName).build()).get(); + } + catch (CompletionException | ExecutionException ex) { + Throwable cause = ex.getCause(); + // Ignore - assuming no record in DB anymore. + if (!(cause instanceof ConditionalCheckFailedException)) { + rethrowAsRuntimeException(cause); + } + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + rethrowAsRuntimeException(ex); + } + } + + /** + * Remove all the expired locks. + */ + public void deleteExpired() { + awaitForActive(); + synchronized (this.heldLocks) { + this.heldLocks.forEach( + (lock) -> doDelete(DeleteItemRequest.builder().key(Map.of(KEY_ATTR, AttributeValue.fromS(lock))) + .conditionExpression(OWNER_ATTR + " = :owner AND " + TTL_ATTR + " < :ttl") + .expressionAttributeValues(ownerWithTtlValues(currentEpochSeconds())))); + this.heldLocks.clear(); + } + } + + private Map ownerWithTtlValues(long epochSeconds) { + Map valueMap = new HashMap<>(); + valueMap.put(":ttl", AttributeValue.fromN("" + epochSeconds)); + valueMap.putAll(this.ownerAttribute); + return valueMap; + } + + /** + * Acquire a lock for a key. + * + * @param lock the key for lock to acquire. + * @param ttl the lease duration for the lock record. + * @return acquired or not. + */ + public boolean acquire(String lock, Duration ttl) throws InterruptedException { + awaitForActive(); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + long currentTime = currentEpochSeconds(); + + Map item = new HashMap<>(); + item.put(KEY_ATTR, AttributeValue.fromS(lock)); + item.put(OWNER_ATTR, AttributeValue.fromS(this.owner)); + item.put(CREATED_ATTR, AttributeValue.fromN("" + currentTime)); + item.put(TTL_ATTR, AttributeValue.fromN("" + ttlEpochSeconds(ttl))); + PutItemRequest.Builder putItemRequest = PutItemRequest.builder().tableName(this.tableName).item(item) + .conditionExpression(LOCK_NOT_EXISTS_EXPRESSION) + .expressionAttributeValues(ownerWithTtlValues(currentTime)); + try { + this.dynamoDB.putItem(putItemRequest.build()).thenRun(() -> this.heldLocks.add(lock)).get(); + return true; + } + catch (CompletionException | ExecutionException ex) { + Throwable cause = ex.getCause(); + if (!(cause instanceof ConditionalCheckFailedException)) { + rethrowAsRuntimeException(cause); + } + return false; + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw ex; + } + } + + /** + * Renew the lease for a lock. + * + * @param lock the lock to renew. + * @param ttl a new lease duration + * @return renewed or not. + */ + public boolean renew(String lock, Duration ttl) { + awaitForActive(); + if (this.heldLocks.contains(lock)) { + UpdateItemRequest.Builder updateItemRequest = UpdateItemRequest.builder().tableName(this.tableName) + .key(Map.of(KEY_ATTR, AttributeValue.fromS(lock))).updateExpression("SET " + TTL_ATTR + " = :ttl") + .conditionExpression(LOCK_EXISTS_EXPRESSION) + .expressionAttributeValues(ownerWithTtlValues(ttlEpochSeconds(ttl))); + try { + this.dynamoDB.updateItem(updateItemRequest.build()).get(); + return true; + } + catch (CompletionException | ExecutionException ex) { + Throwable cause = ex.getCause(); + if (!(cause instanceof ConditionalCheckFailedException)) { + rethrowAsRuntimeException(cause); + } + return false; + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return rethrowAsRuntimeException(ex.getCause()); + } + } + return false; + } + + @Override + public void destroy() { + close(); + } + + @Override + public void close() { + synchronized (this.heldLocks) { + this.heldLocks.forEach(this::deleteFromDb); + this.heldLocks.clear(); + } + } + + private long ttlEpochSeconds(Duration ttl) { + return Instant.now().plus(ttl).getEpochSecond(); + } + + private static long currentEpochSeconds() { + return Instant.now().getEpochSecond(); + } + + private static T rethrowAsRuntimeException(Throwable cause) { + if (cause instanceof RuntimeException runtimeException) { + throw runtimeException; + } + else { + throw new IllegalStateException(cause); + } + } + +} diff --git a/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbMetadataStore.java b/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbMetadataStore.java new file mode 100644 index 000000000..80770b4a6 --- /dev/null +++ b/spring-cloud-aws-dynamodb/src/main/java/io/awspring/cloud/dynamodb/DynamoDbMetadataStore.java @@ -0,0 +1,374 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.dynamodb; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.integration.metadata.ConcurrentMetadataStore; +import org.springframework.util.Assert; +import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateTimeToLiveRequest; + +/** + * The {@link ConcurrentMetadataStore} for the {@link DynamoDbAsyncClient}. + * + * @author Artem Bilan + * @author Asiel Caballero + * + * @since 4.0 + */ +public class DynamoDbMetadataStore implements ConcurrentMetadataStore, InitializingBean { + + private static final Log logger = LogFactory.getLog(DynamoDbMetadataStore.class); + + /** + * The {@value DEFAULT_TABLE_NAME} default name for the metadata table in the DynamoDB. + */ + public static final String DEFAULT_TABLE_NAME = "SpringIntegrationMetadataStore"; + + /** + * The {@value KEY} as a default name for a partition key in the table. + */ + public static final String KEY = "metadataKey"; + + /** + * The {@value VALUE} as a default name for value attribute. + */ + public static final String VALUE = "metadataValue"; + + /** + * The {@value TTL} as a default name for time-to-live attribute. + */ + public static final String TTL = "expireAt"; + + private static final String KEY_NOT_EXISTS_EXPRESSION = String.format("attribute_not_exists(%s)", KEY); + + private final DynamoDbAsyncClient dynamoDB; + + private final String tableName; + + private final CountDownLatch createTableLatch = new CountDownLatch(1); + + private int createTableRetries = 25; + + private int createTableDelay = 1; + + private BillingMode billingMode = BillingMode.PAY_PER_REQUEST; + + private long readCapacity = 1L; + + private long writeCapacity = 1L; + + private Integer timeToLive; + + private volatile boolean initialized; + + public DynamoDbMetadataStore(DynamoDbAsyncClient dynamoDB) { + this(dynamoDB, DEFAULT_TABLE_NAME); + } + + public DynamoDbMetadataStore(DynamoDbAsyncClient dynamoDB, String tableName) { + Assert.notNull(dynamoDB, "'dynamoDB' must not be null."); + Assert.hasText(tableName, "'tableName' must not be empty."); + this.dynamoDB = dynamoDB; + this.tableName = tableName; + + } + + public void setCreateTableRetries(int createTableRetries) { + this.createTableRetries = createTableRetries; + } + + public void setCreateTableDelay(int createTableDelay) { + this.createTableDelay = createTableDelay; + } + + public void setBillingMode(BillingMode billingMode) { + Assert.notNull(billingMode, "'billingMode' must not be null"); + this.billingMode = billingMode; + } + + public void setReadCapacity(long readCapacity) { + this.readCapacity = readCapacity; + } + + public void setWriteCapacity(long writeCapacity) { + this.writeCapacity = writeCapacity; + } + + /** + * Configure a period in seconds for item expiration. If it is configured to non-positive value ({@code <= 0}), the + * TTL is disabled on the table. + * @param timeToLive period in seconds for item expiration. + * @since 2.0 + * @see DynamoDB TTL + */ + public void setTimeToLive(int timeToLive) { + this.timeToLive = timeToLive; + } + + @Override + public void afterPropertiesSet() { + this.dynamoDB.describeTable(request -> request.tableName(this.tableName)).thenRun(() -> { + }).exceptionallyCompose((ex) -> { + Throwable cause = ex.getCause(); + if (cause instanceof ResourceNotFoundException) { + if (logger.isInfoEnabled()) { + logger.info("No table '" + this.tableName + "'. Creating one..."); + } + return createTable(); + } + else { + return rethrowAsRuntimeException(cause); + } + }).thenCompose(result -> updateTimeToLiveIfAny()).exceptionally((ex) -> { + logger.error("Cannot create DynamoDb table: " + this.tableName, ex.getCause()); + return null; + }).thenRun(this.createTableLatch::countDown); + + this.initialized = true; + } + + private CompletableFuture createTable() { + CreateTableRequest.Builder createTableRequest = CreateTableRequest.builder().tableName(this.tableName) + .keySchema(KeySchemaElement.builder().attributeName(KEY).keyType(KeyType.HASH).build()) + .attributeDefinitions( + AttributeDefinition.builder().attributeName(KEY).attributeType(ScalarAttributeType.S).build()) + .billingMode(this.billingMode); + + if (BillingMode.PROVISIONED.equals(this.billingMode)) { + createTableRequest.provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(this.readCapacity).writeCapacityUnits(this.writeCapacity).build()); + } + + return this.dynamoDB.createTable(createTableRequest.build()) + .thenCompose(result -> this.dynamoDB.waiter().waitUntilTableExists( + request -> request.tableName(this.tableName), + waiter -> waiter.maxAttempts(this.createTableRetries).backoffStrategy( + FixedDelayBackoffStrategy.create(Duration.ofSeconds(this.createTableDelay))))) + .thenRun(() -> { + }); + } + + private CompletableFuture updateTimeToLiveIfAny() { + if (this.timeToLive != null) { + UpdateTimeToLiveRequest.Builder updateTimeToLiveRequest = UpdateTimeToLiveRequest.builder() + .tableName(this.tableName) + .timeToLiveSpecification(ttl -> ttl.attributeName(TTL).enabled(this.timeToLive > 0)); + + return this.dynamoDB.updateTimeToLive(updateTimeToLiveRequest.build()).exceptionally((ex) -> { + if (logger.isWarnEnabled()) { + logger.warn("The error during 'updateTimeToLive' request", ex); + } + return null; + }); + } + + return CompletableFuture.completedFuture(null); + } + + private void awaitForActive() { + Assert.state(this.initialized, + () -> "The component has not been initialized: " + this + ".\n Is it declared as a bean?"); + try { + this.createTableLatch.await(this.createTableRetries * this.createTableDelay, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("The DynamoDb table " + this.tableName + " has not been created during " + + this.createTableRetries * this.createTableDelay + " seconds"); + } + } + + @Override + public void put(String key, String value) { + Assert.hasText(key, "'key' must not be empty."); + Assert.hasText(value, "'value' must not be empty."); + + awaitForActive(); + + Map attributes = new HashMap<>(); + attributes.put(KEY, AttributeValue.fromS(key)); + attributes.put(VALUE, AttributeValue.fromS(value)); + + if (this.timeToLive != null && this.timeToLive > 0) { + attributes.put(TTL, AttributeValue.fromN("" + Instant.now().plusMillis(this.timeToLive).getEpochSecond())); + } + + PutItemRequest.Builder putItemRequest = PutItemRequest.builder().tableName(this.tableName).item(attributes); + + this.dynamoDB.putItem(putItemRequest.build()).join(); + } + + @Override + public String get(String key) { + Assert.hasText(key, "'key' must not be empty."); + + awaitForActive(); + + try { + return this.dynamoDB + .getItem(request -> request.tableName(this.tableName).key(Map.of(KEY, AttributeValue.fromS(key)))) + .thenApply(GetItemResponse::item).thenApply(DynamoDbMetadataStore::getValueIfAny).join(); + } + catch (CompletionException ex) { + return rethrowAsRuntimeException(ex.getCause()); + } + } + + @Override + public String putIfAbsent(String key, String value) { + Assert.hasText(key, "'key' must not be empty."); + Assert.hasText(value, "'value' must not be empty."); + + awaitForActive(); + + Map attributes = new HashMap<>(); + attributes.put(":value", AttributeValue.fromS(value)); + + String updateExpression = "SET " + VALUE + " = :value"; + + if (this.timeToLive != null && this.timeToLive > 0) { + updateExpression += ", " + TTL + " = :ttl"; + attributes.put(":ttl", + AttributeValue.fromN("" + Instant.now().plusMillis(this.timeToLive).getEpochSecond())); + } + + UpdateItemRequest.Builder updateItemRequest = UpdateItemRequest.builder().tableName(this.tableName) + .key(Map.of(KEY, AttributeValue.fromS(key))).conditionExpression(KEY_NOT_EXISTS_EXPRESSION) + .updateExpression(updateExpression).expressionAttributeValues(attributes); + + try { + this.dynamoDB.updateItem(updateItemRequest.build()).join(); + return null; + } + catch (CompletionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof ConditionalCheckFailedException) { + return get(key); + } + else { + return rethrowAsRuntimeException(cause); + } + } + } + + @Override + public boolean replace(String key, String oldValue, String newValue) { + Assert.hasText(key, "'key' must not be empty."); + Assert.hasText(oldValue, "'value' must not be empty."); + Assert.hasText(newValue, "'newValue' must not be empty."); + + awaitForActive(); + + Map attributes = new HashMap<>(); + attributes.put(":newValue", AttributeValue.fromS(newValue)); + attributes.put(":oldValue", AttributeValue.fromS(oldValue)); + + String updateExpression = "SET " + VALUE + " = :newValue"; + + if (this.timeToLive != null && this.timeToLive > 0) { + updateExpression += ", " + TTL + " = :ttl"; + attributes.put(":ttl", + AttributeValue.fromN("" + Instant.now().plusMillis(this.timeToLive).getEpochSecond())); + } + + UpdateItemRequest.Builder updateItemRequest = UpdateItemRequest.builder().tableName(this.tableName) + .key(Map.of(KEY, AttributeValue.fromS(key))).conditionExpression(VALUE + " = :oldValue") + .updateExpression(updateExpression).expressionAttributeValues(attributes) + .returnValues(ReturnValue.UPDATED_NEW); + + try { + return this.dynamoDB.updateItem(updateItemRequest.build()).join().hasAttributes(); + } + catch (CompletionException ex) { + if (ex.getCause() instanceof ConditionalCheckFailedException) { + return false; + } + else { + return rethrowAsRuntimeException(ex.getCause()); + } + } + } + + @Override + public String remove(String key) { + Assert.hasText(key, "'key' must not be empty."); + + awaitForActive(); + + try { + return this.dynamoDB + .deleteItem(request -> request.tableName(this.tableName).key(Map.of(KEY, AttributeValue.fromS(key))) + .returnValues(ReturnValue.ALL_OLD)) + .thenApply(DeleteItemResponse::attributes).thenApply(DynamoDbMetadataStore::getValueIfAny).join(); + } + catch (CompletionException ex) { + return rethrowAsRuntimeException(ex.getCause()); + } + } + + private static String getValueIfAny(Map item) { + if (item.containsKey(VALUE)) { + return item.get(VALUE).s(); + } + else { + return null; + } + } + + private static T rethrowAsRuntimeException(Throwable cause) { + if (cause instanceof RuntimeException runtimeException) { + throw runtimeException; + } + else { + throw new IllegalStateException(cause); + } + } + + @Override + public String toString() { + return "DynamoDbMetadataStore{" + "table=" + this.tableName + ", createTableRetries=" + this.createTableRetries + + ", createTableDelay=" + this.createTableDelay + ", billingMode=" + this.billingMode + + ", readCapacity=" + this.readCapacity + ", writeCapacity=" + this.writeCapacity + ", timeToLive=" + + this.timeToLive + '}'; + } +} diff --git a/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbLockRegistryTests.java b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbLockRegistryTests.java new file mode 100644 index 000000000..90a200fb9 --- /dev/null +++ b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbLockRegistryTests.java @@ -0,0 +1,375 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.dynamodb; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.test.util.ReflectionTestUtils; +import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +/** + * @author Artem Bilan + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +class DynamoDbLockRegistryTests implements LocalstackContainerTest { + + private static DynamoDbAsyncClient DYNAMO_DB; + + private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); + + @Autowired + private DynamoDbLockRepository dynamoDbLockRepository; + + @Autowired + private DynamoDbLockRegistry dynamoDbLockRegistry; + + @BeforeAll + static void setup() { + DYNAMO_DB = LocalstackContainerTest.dynamoDbAsyncClient(); + try { + DYNAMO_DB.deleteTable(request -> request.tableName(DynamoDbLockRepository.DEFAULT_TABLE_NAME)) + .thenCompose( + result -> DYNAMO_DB.waiter().waitUntilTableNotExists( + request -> request.tableName(DynamoDbLockRepository.DEFAULT_TABLE_NAME), + waiter -> waiter.maxAttempts(25) + .backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofSeconds(1))))) + .get(); + } + catch (Exception e) { + // Ignore + } + } + + @BeforeEach + void clear() throws InterruptedException { + CountDownLatch createTableLatch = (CountDownLatch) ReflectionTestUtils.getField(this.dynamoDbLockRepository, + "createTableLatch"); + + createTableLatch.await(); + this.dynamoDbLockRepository.close(); + } + + @Test + void lock() { + for (int i = 0; i < 10; i++) { + Lock lock = this.dynamoDbLockRegistry.obtain("foo"); + lock.lock(); + try { + assertThat((Set) ReflectionTestUtils.getField(this.dynamoDbLockRepository, "heldLocks")).hasSize(1); + } + finally { + lock.unlock(); + } + } + } + + @Test + void lockInterruptibly() throws Exception { + for (int i = 0; i < 10; i++) { + Lock lock = this.dynamoDbLockRegistry.obtain("foo"); + lock.lockInterruptibly(); + try { + assertThat((Set) ReflectionTestUtils.getField(this.dynamoDbLockRepository, "heldLocks")).hasSize(1); + } + finally { + lock.unlock(); + } + } + } + + @Test + void reentrantLock() { + for (int i = 0; i < 10; i++) { + Lock lock1 = this.dynamoDbLockRegistry.obtain("foo"); + lock1.lock(); + try { + Lock lock2 = this.dynamoDbLockRegistry.obtain("foo"); + assertThat(lock1).isSameAs(lock2); + lock2.lock(); + lock2.unlock(); + } + finally { + lock1.unlock(); + } + } + } + + @Test + void reentrantLockInterruptibly() throws Exception { + for (int i = 0; i < 10; i++) { + Lock lock1 = this.dynamoDbLockRegistry.obtain("foo"); + lock1.lockInterruptibly(); + try { + Lock lock2 = this.dynamoDbLockRegistry.obtain("foo"); + assertThat(lock1).isSameAs(lock2); + lock2.lockInterruptibly(); + lock2.unlock(); + } + finally { + lock1.unlock(); + } + } + } + + @Test + void twoLocks() throws Exception { + for (int i = 0; i < 10; i++) { + Lock lock1 = this.dynamoDbLockRegistry.obtain("foo"); + lock1.lockInterruptibly(); + try { + Lock lock2 = this.dynamoDbLockRegistry.obtain("bar"); + assertThat(lock1).isNotSameAs(lock2); + lock2.lockInterruptibly(); + lock2.unlock(); + } + finally { + lock1.unlock(); + } + } + } + + @Test + void twoThreadsSecondFailsToGetLock() throws Exception { + final Lock lock1 = this.dynamoDbLockRegistry.obtain("foo"); + lock1.lockInterruptibly(); + final AtomicBoolean locked = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + Future result = this.taskExecutor.submit(() -> { + DynamoDbLockRepository dynamoDbLockRepository = new DynamoDbLockRepository(DYNAMO_DB); + dynamoDbLockRepository.afterPropertiesSet(); + DynamoDbLockRegistry registry2 = new DynamoDbLockRegistry(dynamoDbLockRepository); + registry2.setIdleBetweenTries(Duration.ofMillis(10)); + registry2.setTimeToLive(Duration.ofSeconds(10)); + Lock lock2 = registry2.obtain("foo"); + locked.set(lock2.tryLock()); + latch.countDown(); + try { + lock2.unlock(); + } + catch (Exception e) { + return e; + } + finally { + dynamoDbLockRepository.close(); + } + return null; + }); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(locked.get()).isFalse(); + lock1.unlock(); + Object ise = result.get(10, TimeUnit.SECONDS); + assertThat(ise).isInstanceOf(IllegalMonitorStateException.class); + assertThat(((Exception) ise).getMessage()).contains("The current thread doesn't own mutex at 'foo'"); + } + + @Test + void twoThreads() throws Exception { + final Lock lock1 = this.dynamoDbLockRegistry.obtain("foo"); + final AtomicBoolean locked = new AtomicBoolean(); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch3 = new CountDownLatch(1); + lock1.lockInterruptibly(); + this.taskExecutor.submit(() -> { + DynamoDbLockRepository dynamoDbLockRepository = new DynamoDbLockRepository(DYNAMO_DB); + dynamoDbLockRepository.afterPropertiesSet(); + DynamoDbLockRegistry registry2 = new DynamoDbLockRegistry(dynamoDbLockRepository); + registry2.setIdleBetweenTries(Duration.ofMillis(10)); + registry2.setTimeToLive(Duration.ofSeconds(10)); + Lock lock2 = registry2.obtain("foo"); + try { + latch1.countDown(); + lock2.lockInterruptibly(); + latch2.await(10, TimeUnit.SECONDS); + locked.set(true); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + lock2.unlock(); + latch3.countDown(); + dynamoDbLockRepository.close(); + } + return null; + }); + + assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(locked.get()).isFalse(); + + lock1.unlock(); + latch2.countDown(); + + assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(locked.get()).isTrue(); + } + + @Test + void twoThreadsDifferentRegistries() throws Exception { + DynamoDbLockRepository dynamoDbLockRepository = new DynamoDbLockRepository(DYNAMO_DB); + dynamoDbLockRepository.afterPropertiesSet(); + final DynamoDbLockRegistry registry1 = new DynamoDbLockRegistry(dynamoDbLockRepository); + registry1.setIdleBetweenTries(Duration.ofMillis(10)); + registry1.setTimeToLive(Duration.ofSeconds(10)); + + final DynamoDbLockRegistry registry2 = new DynamoDbLockRegistry(dynamoDbLockRepository); + registry2.setIdleBetweenTries(Duration.ofMillis(10)); + registry2.setTimeToLive(Duration.ofSeconds(10)); + + final Lock lock1 = registry1.obtain("foo"); + final AtomicBoolean locked = new AtomicBoolean(); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch3 = new CountDownLatch(1); + lock1.lockInterruptibly(); + this.taskExecutor.execute(() -> { + Lock lock2 = registry2.obtain("foo"); + try { + latch1.countDown(); + lock2.lockInterruptibly(); + latch2.await(10, TimeUnit.SECONDS); + locked.set(true); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + lock2.unlock(); + latch3.countDown(); + } + }); + assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(locked.get()).isFalse(); + + lock1.unlock(); + latch2.countDown(); + + assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(locked.get()).isTrue(); + + dynamoDbLockRepository.close(); + } + + @Test + void twoThreadsWrongOneUnlocks() throws Exception { + final Lock lock = this.dynamoDbLockRegistry.obtain("foo"); + lock.lockInterruptibly(); + final AtomicBoolean locked = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + Future result = this.taskExecutor.submit(() -> { + try { + lock.unlock(); + } + catch (Exception e) { + latch.countDown(); + return e; + } + return null; + }); + + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(locked.get()).isFalse(); + + lock.unlock(); + Object imse = result.get(10, TimeUnit.SECONDS); + assertThat(imse).isInstanceOf(IllegalMonitorStateException.class); + assertThat(((Exception) imse).getMessage()).contains("The current thread doesn't own mutex at 'foo'"); + } + + @Test + void abandonedLock() throws Exception { + DynamoDbLockRepository dynamoDbLockRepository = new DynamoDbLockRepository(DYNAMO_DB); + dynamoDbLockRepository.afterPropertiesSet(); + this.dynamoDbLockRepository.acquire("foo", Duration.ofSeconds(10)); + + Lock lock = this.dynamoDbLockRegistry.obtain("foo"); + int n = 0; + while (!lock.tryLock() && n++ < 100) { + Thread.sleep(100); + } + + assertThat(n).isLessThan(100); + lock.unlock(); + dynamoDbLockRepository.close(); + } + + @Test + void lockRenew() { + final Lock lock = this.dynamoDbLockRegistry.obtain("foo"); + + assertThat(lock.tryLock()).isTrue(); + try { + this.dynamoDbLockRegistry.setTimeToLive(Duration.ofSeconds(60)); + assertThatNoException().isThrownBy(() -> this.dynamoDbLockRegistry.renewLock("foo")); + String ttl = DYNAMO_DB + .getItem(request -> request.tableName(DynamoDbLockRepository.DEFAULT_TABLE_NAME) + .key(Map.of(DynamoDbLockRepository.KEY_ATTR, AttributeValue.fromS("foo")))) + .join().item().get(DynamoDbLockRepository.TTL_ATTR).n(); + assertThat(Long.parseLong(ttl)).isCloseTo(LocalDateTime.now().plusSeconds(60).toEpochSecond(ZoneOffset.UTC), + Percentage.withPercentage(10)); + } + finally { + lock.unlock(); + this.dynamoDbLockRegistry.setTimeToLive(Duration.ofSeconds(2)); + } + } + + @Configuration(proxyBeanMethods = false) + public static class ContextConfiguration { + + @Bean + public DynamoDbLockRepository dynamoDbLockRepository() { + return new DynamoDbLockRepository(DYNAMO_DB); + } + + @Bean + public DynamoDbLockRegistry dynamoDbLockRegistry(DynamoDbLockRepository dynamoDbLockRepository) { + DynamoDbLockRegistry dynamoDbLockRegistry = new DynamoDbLockRegistry(dynamoDbLockRepository); + dynamoDbLockRegistry.setIdleBetweenTries(Duration.ofMillis(100)); + dynamoDbLockRegistry.setTimeToLive(Duration.ofSeconds(2)); + return dynamoDbLockRegistry; + } + + } + +} diff --git a/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbMetadataStoreTests.java b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbMetadataStoreTests.java new file mode 100644 index 000000000..74b4403e6 --- /dev/null +++ b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbMetadataStoreTests.java @@ -0,0 +1,141 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.dynamodb; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; +import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +/** + * @author Artem Bilan + * + * @since 4.0 + */ +class DynamoDbMetadataStoreTests implements LocalstackContainerTest { + + private static final String TEST_TABLE = "testMetadataStore"; + + private static DynamoDbAsyncClient DYNAMO_DB; + + private static DynamoDbMetadataStore store; + + private final String file1 = "/remotepath/filesTodownload/file-1.txt"; + + private final String file1Id = "12345"; + + @BeforeAll + static void setup() { + DYNAMO_DB = LocalstackContainerTest.dynamoDbAsyncClient(); + try { + DYNAMO_DB.deleteTable(request -> request.tableName(TEST_TABLE)) + .thenCompose( + result -> DYNAMO_DB.waiter().waitUntilTableNotExists( + request -> request.tableName(DynamoDbLockRepository.DEFAULT_TABLE_NAME), + waiter -> waiter.maxAttempts(25) + .backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofSeconds(1))))) + .join(); + } + catch (Exception e) { + // Ignore if the table does not exist + } + + store = new DynamoDbMetadataStore(DYNAMO_DB, TEST_TABLE); + store.setTimeToLive(10); + store.afterPropertiesSet(); + } + + @BeforeEach + void clear() throws InterruptedException { + CountDownLatch createTableLatch = (CountDownLatch) ReflectionTestUtils.getField(store, "createTableLatch"); + + createTableLatch.await(); + + DYNAMO_DB.deleteItem(request -> request.tableName(TEST_TABLE) + .key(Map.of(DynamoDbMetadataStore.KEY, AttributeValue.fromS((this.file1))))).join(); + } + + @Test + void getFromStore() { + String fileID = store.get(this.file1); + assertThat(fileID).isNull(); + + store.put(this.file1, this.file1Id); + + fileID = store.get(this.file1); + assertThat(fileID).isNotNull(); + assertThat(fileID).isEqualTo(this.file1Id); + } + + @Test + void putIfAbsent() { + String fileID = store.get(this.file1); + assertThat(fileID).describedAs("Get First time, Value must not exist").isNull(); + + fileID = store.putIfAbsent(this.file1, this.file1Id); + assertThat(fileID).describedAs("Insert First time, Value must return null").isNull(); + + fileID = store.putIfAbsent(this.file1, "56789"); + assertThat(fileID).describedAs("Key Already Exists - Insertion Failed, ol value must be returned").isNotNull(); + assertThat(fileID).describedAs("The Old Value must be equal to returned").isEqualTo(this.file1Id); + + assertThat(store.get(this.file1)).describedAs("The Old Value must return").isEqualTo(this.file1Id); + } + + @Test + void remove() { + String fileID = store.remove(this.file1); + assertThat(fileID).isNull(); + + fileID = store.putIfAbsent(this.file1, this.file1Id); + assertThat(fileID).isNull(); + + fileID = store.remove(this.file1); + assertThat(fileID).isNotNull(); + assertThat(fileID).isEqualTo(this.file1Id); + + fileID = store.get(this.file1); + assertThat(fileID).isNull(); + } + + @Test + void replace() { + boolean removedValue = store.replace(this.file1, this.file1Id, "4567"); + assertThat(removedValue).isFalse(); + + String fileID = store.get(this.file1); + assertThat(fileID).isNull(); + + fileID = store.putIfAbsent(this.file1, this.file1Id); + assertThat(fileID).isNull(); + + removedValue = store.replace(this.file1, this.file1Id, "4567"); + assertThat(removedValue).isTrue(); + + fileID = store.get(this.file1); + assertThat(fileID).isNotNull(); + assertThat(fileID).isEqualTo("4567"); + } + +} diff --git a/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbTemplateIntegrationTest.java b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbTemplateIntegrationTest.java index f95bb4bf2..20bcae848 100644 --- a/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbTemplateIntegrationTest.java +++ b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/DynamoDbTemplateIntegrationTest.java @@ -26,18 +26,11 @@ import org.junit.jupiter.params.provider.MethodSource; import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.enhanced.dynamodb.*; import software.amazon.awssdk.enhanced.dynamodb.model.PageIterable; import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional; import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest; import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.*; @@ -46,9 +39,9 @@ * * @author Matej Nedic * @author Arun Patra + * @author Artem Bilan */ -@Testcontainers -public class DynamoDbTemplateIntegrationTest { +public class DynamoDbTemplateIntegrationTest implements LocalstackContainerTest { private static DynamoDbTable dynamoDbTable; private static DynamoDbTable prefixedDynamoDbTable; @@ -57,17 +50,9 @@ public class DynamoDbTemplateIntegrationTest { private static final String indexName = "gsiPersonEntityTable"; private static final String nameOfGSPK = "gsPk"; - @Container - static LocalStackContainer localstack = new LocalStackContainer( - DockerImageName.parse("localstack/localstack:4.4.0")); - @BeforeAll public static void createTable() { - DynamoDbClient dynamoDbClient = DynamoDbClient.builder().endpointOverride(localstack.getEndpoint()) - .region(Region.of(localstack.getRegion())) - .credentialsProvider(StaticCredentialsProvider - .create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) - .build(); + DynamoDbClient dynamoDbClient = LocalstackContainerTest.dynamoDbClient(); DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder().dynamoDbClient(dynamoDbClient).build(); dynamoDbTemplate = new DynamoDbTemplate(enhancedClient); diff --git a/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/LocalstackContainerTest.java b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/LocalstackContainerTest.java new file mode 100644 index 000000000..d014294c1 --- /dev/null +++ b/spring-cloud-aws-dynamodb/src/test/java/io/awspring/cloud/dynamodb/LocalstackContainerTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.dynamodb; + +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; + +/** + * The base contract for JUnit tests based on the container for Localstack. The Testcontainers 'reuse' option must be + * disabled, so, Ryuk container is started and will clean all the running containers from this test suite after JVM exit. + *

+ * Since the Localstack container instance is shared via static property, it is going to be started only once per JVM; + * therefore, the target Docker container is reused automatically. + * + * @author Artem Bilan + * + * @since 4.0 + */ +@Testcontainers(disabledWithoutDocker = true) +public interface LocalstackContainerTest { + + LocalStackContainer LOCAL_STACK_CONTAINER = + new LocalStackContainer(DockerImageName.parse("localstack/localstack:4.4.0")); + + @BeforeAll + static void startContainer() { + LOCAL_STACK_CONTAINER.start(); + } + + static DynamoDbClient dynamoDbClient() { + return applyAwsClientOptions(DynamoDbClient.builder()); + } + + static DynamoDbAsyncClient dynamoDbAsyncClient() { + return applyAwsClientOptions(DynamoDbAsyncClient.builder()); + } + + static AwsCredentialsProvider credentialsProvider() { + return StaticCredentialsProvider.create( + AwsBasicCredentials.create(LOCAL_STACK_CONTAINER.getAccessKey(), LOCAL_STACK_CONTAINER.getSecretKey())); + } + + private static , T> T applyAwsClientOptions(B clientBuilder) { + return clientBuilder.region(Region.of(LOCAL_STACK_CONTAINER.getRegion())) + .credentialsProvider(credentialsProvider()).endpointOverride(LOCAL_STACK_CONTAINER.getEndpoint()) + .build(); + } + +} diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-dynamodb/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-dynamodb/pom.xml new file mode 100644 index 000000000..e52ea6366 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-dynamodb/pom.xml @@ -0,0 +1,31 @@ + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + Spring Cloud AWS Starter for Spring Integration with DynamoDb + spring-cloud-aws-starter-integration-dynamodb + + + + io.awspring.cloud + spring-cloud-aws-dynamodb + + + io.awspring.cloud + spring-cloud-aws-starter + + + org.springframework.integration + spring-integration-core + + + +