diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index ab85406d16..2f4b2f3a5d 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -52,6 +52,7 @@ + diff --git a/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java b/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java new file mode 100644 index 0000000000..fe044b761d --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import com.automq.elasticstream.client.FlatRecordBatchWithContext; +import com.automq.elasticstream.client.api.RecordBatch; +import com.automq.elasticstream.client.api.RecordBatchWithContext; +import com.automq.elasticstream.client.flatc.records.KeyValue; +import com.automq.elasticstream.client.flatc.records.RecordBatchMeta; +import com.automq.elasticstream.client.flatc.records.RecordBatchMetaT; +import com.google.flatbuffers.FlatBufferBuilder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class RecordBatchCodec { + private static final byte MAGIC_V0 = 0x22; + private static final ThreadLocal META_BUF = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4096)); + private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT; + + /** + * Encode RecordBatch to storage format record. + * + * @param recordBatch {@link RecordBatch} + * @return storage format record bytes. + */ + public static ByteBuf encode(long streamId, long baseOffset, RecordBatch recordBatch) { + + int totalLength = 0; + + totalLength += 1; // magic + + FlatBufferBuilder metaBuilder = new FlatBufferBuilder(META_BUF.get()); + + Integer propsVector = null; + int propsSize = recordBatch.properties().size(); + if (propsSize > 0) { + int[] kvs = new int[propsSize]; + int index = 0; + for (Map.Entry kv : recordBatch.properties().entrySet()) { + int k = metaBuilder.createString(kv.getKey()); + int v = metaBuilder.createString(kv.getValue()); + int kvPtr = KeyValue.createKeyValue(metaBuilder, k, v); + kvs[index++] = kvPtr; + } + propsVector = RecordBatchMeta.createPropertiesVector(metaBuilder, kvs); + } + + // encode RecordBatchMeta + RecordBatchMeta.startRecordBatchMeta(metaBuilder); + RecordBatchMeta.addBaseOffset(metaBuilder, baseOffset); + RecordBatchMeta.addStreamId(metaBuilder, streamId); + RecordBatchMeta.addLastOffsetDelta(metaBuilder, recordBatch.count()); + RecordBatchMeta.addBaseTimestamp(metaBuilder, recordBatch.baseTimestamp()); + if (null != propsVector) { + RecordBatchMeta.addProperties(metaBuilder, propsVector); + } + int ptr = RecordBatchMeta.endRecordBatchMeta(metaBuilder); + metaBuilder.finish(ptr); + + // The data in this ByteBuffer does NOT start at 0, but at buf.position(). + // The number of bytes is buf.remaining(). + ByteBuffer metaBuf = metaBuilder.dataBuffer(); + + totalLength += 4; // meta length + totalLength += metaBuf.remaining(); // RecordBatchMeta + + totalLength += 4; // payload length + totalLength += recordBatch.rawPayload().remaining(); // payload + + ByteBuf buf = ALLOCATOR.heapBuffer(totalLength); + buf.writeByte(MAGIC_V0); // magic + buf.writeInt(metaBuf.remaining()); // meta length + buf.writeBytes(metaBuf); // RecordBatchMeta + buf.writeInt(recordBatch.rawPayload().remaining()); // payload length + buf.writeBytes(recordBatch.rawPayload()); // payload + + META_BUF.get().clear(); + return buf; + } + + /** + * Decode storage format record to RecordBatchWithContext list. + * + * @param storageFormatBytes storage format bytes. + * @return RecordBatchWithContext list. + */ + public static List decode(ByteBuffer storageFormatBytes) { + ByteBuf buf = Unpooled.wrappedBuffer(storageFormatBytes); + List recordBatchList = new LinkedList<>(); + while (buf.isReadable()) { + buf.readByte(); // magic + int metaLength = buf.readInt(); + ByteBuf metaBuf = buf.slice(buf.readerIndex(), metaLength); + RecordBatchMetaT recordBatchMetaT = RecordBatchMeta.getRootAsRecordBatchMeta(metaBuf.nioBuffer()).unpack(); + buf.skipBytes(metaLength); + int payloadLength = buf.readInt(); + ByteBuf payloadBuf = buf.slice(buf.readerIndex(), payloadLength); + buf.skipBytes(payloadLength); + recordBatchList.add(new FlatRecordBatchWithContext(recordBatchMetaT, payloadBuf.nioBuffer())); + } + return recordBatchList; + } +} diff --git a/core/src/main/scala/kafka/log/s3/S3Wal.java b/core/src/main/scala/kafka/log/s3/S3Wal.java new file mode 100644 index 0000000000..1a3b207720 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/S3Wal.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class S3Wal implements Wal { + private static final Logger LOGGER = LoggerFactory.getLogger(S3Wal.class); + private final int batchIntervalMs = 200; + private final BlockingQueue writeBuffer; + private final WalBatchWriteTask walBatchWriteTask; + private final ObjectManager objectManager; + private final S3Operator s3Operator; + + + public S3Wal(ObjectManager objectManager, S3Operator s3Operator) { + writeBuffer = new ArrayBlockingQueue<>(16384); + walBatchWriteTask = new WalBatchWriteTask(objectManager, s3Operator); + this.objectManager = objectManager; + this.s3Operator = s3Operator; + } + + @Override + public void close() { + walBatchWriteTask.close(); + } + + @Override + public CompletableFuture append(StreamRecordBatch streamRecord) { + CompletableFuture cf = new CompletableFuture<>(); + //TODO: copy to pooled bytebuffer to reduce gc + try { + writeBuffer.put(new WalWriteRequest(streamRecord, cf)); + } catch (InterruptedException e) { + cf.completeExceptionally(e); + } + return cf; + } + + class WalBatchWriteTask implements Runnable { + private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("wal-batch-write", true)); + private final Queue writeTasks = new ConcurrentLinkedQueue<>(); + private final ObjectManager objectManager; + private final S3Operator s3Operator; + + public WalBatchWriteTask(ObjectManager objectManager, S3Operator s3Operator) { + this.objectManager = objectManager; + this.s3Operator = s3Operator; + schedule.scheduleAtFixedRate(this, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS); + } + + public void close() { + schedule.shutdown(); + run(); + } + + @Override + public void run() { + try { + run0(); + } catch (Throwable e) { + LOGGER.error("Error in wal batch write task", e); + } + } + + void run0() { + List requests = new ArrayList<>(writeBuffer.size()); + writeBuffer.drainTo(requests); + if (requests.isEmpty()) { + return; + } + SingleWalObjectWriteTask singleWalObjectWriteTask = new SingleWalObjectWriteTask(requests, objectManager, s3Operator); + writeTasks.offer(singleWalObjectWriteTask); + runWriteTask(singleWalObjectWriteTask); + } + + void runWriteTask(SingleWalObjectWriteTask task) { + task.upload().thenAccept(nil -> schedule.execute(this::tryComplete)) + .exceptionally(ex -> { + LOGGER.warn("Write wal object fail, retry later", ex); + schedule.schedule(() -> runWriteTask(task), batchIntervalMs, TimeUnit.MILLISECONDS); + return null; + }); + } + + void tryComplete() { + while (true) { + SingleWalObjectWriteTask task = writeTasks.peek(); + if (task == null) { + return; + } + if (task.isDone()) { + writeTasks.poll(); + task.ack(); + } else { + return; + } + } + } + } +} diff --git a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java new file mode 100644 index 0000000000..39223acb48 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import kafka.log.s3.exception.StreamFencedException; +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.CommitWalObjectRequest; +import kafka.log.s3.objects.CommitWalObjectResponse; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.operator.S3Operator; +import kafka.log.s3.utils.ObjectUtils; +import org.apache.kafka.common.compress.ZstdFactory; +import org.apache.kafka.common.utils.ByteBufferOutputStream; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class SingleWalObjectWriteTask { + private final List requests; + private final ObjectManager objectManager; + private final S3Operator s3Operator; + private final List streams; + private ByteBuf objectBuf; + private CommitWalObjectResponse response; + private volatile boolean isDone = false; + + public SingleWalObjectWriteTask(List records, ObjectManager objectManager, S3Operator s3Operator) { + Collections.sort(records); + this.requests = records; + this.streams = new LinkedList<>(); + this.objectManager = objectManager; + this.s3Operator = s3Operator; + parse(); + } + + public CompletableFuture upload() { + if (isDone) { + return CompletableFuture.completedFuture(null); + } + UploadContext context = new UploadContext(); + CompletableFuture objectIdCf = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30)); + CompletableFuture writeCf = objectIdCf.thenCompose(objectId -> { + context.objectId = objectId; + // TODO: fill cluster name + return s3Operator.write(ObjectUtils.genKey(0, "todocluster", objectId), objectBuf.duplicate()); + }); + return writeCf + .thenCompose(nil -> { + CommitWalObjectRequest request = new CommitWalObjectRequest(); + request.setObjectId(context.objectId); + request.setObjectSize(objectBuf.readableBytes()); + request.setStreams(streams); + return objectManager.commitWalObject(request); + }) + .thenApply(resp -> { + isDone = true; + response = resp; + objectBuf.release(); + return null; + }); + } + + public void parse() { + int totalSize = requests.stream().mapToInt(r -> r.record.getRecordBatch().rawPayload().remaining()).sum(); + ByteBufferOutputStream compressed = new ByteBufferOutputStream(totalSize); + OutputStream out = ZstdFactory.wrapForOutput(compressed); + long streamId = -1; + long streamStartOffset = -1; + long streamEpoch = -1; + long streamEndOffset = -1; + for (WalWriteRequest request : requests) { + StreamRecordBatch record = request.record; + long currentStreamId = record.getStreamId(); + if (streamId != currentStreamId) { + if (streamId != -1) { + streams.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset)); + } + streamId = currentStreamId; + streamEpoch = record.getEpoch(); + streamStartOffset = record.getBaseOffset(); + } + streamEndOffset = record.getBaseOffset() + record.getRecordBatch().count(); + ByteBuf recordBuf = RecordBatchCodec.encode(record.getStreamId(), record.getBaseOffset(), record.getRecordBatch()); + try { + out.write(recordBuf.array(), recordBuf.readerIndex(), recordBuf.readableBytes()); + } catch (IOException e) { + // won't happen + throw new RuntimeException(e); + } + recordBuf.release(); + } + // add last stream range + if (streamId != -1) { + streams.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset)); + } + objectBuf = Unpooled.wrappedBuffer(compressed.buffer().flip()); + } + + public boolean isDone() { + return isDone; + } + + public void ack() { + Set failedStreamId = new HashSet<>(response.getFailedStreamIds()); + for (WalWriteRequest request : requests) { + long streamId = request.record.getStreamId(); + if (failedStreamId.contains(streamId)) { + request.cf.completeExceptionally(new StreamFencedException()); + } else { + request.cf.complete(null); + } + } + } + + static class UploadContext { + long objectId; + } + +} diff --git a/core/src/main/scala/kafka/log/s3/Wal.java b/core/src/main/scala/kafka/log/s3/Wal.java index 6050cd985e..1ae1526bbe 100644 --- a/core/src/main/scala/kafka/log/s3/Wal.java +++ b/core/src/main/scala/kafka/log/s3/Wal.java @@ -33,4 +33,5 @@ public interface Wal { */ CompletableFuture append(StreamRecordBatch streamRecord); + void close(); } diff --git a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java new file mode 100644 index 0000000000..c6c8628249 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.model.StreamRecordBatch; + +import java.util.concurrent.CompletableFuture; + +public class WalWriteRequest implements Comparable { + final StreamRecordBatch record; + final CompletableFuture cf; + + public WalWriteRequest(StreamRecordBatch record, CompletableFuture cf) { + this.record = record; + this.cf = cf; + } + + @Override + public int compareTo(WalWriteRequest o) { + return record.compareTo(o.record); + } +} diff --git a/core/src/main/scala/kafka/log/s3/exception/StreamFencedException.java b/core/src/main/scala/kafka/log/s3/exception/StreamFencedException.java new file mode 100644 index 0000000000..5a447924f2 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/exception/StreamFencedException.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.exception; + +public class StreamFencedException extends Exception { +} diff --git a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java index 52acd4eee6..28cedb13da 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java +++ b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java @@ -19,16 +19,16 @@ import com.automq.elasticstream.client.api.RecordBatch; -public class StreamRecordBatch { +public class StreamRecordBatch implements Comparable { private final long streamId; private final long epoch; - private final long startOffset; + private final long baseOffset; private final RecordBatch recordBatch; - public StreamRecordBatch(long streamId, long epoch, long startOffset, RecordBatch recordBatch) { + public StreamRecordBatch(long streamId, long epoch, long baseOffset, RecordBatch recordBatch) { this.streamId = streamId; this.epoch = epoch; - this.startOffset = startOffset; + this.baseOffset = baseOffset; this.recordBatch = recordBatch; } @@ -40,11 +40,20 @@ public long getEpoch() { return epoch; } - public long getStartOffset() { - return startOffset; + public long getBaseOffset() { + return baseOffset; } public RecordBatch getRecordBatch() { return recordBatch; } + + @Override + public int compareTo(StreamRecordBatch o) { + int rst = Long.compare(streamId, o.streamId); + if (rst != 0) { + return rst; + } + return Long.compare(baseOffset, o.baseOffset); + } } diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java index c8985b7bb0..27c2a5f07d 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java @@ -17,6 +17,7 @@ package kafka.log.s3.objects; +import java.util.Collections; import java.util.List; /** @@ -27,6 +28,9 @@ public class CommitWalObjectResponse { private List failedStreamIds; public List getFailedStreamIds() { + if (failedStreamIds == null) { + return Collections.emptyList(); + } return failedStreamIds; } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java index e40d7808f8..38d807809d 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java @@ -18,39 +18,30 @@ package kafka.log.s3.objects; public class ObjectStreamRange { - private long streamId; - private long epoch; - private long startOffset; - private long endOffset; + private final long streamId; + private final long epoch; + private final long startOffset; + private final long endOffset; - public long getStreamId() { - return streamId; + public ObjectStreamRange(long streamId, long epoch, long startOffset, long endOffset) { + this.streamId = streamId; + this.epoch = epoch; + this.startOffset = startOffset; + this.endOffset = endOffset; } - public void setStreamId(long streamId) { - this.streamId = streamId; + public long getStreamId() { + return streamId; } public long getEpoch() { return epoch; } - - public void setEpoch(long epoch) { - this.epoch = epoch; - } public long getStartOffset() { return startOffset; } - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; - } - public long getEndOffset() { return endOffset; } - - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; - } } diff --git a/core/src/main/scala/kafka/log/s3/utils/ObjectUtils.java b/core/src/main/scala/kafka/log/s3/utils/ObjectUtils.java new file mode 100644 index 0000000000..4eaef394ce --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/utils/ObjectUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.utils; + +public class ObjectUtils { + + public static String genKey(int version, String clusterName, long objectId) { + if (version == 0) { + String objectIdHex = String.format("%08x", objectId); + String hashPrefix = new StringBuilder(objectIdHex).reverse().toString(); + return hashPrefix + "/" + clusterName + "/" + objectId; + } else { + throw new UnsupportedOperationException("Unsupported version: " + version); + } + } + +} diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3WalTest.java new file mode 100644 index 0000000000..50ff1e05b6 --- /dev/null +++ b/core/src/test/java/kafka/log/s3/S3WalTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.CommitWalObjectRequest; +import kafka.log.s3.objects.CommitWalObjectResponse; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.ObjectStreamRange; +import kafka.log.s3.operator.S3Operator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class S3WalTest { + ObjectManager objectManager; + S3Operator s3Operator; + S3Wal s3Wal; + + @BeforeEach + public void setup() { + objectManager = mock(ObjectManager.class); + s3Operator = mock(S3Operator.class); + s3Wal = new S3Wal(objectManager, s3Operator); + } + + @Test + public void testAppend() throws Exception { + when(objectManager.prepareObject(eq(1), anyLong())).thenReturn(CompletableFuture.completedFuture(16L)); + CommitWalObjectResponse resp = new CommitWalObjectResponse(); + when(objectManager.commitWalObject(any())).thenReturn(CompletableFuture.completedFuture(resp)); + when(s3Operator.write(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + + CompletableFuture cf1 = s3Wal.append(new StreamRecordBatch(233, 1, 10, DefaultRecordBatch.of(1, 100))); + CompletableFuture cf2 = s3Wal.append(new StreamRecordBatch(233, 1, 11, DefaultRecordBatch.of(2, 100))); + CompletableFuture cf3 = s3Wal.append(new StreamRecordBatch(234, 3, 100, DefaultRecordBatch.of(1, 100))); + + cf1.get(3, TimeUnit.SECONDS); + cf2.get(3, TimeUnit.SECONDS); + cf3.get(3, TimeUnit.SECONDS); + + ArgumentCaptor commitArg = ArgumentCaptor.forClass(CommitWalObjectRequest.class); + verify(objectManager).commitWalObject(commitArg.capture()); + CommitWalObjectRequest commitReq = commitArg.getValue(); + assertEquals(16L, commitReq.getObjectId()); + List streams = commitReq.getStreams(); + assertEquals(2, streams.size()); + assertEquals(233, streams.get(0).getStreamId()); + assertEquals(10, streams.get(0).getStartOffset()); + assertEquals(13, streams.get(0).getEndOffset()); + assertEquals(234, streams.get(1).getStreamId()); + assertEquals(100, streams.get(1).getStartOffset()); + assertEquals(101, streams.get(1).getEndOffset()); + } + + @Test + public void testAppend_outOfOrder() { + // TODO: test out of order write task complete. + } +}