Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions core/src/main/scala/kafka/log/streamaspect/LazyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,15 @@ public LazyStream(String name, long streamId, StreamClient client, int replicaCo
throw new RuntimeException(e.getCause());
}
}
LOGGER.info("opened existing stream: stream_id={}, epoch={}, name={}", streamId, epoch, name);
LOGGER.info("opened existing stream: streamId={}, epoch={}, name={}", streamId, epoch, name);
}
}

public void warmUp() throws IOException {
if (this.inner == NOOP_STREAM) {
try {
CreateStreamOptions.Builder options = CreateStreamOptions.builder().replicaCount(replicaCount)
.epoch(epoch);
tags.forEach(options::tag);
this.inner = client.createAndOpenStream(options.build()).get();
LOGGER.info("warmup, created and opened a new stream: stream_id={}, epoch={}, name={}", this.inner.streamId(), epoch, name);
this.inner = createStream();
LOGGER.info("warmup, created and opened a new stream: streamId={}, epoch={}, name={}", this.inner.streamId(), epoch, name);
notifyListener(ElasticStreamMetaEvent.STREAM_DO_CREATE);
} catch (Throwable e) {
throw new IOException(e);
Expand Down Expand Up @@ -114,9 +111,8 @@ public long nextOffset() {
public synchronized CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
if (this.inner == NOOP_STREAM) {
try {
this.inner = client.createAndOpenStream(CreateStreamOptions.builder().replicaCount(replicaCount)
.epoch(epoch).build()).get();
LOGGER.info("created and opened a new stream: stream_id={}, epoch={}, name={}", this.inner.streamId(), epoch, name);
this.inner = createStream();
LOGGER.info("created and opened a new stream: streamId={}, epoch={}, name={}", this.inner.streamId(), epoch, name);
notifyListener(ElasticStreamMetaEvent.STREAM_DO_CREATE);
} catch (Throwable e) {
return FutureUtil.failedFuture(new IOException(e));
Expand Down Expand Up @@ -162,6 +158,13 @@ public void notifyListener(ElasticStreamMetaEvent event) {
}
}

private Stream createStream() throws ExecutionException, InterruptedException {
CreateStreamOptions.Builder options = CreateStreamOptions.builder().replicaCount(replicaCount)
.epoch(epoch);
tags.forEach(options::tag);
return client.createAndOpenStream(options.build()).get();
}

static class NoopStream implements Stream {
@Override
public long streamId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St
this.streamId = streamId;
this.epoch = epoch;
this.startOffset = startOffset;
this.logIdent = "[Stream id=" + streamId + " epoch=" + epoch + "]";
this.logIdent = "[streamId=" + streamId + " epoch=" + epoch + "]";
this.nextOffset = new AtomicLong(nextOffset);
this.confirmOffset = new AtomicLong(nextOffset);
this.status = new Status();
Expand Down