io.awspring.cloud
diff --git a/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplBackpressureException.java b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplBackpressureException.java
new file mode 100644
index 000000000..b21248733
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplBackpressureException.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import java.io.Serial;
+import software.amazon.kinesis.producer.UserRecord;
+
+/**
+ * An exception triggered from the {@link KplMessageHandler} while sending records to Kinesis when maximum number of
+ * records in flight exceeds the backpressure threshold.
+ *
+ * @author Siddharth Jain
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+public class KplBackpressureException extends RuntimeException {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private final transient UserRecord userRecord;
+
+ public KplBackpressureException(String message, UserRecord userRecord) {
+ super(message);
+ this.userRecord = userRecord;
+ }
+
+ /**
+ * Get the {@link UserRecord} when this exception has been thrown.
+ * @return the {@link UserRecord} when this exception has been thrown.
+ */
+ public UserRecord getUserRecord() {
+ return this.userRecord;
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplMessageHandler.java b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplMessageHandler.java
new file mode 100644
index 000000000..e4abc1fa4
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/main/java/io/awspring/cloud/kinesis/integration/KplMessageHandler.java
@@ -0,0 +1,424 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import com.amazonaws.services.schemaregistry.common.Schema;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.springframework.context.Lifecycle;
+import org.springframework.core.serializer.support.SerializingConverter;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.Expression;
+import org.springframework.expression.common.LiteralExpression;
+import org.springframework.integration.MessageTimeoutException;
+import org.springframework.integration.expression.ExpressionUtils;
+import org.springframework.integration.expression.ValueExpression;
+import org.springframework.integration.handler.AbstractMessageHandler;
+import org.springframework.integration.handler.AbstractMessageProducingHandler;
+import org.springframework.integration.mapping.OutboundMessageMapper;
+import org.springframework.integration.support.MutableMessage;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHandlingException;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConversionException;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+import software.amazon.kinesis.producer.KinesisProducer;
+import software.amazon.kinesis.producer.UserRecord;
+import software.amazon.kinesis.producer.UserRecordResult;
+
+/**
+ * The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer Library {@code putRecord(s)}.
+ *
+ * The {@link KplBackpressureException} is thrown when backpressure handling is enabled and buffer is at max capacity.
+ * This exception can be handled with
+ * {@link org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice}.
+ *
+ *
+ * @author Arnaud Lecollaire
+ * @author Artem Bilan
+ * @author Siddharth Jain
+ *
+ * @since 4.0
+ */
+public class KplMessageHandler extends AbstractMessageProducingHandler implements Lifecycle {
+
+ private static final long DEFAULT_SEND_TIMEOUT = 10000;
+
+ private final KinesisProducer kinesisProducer;
+
+ private MessageConverter messageConverter = new ConvertingFromMessageConverter(new SerializingConverter());
+
+ private Expression streamExpression;
+
+ private Expression partitionKeyExpression;
+
+ private Expression explicitHashKeyExpression;
+
+ private Expression glueSchemaExpression;
+
+ private OutboundMessageMapper embeddedHeadersMapper;
+
+ private Duration flushDuration = Duration.ofMillis(0);
+
+ private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
+
+ private EvaluationContext evaluationContext;
+
+ private volatile boolean running;
+
+ private volatile ScheduledFuture> flushFuture;
+
+ private long backPressureThreshold = 0;
+
+ public KplMessageHandler(KinesisProducer kinesisProducer) {
+ Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null.");
+ this.kinesisProducer = kinesisProducer;
+ }
+
+ /**
+ * Configure maximum records in flight for handling backpressure. By default, backpressure handling is not enabled.
+ * When backpressure handling is enabled and number of records in flight exceeds the threshold, a
+ * {@link KplBackpressureException} would be thrown.
+ * @param backPressureThreshold a value greater than {@code 0} to enable backpressure handling.
+ */
+ public void setBackPressureThreshold(long backPressureThreshold) {
+ Assert.isTrue(backPressureThreshold >= 0, "'backPressureThreshold must be greater than or equal to 0.");
+ this.backPressureThreshold = backPressureThreshold;
+ }
+
+ /**
+ * Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record.
+ * @param messageConverter the {@link MessageConverter} to use.
+ */
+ public void setMessageConverter(MessageConverter messageConverter) {
+ Assert.notNull(messageConverter, "'messageConverter' must not be null.");
+ this.messageConverter = messageConverter;
+ }
+
+ public void setStream(String stream) {
+ setStreamExpression(new LiteralExpression(stream));
+ }
+
+ public void setStreamExpressionString(String streamExpression) {
+ setStreamExpression(EXPRESSION_PARSER.parseExpression(streamExpression));
+ }
+
+ public void setStreamExpression(Expression streamExpression) {
+ this.streamExpression = streamExpression;
+ }
+
+ public void setPartitionKey(String partitionKey) {
+ setPartitionKeyExpression(new LiteralExpression(partitionKey));
+ }
+
+ public void setPartitionKeyExpressionString(String partitionKeyExpression) {
+ setPartitionKeyExpression(EXPRESSION_PARSER.parseExpression(partitionKeyExpression));
+ }
+
+ public void setPartitionKeyExpression(Expression partitionKeyExpression) {
+ this.partitionKeyExpression = partitionKeyExpression;
+ }
+
+ public void setExplicitHashKey(String explicitHashKey) {
+ setExplicitHashKeyExpression(new LiteralExpression(explicitHashKey));
+ }
+
+ public void setExplicitHashKeyExpressionString(String explicitHashKeyExpression) {
+ setExplicitHashKeyExpression(EXPRESSION_PARSER.parseExpression(explicitHashKeyExpression));
+ }
+
+ public void setExplicitHashKeyExpression(Expression explicitHashKeyExpression) {
+ this.explicitHashKeyExpression = explicitHashKeyExpression;
+ }
+
+ /**
+ * Specify a {@link OutboundMessageMapper} for embedding message headers into the record data together with payload.
+ * @param embeddedHeadersMapper the {@link OutboundMessageMapper} to embed headers into the record data.
+ */
+ public void setEmbeddedHeadersMapper(OutboundMessageMapper embeddedHeadersMapper) {
+ this.embeddedHeadersMapper = embeddedHeadersMapper;
+ }
+
+ /**
+ * Configure a {@link Duration} how often to call a {@link KinesisProducer#flush()}.
+ * @param flushDuration the {@link Duration} to periodic call of a {@link KinesisProducer#flush()}.
+ */
+ public void setFlushDuration(Duration flushDuration) {
+ Assert.notNull(flushDuration, "'flushDuration' must not be null.");
+ this.flushDuration = flushDuration;
+ }
+
+ /**
+ * Set a {@link Schema} to add into a {@link UserRecord} built from the request message.
+ * @param glueSchema the {@link Schema} to add into a {@link UserRecord}.
+ * @see UserRecord#setSchema(Schema)
+ */
+ public void setGlueSchema(Schema glueSchema) {
+ setGlueSchemaExpression(new ValueExpression<>(glueSchema));
+ }
+
+ /**
+ * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} built from the request message.
+ * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}.
+ * @see UserRecord#setSchema(Schema)
+ */
+ public void setGlueSchemaExpressionString(String glueSchemaExpression) {
+ setGlueSchemaExpression(EXPRESSION_PARSER.parseExpression(glueSchemaExpression));
+ }
+
+ /**
+ * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord} built from the request message.
+ * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}.
+ * @see UserRecord#setSchema(Schema)
+ */
+ public void setGlueSchemaExpression(Expression glueSchemaExpression) {
+ this.glueSchemaExpression = glueSchemaExpression;
+ }
+
+ public void setSendTimeout(long sendTimeout) {
+ setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
+ }
+
+ public void setSendTimeoutExpressionString(String sendTimeoutExpression) {
+ setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression));
+ }
+
+ public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
+ Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null");
+ this.sendTimeoutExpression = sendTimeoutExpression;
+ }
+
+ @Override
+ protected void onInit() {
+ super.onInit();
+ this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
+ }
+
+ @Override
+ public synchronized void start() {
+ if (!this.running) {
+ if (this.flushDuration.toMillis() > 0) {
+ this.flushFuture = getTaskScheduler().scheduleAtFixedRate(this.kinesisProducer::flush,
+ this.flushDuration);
+ }
+ this.running = true;
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (this.running) {
+ this.running = false;
+ if (this.flushFuture != null) {
+ this.flushFuture.cancel(true);
+ }
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.running;
+ }
+
+ @Override
+ protected void handleMessageInternal(Message> message) {
+ UserRecord request = messageToUserRecord(message);
+
+ CompletableFuture> resultFuture = handleMessageToAws(request)
+ .handle((response, ex) -> handleResponse(message, response, ex));
+
+ if (isAsync()) {
+ sendOutputs(resultFuture, message);
+ return;
+ }
+
+ Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
+
+ try {
+ if (sendTimeout == null || sendTimeout < 0) {
+ resultFuture.get();
+ }
+ else {
+ resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ catch (TimeoutException te) {
+ throw new MessageTimeoutException(message, "Timeout waiting for response from KinesisProducer", te);
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(ex);
+ }
+ catch (ExecutionException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ private UserRecord messageToUserRecord(Message> message) {
+ Object payload = message.getPayload();
+ if (payload instanceof UserRecord userRecord) {
+ return userRecord;
+ }
+
+ return buildUserRecord(message);
+ }
+
+ private UserRecord buildUserRecord(Message> message) {
+ Object payload = message.getPayload();
+
+ MessageHeaders messageHeaders = message.getHeaders();
+ String stream = messageHeaders.get(KinesisHeaders.STREAM, String.class);
+ if (!StringUtils.hasText(stream) && this.streamExpression != null) {
+ stream = this.streamExpression.getValue(this.evaluationContext, message, String.class);
+ }
+ Assert.state(stream != null,
+ "'stream' must not be null for sending a Kinesis record. "
+ + "Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an "
+ + "'aws_stream' message header.");
+
+ String partitionKey = messageHeaders.get(KinesisHeaders.PARTITION_KEY, String.class);
+ if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
+ partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class);
+ }
+ Assert.state(partitionKey != null,
+ "'partitionKey' must not be null for sending a Kinesis record."
+ + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') "
+ + "or supply an 'aws_partitionKey' message header.");
+
+ String explicitHashKey = this.explicitHashKeyExpression != null
+ ? this.explicitHashKeyExpression.getValue(this.evaluationContext, message, String.class)
+ : null;
+
+ Schema schema = this.glueSchemaExpression != null
+ ? this.glueSchemaExpression.getValue(this.evaluationContext, message, Schema.class)
+ : null;
+
+ Message> messageToEmbed = null;
+ ByteBuffer data = null;
+
+ if (payload instanceof ByteBuffer byteBuffer) {
+ data = byteBuffer;
+ if (this.embeddedHeadersMapper != null) {
+ messageToEmbed = new MutableMessage<>(data.array(), messageHeaders);
+ }
+ }
+ else {
+ byte[] bytes = (byte[]) (payload instanceof byte[] ? payload
+ : this.messageConverter.fromMessage(message, byte[].class));
+ Assert.notNull(bytes, "payload cannot be null");
+ if (this.embeddedHeadersMapper != null) {
+ messageToEmbed = new MutableMessage<>(bytes, messageHeaders);
+ }
+ else {
+ data = ByteBuffer.wrap(bytes);
+ }
+ }
+
+ if (messageToEmbed != null) {
+ try {
+ byte[] bytes = this.embeddedHeadersMapper.fromMessage(messageToEmbed);
+ Assert.notNull(bytes, "payload cannot be null");
+ data = ByteBuffer.wrap(bytes);
+ }
+ catch (Exception ex) {
+ throw new MessageConversionException(message, "Cannot embedded headers to payload", ex);
+ }
+ }
+
+ return new UserRecord().withStreamName(stream).withPartitionKey(partitionKey)
+ .withExplicitHashKey(explicitHashKey).withData(data).withSchema(schema);
+ }
+
+ private CompletableFuture handleMessageToAws(UserRecord userRecord) {
+ try {
+ return handleUserRecord(userRecord);
+ }
+ finally {
+ if (this.flushDuration.toMillis() <= 0) {
+ this.kinesisProducer.flush();
+ }
+ }
+ }
+
+ private CompletableFuture handleUserRecord(UserRecord userRecord) {
+ if (this.backPressureThreshold > 0) {
+ var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount();
+ if (numberOfRecordsInFlight > this.backPressureThreshold) {
+ throw new KplBackpressureException("Cannot send record to Kinesis since buffer is at max capacity.",
+ userRecord);
+ }
+ }
+
+ return listenableFutureToCompletableFuture(this.kinesisProducer.addUserRecord(userRecord));
+ }
+
+ private Message> handleResponse(Message> message, UserRecordResult response, Throwable cause) {
+ if (cause != null) {
+ throw new MessageHandlingException(message, cause);
+ }
+ return getMessageBuilderFactory().fromMessage(message).copyHeadersIfAbsent(additionalOnSuccessHeaders(response))
+ .build();
+ }
+
+ private static Map additionalOnSuccessHeaders(UserRecordResult response) {
+ return Map.of(KinesisHeaders.SHARD, response.getShardId(), KinesisHeaders.SEQUENCE_NUMBER,
+ response.getSequenceNumber());
+ }
+
+ private static CompletableFuture listenableFutureToCompletableFuture(ListenableFuture listenableFuture) {
+ CompletableFuture completable = new CompletableFuture<>() {
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // propagate cancel to the listenable future
+ boolean result = listenableFuture.cancel(mayInterruptIfRunning);
+ super.cancel(mayInterruptIfRunning);
+ return result;
+ }
+
+ };
+
+ // add callback
+ Futures.addCallback(listenableFuture, new FutureCallback<>() {
+
+ @Override
+ public void onSuccess(T result) {
+ completable.complete(result);
+ }
+
+ @Override
+ public void onFailure(Throwable ex) {
+ completable.completeExceptionally(ex);
+ }
+
+ }, MoreExecutors.directExecutor());
+
+ return completable;
+ }
+
+}
diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java
index 599862add..4fa957a7b 100644
--- a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java
+++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KinesisIntegrationTests.java
@@ -23,7 +23,6 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -112,13 +111,14 @@ void kinesisInboundOutbound() throws InterruptedException {
.contains("Channel 'kinesisReceiveChannel' expected one of the following data types "
+ "[class java.util.Date], but received [class java.lang.String]");
- String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber();
+ String errorSequenceNumber = errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class)
+ .sequenceNumber();
// Second exception for the same record since we have requested via bubbling exception up to the consumer
errorMessage = this.errorChannel.receive(30_000);
assertThat(errorMessage).isNotNull();
assertThat(errorMessage.getHeaders().get(KinesisHeaders.RAW_RECORD, Record.class).sequenceNumber())
- .isEqualTo(errorSequenceNumber);
+ .isEqualTo(errorSequenceNumber);
for (int i = 0; i < 2; i++) {
this.kinesisSendChannel
diff --git a/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KplMessageHandlerTests.java b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KplMessageHandlerTests.java
new file mode 100644
index 000000000..21504438a
--- /dev/null
+++ b/spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/KplMessageHandlerTests.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.kinesis.integration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.amazonaws.services.schemaregistry.common.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessageHandlingException;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import software.amazon.kinesis.producer.KinesisProducer;
+import software.amazon.kinesis.producer.UserRecord;
+
+/**
+ *
+ * @author Siddharth Jain
+ * @author Artem Bilan
+ *
+ * @since 4.0
+ */
+@SpringJUnitConfig
+@DirtiesContext
+public class KplMessageHandlerTests {
+
+ @Autowired
+ protected Schema schema;
+
+ @Autowired
+ protected KinesisProducer kinesisProducer;
+
+ @Autowired
+ protected MessageChannel kinesisSendChannel;
+
+ @Autowired
+ protected KplMessageHandler kplMessageHandler;
+
+ @Test
+ void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() {
+ given(this.kinesisProducer.addUserRecord(any(UserRecord.class))).willReturn(mock());
+ final Message> message = MessageBuilder.withPayload("someMessage")
+ .setHeader(KinesisHeaders.PARTITION_KEY, "somePartitionKey")
+ .setHeader(KinesisHeaders.SEQUENCE_NUMBER, "10").setHeader("someHeaderKey", "someHeaderValue").build();
+
+ ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor.forClass(UserRecord.class);
+ this.kplMessageHandler.setBackPressureThreshold(0);
+ this.kinesisSendChannel.send(message);
+ verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
+ verify(this.kinesisProducer, Mockito.never()).getOutstandingRecordsCount();
+ UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
+ assertThat(userRecord.getStreamName()).isEqualTo("someStream");
+ assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
+ assertThat(userRecord.getExplicitHashKey()).isNull();
+ assertThat(userRecord.getSchema()).isSameAs(this.schema);
+ }
+
+ @Test
+ void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() {
+ given(this.kinesisProducer.addUserRecord(any(UserRecord.class))).willReturn(mock());
+ this.kplMessageHandler.setBackPressureThreshold(2);
+ given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(1);
+ final Message> message = MessageBuilder.withPayload("someMessage")
+ .setHeader(KinesisHeaders.PARTITION_KEY, "somePartitionKey")
+ .setHeader(KinesisHeaders.SEQUENCE_NUMBER, "10").setHeader("someHeaderKey", "someHeaderValue").build();
+
+ ArgumentCaptor userRecordRequestArgumentCaptor = ArgumentCaptor.forClass(UserRecord.class);
+
+ this.kinesisSendChannel.send(message);
+ verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
+ verify(this.kinesisProducer).getOutstandingRecordsCount();
+ UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
+ assertThat(userRecord.getStreamName()).isEqualTo("someStream");
+ assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
+ assertThat(userRecord.getExplicitHashKey()).isNull();
+ assertThat(userRecord.getSchema()).isSameAs(this.schema);
+ }
+
+ @Test
+ void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() {
+ given(this.kinesisProducer.addUserRecord(any(UserRecord.class))).willReturn(mock());
+ this.kplMessageHandler.setBackPressureThreshold(2);
+ given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(5);
+ final Message> message = MessageBuilder.withPayload("someMessage")
+ .setHeader(KinesisHeaders.PARTITION_KEY, "somePartitionKey")
+ .setHeader(KinesisHeaders.SEQUENCE_NUMBER, "10").setHeader("someHeaderKey", "someHeaderValue").build();
+
+ assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.kinesisSendChannel.send(message))
+ .withCauseInstanceOf(MessageHandlingException.class)
+ .withRootCauseExactlyInstanceOf(KplBackpressureException.class)
+ .withStackTraceContaining("Cannot send record to Kinesis since buffer is at max capacity.");
+
+ verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class));
+ verify(this.kinesisProducer).getOutstandingRecordsCount();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ clearInvocations(this.kinesisProducer);
+ }
+
+ @Configuration(proxyBeanMethods = false)
+ @EnableIntegration
+ public static class ContextConfiguration {
+
+ @Bean
+ public KinesisProducer kinesisProducer() {
+ return mock();
+ }
+
+ @Bean
+ public RequestHandlerRetryAdvice retryAdvice() {
+ RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
+ requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder().retryOn(KplBackpressureException.class)
+ .exponentialBackoff(100, 2.0, 1000).maxAttempts(3).build());
+ return requestHandlerRetryAdvice;
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = "retryAdvice")
+ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer, Schema schema) {
+ KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
+ kplMessageHandler.setAsync(true);
+ kplMessageHandler.setStream("someStream");
+ kplMessageHandler.setGlueSchema(schema);
+ return kplMessageHandler;
+ }
+
+ @Bean
+ public Schema schema() {
+ return new Schema("syntax=\"proto2\";", "PROTOBUF", "testschema");
+ }
+
+ }
+
+}
diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer/pom.xml
new file mode 100644
index 000000000..1e2a4c536
--- /dev/null
+++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+ spring-cloud-aws
+ io.awspring.cloud
+ 4.0.0-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ Spring Cloud AWS Starter for Spring Integration with Kinesis Producer Library
+ spring-cloud-aws-starter-integration-kinesis-producer
+
+
+
+ io.awspring.cloud
+ spring-cloud-aws-starter
+
+
+ io.awspring.cloud
+ spring-cloud-aws-kinesis
+
+
+ software.amazon.kinesis
+ amazon-kinesis-producer
+
+
+ org.springframework.integration
+ spring-integration-core
+
+
+
+