diff --git a/core/src/main/scala/kafka/log/streamaspect/LazyStream.java b/core/src/main/scala/kafka/log/streamaspect/LazyStream.java index 0d86e203f2..9c2d0a2750 100644 --- a/core/src/main/scala/kafka/log/streamaspect/LazyStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/LazyStream.java @@ -65,18 +65,15 @@ public LazyStream(String name, long streamId, StreamClient client, int replicaCo throw new RuntimeException(e.getCause()); } } - LOGGER.info("opened existing stream: stream_id={}, epoch={}, name={}", streamId, epoch, name); + LOGGER.info("opened existing stream: streamId={}, epoch={}, name={}", streamId, epoch, name); } } public void warmUp() throws IOException { if (this.inner == NOOP_STREAM) { try { - CreateStreamOptions.Builder options = CreateStreamOptions.builder().replicaCount(replicaCount) - .epoch(epoch); - tags.forEach(options::tag); - this.inner = client.createAndOpenStream(options.build()).get(); - LOGGER.info("warmup, created and opened a new stream: stream_id={}, epoch={}, name={}", this.inner.streamId(), epoch, name); + this.inner = createStream(); + LOGGER.info("warmup, created and opened a new stream: streamId={}, epoch={}, name={}", this.inner.streamId(), epoch, name); notifyListener(ElasticStreamMetaEvent.STREAM_DO_CREATE); } catch (Throwable e) { throw new IOException(e); @@ -114,9 +111,8 @@ public long nextOffset() { public synchronized CompletableFuture append(AppendContext context, RecordBatch recordBatch) { if (this.inner == NOOP_STREAM) { try { - this.inner = client.createAndOpenStream(CreateStreamOptions.builder().replicaCount(replicaCount) - .epoch(epoch).build()).get(); - LOGGER.info("created and opened a new stream: stream_id={}, epoch={}, name={}", this.inner.streamId(), epoch, name); + this.inner = createStream(); + LOGGER.info("created and opened a new stream: streamId={}, epoch={}, name={}", this.inner.streamId(), epoch, name); notifyListener(ElasticStreamMetaEvent.STREAM_DO_CREATE); } catch (Throwable e) { return FutureUtil.failedFuture(new IOException(e)); @@ -162,6 +158,13 @@ public void notifyListener(ElasticStreamMetaEvent event) { } } + private Stream createStream() throws ExecutionException, InterruptedException { + CreateStreamOptions.Builder options = CreateStreamOptions.builder().replicaCount(replicaCount) + .epoch(epoch); + tags.forEach(options::tag); + return client.createAndOpenStream(options.build()).get(); + } + static class NoopStream implements Stream { @Override public long streamId() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 3094263bbc..0ff3d938e0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -92,7 +92,7 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St this.streamId = streamId; this.epoch = epoch; this.startOffset = startOffset; - this.logIdent = "[Stream id=" + streamId + " epoch=" + epoch + "]"; + this.logIdent = "[streamId=" + streamId + " epoch=" + epoch + "]"; this.nextOffset = new AtomicLong(nextOffset); this.confirmOffset = new AtomicLong(nextOffset); this.status = new Status();