diff --git a/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java b/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java index fe044b761d..fa83fafe66 100644 --- a/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java +++ b/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java @@ -68,8 +68,8 @@ public static ByteBuf encode(long streamId, long baseOffset, RecordBatch recordB // encode RecordBatchMeta RecordBatchMeta.startRecordBatchMeta(metaBuilder); - RecordBatchMeta.addBaseOffset(metaBuilder, baseOffset); RecordBatchMeta.addStreamId(metaBuilder, streamId); + RecordBatchMeta.addBaseOffset(metaBuilder, baseOffset); RecordBatchMeta.addLastOffsetDelta(metaBuilder, recordBatch.count()); RecordBatchMeta.addBaseTimestamp(metaBuilder, recordBatch.baseTimestamp()); if (null != propsVector) { diff --git a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java index 39223acb48..e75bd82521 100644 --- a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java +++ b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java @@ -74,7 +74,7 @@ public CompletableFuture upload() { CommitWalObjectRequest request = new CommitWalObjectRequest(); request.setObjectId(context.objectId); request.setObjectSize(objectBuf.readableBytes()); - request.setStreams(streams); + request.setStreamRanges(streams); return objectManager.commitWalObject(request); }) .thenApply(resp -> { diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3WalTest.java index 50ff1e05b6..0b983f57ab 100644 --- a/core/src/test/java/kafka/log/s3/S3WalTest.java +++ b/core/src/test/java/kafka/log/s3/S3WalTest.java @@ -70,7 +70,7 @@ public void testAppend() throws Exception { verify(objectManager).commitWalObject(commitArg.capture()); CommitWalObjectRequest commitReq = commitArg.getValue(); assertEquals(16L, commitReq.getObjectId()); - List streams = commitReq.getStreams(); + List streams = commitReq.getStreamRanges(); assertEquals(2, streams.size()); assertEquals(233, streams.get(0).getStreamId()); assertEquals(10, streams.get(0).getStartOffset());