Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public enum ApiKeys {
PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true),
COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true),
COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true),
GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true),
GET_OPENING_STREAMS(ApiMessageType.GET_OPENING_STREAMS, false, true),
GET_KV(ApiMessageType.GET_KV, false, true),
PUT_KV(ApiMessageType.PUT_KV, false, true),
DELETE_KV(ApiMessageType.DELETE_KV, false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.kafka.common.requests.s3.DeleteKVRequest;
import org.apache.kafka.common.requests.s3.DeleteStreamRequest;
import org.apache.kafka.common.requests.s3.GetKVRequest;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest;
import org.apache.kafka.common.requests.s3.GetOpeningStreamsRequest;
import org.apache.kafka.common.requests.s3.OpenStreamRequest;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest;
import org.apache.kafka.common.requests.s3.PutKVRequest;
Expand Down Expand Up @@ -331,8 +331,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return CommitWALObjectRequest.parse(buffer, apiVersion);
case COMMIT_STREAM_OBJECT:
return CommitStreamObjectRequest.parse(buffer, apiVersion);
case GET_STREAMS_OFFSET:
return GetStreamsOffsetRequest.parse(buffer, apiVersion);
case GET_OPENING_STREAMS:
return GetOpeningStreamsRequest.parse(buffer, apiVersion);
case GET_KV:
return GetKVRequest.parse(buffer, apiVersion);
case PUT_KV:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.kafka.common.requests.s3.DeleteKVResponse;
import org.apache.kafka.common.requests.s3.DeleteStreamResponse;
import org.apache.kafka.common.requests.s3.GetKVResponse;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetResponse;
import org.apache.kafka.common.requests.s3.GetOpeningStreamsResponse;
import org.apache.kafka.common.requests.s3.OpenStreamResponse;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse;
import org.apache.kafka.common.requests.s3.PutKVResponse;
Expand Down Expand Up @@ -275,8 +275,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return CommitStreamObjectResponse.parse(responseBuffer, version);
case COMMIT_WALOBJECT:
return CommitWALObjectResponse.parse(responseBuffer, version);
case GET_STREAMS_OFFSET:
return GetStreamsOffsetResponse.parse(responseBuffer, version);
case GET_OPENING_STREAMS:
return GetOpeningStreamsResponse.parse(responseBuffer, version);
case GET_KV:
return GetKVResponse.parse(responseBuffer, version);
case PUT_KV:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@
package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.GetStreamsOffsetRequestData;
import org.apache.kafka.common.message.GetOpeningStreamsRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class GetStreamsOffsetRequest extends AbstractRequest {
public class GetOpeningStreamsRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<GetStreamsOffsetRequest> {
public static class Builder extends AbstractRequest.Builder<GetOpeningStreamsRequest> {

private final GetStreamsOffsetRequestData data;
public Builder(GetStreamsOffsetRequestData data) {
super(ApiKeys.GET_STREAMS_OFFSET);
private final GetOpeningStreamsRequestData data;
public Builder(GetOpeningStreamsRequestData data) {
super(ApiKeys.GET_OPENING_STREAMS);
this.data = data;
}

@Override
public GetStreamsOffsetRequest build(short version) {
return new GetStreamsOffsetRequest(data, version);
public GetOpeningStreamsRequest build(short version) {
return new GetOpeningStreamsRequest(data, version);
}

@Override
Expand All @@ -46,10 +46,10 @@ public String toString() {
}
}

private final GetStreamsOffsetRequestData data;
private final GetOpeningStreamsRequestData data;

public GetStreamsOffsetRequest(GetStreamsOffsetRequestData data, short version) {
super(ApiKeys.GET_STREAMS_OFFSET, version);
public GetOpeningStreamsRequest(GetOpeningStreamsRequestData data, short version) {
super(ApiKeys.GET_OPENING_STREAMS, version);
this.data = data;
}

Expand All @@ -63,12 +63,12 @@ public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
}

@Override
public GetStreamsOffsetRequestData data() {
public GetOpeningStreamsRequestData data() {
return data;
}

public static GetStreamsOffsetRequest parse(ByteBuffer buffer, short version) {
return new GetStreamsOffsetRequest(new GetStreamsOffsetRequestData(
public static GetOpeningStreamsRequest parse(ByteBuffer buffer, short version) {
return new GetOpeningStreamsRequest(new GetOpeningStreamsRequestData(
new ByteBufferAccessor(buffer), version), version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.GetStreamsOffsetResponseData;
import org.apache.kafka.common.message.GetOpeningStreamsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class GetStreamsOffsetResponse extends AbstractResponse {
private final GetStreamsOffsetResponseData data;
public class GetOpeningStreamsResponse extends AbstractResponse {
private final GetOpeningStreamsResponseData data;

public GetStreamsOffsetResponse(GetStreamsOffsetResponseData data) {
super(ApiKeys.GET_STREAMS_OFFSET);
public GetOpeningStreamsResponse(GetOpeningStreamsResponseData data) {
super(ApiKeys.GET_OPENING_STREAMS);
this.data = data;
}

@Override
public GetStreamsOffsetResponseData data() {
public GetOpeningStreamsResponseData data() {
return data;
}

Expand All @@ -53,8 +53,8 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static GetStreamsOffsetResponse parse(ByteBuffer buffer, short version) {
return new GetStreamsOffsetResponse(new GetStreamsOffsetResponseData(
public static GetOpeningStreamsResponse parse(ByteBuffer buffer, short version) {
return new GetOpeningStreamsResponse(new GetOpeningStreamsResponseData(
new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@
"controller",
"broker"
],
"name": "GetStreamsOffsetRequest",
"name": "GetOpeningStreamsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "StreamIds",
"type": "[]int64",
"name": "BrokerId",
"type": "int32",
"versions": "0+",
"entityType": "streamId",
"about": "The ids of the requesting stream"
"about": "The broker id."
},
{
"name": "BrokerEpoch",
"type": "int64",
"versions": "0+",
"about": "The broker epoch."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 508,
"type": "response",
"name": "GetStreamsOffsetResponse",
"name": "GetOpeningStreamsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ public List<S3StreamObjectMetadata> getStreamObjects(long streamId, long startOf
return Collections.emptyList();
}

@Override
public CompletableFuture<List<StreamOffsetRange>> getOpeningStreams() {
return CompletableFuture.completedFuture(Collections.emptyList());
}

@Override
public CompletableFuture<Long> createStream() {
return this.submitEvent(() -> {
Expand Down Expand Up @@ -356,15 +361,6 @@ public CompletableFuture<Void> deleteStream(long streamId, long epoch) {
return null;
}

@Override
public CompletableFuture<List<StreamOffsetRange>> getStreamsOffset(List<Long> streamIds) {
return this.submitEvent(() -> {
return streamIds.stream().filter(this.streamsMetadata::containsKey).map(id -> {
return new StreamOffsetRange(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset);
}).collect(Collectors.toList());
});
}

private S3Object prepareObject(long objectId, long ttl) {
long preparedTs = System.currentTimeMillis();
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.message.CreateStreamRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.message.GetStreamsOffsetRequestData;
import org.apache.kafka.common.message.GetStreamsOffsetResponseData;
import org.apache.kafka.common.message.GetOpeningStreamsRequestData;
import org.apache.kafka.common.message.GetOpeningStreamsResponseData;
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.s3.CloseStreamRequest;
import org.apache.kafka.common.requests.s3.CreateStreamRequest;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest;
import org.apache.kafka.common.requests.s3.GetOpeningStreamsRequest;
import org.apache.kafka.common.requests.s3.OpenStreamRequest;
import org.apache.kafka.metadata.stream.StreamOffsetRange;
import org.slf4j.Logger;
Expand All @@ -52,6 +52,23 @@ public ControllerStreamManager(ControllerRequestSender requestSender, KafkaConfi
this.requestSender = requestSender;
}

@Override
public CompletableFuture<List<StreamOffsetRange>> getOpeningStreams() {
GetOpeningStreamsRequest.Builder request = new GetOpeningStreamsRequest.Builder(
new GetOpeningStreamsRequestData().setBrokerId(config.brokerId()).setBrokerEpoch(config.brokerEpoch()));
return this.requestSender.send(request, GetOpeningStreamsResponseData.class).thenApply(resp -> {
switch (Errors.forCode(resp.errorCode())) {
case NONE:
return resp.streamsOffset().stream()
.map(streamOffset -> new StreamOffsetRange(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset()))
.collect(Collectors.toList());
default:
LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
}
});
}

@Override
public CompletableFuture<Long> createStream() {
CreateStreamRequest.Builder request = new CreateStreamRequest.Builder(
Expand Down Expand Up @@ -133,22 +150,4 @@ public CompletableFuture<Void> deleteStream(long streamId, long epoch) {
// TODO: implement
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<List<StreamOffsetRange>> getStreamsOffset(List<Long> streamIds) {
GetStreamsOffsetRequest.Builder request = new GetStreamsOffsetRequest.Builder(
new GetStreamsOffsetRequestData()
.setStreamIds(streamIds));
return this.requestSender.send(request, GetStreamsOffsetResponseData.class).thenApply(resp -> {
switch (Errors.forCode(resp.errorCode())) {
case NONE:
return resp.streamsOffset().stream()
.map(streamOffset -> new StreamOffsetRange(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset()))
.collect(Collectors.toList());
default:
LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
}
});
}
}
19 changes: 9 additions & 10 deletions core/src/main/scala/kafka/log/s3/streams/StreamManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@


public interface StreamManager {

/**
* Get current server opening streams.
* When server is starting or recovering, wal in EBS need streams offset to determine the recover point.
* @return list of {@link StreamOffsetRange}
*/
CompletableFuture<List<StreamOffsetRange>> getOpeningStreams();


/**
* Create a new stream.
*
Expand Down Expand Up @@ -67,15 +76,5 @@ public interface StreamManager {
* @param epoch stream epoch.
*/
CompletableFuture<Void> deleteStream(long streamId, long epoch);

/**
* Get streams offset.
* <p>
* When server is starting or recovering, wal in EBS need streams offset to determine the recover point.
*
* @param streamIds stream ids.
* @return {@link StreamOffsetRange}
*/
CompletableFuture<List<StreamOffsetRange>> getStreamsOffset(List<Long> streamIds);
}

10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.s3.{CloseStreamRequest, CloseStreamResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamRequest, CreateStreamResponse, DeleteKVRequest, DeleteKVResponse, DeleteStreamRequest, DeleteStreamResponse, GetKVRequest, GetKVResponse, GetStreamsOffsetRequest, GetStreamsOffsetResponse, OpenStreamRequest, OpenStreamResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse, PutKVRequest, PutKVResponse}
import org.apache.kafka.common.requests.s3.{CloseStreamRequest, CloseStreamResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamRequest, CreateStreamResponse, DeleteKVRequest, DeleteKVResponse, DeleteStreamRequest, DeleteStreamResponse, GetKVRequest, GetKVResponse, GetOpeningStreamsRequest, GetOpeningStreamsResponse, OpenStreamRequest, OpenStreamResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse, PutKVRequest, PutKVResponse}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
Expand Down Expand Up @@ -118,7 +118,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.PREPARE_S3_OBJECT => handlePrepareS3Object(request)
case ApiKeys.COMMIT_WALOBJECT => handleCommitWALObject(request)
case ApiKeys.COMMIT_STREAM_OBJECT => handleCommitStreamObject(request)
case ApiKeys.GET_STREAMS_OFFSET => handleGetStreamsOffset(request)
case ApiKeys.GET_OPENING_STREAMS => handleGetStreamsOffset(request)
case ApiKeys.GET_KV => handleGetKV(request)
case ApiKeys.PUT_KV => handlePutKV(request)
case ApiKeys.DELETE_KV => handleDeleteKV(request)
Expand Down Expand Up @@ -993,16 +993,16 @@ class ControllerApis(val requestChannel: RequestChannel,
}

def handleGetStreamsOffset(request: RequestChannel.Request): CompletableFuture[Unit] = {
val getStreamsOffsetRequest = request.body[GetStreamsOffsetRequest]
val getStreamsOffsetRequest = request.body[GetOpeningStreamsRequest]
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.getStreamsOffset(context, getStreamsOffsetRequest.data)
controller.getOpeningStreams(context, getStreamsOffsetRequest.data)
.handle[Unit] { (result, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
new GetStreamsOffsetResponse(result.setThrottleTimeMs(requestThrottleMs))
new GetOpeningStreamsResponse(result.setThrottleTimeMs(requestThrottleMs))
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3StreamObjectCompactionTaskInterval = getInt(KafkaConfig.S3StreamObjectCompactionTaskIntervalProp)
val s3StreamObjectCompactionMaxSize = getInt(KafkaConfig.S3StreamObjectCompactionMaxSizeProp)
val s3StreamObjectCompactionLivingTimeThreshold = getInt(KafkaConfig.S3StreamObjectCompactionLivingTimeThresholdProp)
// TODO: ensure incremental epoch => Store epoch in disk, if timestamp flip back, we could use disk epoch to keep the incremental epoch.
val brokerEpoch = System.currentTimeMillis()
// Kafka on S3 inject end

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/kafka/test/MockController.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.GetKVRequestData;
import org.apache.kafka.common.message.GetKVResponseData;
import org.apache.kafka.common.message.GetStreamsOffsetRequestData;
import org.apache.kafka.common.message.GetStreamsOffsetResponseData;
import org.apache.kafka.common.message.GetOpeningStreamsRequestData;
import org.apache.kafka.common.message.GetOpeningStreamsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.OpenStreamRequestData;
Expand Down Expand Up @@ -525,7 +525,7 @@ public CompletableFuture<CommitStreamObjectResponseData> commitStreamObject(Cont
}

@Override
public CompletableFuture<GetStreamsOffsetResponseData> getStreamsOffset(ControllerRequestContext context, GetStreamsOffsetRequestData request) {
public CompletableFuture<GetOpeningStreamsResponseData> getOpeningStreams(ControllerRequestContext context, GetOpeningStreamsRequestData request) {
throw new UnsupportedOperationException();
}

Expand Down
Loading