Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import kafka.log.es.FutureUtil;
import kafka.log.es.RecordBatchWithContextWrapper;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.streams.StreamManager;
import org.slf4j.Logger;
Expand All @@ -42,38 +41,39 @@

public class S3Stream implements Stream {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Stream.class);
private final StreamMetadata metadata;
private final String logIdent;
private final long streamId;
private final long epoch;
private long startOffset;
final AtomicLong confirmOffset;
private final AtomicLong nextOffset;
private final Wal wal;
private final S3BlockCache blockCache;
private final StreamManager streamManager;
private final Status status;

public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager) {
this.metadata = metadata;
this.logIdent = "[Stream id=" + metadata.getStreamId() + " epoch" + metadata.getEpoch() + "]";
this.streamId = metadata.getStreamId();
this.epoch = metadata.getEpoch();
this.nextOffset = new AtomicLong(metadata.getRanges().get(metadata.getRanges().size() - 1).getStartOffset());
this.confirmOffset = new AtomicLong(nextOffset.get());
public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Wal wal, S3BlockCache blockCache, StreamManager streamManager) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamMetadata may be more extensible when we need to add more info.

this.streamId = streamId;
this.epoch = epoch;
this.startOffset = startOffset;
this.logIdent = "[Stream id=" + streamId + " epoch" + epoch + "]";
this.nextOffset = new AtomicLong(nextOffset);
this.confirmOffset = new AtomicLong(nextOffset);
this.status = new Status();
this.wal = wal;
this.blockCache = blockCache;
this.streamManager = streamManager;
}


@Override
public long streamId() {
return metadata.getStreamId();
return this.streamId;
}

@Override
public long startOffset() {
return metadata.getStartOffset();
return this.startOffset;
}

@Override
Expand Down Expand Up @@ -112,11 +112,11 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed"));
}
long confirmOffset = this.confirmOffset.get();
if (startOffset < metadata.getStartOffset() || endOffset > confirmOffset) {
if (startOffset < startOffset() || endOffset > confirmOffset) {
return FutureUtil.failedFuture(
new ElasticStreamClientException(
ErrorCode.OFFSET_OUT_OF_RANGE_BOUNDS,
String.format("fetch range[%s, %s) is out of stream bound [%s, %s)", startOffset, endOffset, metadata.getStartOffset(), confirmOffset)
String.format("fetch range[%s, %s) is out of stream bound [%s, %s)", startOffset, endOffset, startOffset(), confirmOffset)
));
}
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> {
Expand All @@ -127,12 +127,12 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in

@Override
public CompletableFuture<Void> trim(long newStartOffset) {
if (newStartOffset < metadata.getStartOffset()) {
if (newStartOffset < this.startOffset) {
throw new IllegalArgumentException("newStartOffset[" + newStartOffset + "] cannot be less than current start offset["
+ metadata.getStartOffset() + "]");
+ this.startOffset + "]");
}
metadata.setStartOffset(newStartOffset);
return streamManager.trimStream(metadata.getStreamId(), metadata.getEpoch(), newStartOffset);
this.startOffset = newStartOffset;
return streamManager.trimStream(streamId, epoch, newStartOffset);
}

@Override
Expand All @@ -144,14 +144,14 @@ public CompletableFuture<Void> close() {
@Override
public CompletableFuture<Void> destroy() {
status.markDestroy();
metadata.setStartOffset(this.confirmOffset.get());
startOffset = this.confirmOffset.get();
return streamManager.deleteStream(streamId, epoch);
}

private void updateConfirmOffset(long newOffset) {
for (; ; ) {
long oldConfirmOffset = confirmOffset.get();
if (oldConfirmOffset <= newOffset) {
if (oldConfirmOffset >= newOffset) {
break;
}
if (confirmOffset.compareAndSet(oldConfirmOffset, newOffset)) {
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/log/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;

public class S3StreamClient implements StreamClient {

private final StreamManager streamController;
private final Wal wal;
private final S3BlockCache blockCache;
Expand All @@ -51,6 +52,10 @@ public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions ope
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
return streamController.openStream(streamId, epoch).thenApply(metadata -> new S3Stream(metadata, wal, blockCache, streamController));
return streamController.openStream(streamId, epoch).
thenApply(metadata -> new S3Stream(
metadata.getStreamId(), metadata.getEpoch(),
metadata.getStartOffset(), metadata.getNextOffset(),
wal, blockCache, streamController));
}
}
Loading