diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index b48126c021..2faa9144f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -119,7 +119,7 @@ public enum ApiKeys { PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true), COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true), COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true), - GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true), + GET_OPENING_STREAMS(ApiMessageType.GET_OPENING_STREAMS, false, true), GET_KV(ApiMessageType.GET_KV, false, true), PUT_KV(ApiMessageType.PUT_KV, false, true), DELETE_KV(ApiMessageType.DELETE_KV, false, true); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index ec67d03b21..65a1cb9b97 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -33,7 +33,7 @@ import org.apache.kafka.common.requests.s3.DeleteKVRequest; import org.apache.kafka.common.requests.s3.DeleteStreamRequest; import org.apache.kafka.common.requests.s3.GetKVRequest; -import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; +import org.apache.kafka.common.requests.s3.GetOpeningStreamsRequest; import org.apache.kafka.common.requests.s3.OpenStreamRequest; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; import org.apache.kafka.common.requests.s3.PutKVRequest; @@ -331,8 +331,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return CommitWALObjectRequest.parse(buffer, apiVersion); case COMMIT_STREAM_OBJECT: return CommitStreamObjectRequest.parse(buffer, apiVersion); - case GET_STREAMS_OFFSET: - return GetStreamsOffsetRequest.parse(buffer, apiVersion); + case GET_OPENING_STREAMS: + return GetOpeningStreamsRequest.parse(buffer, apiVersion); case GET_KV: return GetKVRequest.parse(buffer, apiVersion); case PUT_KV: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 086df2aff9..abc5cc18af 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.s3.DeleteKVResponse; import org.apache.kafka.common.requests.s3.DeleteStreamResponse; import org.apache.kafka.common.requests.s3.GetKVResponse; -import org.apache.kafka.common.requests.s3.GetStreamsOffsetResponse; +import org.apache.kafka.common.requests.s3.GetOpeningStreamsResponse; import org.apache.kafka.common.requests.s3.OpenStreamResponse; import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse; import org.apache.kafka.common.requests.s3.PutKVResponse; @@ -275,8 +275,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return CommitStreamObjectResponse.parse(responseBuffer, version); case COMMIT_WALOBJECT: return CommitWALObjectResponse.parse(responseBuffer, version); - case GET_STREAMS_OFFSET: - return GetStreamsOffsetResponse.parse(responseBuffer, version); + case GET_OPENING_STREAMS: + return GetOpeningStreamsResponse.parse(responseBuffer, version); case GET_KV: return GetKVResponse.parse(responseBuffer, version); case PUT_KV: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetOpeningStreamsRequest.java similarity index 69% rename from clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/GetOpeningStreamsRequest.java index f6e4c9ab02..cdc4809737 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetOpeningStreamsRequest.java @@ -18,26 +18,26 @@ package org.apache.kafka.common.requests.s3; import java.nio.ByteBuffer; -import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.GetOpeningStreamsRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; -public class GetStreamsOffsetRequest extends AbstractRequest { +public class GetOpeningStreamsRequest extends AbstractRequest { - public static class Builder extends AbstractRequest.Builder { + public static class Builder extends AbstractRequest.Builder { - private final GetStreamsOffsetRequestData data; - public Builder(GetStreamsOffsetRequestData data) { - super(ApiKeys.GET_STREAMS_OFFSET); + private final GetOpeningStreamsRequestData data; + public Builder(GetOpeningStreamsRequestData data) { + super(ApiKeys.GET_OPENING_STREAMS); this.data = data; } @Override - public GetStreamsOffsetRequest build(short version) { - return new GetStreamsOffsetRequest(data, version); + public GetOpeningStreamsRequest build(short version) { + return new GetOpeningStreamsRequest(data, version); } @Override @@ -46,10 +46,10 @@ public String toString() { } } - private final GetStreamsOffsetRequestData data; + private final GetOpeningStreamsRequestData data; - public GetStreamsOffsetRequest(GetStreamsOffsetRequestData data, short version) { - super(ApiKeys.GET_STREAMS_OFFSET, version); + public GetOpeningStreamsRequest(GetOpeningStreamsRequestData data, short version) { + super(ApiKeys.GET_OPENING_STREAMS, version); this.data = data; } @@ -63,12 +63,12 @@ public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { } @Override - public GetStreamsOffsetRequestData data() { + public GetOpeningStreamsRequestData data() { return data; } - public static GetStreamsOffsetRequest parse(ByteBuffer buffer, short version) { - return new GetStreamsOffsetRequest(new GetStreamsOffsetRequestData( + public static GetOpeningStreamsRequest parse(ByteBuffer buffer, short version) { + return new GetOpeningStreamsRequest(new GetOpeningStreamsRequestData( new ByteBufferAccessor(buffer), version), version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetOpeningStreamsResponse.java similarity index 75% rename from clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/GetOpeningStreamsResponse.java index b971db66b2..62577c9f8c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetOpeningStreamsResponse.java @@ -19,22 +19,22 @@ import java.nio.ByteBuffer; import java.util.Map; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; -public class GetStreamsOffsetResponse extends AbstractResponse { - private final GetStreamsOffsetResponseData data; +public class GetOpeningStreamsResponse extends AbstractResponse { + private final GetOpeningStreamsResponseData data; - public GetStreamsOffsetResponse(GetStreamsOffsetResponseData data) { - super(ApiKeys.GET_STREAMS_OFFSET); + public GetOpeningStreamsResponse(GetOpeningStreamsResponseData data) { + super(ApiKeys.GET_OPENING_STREAMS); this.data = data; } @Override - public GetStreamsOffsetResponseData data() { + public GetOpeningStreamsResponseData data() { return data; } @@ -53,8 +53,8 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } - public static GetStreamsOffsetResponse parse(ByteBuffer buffer, short version) { - return new GetStreamsOffsetResponse(new GetStreamsOffsetResponseData( + public static GetOpeningStreamsResponse parse(ByteBuffer buffer, short version) { + return new GetOpeningStreamsResponse(new GetOpeningStreamsResponseData( new ByteBufferAccessor(buffer), version)); } } diff --git a/clients/src/main/resources/common/message/GetStreamsOffsetRequest.json b/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json similarity index 80% rename from clients/src/main/resources/common/message/GetStreamsOffsetRequest.json rename to clients/src/main/resources/common/message/GetOpeningStreamsRequest.json index d976cf9e90..1f42938a90 100644 --- a/clients/src/main/resources/common/message/GetStreamsOffsetRequest.json +++ b/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json @@ -20,16 +20,21 @@ "controller", "broker" ], - "name": "GetStreamsOffsetRequest", + "name": "GetOpeningStreamsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { - "name": "StreamIds", - "type": "[]int64", + "name": "BrokerId", + "type": "int64", "versions": "0+", - "entityType": "streamId", - "about": "The ids of the requesting stream" + "about": "The broker id." + }, + { + "name": "BrokerEpoch", + "type": "int64", + "versions": "0+", + "about": "The broker epoch." } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/GetStreamsOffsetResponse.json b/clients/src/main/resources/common/message/GetOpeningStreamsResponse.json similarity index 98% rename from clients/src/main/resources/common/message/GetStreamsOffsetResponse.json rename to clients/src/main/resources/common/message/GetOpeningStreamsResponse.json index bd1ea25517..56e60256f9 100644 --- a/clients/src/main/resources/common/message/GetStreamsOffsetResponse.json +++ b/clients/src/main/resources/common/message/GetOpeningStreamsResponse.json @@ -16,7 +16,7 @@ { "apiKey": 508, "type": "response", - "name": "GetStreamsOffsetResponse", + "name": "GetOpeningStreamsResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index d222e69368..302a46cae8 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -273,6 +273,11 @@ public List getStreamObjects(long streamId, long startOf return Collections.emptyList(); } + @Override + public CompletableFuture> getOpeningStreams() { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + @Override public CompletableFuture createStream() { return this.submitEvent(() -> { @@ -356,15 +361,6 @@ public CompletableFuture deleteStream(long streamId, long epoch) { return null; } - @Override - public CompletableFuture> getStreamsOffset(List streamIds) { - return this.submitEvent(() -> { - return streamIds.stream().filter(this.streamsMetadata::containsKey).map(id -> { - return new StreamOffsetRange(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset); - }).collect(Collectors.toList()); - }); - } - private S3Object prepareObject(long objectId, long ttl) { long preparedTs = System.currentTimeMillis(); String objectKey = ObjectUtils.genKey(0, "todocluster", objectId); diff --git a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java index 1afd60bce9..a108e4ebcd 100644 --- a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java @@ -24,14 +24,14 @@ import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; -import org.apache.kafka.common.message.GetStreamsOffsetRequestData; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData; +import org.apache.kafka.common.message.GetOpeningStreamsRequestData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.s3.CloseStreamRequest; import org.apache.kafka.common.requests.s3.CreateStreamRequest; -import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; +import org.apache.kafka.common.requests.s3.GetOpeningStreamsRequest; import org.apache.kafka.common.requests.s3.OpenStreamRequest; import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.slf4j.Logger; @@ -52,6 +52,23 @@ public ControllerStreamManager(ControllerRequestSender requestSender, KafkaConfi this.requestSender = requestSender; } + @Override + public CompletableFuture> getOpeningStreams() { + GetOpeningStreamsRequest.Builder request = new GetOpeningStreamsRequest.Builder( + new GetOpeningStreamsRequestData().setBrokerId(config.brokerId()).setBrokerEpoch(config.brokerEpoch())); + return this.requestSender.send(request, GetOpeningStreamsResponseData.class).thenApply(resp -> { + switch (Errors.forCode(resp.errorCode())) { + case NONE: + return resp.streamsOffset().stream() + .map(streamOffset -> new StreamOffsetRange(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) + .collect(Collectors.toList()); + default: + LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode())); + throw Errors.forCode(resp.errorCode()).exception(); + } + }); + } + @Override public CompletableFuture createStream() { CreateStreamRequest.Builder request = new CreateStreamRequest.Builder( @@ -133,22 +150,4 @@ public CompletableFuture deleteStream(long streamId, long epoch) { // TODO: implement return CompletableFuture.completedFuture(null); } - - @Override - public CompletableFuture> getStreamsOffset(List streamIds) { - GetStreamsOffsetRequest.Builder request = new GetStreamsOffsetRequest.Builder( - new GetStreamsOffsetRequestData() - .setStreamIds(streamIds)); - return this.requestSender.send(request, GetStreamsOffsetResponseData.class).thenApply(resp -> { - switch (Errors.forCode(resp.errorCode())) { - case NONE: - return resp.streamsOffset().stream() - .map(streamOffset -> new StreamOffsetRange(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) - .collect(Collectors.toList()); - default: - LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode())); - throw Errors.forCode(resp.errorCode()).exception(); - } - }); - } } diff --git a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java index 5e5b177d72..2544161c0e 100644 --- a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java @@ -24,6 +24,15 @@ public interface StreamManager { + + /** + * Get current server opening streams. + * When server is starting or recovering, wal in EBS need streams offset to determine the recover point. + * @return list of {@link StreamOffsetRange} + */ + CompletableFuture> getOpeningStreams(); + + /** * Create a new stream. * @@ -67,15 +76,5 @@ public interface StreamManager { * @param epoch stream epoch. */ CompletableFuture deleteStream(long streamId, long epoch); - - /** - * Get streams offset. - *

- * When server is starting or recovering, wal in EBS need streams offset to determine the recover point. - * - * @param streamIds stream ids. - * @return {@link StreamOffsetRange} - */ - CompletableFuture> getStreamsOffset(List streamIds); } diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index f21f9cc686..9050c0747f 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.message.{CreateTopicsRequestData, _} import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ -import org.apache.kafka.common.requests.s3.{CloseStreamRequest, CloseStreamResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamRequest, CreateStreamResponse, DeleteKVRequest, DeleteKVResponse, DeleteStreamRequest, DeleteStreamResponse, GetKVRequest, GetKVResponse, GetStreamsOffsetRequest, GetStreamsOffsetResponse, OpenStreamRequest, OpenStreamResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse, PutKVRequest, PutKVResponse} +import org.apache.kafka.common.requests.s3.{CloseStreamRequest, CloseStreamResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamRequest, CreateStreamResponse, DeleteKVRequest, DeleteKVResponse, DeleteStreamRequest, DeleteStreamResponse, GetKVRequest, GetKVResponse, GetOpeningStreamsRequest, GetOpeningStreamsResponse, OpenStreamRequest, OpenStreamResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse, PutKVRequest, PutKVResponse} import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} import org.apache.kafka.common.utils.Time @@ -118,7 +118,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.PREPARE_S3_OBJECT => handlePrepareS3Object(request) case ApiKeys.COMMIT_WALOBJECT => handleCommitWALObject(request) case ApiKeys.COMMIT_STREAM_OBJECT => handleCommitStreamObject(request) - case ApiKeys.GET_STREAMS_OFFSET => handleGetStreamsOffset(request) + case ApiKeys.GET_OPENING_STREAMS => handleGetStreamsOffset(request) case ApiKeys.GET_KV => handleGetKV(request) case ApiKeys.PUT_KV => handlePutKV(request) case ApiKeys.DELETE_KV => handleDeleteKV(request) @@ -993,16 +993,16 @@ class ControllerApis(val requestChannel: RequestChannel, } def handleGetStreamsOffset(request: RequestChannel.Request): CompletableFuture[Unit] = { - val getStreamsOffsetRequest = request.body[GetStreamsOffsetRequest] + val getStreamsOffsetRequest = request.body[GetOpeningStreamsRequest] val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty()) - controller.getStreamsOffset(context, getStreamsOffsetRequest.data) + controller.getOpeningStreams(context, getStreamsOffsetRequest.data) .handle[Unit] { (result, exception) => if (exception != null) { requestHelper.handleError(request, exception) } else { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - new GetStreamsOffsetResponse(result.setThrottleTimeMs(requestThrottleMs)) + new GetOpeningStreamsResponse(result.setThrottleTimeMs(requestThrottleMs)) }) } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e7387e3cc7..b559293d7d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -2065,6 +2065,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3StreamObjectCompactionTaskInterval = getInt(KafkaConfig.S3StreamObjectCompactionTaskIntervalProp) val s3StreamObjectCompactionMaxSize = getInt(KafkaConfig.S3StreamObjectCompactionMaxSizeProp) val s3StreamObjectCompactionLivingTimeThreshold = getInt(KafkaConfig.S3StreamObjectCompactionLivingTimeThresholdProp) + // TODO: ensure incremental epoch => Store epoch in disk, if timestamp flip back, we could use disk epoch to keep the incremental epoch. + val brokerEpoch = System.currentTimeMillis() // Kafka on S3 inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 71ab0eaa91..091262da8e 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -53,8 +53,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.GetKVRequestData; import org.apache.kafka.common.message.GetKVResponseData; -import org.apache.kafka.common.message.GetStreamsOffsetRequestData; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData; +import org.apache.kafka.common.message.GetOpeningStreamsRequestData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; @@ -525,7 +525,7 @@ public CompletableFuture commitStreamObject(Cont } @Override - public CompletableFuture getStreamsOffset(ControllerRequestContext context, GetStreamsOffsetRequestData request) { + public CompletableFuture getOpeningStreams(ControllerRequestContext context, GetOpeningStreamsRequestData request) { throw new UnsupportedOperationException(); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 922addb8b7..c69d0a607b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -48,8 +48,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.GetKVRequestData; import org.apache.kafka.common.message.GetKVResponseData; -import org.apache.kafka.common.message.GetStreamsOffsetRequestData; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData; +import org.apache.kafka.common.message.GetOpeningStreamsRequestData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; @@ -452,9 +452,9 @@ CompletableFuture commitStreamObject( /** * Broker trys to get the offset: [startOffset, endOffset) of the stream. */ - CompletableFuture getStreamsOffset( + CompletableFuture getOpeningStreams( ControllerRequestContext context, - GetStreamsOffsetRequestData request + GetOpeningStreamsRequestData request ); /** diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 10093dc424..201c1bd2fe 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -58,8 +58,8 @@ import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.GetKVRequestData; import org.apache.kafka.common.message.GetKVResponseData; -import org.apache.kafka.common.message.GetStreamsOffsetRequestData; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData; +import org.apache.kafka.common.message.GetOpeningStreamsRequestData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; @@ -2313,9 +2313,9 @@ public CompletableFuture commitStreamObject(Cont } @Override - public CompletableFuture getStreamsOffset(ControllerRequestContext context, GetStreamsOffsetRequestData request) { - return appendReadEvent("getStreamsOffset", context.deadlineNs(), - () -> streamControlManager.getStreamsOffset(request)); + public CompletableFuture getOpeningStreams(ControllerRequestContext context, GetOpeningStreamsRequestData request) { + return appendWriteEvent("getOpeningStreams", context.deadlineNs(), + () -> streamControlManager.getOpeningStreams(request)); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index a7f44ef9f8..55ce9ac0ba 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -35,9 +35,9 @@ import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.DeleteStreamRequestData; import org.apache.kafka.common.message.DeleteStreamResponseData; -import org.apache.kafka.common.message.GetStreamsOffsetRequestData; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData.StreamOffset; +import org.apache.kafka.common.message.GetOpeningStreamsRequestData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData.StreamOffset; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; @@ -474,23 +474,30 @@ public ControllerResult commitStreamObject(Commi return ControllerResult.atomicOf(records, resp); } - public GetStreamsOffsetResponseData getStreamsOffset(GetStreamsOffsetRequestData data) { - List streamIds = data.streamIds(); - GetStreamsOffsetResponseData resp = new GetStreamsOffsetResponseData(); - List streamOffsets = streamIds.stream() - .filter(this.streamsMetadata::containsKey) - .map(id -> { - S3StreamMetadata streamMetadata = this.streamsMetadata.get(id); - RangeMetadata range = streamMetadata.ranges().get(streamMetadata.currentRangeIndex()); - long startOffset = streamMetadata.startOffset(); - long endOffset = range == null ? startOffset : range.endOffset(); - return new StreamOffset() - .setStreamId(id) - .setStartOffset(startOffset) - .setEndOffset(endOffset); - }).collect(Collectors.toList()); + public ControllerResult getOpeningStreams(GetOpeningStreamsRequestData data) { + // TODO: check broker epoch, reject old epoch request. + int brokerId = (int) data.brokerId(); + // The getOpeningStreams operation rate is low, so we just iterate all streams to get the broker opening streams. + List streamOffsets = this.streamsMetadata.entrySet().stream().filter(entry -> { + S3StreamMetadata streamMetadata = entry.getValue(); + int rangeIndex = streamMetadata.currentRangeIndex.get(); + if (rangeIndex < 0) { + return false; + } + RangeMetadata rangeMetadata = streamMetadata.ranges.get(rangeIndex); + return rangeMetadata.brokerId() == brokerId; + }).map(e -> { + S3StreamMetadata streamMetadata = e.getValue(); + RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get()); + return new StreamOffset() + .setStreamId(e.getKey()) + .setStartOffset(streamMetadata.startOffset.get()) + .setEndOffset(rangeMetadata.endOffset()); + }).collect(Collectors.toList()); + GetOpeningStreamsResponseData resp = new GetOpeningStreamsResponseData(); resp.setStreamsOffset(streamOffsets); - return resp; + // TODO: generate a broker epoch update record to ensure consistent linear read. + return ControllerResult.of(Collections.emptyList(), resp); } public void replay(AssignedStreamIdRecord record) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index e8cafaeedf..a8430f41f9 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -37,8 +37,8 @@ import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; -import org.apache.kafka.common.message.GetStreamsOffsetRequestData; -import org.apache.kafka.common.message.GetStreamsOffsetResponseData; +import org.apache.kafka.common.message.GetOpeningStreamsRequestData; +import org.apache.kafka.common.message.GetOpeningStreamsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; @@ -355,13 +355,17 @@ public void testCommitWalBasic() { assertEquals(1, manager.brokersMetadata().get(BROKER1).walObjects().size()); // 6. get stream's offset - GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() - .setStreamIds(List.of(STREAM0)); - GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + GetOpeningStreamsRequestData request = new GetOpeningStreamsRequestData() + .setBrokerId(BROKER1).setBrokerEpoch(0L); + GetOpeningStreamsResponseData streamsOffset = manager.getOpeningStreams(request).response(); assertEquals(1, streamsOffset.streamsOffset().size()); assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); assertEquals(300L, streamsOffset.streamsOffset().get(0).endOffset()); + + request = new GetOpeningStreamsRequestData() + .setBrokerId(BROKER0).setBrokerEpoch(0L); + assertEquals(0, manager.getOpeningStreams(request).response().streamsOffset().size()); } private void createAndOpenStream(int brokerId, long epoch) { @@ -406,9 +410,8 @@ public void testCommitWalCompacted() { replay(manager, result4.records()); // 3. fetch range end offset - GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() - .setStreamIds(List.of(STREAM0, STREAM1)); - GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + GetOpeningStreamsRequestData request = new GetOpeningStreamsRequestData().setBrokerId(BROKER0).setBrokerEpoch(0L); + GetOpeningStreamsResponseData streamsOffset = manager.getOpeningStreams(request).response(); assertEquals(2, streamsOffset.streamsOffset().size()); assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); @@ -440,7 +443,7 @@ public void testCommitWalCompacted() { replay(manager, result5.records()); // 5. fetch range end offset - streamsOffset = manager.getStreamsOffset(request); + streamsOffset = manager.getOpeningStreams(request).response(); assertEquals(2, streamsOffset.streamsOffset().size()); assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); @@ -487,7 +490,7 @@ public void testCommitWalCompacted() { replay(manager, result6.records()); // 8. fetch range end offset - streamsOffset = manager.getStreamsOffset(request); + streamsOffset = manager.getOpeningStreams(request).response(); assertEquals(2, streamsOffset.streamsOffset().size()); assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); @@ -538,9 +541,8 @@ public void testCommitWalWithStreamObject() { replay(manager, result4.records()); // 3. fetch range end offset - GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() - .setStreamIds(List.of(STREAM0, STREAM1)); - GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + GetOpeningStreamsRequestData request = new GetOpeningStreamsRequestData().setBrokerId(BROKER0).setBrokerEpoch(0L); + GetOpeningStreamsResponseData streamsOffset = manager.getOpeningStreams(request).response(); assertEquals(2, streamsOffset.streamsOffset().size()); assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); @@ -625,9 +627,8 @@ public void testCommitStreamObject() { replay(manager, result2.records()); // 5. fetch stream offset range - GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() - .setStreamIds(List.of(STREAM0, STREAM1)); - GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + GetOpeningStreamsRequestData request = new GetOpeningStreamsRequestData().setBrokerId(BROKER0).setBrokerEpoch(0L); + GetOpeningStreamsResponseData streamsOffset = manager.getOpeningStreams(request).response(); assertEquals(2, streamsOffset.streamsOffset().size()); assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset());