diff --git a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java index a4e6ca99a8..d7838a824e 100644 --- a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java @@ -17,13 +17,12 @@ package kafka.log.s3.streams; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import kafka.log.s3.model.StreamOffset; import kafka.log.s3.network.ControllerRequestSender; import kafka.log.s3.objects.OpenStreamMetadata; import kafka.server.KafkaConfig; +import org.apache.kafka.common.message.CloseStreamRequestData; +import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.GetStreamsOffsetRequestData; @@ -31,12 +30,17 @@ import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.s3.CloseStreamRequest; +import org.apache.kafka.common.requests.s3.CreateStreamRequest; import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; import org.apache.kafka.common.requests.s3.OpenStreamRequest; -import org.apache.kafka.common.requests.s3.CreateStreamRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + public class ControllerStreamManager implements StreamManager { private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStreamManager.class); @@ -51,7 +55,7 @@ public ControllerStreamManager(ControllerRequestSender requestSender, KafkaConfi @Override public CompletableFuture createStream() { CreateStreamRequest.Builder request = new CreateStreamRequest.Builder( - new CreateStreamRequestData() + new CreateStreamRequestData() ); return this.requestSender.send(request, CreateStreamResponseData.class).thenApply(resp -> { switch (Errors.forCode(resp.errorCode())) { @@ -67,10 +71,10 @@ public CompletableFuture createStream() { @Override public CompletableFuture openStream(long streamId, long epoch) { OpenStreamRequest.Builder request = new OpenStreamRequest.Builder( - new OpenStreamRequestData() - .setStreamId(streamId) - .setStreamEpoch(epoch) - .setBrokerId(config.brokerId()) + new OpenStreamRequestData() + .setStreamId(streamId) + .setStreamEpoch(epoch) + .setBrokerId(config.brokerId()) ); return this.requestSender.send(request, OpenStreamResponseData.class).thenApply(resp -> { switch (Errors.forCode(resp.errorCode())) { @@ -94,30 +98,53 @@ public CompletableFuture openStream(long streamId, long epoc @Override public CompletableFuture trimStream(long streamId, long epoch, long newStartOffset) { - return null; + // TODO: implement + return CompletableFuture.completedFuture(null); } @Override public CompletableFuture closeStream(long streamId, long epoch) { - return null; + CloseStreamRequest.Builder request = new CloseStreamRequest.Builder( + new CloseStreamRequestData() + .setStreamId(streamId) + .setStreamEpoch(epoch) + .setBrokerId(config.brokerId()) + ); + return this.requestSender.send(request, CloseStreamResponseData.class).thenApply(resp -> { + LOGGER.info("close stream {} response: {}", streamId, resp); + switch (Errors.forCode(resp.errorCode())) { + case NONE: + return null; + case STREAM_NOT_EXIST: + case STREAM_FENCED: + case STREAM_INNER_ERROR: + LOGGER.error("Unexpected error while closing stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); + throw Errors.forCode(resp.errorCode()).exception(); + default: + // TODO: retry recoverable error + LOGGER.warn("Error while closing stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode())); + throw Errors.forCode(resp.errorCode()).exception(); + } + }); } @Override public CompletableFuture deleteStream(long streamId, long epoch) { - return null; + // TODO: implement + return CompletableFuture.completedFuture(null); } @Override public CompletableFuture> getStreamsOffset(List streamIds) { GetStreamsOffsetRequest.Builder request = new GetStreamsOffsetRequest.Builder( - new GetStreamsOffsetRequestData() - .setStreamIds(streamIds)); + new GetStreamsOffsetRequestData() + .setStreamIds(streamIds)); return this.requestSender.send(request, GetStreamsOffsetResponseData.class).thenApply(resp -> { switch (Errors.forCode(resp.errorCode())) { case NONE: return resp.streamsOffset().stream() - .map(streamOffset -> new StreamOffset(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) - .collect(Collectors.toList()); + .map(streamOffset -> new StreamOffset(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) + .collect(Collectors.toList()); default: LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode())); throw Errors.forCode(resp.errorCode()).exception(); diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 74e7bd29f9..a9c6663e3a 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -578,13 +578,14 @@ class BrokerServer( if (alterPartitionManager != null) CoreUtils.swallow(alterPartitionManager.shutdown(), this) - if (clientToControllerChannelManager != null) - CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) - if (logManager != null) CoreUtils.swallow(logManager.shutdown(), this) // elastic stream inject start + // log manager need clientToControllerChannelManager to send request to controller. + if (clientToControllerChannelManager != null) + CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) + // Note that logs are closed in logManager.shutdown(). // Make sure these thread pools are shutdown after the log manager's shutdown. CoreUtils.swallow(replicaManager.shutdownAdditionalThreadPools(), this)