Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,30 @@

package kafka.log.s3.streams;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import kafka.log.s3.model.StreamOffset;
import kafka.log.s3.network.ControllerRequestSender;
import kafka.log.s3.objects.OpenStreamMetadata;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.message.CloseStreamRequestData;
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.message.CreateStreamRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.message.GetStreamsOffsetRequestData;
import org.apache.kafka.common.message.GetStreamsOffsetResponseData;
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.s3.CloseStreamRequest;
import org.apache.kafka.common.requests.s3.CreateStreamRequest;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest;
import org.apache.kafka.common.requests.s3.OpenStreamRequest;
import org.apache.kafka.common.requests.s3.CreateStreamRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class ControllerStreamManager implements StreamManager {

private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStreamManager.class);
Expand All @@ -51,7 +55,7 @@ public ControllerStreamManager(ControllerRequestSender requestSender, KafkaConfi
@Override
public CompletableFuture<Long> createStream() {
CreateStreamRequest.Builder request = new CreateStreamRequest.Builder(
new CreateStreamRequestData()
new CreateStreamRequestData()
);
return this.requestSender.send(request, CreateStreamResponseData.class).thenApply(resp -> {
switch (Errors.forCode(resp.errorCode())) {
Expand All @@ -67,10 +71,10 @@ public CompletableFuture<Long> createStream() {
@Override
public CompletableFuture<OpenStreamMetadata> openStream(long streamId, long epoch) {
OpenStreamRequest.Builder request = new OpenStreamRequest.Builder(
new OpenStreamRequestData()
.setStreamId(streamId)
.setStreamEpoch(epoch)
.setBrokerId(config.brokerId())
new OpenStreamRequestData()
.setStreamId(streamId)
.setStreamEpoch(epoch)
.setBrokerId(config.brokerId())
);
return this.requestSender.send(request, OpenStreamResponseData.class).thenApply(resp -> {
switch (Errors.forCode(resp.errorCode())) {
Expand All @@ -94,30 +98,53 @@ public CompletableFuture<OpenStreamMetadata> openStream(long streamId, long epoc

@Override
public CompletableFuture<Void> trimStream(long streamId, long epoch, long newStartOffset) {
return null;
// TODO: implement
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> closeStream(long streamId, long epoch) {
return null;
CloseStreamRequest.Builder request = new CloseStreamRequest.Builder(
new CloseStreamRequestData()
.setStreamId(streamId)
.setStreamEpoch(epoch)
.setBrokerId(config.brokerId())
);
return this.requestSender.send(request, CloseStreamResponseData.class).thenApply(resp -> {
LOGGER.info("close stream {} response: {}", streamId, resp);
switch (Errors.forCode(resp.errorCode())) {
case NONE:
return null;
case STREAM_NOT_EXIST:
case STREAM_FENCED:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while closing stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
default:
// TODO: retry recoverable error
LOGGER.warn("Error while closing stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
}
});
}

@Override
public CompletableFuture<Void> deleteStream(long streamId, long epoch) {
return null;
// TODO: implement
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<List<StreamOffset>> getStreamsOffset(List<Long> streamIds) {
GetStreamsOffsetRequest.Builder request = new GetStreamsOffsetRequest.Builder(
new GetStreamsOffsetRequestData()
.setStreamIds(streamIds));
new GetStreamsOffsetRequestData()
.setStreamIds(streamIds));
return this.requestSender.send(request, GetStreamsOffsetResponseData.class).thenApply(resp -> {
switch (Errors.forCode(resp.errorCode())) {
case NONE:
return resp.streamsOffset().stream()
.map(streamOffset -> new StreamOffset(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset()))
.collect(Collectors.toList());
.map(streamOffset -> new StreamOffset(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset()))
.collect(Collectors.toList());
default:
LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -578,13 +578,14 @@ class BrokerServer(
if (alterPartitionManager != null)
CoreUtils.swallow(alterPartitionManager.shutdown(), this)

if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)

if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)

// elastic stream inject start
// log manager need clientToControllerChannelManager to send request to controller.
if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)

// Note that logs are closed in logManager.shutdown().
// Make sure these thread pools are shutdown after the log manager's shutdown.
CoreUtils.swallow(replicaManager.shutdownAdditionalThreadPools(), this)
Expand Down