From b8fd9cfddf0c9cae72745a4959bf9320dc1b818f Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 22 Oct 2025 17:13:43 -0400 Subject: [PATCH] Add Kinesis Producer Library with Spring Integration * Add `KplMessageHandler` - a `MessageHandler` implementation to produce into Kinesis via KPL * Add dedicated to Spring Integration with KPL starter - `spring-cloud-aws-starter-integration-kinesis-producer` * Document the `KplMessageHandler` --- docs/src/main/asciidoc/kinesis.adoc | 34 ++ pom.xml | 1 + spring-cloud-aws-dependencies/pom.xml | 5 + .../integration/KplBackpressureException.java | 50 +++ .../integration/KplMessageHandler.java | 424 ++++++++++++++++++ .../integration/KinesisIntegrationTests.java | 6 +- .../integration/KplMessageHandlerTests.java | 168 +++++++ .../pom.xml | 35 ++ 8 files changed, 720 insertions(+), 3 deletions(-) create mode 100644 spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplBackpressureException.java create mode 100644 spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplMessageHandler.java create mode 100644 spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KplMessageHandlerTests.java create mode 100644 spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer/pom.xml diff --git a/docs/src/main/asciidoc/kinesis.adoc b/docs/src/main/asciidoc/kinesis.adoc index c8c090d13..a5bf039e1 100644 --- a/docs/src/main/asciidoc/kinesis.adoc +++ b/docs/src/main/asciidoc/kinesis.adoc @@ -114,8 +114,42 @@ public PollableChannel errorChannel() { } ---- +The `KplMessageHandler` is an `AbstractMessageHandler` to perform put record to the Kinesis stream using https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html[Kinesis Producer Library (KPL)]. +The configuration and behavior are similar to the `KinesisMessageHandler` described above, with the difference that it supports only a single `UserRecord` production according to KPL API. +The request message payload could be as a `UserRecord`. +Otherwise, such an instance is built against request messages and respective `KplMessageHandler` options, include https://docs.aws.amazon.com/streams/latest/dev/kpl-with-schemaregistry.html[AWS Glue Schema] for the record serialization. + +Due to asynchronous behavior and buffer of the `KinesisProducer`, a `KplBackpressureException` could be thrown from the `KplMessageHandler` when `backPressureThreshold` as the number of outstanding records is provided. + +The configuration of the `KplMessageHandler` is straightforward: + +[source,java] +---- +@Bean +RequestHandlerRetryAdvice retryAdvice() { + RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); + requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder() + .retryOn(KplBackpressureException.class) + .exponentialBackoff(100, 2.0, 1000) + .maxAttempts(3) + .build()); + return requestHandlerRetryAdvice; +} + +@Bean +@ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = "retryAdvice") +MessageHandler kplMessageHandler(KinesisProducer kinesisProducer, Schema schema) { + KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); + kplMessageHandler.setAsync(true); + kplMessageHandler.setStream("someStream"); + kplMessageHandler.setBackPressureThreshold(2); + kplMessageHandler.setGlueSchema(schema); + return kplMessageHandler; +} +---- === Spring Integration Starters The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used. For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client. +The `spring-cloud-aws-starter-integration-kinesis-producer` artifact is dedicated for dependencies related to the Kinesis Producer Library. diff --git a/pom.xml b/pom.xml index bb7cb54c4..114fb9c3b 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ spring-cloud-aws-dynamodb spring-cloud-aws-kinesis spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis + spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer spring-cloud-aws-s3 spring-cloud-aws-testcontainers spring-cloud-aws-starters/spring-cloud-aws-starter diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index eea04bba6..7a17dd67c 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -123,6 +123,11 @@ spring-cloud-aws-starter-integration-kinesis ${project.version} + + io.awspring.cloud + spring-cloud-aws-starter-integration-kinesis-producer + ${project.version} + io.awspring.cloud diff --git a/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplBackpressureException.java b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplBackpressureException.java new file mode 100644 index 000000000..b21248733 --- /dev/null +++ b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplBackpressureException.java @@ -0,0 +1,50 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.kinesis.integration; + +import java.io.Serial; +import software.amazon.kinesis.producer.UserRecord; + +/** + * An exception triggered from the {@link KplMessageHandler} while sending records to Kinesis when maximum number of + * records in flight exceeds the backpressure threshold. + * + * @author Siddharth Jain + * @author Artem Bilan + * + * @since 4.0 + */ +public class KplBackpressureException extends RuntimeException { + + @Serial + private static final long serialVersionUID = 1L; + + private final transient UserRecord userRecord; + + public KplBackpressureException(String message, UserRecord userRecord) { + super(message); + this.userRecord = userRecord; + } + + /** + * Get the {@link UserRecord} when this exception has been thrown. + * @return the {@link UserRecord} when this exception has been thrown. + */ + public UserRecord getUserRecord() { + return this.userRecord; + } + +} diff --git a/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplMessageHandler.java b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplMessageHandler.java new file mode 100644 index 000000000..e4abc1fa4 --- /dev/null +++ b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplMessageHandler.java @@ -0,0 +1,424 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.kinesis.integration; + +import com.amazonaws.services.schemaregistry.common.Schema; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.springframework.context.Lifecycle; +import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.MessageTimeoutException; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.integration.mapping.OutboundMessageMapper; +import org.springframework.integration.support.MutableMessage; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConversionException; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import software.amazon.kinesis.producer.KinesisProducer; +import software.amazon.kinesis.producer.UserRecord; +import software.amazon.kinesis.producer.UserRecordResult; + +/** + * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer Library {@code putRecord(s)}. + *

