Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<allow pkg="redis.clients.jedis" />
<allow pkg="kafka.server" />
<allow pkg="kafka.log.s3" />
<allow pkg="com.google.flatbuffers" />
</subpackage>

<subpackage name="testkit">
Expand Down
124 changes: 124 additions & 0 deletions core/src/main/scala/kafka/log/s3/RecordBatchCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import com.automq.elasticstream.client.FlatRecordBatchWithContext;
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.flatc.records.KeyValue;
import com.automq.elasticstream.client.flatc.records.RecordBatchMeta;
import com.automq.elasticstream.client.flatc.records.RecordBatchMetaT;
import com.google.flatbuffers.FlatBufferBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class RecordBatchCodec {
private static final byte MAGIC_V0 = 0x22;
private static final ThreadLocal<ByteBuffer> META_BUF = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4096));
private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT;

/**
* Encode RecordBatch to storage format record.
*
* @param recordBatch {@link RecordBatch}
* @return storage format record bytes.
*/
public static ByteBuf encode(long streamId, long baseOffset, RecordBatch recordBatch) {

int totalLength = 0;

totalLength += 1; // magic

FlatBufferBuilder metaBuilder = new FlatBufferBuilder(META_BUF.get());

Integer propsVector = null;
int propsSize = recordBatch.properties().size();
if (propsSize > 0) {
int[] kvs = new int[propsSize];
int index = 0;
for (Map.Entry<String, String> kv : recordBatch.properties().entrySet()) {
int k = metaBuilder.createString(kv.getKey());
int v = metaBuilder.createString(kv.getValue());
int kvPtr = KeyValue.createKeyValue(metaBuilder, k, v);
kvs[index++] = kvPtr;
}
propsVector = RecordBatchMeta.createPropertiesVector(metaBuilder, kvs);
}

// encode RecordBatchMeta
RecordBatchMeta.startRecordBatchMeta(metaBuilder);
RecordBatchMeta.addBaseOffset(metaBuilder, baseOffset);
RecordBatchMeta.addStreamId(metaBuilder, streamId);
RecordBatchMeta.addLastOffsetDelta(metaBuilder, recordBatch.count());
RecordBatchMeta.addBaseTimestamp(metaBuilder, recordBatch.baseTimestamp());
if (null != propsVector) {
RecordBatchMeta.addProperties(metaBuilder, propsVector);
}
int ptr = RecordBatchMeta.endRecordBatchMeta(metaBuilder);
metaBuilder.finish(ptr);

// The data in this ByteBuffer does NOT start at 0, but at buf.position().
// The number of bytes is buf.remaining().
ByteBuffer metaBuf = metaBuilder.dataBuffer();

totalLength += 4; // meta length
totalLength += metaBuf.remaining(); // RecordBatchMeta

totalLength += 4; // payload length
totalLength += recordBatch.rawPayload().remaining(); // payload

ByteBuf buf = ALLOCATOR.heapBuffer(totalLength);
buf.writeByte(MAGIC_V0); // magic
buf.writeInt(metaBuf.remaining()); // meta length
buf.writeBytes(metaBuf); // RecordBatchMeta
buf.writeInt(recordBatch.rawPayload().remaining()); // payload length
buf.writeBytes(recordBatch.rawPayload()); // payload

META_BUF.get().clear();
return buf;
}

/**
* Decode storage format record to RecordBatchWithContext list.
*
* @param storageFormatBytes storage format bytes.
* @return RecordBatchWithContext list.
*/
public static List<RecordBatchWithContext> decode(ByteBuffer storageFormatBytes) {
ByteBuf buf = Unpooled.wrappedBuffer(storageFormatBytes);
List<RecordBatchWithContext> recordBatchList = new LinkedList<>();
while (buf.isReadable()) {
buf.readByte(); // magic
int metaLength = buf.readInt();
ByteBuf metaBuf = buf.slice(buf.readerIndex(), metaLength);
RecordBatchMetaT recordBatchMetaT = RecordBatchMeta.getRootAsRecordBatchMeta(metaBuf.nioBuffer()).unpack();
buf.skipBytes(metaLength);
int payloadLength = buf.readInt();
ByteBuf payloadBuf = buf.slice(buf.readerIndex(), payloadLength);
buf.skipBytes(payloadLength);
recordBatchList.add(new FlatRecordBatchWithContext(recordBatchMetaT, payloadBuf.nioBuffer()));
}
return recordBatchList;
}
}
133 changes: 133 additions & 0 deletions core/src/main/scala/kafka/log/s3/S3Wal.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class S3Wal implements Wal {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Wal.class);
private final int batchIntervalMs = 200;
private final BlockingQueue<WalWriteRequest> writeBuffer;
private final WalBatchWriteTask walBatchWriteTask;
private final ObjectManager objectManager;
private final S3Operator s3Operator;


public S3Wal(ObjectManager objectManager, S3Operator s3Operator) {
writeBuffer = new ArrayBlockingQueue<>(16384);
walBatchWriteTask = new WalBatchWriteTask(objectManager, s3Operator);
this.objectManager = objectManager;
this.s3Operator = s3Operator;
}

@Override
public void close() {
walBatchWriteTask.close();
}

@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
CompletableFuture<Void> cf = new CompletableFuture<>();
//TODO: copy to pooled bytebuffer to reduce gc
try {
writeBuffer.put(new WalWriteRequest(streamRecord, cf));
} catch (InterruptedException e) {
cf.completeExceptionally(e);
}
return cf;
}

class WalBatchWriteTask implements Runnable {
private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("wal-batch-write", true));
private final Queue<SingleWalObjectWriteTask> writeTasks = new ConcurrentLinkedQueue<>();
private final ObjectManager objectManager;
private final S3Operator s3Operator;

public WalBatchWriteTask(ObjectManager objectManager, S3Operator s3Operator) {
this.objectManager = objectManager;
this.s3Operator = s3Operator;
schedule.scheduleAtFixedRate(this, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS);
}

public void close() {
schedule.shutdown();
run();
}

@Override
public void run() {
try {
run0();
} catch (Throwable e) {
LOGGER.error("Error in wal batch write task", e);
}
}

void run0() {
List<WalWriteRequest> requests = new ArrayList<>(writeBuffer.size());
writeBuffer.drainTo(requests);
if (requests.isEmpty()) {
return;
}
SingleWalObjectWriteTask singleWalObjectWriteTask = new SingleWalObjectWriteTask(requests, objectManager, s3Operator);
writeTasks.offer(singleWalObjectWriteTask);
runWriteTask(singleWalObjectWriteTask);
}

void runWriteTask(SingleWalObjectWriteTask task) {
task.upload().thenAccept(nil -> schedule.execute(this::tryComplete))
.exceptionally(ex -> {
LOGGER.warn("Write wal object fail, retry later", ex);
schedule.schedule(() -> runWriteTask(task), batchIntervalMs, TimeUnit.MILLISECONDS);
return null;
});
}

void tryComplete() {
while (true) {
SingleWalObjectWriteTask task = writeTasks.peek();
if (task == null) {
return;
}
if (task.isDone()) {
writeTasks.poll();
task.ack();
} else {
return;
}
}
}
}
}
Loading