Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/log/es/FutureUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.Logger;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

public class FutureUtil {
public static <T> CompletableFuture<T> failedFuture(Throwable ex) {
Expand Down Expand Up @@ -48,4 +49,13 @@ public static <T> void propagate(CompletableFuture<T> source, CompletableFuture<
}
});
}

public static <T> CompletableFuture<T> exec(Supplier<CompletableFuture<T>> run, Logger logger, String name) {
try {
return run.get();
} catch (Throwable ex) {
logger.error("{} run with unexpected exception", name, ex);
return failedFuture(ex);
}
}
}
15 changes: 9 additions & 6 deletions core/src/main/scala/kafka/log/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import kafka.log.es.MemoryClient.KVClientImpl;
import kafka.log.s3.cache.DefaultS3BlockCache;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.memory.MemoryMetadataManager;
import kafka.log.s3.network.ControllerRequestSender;
import kafka.log.s3.objects.ControllerObjectManager;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.streams.ControllerStreamManager;
import kafka.log.s3.streams.StreamManager;
import kafka.log.s3.wal.MemoryWriteAheadLog;
import kafka.server.BrokerServer;
Expand All @@ -37,7 +36,7 @@
public class DefaultS3Client implements Client {

private final KafkaConfig config;
private final StreamMetadataManager metadataManager;
// private final StreamMetadataManager metadataManager;

private final BrokerToControllerChannelManager channelManager;

Expand All @@ -60,11 +59,15 @@ public class DefaultS3Client implements Client {
public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator operator) {
this.config = config;
this.channelManager = brokerServer.clientToControllerChannelManager();
this.metadataManager = new StreamMetadataManager(brokerServer, config);
// this.metadataManager = new StreamMetadataManager(brokerServer, config);
this.operator = operator;
this.requestSender = new ControllerRequestSender(channelManager);
this.streamManager = new ControllerStreamManager(this.requestSender, config);
this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config);
// this.streamManager = new ControllerStreamManager(this.requestSender, config);
// this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config);
MemoryMetadataManager memoryMetadataManager = new MemoryMetadataManager();
memoryMetadataManager.start();
this.streamManager = memoryMetadataManager;
this.objectManager = memoryMetadataManager;
this.blockCache = new DefaultS3BlockCache(objectManager, operator);
this.storage = new S3Storage(new MemoryWriteAheadLog(), objectManager, blockCache, operator);
this.streamClient = new S3StreamClient(this.streamManager, this.storage);
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@

import io.netty.buffer.ByteBuf;
import kafka.log.s3.model.StreamRecordBatch;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class ObjectReader {
Expand Down Expand Up @@ -97,7 +97,7 @@ public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws In
throw new IndexBlockParseException(indexBlockPosition);
} else {
int indexRelativePosition = objectTailBuf.readableBytes() - (int) (objectSize - indexBlockPosition);
ByteBuf indexBlockBuf = objectTailBuf.slice(indexRelativePosition, indexBlockSize);
ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize);
int blockCount = indexBlockBuf.readInt();
ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16);
indexBlockBuf.skipBytes(blockCount * 16);
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ public void write(FlatStreamRecordBatch record) {
public void closeCurrentBlock() {
if (dataBlock != null) {
dataBlock.close();
waitingUploadBlocks.add(dataBlock);
nextDataBlockPosition += dataBlock.size();
dataBlock = null;
tryUploadPart();
}
}

Expand All @@ -98,11 +101,11 @@ public CompletableFuture<Void> close() {
dataBlock.close();
nextDataBlockPosition += dataBlock.size();
waitingUploadBlocks.add(dataBlock);
completedBlocks.add(dataBlock);
dataBlock = null;
}
for (DataBlock block : waitingUploadBlocks) {
buf.addComponent(true, block.buffer());
completedBlocks.add(block);
}
waitingUploadBlocks.clear();
indexBlock = new IndexBlock();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class S3Storage implements Storage {
private final WriteAheadLog log;
private final LogCache logCache;
private final AtomicLong logConfirmOffset = new AtomicLong();
private final AtomicLong processedLogConfirmOffset = new AtomicLong();
private final AtomicLong processedLogConfirmOffset = new AtomicLong(-1L);
private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-main", false));
private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor(
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static kafka.log.es.FutureUtil.exec;

public class S3Stream implements Stream {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Stream.class);
private final String logIdent;
Expand Down Expand Up @@ -80,6 +82,10 @@ public long nextOffset() {

@Override
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
return exec(() -> append0(recordBatch), LOGGER, "append");
}

private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
if (!status.isWritable()) {
return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable"));
}
Expand All @@ -105,6 +111,10 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
return exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch");
}

private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset, int maxBytes) {
if (status.isClosed()) {
return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed"));
}
Expand All @@ -122,8 +132,13 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
});
}


@Override
public CompletableFuture<Void> trim(long newStartOffset) {
return exec(() -> trim0(newStartOffset), LOGGER, "trim");
}

private CompletableFuture<Void> trim0(long newStartOffset) {
if (newStartOffset < this.startOffset) {
throw new IllegalArgumentException("newStartOffset[" + newStartOffset + "] cannot be less than current start offset["
+ this.startOffset + "]");
Expand All @@ -132,14 +147,23 @@ public CompletableFuture<Void> trim(long newStartOffset) {
return streamManager.trimStream(streamId, epoch, newStartOffset);
}


@Override
public CompletableFuture<Void> close() {
return exec(this::close0, LOGGER, "close");
}

private CompletableFuture<Void> close0() {
status.markClosed();
return storage.forceUpload(streamId).thenCompose(nil -> streamManager.closeStream(streamId, epoch));
}

@Override
public CompletableFuture<Void> destroy() {
return exec(this::destroy0, LOGGER, "destroy");
}

private CompletableFuture<Void> destroy0() {
status.markDestroy();
startOffset = this.confirmOffset.get();
return streamManager.deleteStream(streamId, epoch);
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/scala/kafka/log/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.api.StreamClient;
import kafka.log.s3.streams.StreamManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

import static kafka.log.es.FutureUtil.exec;

public class S3StreamClient implements StreamClient {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class);

private final StreamManager streamManager;
private final Storage storage;
Expand All @@ -37,19 +42,20 @@ public S3StreamClient(StreamManager streamManager, Storage storage) {

@Override
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
return streamManager.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch()));
return exec(() -> streamManager.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch())),
LOGGER, "createAndOpenStream");
}

@Override
public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions openStreamOptions) {
return openStream0(streamId, openStreamOptions.epoch());
return exec(() -> openStream0(streamId, openStreamOptions.epoch()), LOGGER, "openStream");
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> new S3Stream(
metadata.getStreamId(), metadata.getEpoch(),
metadata.getStartOffset(), metadata.getNextOffset(),
storage, streamManager));
thenApply(metadata -> new S3Stream(
metadata.getStreamId(), metadata.getEpoch(),
metadata.getStartOffset(), metadata.getNextOffset(),
storage, streamManager));
}
}
34 changes: 18 additions & 16 deletions core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class WALObjectUploadTask {
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final CompletableFuture<Long> prepareCf = new CompletableFuture<>();
private volatile CommitWALObjectRequest commitWALObjectRequest;
private final CompletableFuture<CommitWALObjectRequest> uploadCf = new CompletableFuture<>();

public WALObjectUploadTask(Map<Long, List<FlatStreamRecordBatch>> streamRecordsMap, int streamSplitSizeThreshold, ObjectManager objectManager, S3Operator s3Operator) {
Expand All @@ -62,9 +63,9 @@ public CompletableFuture<CommitWALObjectRequest> upload() {
prepareCf.thenAccept(objectId -> {
List<Long> streamIds = new ArrayList<>(streamRecordsMap.keySet());
Collections.sort(streamIds);
CommitWALObjectRequest compactRequest = new CommitWALObjectRequest();
CommitWALObjectRequest request = new CommitWALObjectRequest();

ObjectWriter minorCompactObject = new ObjectWriter(objectId, s3Operator);
ObjectWriter walObject = new ObjectWriter(objectId, s3Operator);

List<CompletableFuture<StreamObject>> streamObjectCfList = new LinkedList<>();

Expand All @@ -75,29 +76,27 @@ public CompletableFuture<CommitWALObjectRequest> upload() {
streamObjectCfList.add(writeStreamObject(streamRecords));
} else {
for (FlatStreamRecordBatch record : streamRecords) {
minorCompactObject.write(record);
walObject.write(record);
}
long startOffset = streamRecords.get(0).baseOffset;
long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset();
compactRequest.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset));
// minor compact object block only contain single stream's data.
minorCompactObject.closeCurrentBlock();
request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset));
// log object block only contain single stream's data.
walObject.closeCurrentBlock();
}
}
compactRequest.setObjectId(objectId);
compactRequest.setOrderId(objectId);
CompletableFuture<Void> minorCompactObjectCf = minorCompactObject.close().thenAccept(nil -> {
compactRequest.setObjectSize(minorCompactObject.size());
});
compactRequest.setObjectSize(minorCompactObject.size());
request.setObjectId(objectId);
request.setOrderId(objectId);
CompletableFuture<Void> walObjectCf = walObject.close().thenAccept(nil -> request.setObjectSize(walObject.size()));
for (CompletableFuture<StreamObject> streamObjectCf : streamObjectCfList) {
streamObjectCf.thenAccept(compactRequest::addStreamObject);
streamObjectCf.thenAccept(request::addStreamObject);
}
List<CompletableFuture<?>> allCf = new LinkedList<>(streamObjectCfList);
allCf.add(minorCompactObjectCf);
allCf.add(walObjectCf);

CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> {
uploadCf.complete(compactRequest);
commitWALObjectRequest = request;
uploadCf.complete(request);
}).exceptionally(ex -> {
uploadCf.completeExceptionally(ex);
return null;
Expand All @@ -107,7 +106,10 @@ public CompletableFuture<CommitWALObjectRequest> upload() {
}

public CompletableFuture<Void> commit() {
return uploadCf.thenCompose(request -> objectManager.commitWALObject(request).thenApply(resp -> null));
return uploadCf.thenCompose(request -> objectManager.commitWALObject(request).thenApply(resp -> {
LOGGER.debug("Commit WAL object {}", commitWALObjectRequest);
return null;
}));
}

private CompletableFuture<StreamObject> writeStreamObject(List<FlatStreamRecordBatch> streamRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,15 @@ public long getOrderId() {
public void setOrderId(long orderId) {
this.orderId = orderId;
}

@Override
public String toString() {
return "CommitWALObjectRequest{" +
"objectId=" + objectId +
", objectSize=" + objectSize +
", streamRanges=" + streamRanges +
", streamObjects=" + streamObjects +
", orderId=" + orderId +
'}';
}
}
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,14 @@ public void setStartOffset(long startOffset) {
public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

@Override
public String toString() {
return "ObjectStreamRange{" +
"streamId=" + streamId +
", epoch=" + epoch +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
'}';
}
}
14 changes: 14 additions & 0 deletions core/src/main/scala/kafka/log/s3/objects/StreamObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.log.s3.objects;

import java.util.Arrays;

public class StreamObject {
private long objectId;
private long objectSize;
Expand Down Expand Up @@ -76,4 +78,16 @@ public long[] getSourceObjectIds() {
public void setSourceObjectIds(long[] sourceObjectIds) {
this.sourceObjectIds = sourceObjectIds;
}

@Override
public String toString() {
return "StreamObject{" +
"objectId=" + objectId +
", objectSize=" + objectSize +
", streamId=" + streamId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", sourceObjectIds=" + Arrays.toString(sourceObjectIds) +
'}';
}
}
5 changes: 5 additions & 0 deletions core/src/test/java/kafka/log/s3/ObjectWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public void testWrite() throws ExecutionException, InterruptedException {
assertEquals(1, blockIndexes.get(0).blockId());
}

@Test
public void testWrite_closeBlock() {

}

StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) {
RecordBatch recordBatch = DefaultRecordBatch.of(count, payloadSize);
return new StreamRecordBatch(streamId, 0, offset, recordBatch);
Expand Down
Loading