From ff54a0137281a184e33a43c10c1fc39b061bf36c Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Sat, 18 May 2024 00:19:17 +0800 Subject: [PATCH] Change aws2 kinesis record body type to byte array --- .../camel/component/aws2/kinesis/Kinesis2Consumer.java | 2 +- .../camel/component/aws2/kinesis/Kinesis2Producer.java | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 93209375dcdff..fd9b1ee098d6a 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -303,7 +303,7 @@ private Queue createExchanges(Shard shard, List records) { protected Exchange createExchange(Shard shard, Record dataRecord) { LOG.debug("Received Kinesis record with partition_key={}", dataRecord.partitionKey()); Exchange exchange = createExchange(true); - exchange.getIn().setBody(dataRecord.data().asInputStream()); + exchange.getIn().setBody(dataRecord.data().asByteArray()); exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, dataRecord.approximateArrivalTimestamp()); exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, dataRecord.partitionKey()); exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, dataRecord.sequenceNumber()); diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java index 1763dd3fe8355..ce8903aa4ac8d 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.aws2.kinesis; -import java.nio.ByteBuffer; - import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.support.DefaultProducer; @@ -57,12 +55,12 @@ public void process(Exchange exchange) throws Exception { } private PutRecordRequest createRequest(Exchange exchange) { - ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class); + byte[] body = exchange.getIn().getBody(byte[].class); Object partitionKey = exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY); Object sequenceNumber = exchange.getIn().getHeader(Kinesis2Constants.SEQUENCE_NUMBER); PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder(); - putRecordRequest.data(SdkBytes.fromByteBuffer(body)); + putRecordRequest.data(SdkBytes.fromByteArray(body)); putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName()); putRecordRequest.partitionKey(partitionKey.toString()); if (sequenceNumber != null) {