Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier,
cache.put(record.rawPayload())
}
})
rst.free()
} else {
return timestampOffset
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class ElasticTransactionIndex(@volatile private var _file: File, streamSliceSupp
return queue.dequeue()
}
try {
val records = stream.fetch(position, stream.nextOffset(), AbortedTxn.TotalSize * 128).recordBatchList()
val rst = stream.fetch(position, stream.nextOffset(), AbortedTxn.TotalSize * 128)
val records = rst.recordBatchList()
records.forEach(recordBatch => {
val readBuf = Unpooled.wrappedBuffer(recordBatch.rawPayload())
val size = readBuf.readableBytes()
Expand All @@ -110,6 +111,7 @@ class ElasticTransactionIndex(@volatile private var _file: File, streamSliceSupp
}
position += size
})
rst.free()
queue.dequeue()
} catch {
case e: IOException =>
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/log/es/MetaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -177,7 +179,7 @@ public Map<String, Object> replay() throws IOException {
FetchResult fetchRst = fetch(pos, endOffset, 64 * 1024).get();
for (RecordBatchWithContext context : fetchRst.recordBatchList()) {
try {
MetaKeyValue kv = MetaKeyValue.decode(context.rawPayload());
MetaKeyValue kv = MetaKeyValue.decode(Unpooled.copiedBuffer(context.rawPayload()).nioBuffer());
metaCache.put(kv.getKey(), Pair.of(context.baseOffset(), kv.getValue()));
} catch (Exception e) {
LOGGER.error("{} streamId {}: decode meta failed, offset: {}, error: {}", logIdent, streamId(), context.baseOffset(), e.getMessage());
Expand All @@ -187,6 +189,7 @@ public Map<String, Object> replay() throws IOException {
if (pos >= endOffset) {
done = true;
}
fetchRst.free();
}
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/kafka/log/s3/ByteBufAlloc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import io.netty.buffer.PooledByteBufAllocator;

public class ByteBufAlloc {

public static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;

}
69 changes: 0 additions & 69 deletions core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java

This file was deleted.

22 changes: 11 additions & 11 deletions core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.operator.Writer;
import org.apache.kafka.metadata.stream.ObjectUtils;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.metadata.stream.ObjectUtils;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -58,7 +59,7 @@ public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold
writer = s3Operator.writer(objectKey);
}

public void write(FlatStreamRecordBatch record) {
public void write(StreamRecordBatch record) {
if (dataBlock == null) {
dataBlock = new DataBlock(nextDataBlockPosition);
}
Expand Down Expand Up @@ -159,7 +160,7 @@ public DataBlock(long position) {
streamRanges = new LinkedList<>();
}

public boolean write(FlatStreamRecordBatch record) {
public boolean write(StreamRecordBatch record) {
try {
recordCount++;
return write0(record);
Expand All @@ -169,19 +170,18 @@ public boolean write(FlatStreamRecordBatch record) {
}
}

public boolean write0(FlatStreamRecordBatch record) throws IOException {
if (streamRange == null || streamRange.getStreamId() != record.streamId) {
public boolean write0(StreamRecordBatch record) throws IOException {
if (streamRange == null || streamRange.getStreamId() != record.getStreamId()) {
streamRange = new ObjectStreamRange();
streamRange.setStreamId(record.streamId);
streamRange.setEpoch(record.epoch);
streamRange.setStartOffset(record.baseOffset);
streamRange.setStreamId(record.getStreamId());
streamRange.setEpoch(record.getEpoch());
streamRange.setStartOffset(record.getBaseOffset());
streamRanges.add(streamRange);
}
streamRange.setEndOffset(record.lastOffset());
streamRange.setEndOffset(record.getLastOffset());

ByteBuf recordBuf = record.encodedBuf();
ByteBuf recordBuf = record.encoded();
out.write(recordBuf.array(), recordBuf.arrayOffset(), recordBuf.readableBytes());
recordBuf.release();
blockSize += recordBuf.readableBytes();
if (blockSize >= blockSizeThreshold) {
close();
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,9 @@ public void close() {

@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
//TODO: copy to pooled bytebuffer to reduce gc, convert to flat record
FlatStreamRecordBatch flatStreamRecordBatch = FlatStreamRecordBatch.from(streamRecord);
WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf());
WriteAheadLog.AppendResult appendResult = log.append(streamRecord.encoded());
CompletableFuture<Void> cf = new CompletableFuture<>();
WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.offset, cf);
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, appendResult.offset, cf);
callbackSequencer.before(writeRequest);
appendResult.future.thenAccept(nil -> handleAppendCallback(writeRequest));
return cf;
Expand All @@ -97,9 +95,9 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
}

private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes) {
List<FlatStreamRecordBatch> records = logCache.get(streamId, startOffset, endOffset, maxBytes);
List<StreamRecordBatch> records = logCache.get(streamId, startOffset, endOffset, maxBytes);
if (!records.isEmpty()) {
return CompletableFuture.completedFuture(new ReadDataBlock(StreamRecordBatchCodec.decode(records)));
return CompletableFuture.completedFuture(new ReadDataBlock(records));
}
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> {
long nextStartOffset = readDataBlock.endOffset().orElse(startOffset);
Expand All @@ -108,7 +106,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
return readDataBlock;
}
List<StreamRecordBatch> finalRecords = new LinkedList<>(readDataBlock.getRecords());
finalRecords.addAll(StreamRecordBatchCodec.decode(logCache.get(streamId, nextStartOffset, endOffset, maxBytes)));
finalRecords.addAll(logCache.get(streamId, nextStartOffset, endOffset, maxBytes));
return new ReadDataBlock(finalRecords);
});
}
Expand Down Expand Up @@ -192,6 +190,8 @@ private void commitWALObject(WALObjectUploadTaskContext context) {
// 1. poll out current task
walObjectCommitQueue.poll();
log.trim(context.cache.confirmOffset());
// transfer records ownership to block cache.
blockCache.put(context.cache.records());
freeCache(context.cache.blockId());
context.cf.complete(null);

Expand Down Expand Up @@ -219,7 +219,7 @@ static class WALCallbackSequencer {
public void before(WalWriteRequest request) {
try {
walRequests.put(request);
Queue<WalWriteRequest> streamRequests = stream2requests.computeIfAbsent(request.record.streamId, s -> new LinkedBlockingQueue<>());
Queue<WalWriteRequest> streamRequests = stream2requests.computeIfAbsent(request.record.getStreamId(), s -> new LinkedBlockingQueue<>());
streamRequests.add(request);
} catch (InterruptedException ex) {
request.cf.completeExceptionally(ex);
Expand All @@ -244,7 +244,7 @@ public List<WalWriteRequest> after(WalWriteRequest request) {
}

// pop sequence success stream request.
long streamId = request.record.streamId;
long streamId = request.record.getStreamId();
Queue<WalWriteRequest> streamRequests = stream2requests.get(streamId);
WalWriteRequest peek = streamRequests.peek();
if (peek == null || peek.offset != request.offset) {
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.flatc.header.ErrorCode;
import io.netty.buffer.Unpooled;
import kafka.log.es.FutureUtil;
import kafka.log.es.RecordBatchWithContextWrapper;
import kafka.log.s3.model.StreamRecordBatch;
Expand Down Expand Up @@ -90,7 +91,7 @@ private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable"));
}
long offset = nextOffset.getAndAdd(recordBatch.count());
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch);
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));
CompletableFuture<AppendResult> cf = storage.append(streamRecordBatch).thenApply(nil -> {
updateConfirmOffset(offset + recordBatch.count());
return new DefaultAppendResult(offset);
Expand Down Expand Up @@ -128,7 +129,7 @@ private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset,
));
}
return storage.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> {
List<RecordBatchWithContext> records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList());
List<StreamRecordBatch> records = dataBlock.getRecords();
LOGGER.trace("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size());
return new DefaultFetchResult(records);
});
Expand Down Expand Up @@ -184,16 +185,24 @@ private void updateConfirmOffset(long newOffset) {
}
}


static class DefaultFetchResult implements FetchResult {
private final List<RecordBatchWithContext> records;
private final List<StreamRecordBatch> records;

public DefaultFetchResult(List<RecordBatchWithContext> records) {
public DefaultFetchResult(List<StreamRecordBatch> records) {
this.records = records;
}

@Override
public List<RecordBatchWithContext> recordBatchList() {
return records;
return records.stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList());
}

@Override
public void free() {
for (StreamRecordBatch record : records) {
record.release();
}
}
}

Expand Down
27 changes: 8 additions & 19 deletions core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@

package kafka.log.s3;

import com.automq.elasticstream.client.DefaultRecordBatch;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import kafka.log.s3.model.StreamRecordBatch;

import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class StreamRecordBatchCodec {
private static final byte MAGIC_V0 = 0x22;
private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT;
public static final byte MAGIC_V0 = 0x22;

public static ByteBuf encode(StreamRecordBatch streamRecord) {
int totalLength = 1 // magic
Expand All @@ -42,7 +36,7 @@ public static ByteBuf encode(StreamRecordBatch streamRecord) {
+ 4 // payload length
+ streamRecord.getRecordBatch().rawPayload().remaining(); // payload

ByteBuf buf = ALLOCATOR.heapBuffer(totalLength);
ByteBuf buf = ByteBufAlloc.ALLOC.heapBuffer(totalLength);
buf.writeByte(MAGIC_V0);
buf.writeLong(streamRecord.getStreamId());
buf.writeLong(streamRecord.getEpoch());
Expand All @@ -65,10 +59,10 @@ public static StreamRecordBatch decode(DataInputStream in) {
long baseOffset = in.readLong();
int lastOffsetDelta = in.readInt();
int payloadLength = in.readInt();
ByteBuffer payload = ByteBuffer.allocate(payloadLength);
in.readFully(payload.array());
DefaultRecordBatch defaultRecordBatch = new DefaultRecordBatch(lastOffsetDelta, 0, Collections.emptyMap(), payload);
return new StreamRecordBatch(streamId, epoch, baseOffset, defaultRecordBatch);
ByteBuf payload = ByteBufAlloc.ALLOC.heapBuffer(payloadLength);
in.readFully(payload.array(), payload.arrayOffset(), payloadLength);
payload.writerIndex(payload.readerIndex() + payloadLength);
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -81,13 +75,8 @@ public static StreamRecordBatch decode(ByteBuf buf) {
long baseOffset = buf.readLong();
int lastOffsetDelta = buf.readInt();
int payloadLength = buf.readInt();
ByteBuffer payload = buf.slice(buf.readerIndex(), payloadLength).nioBuffer();
ByteBuf payload = buf.slice(buf.readerIndex(), payloadLength);
buf.skipBytes(payloadLength);
DefaultRecordBatch defaultRecordBatch = new DefaultRecordBatch(lastOffsetDelta, 0, Collections.emptyMap(), payload);
return new StreamRecordBatch(streamId, epoch, baseOffset, defaultRecordBatch);
}

public static List<StreamRecordBatch> decode(List<FlatStreamRecordBatch> records) {
return records.stream().map(r -> decode(r.encodedBuf())).collect(Collectors.toList());
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
}
}
Loading