diff --git a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java index e75bd82521..d38d3a7006 100644 --- a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java +++ b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java @@ -44,7 +44,7 @@ public class SingleWalObjectWriteTask { private final List requests; private final ObjectManager objectManager; private final S3Operator s3Operator; - private final List streams; + private final List streamRanges; private ByteBuf objectBuf; private CommitWalObjectResponse response; private volatile boolean isDone = false; @@ -52,7 +52,7 @@ public class SingleWalObjectWriteTask { public SingleWalObjectWriteTask(List records, ObjectManager objectManager, S3Operator s3Operator) { Collections.sort(records); this.requests = records; - this.streams = new LinkedList<>(); + this.streamRanges = new LinkedList<>(); this.objectManager = objectManager; this.s3Operator = s3Operator; parse(); @@ -74,7 +74,7 @@ public CompletableFuture upload() { CommitWalObjectRequest request = new CommitWalObjectRequest(); request.setObjectId(context.objectId); request.setObjectSize(objectBuf.readableBytes()); - request.setStreamRanges(streams); + request.setStreamRanges(streamRanges); return objectManager.commitWalObject(request); }) .thenApply(resp -> { @@ -98,7 +98,7 @@ public void parse() { long currentStreamId = record.getStreamId(); if (streamId != currentStreamId) { if (streamId != -1) { - streams.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset)); + streamRanges.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset)); } streamId = currentStreamId; streamEpoch = record.getEpoch(); @@ -116,7 +116,7 @@ public void parse() { } // add last stream range if (streamId != -1) { - streams.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset)); + streamRanges.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset)); } objectBuf = Unpooled.wrappedBuffer(compressed.buffer().flip()); } diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3WalTest.java index 0b983f57ab..220a25d25a 100644 --- a/core/src/test/java/kafka/log/s3/S3WalTest.java +++ b/core/src/test/java/kafka/log/s3/S3WalTest.java @@ -70,14 +70,14 @@ public void testAppend() throws Exception { verify(objectManager).commitWalObject(commitArg.capture()); CommitWalObjectRequest commitReq = commitArg.getValue(); assertEquals(16L, commitReq.getObjectId()); - List streams = commitReq.getStreamRanges(); - assertEquals(2, streams.size()); - assertEquals(233, streams.get(0).getStreamId()); - assertEquals(10, streams.get(0).getStartOffset()); - assertEquals(13, streams.get(0).getEndOffset()); - assertEquals(234, streams.get(1).getStreamId()); - assertEquals(100, streams.get(1).getStartOffset()); - assertEquals(101, streams.get(1).getEndOffset()); + List streamRanges = commitReq.getStreamRanges(); + assertEquals(2, streamRanges.size()); + assertEquals(233, streamRanges.get(0).getStreamId()); + assertEquals(10, streamRanges.get(0).getStartOffset()); + assertEquals(13, streamRanges.get(0).getEndOffset()); + assertEquals(234, streamRanges.get(1).getStreamId()); + assertEquals(100, streamRanges.get(1).getStartOffset()); + assertEquals(101, streamRanges.get(1).getEndOffset()); } @Test