From 0de6c4acf2e05463fc6e13cfed3626cf0c568393 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 Sep 2023 17:12:19 +0800 Subject: [PATCH] feat(s3): boostrap Kafka on S3 1. boostrap with controller-metadata-manager Signed-off-by: TheR1sing3un --- .../common/requests/AbstractRequest.java | 28 ++++ .../common/requests/AbstractResponse.java | 28 ++++ .../requests/s3/CloseStreamRequest.java | 7 + .../requests/s3/CloseStreamResponse.java | 7 + .../s3/CommitStreamObjectRequest.java | 7 + .../s3/CommitStreamObjectResponse.java | 7 + .../requests/s3/CommitWALObjectRequest.java | 7 + .../requests/s3/CommitWALObjectResponse.java | 7 + .../requests/s3/CreateStreamRequest.java | 7 + .../requests/s3/CreateStreamResponse.java | 7 + .../requests/s3/DeleteStreamRequest.java | 7 + .../requests/s3/DeleteStreamResponse.java | 7 + .../requests/s3/GetStreamsOffsetRequest.java | 7 + .../requests/s3/GetStreamsOffsetResponse.java | 7 + .../common/requests/s3/OpenStreamRequest.java | 9 +- .../requests/s3/OpenStreamResponse.java | 7 + .../requests/s3/PrepareS3ObjectRequest.java | 9 +- .../requests/s3/PrepareS3ObjectResponse.java | 7 + .../kafka/log/es/ElasticLogManager.scala | 3 +- .../kafka/log/es/client/s3/ClientFactory.java | 3 +- .../scala/kafka/log/s3/DefaultS3Client.java | 21 +-- .../src/main/scala/kafka/log/s3/S3Stream.java | 3 + .../kafka/log/s3/StreamMetadataManager.java | 140 +++++++++++++++--- .../s3/network/ControllerRequestSender.java | 11 +- .../s3/objects/ControllerObjectManager.java | 42 +++++- .../log/s3/objects/ObjectStreamRange.java | 9 ++ .../kafka/log/s3/objects/StreamObject.java | 21 ++- .../scala/kafka/server/BrokerServer.scala | 43 +++--- .../controller/ControllerMetricsManager.java | 17 +++ .../kafka/controller/QuorumController.java | 4 +- .../controller/stream/MockS3Operator.java | 7 +- .../controller/stream/S3StreamConstant.java | 4 + .../stream/StreamControlManager.java | 4 + .../kafka/image/BrokerS3WALMetadataImage.java | 11 ++ .../kafka/image/S3StreamMetadataImage.java | 15 ++ .../kafka/image/S3StreamsMetadataDelta.java | 11 ++ .../kafka/image/S3StreamsMetadataImage.java | 11 ++ .../metadata/stream/S3ObjectMetadata.java | 5 - 38 files changed, 477 insertions(+), 80 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 0d96d842d6..3da7f16225 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -26,6 +26,14 @@ import java.nio.ByteBuffer; import java.util.Map; +import org.apache.kafka.common.requests.s3.CloseStreamRequest; +import org.apache.kafka.common.requests.s3.CommitStreamObjectRequest; +import org.apache.kafka.common.requests.s3.CommitWALObjectRequest; +import org.apache.kafka.common.requests.s3.CreateStreamRequest; +import org.apache.kafka.common.requests.s3.DeleteStreamRequest; +import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; +import org.apache.kafka.common.requests.s3.OpenStreamRequest; +import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; public abstract class AbstractRequest implements AbstractRequestResponse { @@ -165,6 +173,7 @@ public static RequestAndSize parseRequest(ApiKeys apiKey, short apiVersion, Byte return new RequestAndSize(doParseRequest(apiKey, apiVersion, buffer), bufferSize); } + @SuppressWarnings("all") private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, ByteBuffer buffer) { switch (apiKey) { case PRODUCE: @@ -303,6 +312,25 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return ListTransactionsRequest.parse(buffer, apiVersion); case ALLOCATE_PRODUCER_IDS: return AllocateProducerIdsRequest.parse(buffer, apiVersion); + + // Kafka on S3 inject start + case CREATE_STREAM: + return CreateStreamRequest.parse(buffer, apiVersion); + case OPEN_STREAM: + return OpenStreamRequest.parse(buffer, apiVersion); + case CLOSE_STREAM: + return CloseStreamRequest.parse(buffer, apiVersion); + case DELETE_STREAM: + return DeleteStreamRequest.parse(buffer, apiVersion); + case PREPARE_S3_OBJECT: + return PrepareS3ObjectRequest.parse(buffer, apiVersion); + case COMMIT_WALOBJECT: + return CommitWALObjectRequest.parse(buffer, apiVersion); + case COMMIT_STREAM_OBJECT: + return CommitStreamObjectRequest.parse(buffer, apiVersion); + case GET_STREAMS_OFFSET: + return GetStreamsOffsetRequest.parse(buffer, apiVersion); + // Kafka on S3 inject end default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 7e4425d3e7..d8f1aa008f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -29,6 +29,14 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.kafka.common.requests.s3.CloseStreamResponse; +import org.apache.kafka.common.requests.s3.CommitStreamObjectResponse; +import org.apache.kafka.common.requests.s3.CommitWALObjectResponse; +import org.apache.kafka.common.requests.s3.CreateStreamResponse; +import org.apache.kafka.common.requests.s3.DeleteStreamResponse; +import org.apache.kafka.common.requests.s3.GetStreamsOffsetResponse; +import org.apache.kafka.common.requests.s3.OpenStreamResponse; +import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse; public abstract class AbstractResponse implements AbstractRequestResponse { public static final int DEFAULT_THROTTLE_TIME = 0; @@ -109,6 +117,7 @@ public static AbstractResponse parseResponse(ByteBuffer buffer, RequestHeader re return AbstractResponse.parseResponse(apiKey, buffer, apiVersion); } + @SuppressWarnings("all") public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer responseBuffer, short version) { switch (apiKey) { case PRODUCE: @@ -247,6 +256,25 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return ListTransactionsResponse.parse(responseBuffer, version); case ALLOCATE_PRODUCER_IDS: return AllocateProducerIdsResponse.parse(responseBuffer, version); + + // Kafka on S3 inject start + case CREATE_STREAM: + return CreateStreamResponse.parse(responseBuffer, version); + case OPEN_STREAM: + return OpenStreamResponse.parse(responseBuffer, version); + case DELETE_STREAM: + return DeleteStreamResponse.parse(responseBuffer, version); + case CLOSE_STREAM: + return CloseStreamResponse.parse(responseBuffer, version); + case PREPARE_S3_OBJECT: + return PrepareS3ObjectResponse.parse(responseBuffer, version); + case COMMIT_STREAM_OBJECT: + return CommitStreamObjectResponse.parse(responseBuffer, version); + case COMMIT_WALOBJECT: + return CommitWALObjectResponse.parse(responseBuffer, version); + case GET_STREAMS_OFFSET: + return GetStreamsOffsetResponse.parse(responseBuffer, version); + // Kafka on S3 inject end default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamRequest.java index b1d4691089..86476bbb9a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -63,4 +65,9 @@ public CloseStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { public CloseStreamRequestData data() { return data; } + + public static CloseStreamRequest parse(ByteBuffer buffer, short version) { + return new CloseStreamRequest(new CloseStreamRequestData( + new ByteBufferAccessor(buffer), version), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamResponse.java index 9a6ca9ad92..22e7a59d19 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamResponse.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; @@ -51,4 +53,9 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } + + public static CloseStreamResponse parse(ByteBuffer buffer, short version) { + return new CloseStreamResponse(new CloseStreamResponseData( + new ByteBufferAccessor(buffer), version)); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectRequest.java index a29720c355..f9b81a347e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -65,4 +67,9 @@ public CommitStreamObjectResponse getErrorResponse(int throttleTimeMs, Throwable public CommitStreamObjectRequestData data() { return data; } + + public static CommitStreamObjectRequest parse(ByteBuffer buffer, short version) { + return new CommitStreamObjectRequest(new CommitStreamObjectRequestData( + new ByteBufferAccessor(buffer), version), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectResponse.java index b26c1ec2dd..6111fa4812 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectResponse.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; @@ -52,4 +54,9 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } + public static CommitStreamObjectResponse parse(ByteBuffer buffer, short version) { + return new CommitStreamObjectResponse(new CommitStreamObjectResponseData( + new ByteBufferAccessor(buffer), version)); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java index d789ae9fd8..f57036a750 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.CommitWALObjectRequestData; import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -64,4 +66,9 @@ public CommitWALObjectRequestData data() { return data; } + public static CommitWALObjectRequest parse(ByteBuffer buffer, short version) { + return new CommitWALObjectRequest(new CommitWALObjectRequestData( + new ByteBufferAccessor(buffer), version), version); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java index c1ef215e3a..f46c8b4442 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; @@ -52,5 +54,10 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } + + public static CommitWALObjectResponse parse(ByteBuffer buffer, short version) { + return new CommitWALObjectResponse(new CommitWALObjectResponseData( + new ByteBufferAccessor(buffer), version)); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamRequest.java index 29dd41c07c..030aa9800d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -64,4 +66,9 @@ public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { public CreateStreamRequestData data() { return data; } + + public static CreateStreamRequest parse(ByteBuffer buffer, short version) { + return new CreateStreamRequest(new CreateStreamRequestData( + new ByteBufferAccessor(buffer), version), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamResponse.java index 0a42572af4..72ed0c4e1c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamResponse.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; @@ -51,4 +53,9 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } + + public static CreateStreamResponse parse(ByteBuffer buffer, short version) { + return new CreateStreamResponse(new CreateStreamResponseData( + new ByteBufferAccessor(buffer), version)); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamRequest.java index 1ff86e7790..51aca1e8c2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.DeleteStreamRequestData; import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -62,4 +64,9 @@ public DeleteStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { public DeleteStreamRequestData data() { return data; } + + public static DeleteStreamRequest parse(ByteBuffer buffer, short version) { + return new DeleteStreamRequest(new DeleteStreamRequestData( + new ByteBufferAccessor(buffer), version), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamResponse.java index 804ba4cbcd..63e7316fec 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamResponse.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; @@ -50,4 +52,9 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } + + public static DeleteStreamResponse parse(ByteBuffer buffer, short version) { + return new DeleteStreamResponse(new DeleteStreamResponseData( + new ByteBufferAccessor(buffer), version)); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java index 1d6ca499b2..f6e4c9ab02 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.GetStreamsOffsetRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -64,5 +66,10 @@ public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { public GetStreamsOffsetRequestData data() { return data; } + + public static GetStreamsOffsetRequest parse(ByteBuffer buffer, short version) { + return new GetStreamsOffsetRequest(new GetStreamsOffsetRequestData( + new ByteBufferAccessor(buffer), version), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java index 6342e34b9c..b971db66b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; @@ -50,4 +52,9 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } + + public static GetStreamsOffsetResponse parse(ByteBuffer buffer, short version) { + return new GetStreamsOffsetResponse(new GetStreamsOffsetResponseData( + new ByteBufferAccessor(buffer), version)); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamRequest.java index 9dfc825346..59f7b579f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -29,7 +31,7 @@ public static class Builder extends AbstractRequest.Builder { private final OpenStreamRequestData data; public Builder(OpenStreamRequestData data) { - super(ApiKeys.CREATE_STREAM); + super(ApiKeys.OPEN_STREAM); this.data = data; } @@ -63,4 +65,9 @@ public OpenStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { public OpenStreamRequestData data() { return data; } + + public static OpenStreamRequest parse(ByteBuffer buffer, short version) { + return new OpenStreamRequest(new OpenStreamRequestData( + new ByteBufferAccessor(buffer), version), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamResponse.java index 8cdc047af9..c600b35b4f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamResponse.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; @@ -50,4 +52,9 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } + + public static OpenStreamResponse parse(ByteBuffer buffer, short version) { + return new OpenStreamResponse(new OpenStreamResponseData( + new ByteBufferAccessor(buffer), version)); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectRequest.java index 340d4dcc21..e5032eae58 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectRequest.java @@ -17,9 +17,11 @@ package org.apache.kafka.common.requests.s3; +import java.nio.ByteBuffer; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; @@ -29,7 +31,7 @@ public static class Builder extends AbstractRequest.Builder fetch0(long startOffset, long endOffset, if (status.isClosed()) { return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed")); } + LOGGER.info("{} stream try fetch, startOffset: {}, endOffset: {}, maxBytes: {}", logIdent, startOffset, endOffset, maxBytes); long confirmOffset = this.confirmOffset.get(); if (startOffset < startOffset() || endOffset > confirmOffset) { return FutureUtil.failedFuture( @@ -128,6 +129,7 @@ private CompletableFuture fetch0(long startOffset, long endOffset, } return storage.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { List records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); + LOGGER.info("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size()); return new DefaultFetchResult(records); }); } @@ -176,6 +178,7 @@ private void updateConfirmOffset(long newOffset) { break; } if (confirmOffset.compareAndSet(oldConfirmOffset, newOffset)) { + LOGGER.info("{} stream update confirm offset from {} to {}", logIdent, oldConfirmOffset, newOffset); break; } } diff --git a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java index 9d79787a4a..b5e8983235 100644 --- a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java @@ -25,6 +25,7 @@ import kafka.server.BrokerServer; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; +import org.apache.kafka.controller.stream.S3StreamConstant; import org.apache.kafka.image.BrokerS3WALMetadataImage; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -53,10 +54,13 @@ public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { this.inflightWalObjects = new InflightWalObjects(); this.metadataCache = broker.metadataCache(); this.catchUpMetadataListener = new CatchUpMetadataListener(); - // register listener this.broker.metadataListener().registerStreamMetadataListener(this.catchUpMetadataListener); } + public synchronized void append(InflightWalObject object) { + this.inflightWalObjects.append(object); + } + public synchronized void catchupTo(long objectId) { // delete all wal objects which are <= objectId this.inflightWalObjects.trim(objectId); @@ -66,38 +70,58 @@ public synchronized void catchupTo(long objectId) { public synchronized List getObjects(long streamId, long startOffset, long endOffset, int limit) { List objects = new ArrayList<>(); if (startOffset >= endOffset) { + LOGGER.warn("[GetObjects]: invalid offset range, stream: {}, startOffset: {}, endOffset: {}", streamId, startOffset, endOffset); return objects; } OffsetRange walRange = this.inflightWalObjects.getWalRange(streamId); if (walRange == null || endOffset <= walRange.startOffset()) { // only search in cache InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, endOffset, limit); - if (cachedInRangeObjects != null) { - objects.addAll(cachedInRangeObjects.objects()); + if (cachedInRangeObjects == null) { + LOGGER.warn( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); + return objects; } + objects.addAll(cachedInRangeObjects.objects()); + objects.forEach(obj -> { + S3Object metadata = metadataCache.getObjectMetadata(obj.getObjectId()); + if (metadata == null) { + LOGGER.error("object: {} metadata not exist", obj.getObjectId()); + throw new RuntimeException("object: " + obj.getObjectId() + " metadata not exist"); + } + obj.setObjectSize(metadata.getObjectSize()); + }); + LOGGER.info( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", + streamId, startOffset, endOffset, limit, + cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); return objects; } if (startOffset >= walRange.startOffset()) { // only search in inflight wal InRangeObjects inflightInRangeObjects = this.inflightWalObjects.getObjects(streamId, startOffset, endOffset, limit); - if (inflightInRangeObjects != null) { - objects.addAll(inflightInRangeObjects.objects()); + if (inflightInRangeObjects == null) { + LOGGER.warn( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in inflightWalObjects failed with empty result", + streamId, startOffset, endOffset, limit); + return objects; } + objects.addAll(inflightInRangeObjects.objects()); + LOGGER.info( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from inflight: startOffset: {}, endOffset: {}, object count: {}", + streamId, startOffset, endOffset, limit, + inflightInRangeObjects.startOffset(), inflightInRangeObjects.endOffset(), objects.size()); return objects; } long cachedEndOffset = walRange.startOffset(); InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, cachedEndOffset, limit); if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { + LOGGER.warn("[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); return objects; } objects.addAll(cachedInRangeObjects.objects()); - if (objects.size() >= limit) { - return objects; - } - InRangeObjects inflightinRangeObjects = this.inflightWalObjects.getObjects(streamId, cachedEndOffset, endOffset, limit - objects.size()); - if (inflightinRangeObjects != null) { - objects.addAll(inflightinRangeObjects.objects()); - } objects.forEach(obj -> { S3Object metadata = metadataCache.getObjectMetadata(obj.getObjectId()); if (metadata == null) { @@ -106,6 +130,25 @@ public synchronized List getObjects(long streamId, long startO } obj.setObjectSize(metadata.getObjectSize()); }); + if (objects.size() >= limit) { + LOGGER.info( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", + streamId, startOffset, endOffset, limit, + cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); + return objects; + } + InRangeObjects inflightinRangeObjects = this.inflightWalObjects.getObjects(streamId, cachedEndOffset, endOffset, limit - objects.size()); + if (inflightinRangeObjects == null || inflightinRangeObjects == InRangeObjects.INVALID) { + LOGGER.warn( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in inflightWalObjects failed with empty result", + streamId, startOffset, endOffset, limit); + return objects; + } + objects.addAll(inflightinRangeObjects.objects()); + LOGGER.info( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache and inflight: startOffset: {}, endOffset: {}, object count: {}", + streamId, startOffset, endOffset, limit, + cachedInRangeObjects.startOffset(), inflightinRangeObjects.endOffset(), objects.size()); return objects; } @@ -136,18 +179,65 @@ public void setStartOffset(long startOffset) { } } + public static class InflightWalObject extends S3WALObject { + + private final long objectSize; + + public InflightWalObject(long objectId, int brokerId, Map> streamsIndex, long orderId, long objectSize) { + super(objectId, brokerId, streamsIndex, orderId); + this.objectSize = objectSize; + } + + public long startOffset(long streamId) { + List indexes = streamsIndex().get(streamId); + if (indexes == null || indexes.isEmpty()) { + return S3StreamConstant.INVALID_OFFSET; + } + return indexes.get(0).getStartOffset(); + } + + public long endOffset(long streamId) { + List indexes = streamsIndex().get(streamId); + if (indexes == null || indexes.isEmpty()) { + return S3StreamConstant.INVALID_OFFSET; + } + return indexes.get(indexes.size() - 1).getEndOffset(); + } + + public long objectSize() { + return objectSize; + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + } + static class InflightWalObjects { - private final List objects; + private final Logger log = LoggerFactory.getLogger(InflightWalObjects.class); + + private final List objects; private final Map streamOffsets; + private volatile long firstObjectId = S3StreamConstant.MAX_OBJECT_ID; public InflightWalObjects() { this.objects = new LinkedList<>(); this.streamOffsets = new HashMap<>(); } - public void append(S3WALObject object) { + public void append(InflightWalObject object) { objects.add(object); + if (objects.size() == 1) { + firstObjectId = object.objectId(); + } + log.info("[AppendInflight]: append wal object: {}", object.objectId()); object.streamsIndex().forEach((stream, indexes) -> { // wal object only contains one index for each stream streamOffsets.putIfAbsent(stream, new OffsetRange(indexes.get(0).getStartOffset(), indexes.get(indexes.size() - 1).getEndOffset())); @@ -156,18 +246,23 @@ public void append(S3WALObject object) { } public void trim(long objectId) { + log.info("[TrimInflight]: trim wal object <= {}", objectId); // TODO: speed up by binary search int clearEndIndex = objects.size(); for (int i = 0; i < objects.size(); i++) { S3WALObject wal = objects.get(i); if (wal.objectId() > objectId) { clearEndIndex = i; + firstObjectId = wal.objectId(); break; } wal.streamsIndex().forEach((stream, indexes) -> { streamOffsets.get(stream).setStartOffset(indexes.get(indexes.size() - 1).getEndOffset()); }); } + if (clearEndIndex == objects.size()) { + firstObjectId = S3StreamConstant.MAX_OBJECT_ID; + } objects.subList(0, clearEndIndex).clear(); } @@ -192,20 +287,18 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset } List inRangeObjects = new LinkedList<>(); long nextStartOffset = startOffset; - for (S3WALObject object : objects) { + for (InflightWalObject object : objects) { if (limit <= 0) { break; } if (nextStartOffset >= endOffset) { break; } - List indexes = object.streamsIndex().get(streamId); - if (indexes == null || indexes.size() != 1) { - LOGGER.error("invalid wal object: {}", object); + long objStartOffset = object.startOffset(streamId); + long objEndOffset = object.endOffset(streamId); + if (objStartOffset == S3StreamConstant.INVALID_OFFSET || objEndOffset == S3StreamConstant.INVALID_OFFSET) { continue; } - long objStartOffset = indexes.get(0).getStartOffset(); - long objEndOffset = indexes.get(0).getEndOffset(); if (objStartOffset > startOffset) { break; } @@ -213,7 +306,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset continue; } limit--; - inRangeObjects.add(new S3ObjectMetadata(object.objectId(), object.objectType())); + inRangeObjects.add(new S3ObjectMetadata(object.objectId(), object.objectSize(), object.objectType())); nextStartOffset = objEndOffset; } return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects); @@ -231,10 +324,15 @@ class CatchUpMetadataListener implements StreamMetadataListener { public void onChange(MetadataDelta delta, MetadataImage newImage) { BrokerS3WALMetadataImage walMetadataImage = newImage.streamsMetadata().brokerWALMetadata().get(config.brokerId()); if (walMetadataImage == null) { + LOGGER.warn("[CatchUpMetadataListener]: wal metadata image not exist"); return; } S3WALObject wal = walMetadataImage.getWalObjects().get(walMetadataImage.getWalObjects().size() - 1); if (wal == null) { + LOGGER.warn("[CatchUpMetadataListener]: wal object not exist"); + return; + } + if (wal.objectId() < inflightWalObjects.firstObjectId) { return; } catchupTo(wal.objectId()); diff --git a/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java index a23d4603d0..6e6f45cecd 100644 --- a/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java +++ b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java @@ -18,6 +18,7 @@ package kafka.log.s3.network; import java.util.concurrent.CompletableFuture; +import kafka.server.BrokerServer; import kafka.server.BrokerToControllerChannelManager; import kafka.server.ControllerRequestCompletionHandler; import org.apache.kafka.clients.ClientResponse; @@ -30,10 +31,12 @@ public class ControllerRequestSender { private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class); - private final BrokerToControllerChannelManager channelManager; + private final BrokerServer brokerServer; + private BrokerToControllerChannelManager channelManager; - public ControllerRequestSender(BrokerToControllerChannelManager channelManager) { - this.channelManager = channelManager; + public ControllerRequestSender(BrokerServer brokerServer) { + this.brokerServer = brokerServer; + this.channelManager = brokerServer.clientToControllerChannelManager(); } public CompletableFuture send(AbstractRequest.Builder requestBuilder, @@ -60,7 +63,7 @@ public void onComplete(ClientResponse response) { cf.completeExceptionally(response.versionMismatch()); return; } - if (responseDataType.isInstance(response.responseBody().data())) { + if (!responseDataType.isInstance(response.responseBody().data())) { LOGGER.error("Unexpected response type: {} while sending request: {}", response.responseBody().data().getClass().getSimpleName(), requestBuilder); cf.completeExceptionally(new RuntimeException("Unexpected response type while sending request")); diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 635d759c1c..006ff50c89 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -20,16 +20,22 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import kafka.log.s3.StreamMetadataManager; +import kafka.log.s3.StreamMetadataManager.InflightWalObject; import kafka.log.s3.network.ControllerRequestSender; import kafka.server.KafkaConfig; +import org.apache.kafka.common.message.CommitWALObjectRequestData; +import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest.Builder; import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +76,38 @@ public CompletableFuture prepareObject(int count, long ttl) { @Override public CompletableFuture commitWALObject(CommitWALObjectRequest request) { - return null; + org.apache.kafka.common.requests.s3.CommitWALObjectRequest.Builder wrapRequestBuilder = new org.apache.kafka.common.requests.s3.CommitWALObjectRequest.Builder( + new CommitWALObjectRequestData() + .setBrokerId(config.brokerId()) + .setOrderId(request.getOrderId()) + .setObjectId(request.getObjectId()) + .setObjectSize(request.getObjectSize()) + .setObjectStreamRanges(request.getStreamRanges() + .stream() + .map(ObjectStreamRange::toObjectStreamRangeInRequest).collect(Collectors.toList())) + .setStreamObjects(request.getStreamObjects() + .stream() + .map(StreamObject::toStreamObjectInRequest).collect(Collectors.toList()))); + return requestSender.send(wrapRequestBuilder, CommitWALObjectResponseData.class).thenApply(resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + return new CommitWALObjectResponse(); + default: + LOGGER.error("Error while committing WAL object: {}, code: {}", request, code); + throw code.exception(); + } + }).thenApply(resp -> { + long objectId = request.getObjectId(); + long orderId = request.getOrderId(); + int brokerId = config.brokerId(); + long objectSize = request.getObjectSize(); + Map> rangeList = request.getStreamRanges().stream() + .map(range -> new S3ObjectStreamIndex(range.getStreamId(), range.getStartOffset(), range.getEndOffset())) + .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); + this.metadataManager.append(new InflightWalObject(objectId, brokerId, rangeList, orderId, objectSize)); + return resp; + }); } @Override @@ -88,7 +125,8 @@ public List getObjects(long streamId, long startOffset, long e try { return this.metadataManager.getObjects(streamId, startOffset, endOffset, limit); } catch (Exception e) { - LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit, e); + LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit, + e); return Collections.emptyList(); } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java index 0dd767d0aa..79f01813b1 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java @@ -17,6 +17,8 @@ package kafka.log.s3.objects; +import org.apache.kafka.common.message.CommitWALObjectRequestData; + public class ObjectStreamRange { private long streamId; private long epoch; @@ -63,6 +65,13 @@ public void setEndOffset(long endOffset) { this.endOffset = endOffset; } + public CommitWALObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest() { + return new CommitWALObjectRequestData.ObjectStreamRange() + .setStreamId(streamId) + .setStartOffset(startOffset) + .setEndOffset(endOffset); + } + @Override public String toString() { return "ObjectStreamRange{" + diff --git a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java index ebf511ed29..c3c4a05036 100644 --- a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java +++ b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java @@ -17,7 +17,8 @@ package kafka.log.s3.objects; -import java.util.Arrays; +import java.util.List; +import org.apache.kafka.common.message.CommitWALObjectRequestData; public class StreamObject { private long objectId; @@ -29,7 +30,7 @@ public class StreamObject { /** * The source objects' id of the stream object. */ - private long[] sourceObjectIds; + private List sourceObjectIds; public long getObjectId() { return objectId; @@ -71,14 +72,24 @@ public void setEndOffset(long endOffset) { this.endOffset = endOffset; } - public long[] getSourceObjectIds() { + public List getSourceObjectIds() { return sourceObjectIds; } - public void setSourceObjectIds(long[] sourceObjectIds) { + public void setSourceObjectIds(List sourceObjectIds) { this.sourceObjectIds = sourceObjectIds; } + public CommitWALObjectRequestData.StreamObject toStreamObjectInRequest() { + return new CommitWALObjectRequestData.StreamObject() + .setStreamId(streamId) + .setObjectId(objectId) + .setObjectSize(objectSize) + .setStartOffset(startOffset) + .setEndOffset(endOffset) + .setSourceObjectIds(sourceObjectIds); + } + @Override public String toString() { return "StreamObject{" + @@ -87,7 +98,7 @@ public String toString() { ", streamId=" + streamId + ", startOffset=" + startOffset + ", endOffset=" + endOffset + - ", sourceObjectIds=" + Arrays.toString(sourceObjectIds) + + ", sourceObjectIds=" + sourceObjectIds + '}'; } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index bad81db467..d40df46467 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -199,10 +199,19 @@ class BrokerServer( metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) - val controllerNodes = RaftConfig.voterConnectionsToNodes(sharedServer.controllerQuorumVotersFuture.get()).asScala + // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery + // until we catch up on the metadata log and have up-to-date topic and broker configs. + logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, + brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true) + // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. + // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. + tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) + credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) + + val controllerNodes = RaftConfig.voterConnectionsToNodes(sharedServer.controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) - // elastic stream inject start + clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, @@ -213,25 +222,6 @@ class BrokerServer( retryTimeoutMs = 60000 ) - if (config.elasticStreamEnabled) { - if (!ElasticLogManager.init(this, config, clusterId)) { - throw new UnsupportedOperationException("Elastic stream client failed to be configured. Please check your configuration.") - } - } else { - warn("Elastic stream is disabled. This node will store data locally.") - } - // elastic stream inject end - - // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery - // until we catch up on the metadata log and have up-to-date topic and broker configs. - logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, - brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true) - - // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. - // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. - tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) - credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) - clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) @@ -332,6 +322,17 @@ class BrokerServer( sharedServer.brokerMetrics, sharedServer.metadataLoaderFaultHandler) + // elastic stream inject start + + if (config.elasticStreamEnabled) { + if (!ElasticLogManager.init(this, config, clusterId)) { + throw new UnsupportedOperationException("Elastic stream client failed to be configured. Please check your configuration.") + } + } else { + warn("Elastic stream is disabled. This node will store data locally.") + } + // elastic stream inject end + val networkListeners = new ListenerCollection() config.effectiveAdvertisedListeners.foreach { ep => networkListeners.add(new Listener(). diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java index 60c4831203..13725c0dfb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java @@ -146,6 +146,23 @@ void replay(ApiMessage message) { case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: case NO_OP_RECORD: case ZK_MIGRATION_STATE_RECORD: + // Kafka on S3 inject start + case S3_STREAM_RECORD: + case REMOVE_S3_STREAM_RECORD: + case RANGE_RECORD: + case REMOVE_RANGE_RECORD: + case S3_STREAM_OBJECT_RECORD: + case REMOVE_S3_STREAM_OBJECT_RECORD: + case WALOBJECT_RECORD: + case REMOVE_WALOBJECT_RECORD: + case S3_OBJECT_RECORD: + case REMOVE_S3_OBJECT_RECORD: + case ASSIGNED_STREAM_ID_RECORD: + case ASSIGNED_S3_OBJECT_ID_RECORD: + case BROKER_WALMETADATA_RECORD: + case REMOVE_BROKER_WALMETADATA_RECORD: + case ADVANCE_RANGE_RECORD: + // Kafka on S3 inject end // These record types do not affect metrics break; default: diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index bf353f5ce6..6bad292a73 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -103,7 +103,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.controller.stream.DefaultS3Operator; +import org.apache.kafka.controller.stream.MockS3Operator; import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.controller.stream.StreamControlManager; import org.apache.kafka.metadata.BrokerHeartbeatReply; @@ -1864,7 +1864,7 @@ private QuorumController( // Kafka on S3 inject start this.s3Config = s3Config; this.s3ObjectControlManager = new S3ObjectControlManager( - this, snapshotRegistry, logContext, clusterId, s3Config, new DefaultS3Operator()); + this, snapshotRegistry, logContext, clusterId, s3Config, new MockS3Operator()); this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager); // Kafka on S3 inject end updateWriteOffset(-1); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/MockS3Operator.java b/metadata/src/main/java/org/apache/kafka/controller/stream/MockS3Operator.java index e607cbe1df..00e7a5fb85 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/MockS3Operator.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/MockS3Operator.java @@ -28,11 +28,14 @@ public class MockS3Operator implements S3Operator { @Override public CompletableFuture delete(String objectKey) { - return delele(new String[]{objectKey}); + return delele(new String[] {objectKey}); } @Override public CompletableFuture delele(String[] objectKeys) { - return CompletableFuture.completedFuture(false); + for (String objectKey : objectKeys) { + objects.remove(objectKey); + } + return CompletableFuture.completedFuture(true); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java index e7e3ce4e0a..10da5563ff 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java @@ -31,4 +31,8 @@ public class S3StreamConstant { public static final long INVALID_OBJECT_ID = -1L; + public static final long INVALID_OFFSET = -1L; + + public static final long MAX_OBJECT_ID = Long.MAX_VALUE; + } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index d1b7096928..0175bf5e51 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -206,6 +206,7 @@ public ControllerResult createStream(CreateStreamReque .setStartOffset(S3StreamConstant.INIT_START_OFFSET) .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX), (short) 0); resp.setStreamId(streamId); + log.info("[CreateStream]: create stream {} success", streamId); return ControllerResult.atomicOf(Arrays.asList(record0, record), resp); } @@ -288,6 +289,7 @@ public ControllerResult openStream(OpenStreamRequestData .setRangeIndex(newRangeIndex), (short) 0)); resp.setStartOffset(streamMetadata.startOffset()); resp.setNextOffset(startOffset); + log.info("[OpenStream]: broker: {} open stream: {} with epoch: {} success", brokerId, streamId, epoch); return ControllerResult.atomicOf(records, resp); } @@ -343,6 +345,7 @@ public ControllerResult closeStream(CloseStreamRequestD .setRangeIndex(streamMetadata.currentRangeIndex()) .setStartOffset(streamMetadata.startOffset()) .setStreamState(StreamState.CLOSED.toByte()), (short) 0)); + log.info("[CloseStream]: broker: {} close stream: {} with epoch: {} success", brokerId, streamId, epoch); return ControllerResult.atomicOf(records, resp); } @@ -410,6 +413,7 @@ public ControllerResult commitWALObject(CommitWALOb long endOffset = obj.endOffset(); records.add(new S3StreamObject(obj.objectId(), obj.objectSize(), streamId, startOffset, endOffset).toRecord()); }); + log.info("[CommitWALObject]: broker: {} commit wal object {} success", brokerId, objectId); return ControllerResult.atomicOf(records, resp); } diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java index e5bf74e658..6308c754c8 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.image.writer.ImageWriter; @@ -67,4 +68,14 @@ public List getWalObjects() { public int getBrokerId() { return brokerId; } + + @Override + public String toString() { + return "BrokerS3WALMetadataImage{" + + "brokerId=" + brokerId + + ", s3WalObjects=" + s3WalObjects.stream() + .map(wal -> wal.toString()) + .collect(Collectors.joining(", ")) + + '}'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 10bc4b8cbf..67b7310bb3 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.controller.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.RangeMetadata; @@ -128,4 +129,18 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(streamId, epoch, state, rangeIndex, startOffset, ranges, streamObjects); } + + @Override + public String toString() { + return "S3StreamMetadataImage{" + + "streamId=" + streamId + + ", epoch=" + epoch + + ", rangeIndex=" + rangeIndex + + ", startOffset=" + startOffset + + ", state=" + state + + ", ranges=" + ranges + + ", streamObjects=" + streamObjects.entrySet().stream(). + map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + + '}'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index ad4ad855b9..faae3f8c13 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -161,4 +161,15 @@ S3StreamsMetadataImage apply() { return new S3StreamsMetadataImage(currentAssignedStreamId, newStreams, newBrokerStreams); } + @Override + public String toString() { + return "S3StreamsMetadataDelta{" + + "image=" + image + + ", currentAssignedStreamId=" + currentAssignedStreamId + + ", changedStreams=" + changedStreams + + ", changedBrokers=" + changedBrokers + + ", deletedStreams=" + deletedStreams + + ", deletedBrokers=" + deletedBrokers + + '}'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index a8070dff19..430002fb92 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -318,4 +318,15 @@ public Map streamsMetadata() { public long nextAssignedStreamId() { return nextAssignedStreamId; } + + @Override + public String toString() { + return "S3StreamsMetadataImage{" + + "nextAssignedStreamId=" + nextAssignedStreamId + + ", streamsMetadata=" + streamsMetadata.entrySet().stream(). + map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + + ", brokerWALMetadata=" + brokerWALMetadata.entrySet().stream(). + map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + + '}'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java index 67dfdade6c..1a595b3c9a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java @@ -23,11 +23,6 @@ public class S3ObjectMetadata { private long objectSize; private final S3ObjectType type; - public S3ObjectMetadata(long objectId, S3ObjectType type) { - this.objectId = objectId; - this.type = type; - } - public S3ObjectMetadata(long objectId, long objectSize, S3ObjectType type) { this.objectId = objectId; this.objectSize = objectSize;