+ * The {@link KplBackpressureException} is thrown when backpressure handling is enabled and buffer is at max capacity. + * This exception can be handled with + * {@link org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice}. + *

+ * + * @author Arnaud Lecollaire + * @author Artem Bilan + * @author Siddharth Jain + * + * @since 4.0 + */ +public class KplMessageHandler extends AbstractMessageProducingHandler implements Lifecycle { + + private static final long DEFAULT_SEND_TIMEOUT = 10000; + + private final KinesisProducer kinesisProducer; + + private MessageConverter messageConverter = new ConvertingFromMessageConverter(new SerializingConverter()); + + private Expression streamExpression; + + private Expression partitionKeyExpression; + + private Expression explicitHashKeyExpression; + + private Expression glueSchemaExpression; + + private OutboundMessageMapper embeddedHeadersMapper; + + private Duration flushDuration = Duration.ofMillis(0); + + private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT); + + private EvaluationContext evaluationContext; + + private volatile boolean running; + + private volatile ScheduledFuture flushFuture; + + private long backPressureThreshold = 0; + + public KplMessageHandler(KinesisProducer kinesisProducer) { + Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null."); + this.kinesisProducer = kinesisProducer; + } + + /** + * Configure maximum records in flight for handling backpressure. By default, backpressure handling is not enabled. + * When backpressure handling is enabled and number of records in flight exceeds the threshold, a + * {@link KplBackpressureException} would be thrown. + * @param backPressureThreshold a value greater than {@code 0} to enable backpressure handling. + */ + public void setBackPressureThreshold(long backPressureThreshold) { + Assert.isTrue(backPressureThreshold >= 0, "'backPressureThreshold must be greater than or equal to 0."); + this.backPressureThreshold = backPressureThreshold; + } + + /** + * Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record. + * @param messageConverter the {@link MessageConverter} to use. + */ + public void setMessageConverter(MessageConverter messageConverter) { + Assert.notNull(messageConverter, "'messageConverter' must not be null."); + this.messageConverter = messageConverter; + } + + public void setStream(String stream) { + setStreamExpression(new LiteralExpression(stream)); + } + + public void setStreamExpressionString(String streamExpression) { + setStreamExpression(EXPRESSION_PARSER.parseExpression(streamExpression)); + } + + public void setStreamExpression(Expression streamExpression) { + this.streamExpression = streamExpression; + } + + public void setPartitionKey(String partitionKey) { + setPartitionKeyExpression(new LiteralExpression(partitionKey)); + } + + public void setPartitionKeyExpressionString(String partitionKeyExpression) { + setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(partitionKeyExpression)); + } + + public void setPartitionKeyExpression(Expression partitionKeyExpression) { + this.partitionKeyExpression = partitionKeyExpression; + } + + public void setExplicitHashKey(String explicitHashKey) { + setExplicitHashKeyExpression(new LiteralExpression(explicitHashKey)); + } + + public void setExplicitHashKeyExpressionString(String explicitHashKeyExpression) { + setExplicitHashKeyExpression(EXPRESSION_PARSER.parseExpression(explicitHashKeyExpression)); + } + + public void setExplicitHashKeyExpression(Expression explicitHashKeyExpression) { + this.explicitHashKeyExpression = explicitHashKeyExpression; + } + + /** + * Specify a {@link OutboundMessageMapper} for embedding message headers into the record data together with payload. + * @param embeddedHeadersMapper the {@link OutboundMessageMapper} to embed headers into the record data. + */ + public void setEmbeddedHeadersMapper(OutboundMessageMapper embeddedHeadersMapper) { + this.embeddedHeadersMapper = embeddedHeadersMapper; + } + + /** + * Configure a {@link Duration} how often to call a {@link KinesisProducer#flush()}. + * @param flushDuration the {@link Duration} to periodic call of a {@link KinesisProducer#flush()}. + */ + public void setFlushDuration(Duration flushDuration) { + Assert.notNull(flushDuration, "'flushDuration' must not be null."); + this.flushDuration = flushDuration; + } + + /** + * Set a {@link Schema} to add into a {@link UserRecord} built from the request message. + * @param glueSchema the {@link Schema} to add into a {@link UserRecord}. + * @see UserRecord#setSchema(Schema) + */ + public void setGlueSchema(Schema glueSchema) { + setGlueSchemaExpression(new ValueExpression<>(glueSchema)); + } + + /** + * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} built from the request message. + * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}. + * @see UserRecord#setSchema(Schema) + */ + public void setGlueSchemaExpressionString(String glueSchemaExpression) { + setGlueSchemaExpression(EXPRESSION_PARSER.parseExpression(glueSchemaExpression)); + } + + /** + * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} built from the request message. + * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}. + * @see UserRecord#setSchema(Schema) + */ + public void setGlueSchemaExpression(Expression glueSchemaExpression) { + this.glueSchemaExpression = glueSchemaExpression; + } + + public void setSendTimeout(long sendTimeout) { + setSendTimeoutExpression(new ValueExpression<>(sendTimeout)); + } + + public void setSendTimeoutExpressionString(String sendTimeoutExpression) { + setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression)); + } + + public void setSendTimeoutExpression(Expression sendTimeoutExpression) { + Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null"); + this.sendTimeoutExpression = sendTimeoutExpression; + } + + @Override + protected void onInit() { + super.onInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + + @Override + public synchronized void start() { + if (!this.running) { + if (this.flushDuration.toMillis() > 0) { + this.flushFuture = getTaskScheduler().scheduleAtFixedRate(this.kinesisProducer::flush, + this.flushDuration); + } + this.running = true; + } + } + + @Override + public synchronized void stop() { + if (this.running) { + this.running = false; + if (this.flushFuture != null) { + this.flushFuture.cancel(true); + } + } + } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + protected void handleMessageInternal(Message message) { + UserRecord request = messageToUserRecord(message); + + CompletableFuture resultFuture = handleMessageToAws(request) + .handle((response, ex) -> handleResponse(message, response, ex)); + + if (isAsync()) { + sendOutputs(resultFuture, message); + return; + } + + Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class); + + try { + if (sendTimeout == null || sendTimeout < 0) { + resultFuture.get(); + } + else { + resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS); + } + } + catch (TimeoutException te) { + throw new MessageTimeoutException(message, "Timeout waiting for response from KinesisProducer", te); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(ex); + } + catch (ExecutionException ex) { + throw new IllegalStateException(ex); + } + } + + private UserRecord messageToUserRecord(Message message) { + Object payload = message.getPayload(); + if (payload instanceof UserRecord userRecord) { + return userRecord; + } + + return buildUserRecord(message); + } + + private UserRecord buildUserRecord(Message message) { + Object payload = message.getPayload(); + + MessageHeaders messageHeaders = message.getHeaders(); + String stream = messageHeaders.get(KinesisHeaders.STREAM, String.class); + if (!StringUtils.hasText(stream) && this.streamExpression != null) { + stream = this.streamExpression.getValue(this.evaluationContext, message, String.class); + } + Assert.state(stream != null, + "'stream' must not be null for sending a Kinesis record. " + + "Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an " + + "'aws_stream' message header."); + + String partitionKey = messageHeaders.get(KinesisHeaders.PARTITION_KEY, String.class); + if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) { + partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class); + } + Assert.state(partitionKey != null, + "'partitionKey' must not be null for sending a Kinesis record." + + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " + + "or supply an 'aws_partitionKey' message header."); + + String explicitHashKey = this.explicitHashKeyExpression != null + ? this.explicitHashKeyExpression.getValue(this.evaluationContext, message, String.class) + : null; + + Schema schema = this.glueSchemaExpression != null + ? this.glueSchemaExpression.getValue(this.evaluationContext, message, Schema.class) + : null; + + Message messageToEmbed = null; + ByteBuffer data = null; + + if (payload instanceof ByteBuffer byteBuffer) { + data = byteBuffer; + if (this.embeddedHeadersMapper != null) { + messageToEmbed = new MutableMessage<>(data.array(), messageHeaders); + } + } + else { + byte[] bytes = (byte[]) (payload instanceof byte[] ? payload + : this.messageConverter.fromMessage(message, byte[].class)); + Assert.notNull(bytes, "payload cannot be null"); + if (this.embeddedHeadersMapper != null) { + messageToEmbed = new MutableMessage<>(bytes, messageHeaders); + } + else { + data = ByteBuffer.wrap(bytes); + } + } + + if (messageToEmbed != null) { + try { + byte[] bytes = this.embeddedHeadersMapper.fromMessage(messageToEmbed); + Assert.notNull(bytes, "payload cannot be null"); + data = ByteBuffer.wrap(bytes); + } + catch (Exception ex) { + throw new MessageConversionException(message, "Cannot embedded headers to payload", ex); + } + } + + return new UserRecord().withStreamName(stream).withPartitionKey(partitionKey) + .withExplicitHashKey(explicitHashKey).withData(data).withSchema(schema); + } + + private CompletableFuture handleMessageToAws(UserRecord userRecord) { + try { + return handleUserRecord(userRecord); + } + finally { + if (this.flushDuration.toMillis() <= 0) { + this.kinesisProducer.flush(); + } + } + } + + private CompletableFuture handleUserRecord(UserRecord userRecord) { + if (this.backPressureThreshold > 0) { + var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount(); + if (numberOfRecordsInFlight > this.backPressureThreshold) { + throw new KplBackpressureException("Cannot send record to Kinesis since buffer is at max capacity.", + userRecord); + } + } + + return listenableFutureToCompletableFuture(this.kinesisProducer.addUserRecord(userRecord)); + } + + private Message handleResponse(Message message, UserRecordResult response, Throwable cause) { + if (cause != null) { + throw new MessageHandlingException(message, cause); + } + return getMessageBuilderFactory().fromMessage(message).copyHeadersIfAbsent(additionalOnSuccessHeaders(response)) + .build(); + } + + private static Map additionalOnSuccessHeaders(UserRecordResult response) { + return Map.of(KinesisHeaders.SHARD, response.getShardId(), KinesisHeaders.SEQUENCE_NUMBER, + response.getSequenceNumber()); + } + + private static CompletableFuture listenableFutureToCompletableFuture(ListenableFuture listenableFuture) { + CompletableFuture completable = new CompletableFuture<>() { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + // propagate cancel to the listenable future + boolean result = listenableFuture.cancel(mayInterruptIfRunning); + super.cancel(mayInterruptIfRunning); + return result; + } + + }; + + // add callback + Futures.addCallback(listenableFuture, new FutureCallback<>() { + + @Override + public void onSuccess(T result) { + completable.complete(result); + } + + @Override + public void onFailure(Throwable ex) { + completable.completeExceptionally(ex); + } + + }, MoreExecutors.directExecutor()); + + return completable; + } + +} diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java index 599862add..4fa957a7b 100644 --- a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java +++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -112,13 +111,14 @@ void kinesisInboundOutbound() throws InterruptedException { .contains("Channel 'kinesisReceiveChannel' expected one of the following data types " + "[class java.util.Date], but received [class java.lang.String]"); - String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber(); + String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class) + .sequenceNumber(); // Second exception for the same record since we have requested via bubbling exception up to the consumer errorMessage = this.errorChannel.receive(30_000); assertThat(errorMessage).isNotNull(); assertThat(errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber()) - .isEqualTo(errorSequenceNumber); + .isEqualTo(errorSequenceNumber); for (int i = 0; i < 2; i++) { this.kinesisSendChannel diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KplMessageHandlerTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KplMessageHandlerTests.java new file mode 100644 index 000000000..21504438a --- /dev/null +++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KplMessageHandlerTests.java @@ -0,0 +1,168 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.kinesis.integration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.amazonaws.services.schemaregistry.common.Schema; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import software.amazon.kinesis.producer.KinesisProducer; +import software.amazon.kinesis.producer.UserRecord; + +/** + * + * @author Siddharth Jain + * @author Artem Bilan + * + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +public class KplMessageHandlerTests { + + @Autowired + protected Schema schema; + + @Autowired + protected KinesisProducer kinesisProducer; + + @Autowired + protected MessageChannel kinesisSendChannel; + + @Autowired + protected KplMessageHandler kplMessageHandler; + + @Test + void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() { + given(this.kinesisProducer.addUserRecord(any(UserRecord.class))).willReturn(mock()); + final Message message = MessageBuilder.withPayload("someMessage") + .setHeader(KinesisHeaders.PARTITION_KEY, "somePartitionKey") + .setHeader(KinesisHeaders.SEQUENCE_NUMBER, "10").setHeader("someHeaderKey", "someHeaderValue").build(); + + ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor.forClass(UserRecord.class); + this.kplMessageHandler.setBackPressureThreshold(0); + this.kinesisSendChannel.send(message); + verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); + verify(this.kinesisProducer, Mockito.never()).getOutstandingRecordsCount(); + UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); + assertThat(userRecord.getStreamName()).isEqualTo("someStream"); + assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey"); + assertThat(userRecord.getExplicitHashKey()).isNull(); + assertThat(userRecord.getSchema()).isSameAs(this.schema); + } + + @Test + void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() { + given(this.kinesisProducer.addUserRecord(any(UserRecord.class))).willReturn(mock()); + this.kplMessageHandler.setBackPressureThreshold(2); + given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(1); + final Message message = MessageBuilder.withPayload("someMessage") + .setHeader(KinesisHeaders.PARTITION_KEY, "somePartitionKey") + .setHeader(KinesisHeaders.SEQUENCE_NUMBER, "10").setHeader("someHeaderKey", "someHeaderValue").build(); + + ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor.forClass(UserRecord.class); + + this.kinesisSendChannel.send(message); + verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); + verify(this.kinesisProducer).getOutstandingRecordsCount(); + UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); + assertThat(userRecord.getStreamName()).isEqualTo("someStream"); + assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey"); + assertThat(userRecord.getExplicitHashKey()).isNull(); + assertThat(userRecord.getSchema()).isSameAs(this.schema); + } + + @Test + void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() { + given(this.kinesisProducer.addUserRecord(any(UserRecord.class))).willReturn(mock()); + this.kplMessageHandler.setBackPressureThreshold(2); + given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(5); + final Message message = MessageBuilder.withPayload("someMessage") + .setHeader(KinesisHeaders.PARTITION_KEY, "somePartitionKey") + .setHeader(KinesisHeaders.SEQUENCE_NUMBER, "10").setHeader("someHeaderKey", "someHeaderValue").build(); + + assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.kinesisSendChannel.send(message)) + .withCauseInstanceOf(MessageHandlingException.class) + .withRootCauseExactlyInstanceOf(KplBackpressureException.class) + .withStackTraceContaining("Cannot send record to Kinesis since buffer is at max capacity."); + + verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class)); + verify(this.kinesisProducer).getOutstandingRecordsCount(); + } + + @AfterEach + public void tearDown() { + clearInvocations(this.kinesisProducer); + } + + @Configuration(proxyBeanMethods = false) + @EnableIntegration + public static class ContextConfiguration { + + @Bean + public KinesisProducer kinesisProducer() { + return mock(); + } + + @Bean + public RequestHandlerRetryAdvice retryAdvice() { + RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); + requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder().retryOn(KplBackpressureException.class) + .exponentialBackoff(100, 2.0, 1000).maxAttempts(3).build()); + return requestHandlerRetryAdvice; + } + + @Bean + @ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = "retryAdvice") + public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer, Schema schema) { + KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); + kplMessageHandler.setAsync(true); + kplMessageHandler.setStream("someStream"); + kplMessageHandler.setGlueSchema(schema); + return kplMessageHandler; + } + + @Bean + public Schema schema() { + return new Schema("syntax=\"proto2\";", "PROTOBUF", "testschema"); + } + + } + +} diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer/pom.xml new file mode 100644 index 000000000..1e2a4c536 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer/pom.xml @@ -0,0 +1,35 @@ + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + Spring Cloud AWS Starter for Spring Integration with Kinesis Producer Library + spring-cloud-aws-starter-integration-kinesis-producer + + + + io.awspring.cloud + spring-cloud-aws-starter + + + io.awspring.cloud + spring-cloud-aws-kinesis + + + software.amazon.kinesis + amazon-kinesis-producer + + + org.springframework.integration + spring-integration-core + + + +