Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors.s3;

import org.apache.kafka.common.errors.ApiException;

public class CompactedObjectsNotFoundException extends ApiException {
public CompactedObjectsNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.s3.CompactedObjectsNotFoundException;
import org.apache.kafka.common.errors.s3.ObjectNotExistException;
import org.apache.kafka.common.errors.s3.RedundantOperationException;
import org.apache.kafka.common.errors.s3.StreamExistException;
Expand Down Expand Up @@ -389,7 +390,7 @@ public enum Errors {
STREAM_NOT_OPENED(505, "The stream is not opened.", StreamNotOpenedException::new),
STREAM_NOT_CLOSED(506, "The stream is not closed.", StreamNotClosedException::new),
REDUNDANT_OPERATION(507, "The operation is redundant.", RedundantOperationException::new),

COMPACTED_OBJECTS_NOT_FOUND(508, "The compacted objects are not found.", CompactedObjectsNotFoundException::new),


STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new);
Expand Down
89 changes: 49 additions & 40 deletions core/src/main/scala/kafka/log/s3/ControllerKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import kafka.log.es.api.KVClient;
import kafka.log.es.api.KeyValue;
import kafka.log.s3.network.ControllerRequestSender;
import kafka.log.s3.network.ControllerRequestSender.RequestTask;
import kafka.log.s3.network.ControllerRequestSender.ResponseHandleResult;
import org.apache.kafka.common.message.DeleteKVRequestData;
import org.apache.kafka.common.message.DeleteKVResponseData;
import org.apache.kafka.common.message.GetKVRequestData;
import org.apache.kafka.common.message.GetKVResponseData;
import org.apache.kafka.common.message.PutKVRequestData;
Expand Down Expand Up @@ -57,18 +60,20 @@ public CompletableFuture<Void> putKV(List<KeyValue> list) {
.setValue(kv.value().array())
).collect(Collectors.toList()))
);
return this.requestSender.send(requestBuilder, PutKVResponseData.class)
.thenApply(resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp);
return null;
default:
LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}", list, code);
throw code.exception();
}
});
CompletableFuture<Void> future = new CompletableFuture<>();
RequestTask<PutKVResponseData, Void> task = new RequestTask<>(future, requestBuilder, PutKVResponseData.class, resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp);
return ResponseHandleResult.withSuccess(null);
default:
LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}, retry later", list, code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}

@Override
Expand All @@ -78,22 +83,24 @@ public CompletableFuture<List<KeyValue>> getKV(List<String> list) {
new GetKVRequestData()
.setKeys(list)
);
return this.requestSender.send(requestBuilder, GetKVResponseData.class)
.thenApply(resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
List<KeyValue> keyValues = resp.keyValues()
.stream()
.map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null))
.collect(Collectors.toList());
LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues);
return keyValues;
default:
LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}", String.join(",", list), code);
throw code.exception();
}
});
CompletableFuture<List<KeyValue>> future = new CompletableFuture<>();
RequestTask<GetKVResponseData, List<KeyValue>> task = new RequestTask<>(future, requestBuilder, GetKVResponseData.class, resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
List<KeyValue> keyValues = resp.keyValues()
.stream()
.map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null))
.collect(Collectors.toList());
LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues);
return ResponseHandleResult.withSuccess(keyValues);
default:
LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}, retry later", list, code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}

@Override
Expand All @@ -103,17 +110,19 @@ public CompletableFuture<Void> delKV(List<String> list) {
new DeleteKVRequestData()
.setKeys(list)
);
return this.requestSender.send(requestBuilder, PutKVResponseData.class)
.thenApply(resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp);
return null;
default:
LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}", String.join(",", list), code);
throw code.exception();
}
});
CompletableFuture<Void> future = new CompletableFuture<>();
RequestTask<DeleteKVResponseData, Void> task = new RequestTask<>(future, requestBuilder, DeleteKVResponseData.class, resp -> {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp);
return ResponseHandleResult.withSuccess(null);
default:
LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}, retry later", String.join(",", list), code);
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
}
}
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/log/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.metadata.StreamMetadataManager;
import kafka.log.s3.network.ControllerRequestSender;
import kafka.log.s3.network.ControllerRequestSender.RetryPolicyContext;
import kafka.log.s3.objects.ControllerObjectManager;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.operator.S3Operator;
Expand Down Expand Up @@ -58,7 +59,9 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator
this.config = config;
this.metadataManager = new StreamMetadataManager(brokerServer, config);
this.operator = operator;
this.requestSender = new ControllerRequestSender(brokerServer);
RetryPolicyContext retryPolicyContext = new RetryPolicyContext(config.s3ControllerRequestRetryMaxCount(),
config.s3ControllerRequestRetryBaseDelayMs());
this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext);
this.streamManager = new ControllerStreamManager(this.requestSender, config);
this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config);
this.blockCache = new DefaultS3BlockCache(config.s3CacheSize(), objectManager, operator);
Expand Down
166 changes: 152 additions & 14 deletions core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,199 @@

package kafka.log.s3.network;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import kafka.server.BrokerServer;
import kafka.server.BrokerToControllerChannelManager;
import kafka.server.ControllerRequestCompletionHandler;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequest.Builder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

public class ControllerRequestSender {
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class);

private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class);
private final RetryPolicyContext retryPolicyContext;
private final BrokerServer brokerServer;
private BrokerToControllerChannelManager channelManager;
private final BrokerToControllerChannelManager channelManager;

private final ScheduledExecutorService retryService;

public ControllerRequestSender(BrokerServer brokerServer) {
public ControllerRequestSender(BrokerServer brokerServer, RetryPolicyContext retryPolicyContext) {
this.retryPolicyContext = retryPolicyContext;
this.brokerServer = brokerServer;
this.channelManager = brokerServer.clientToControllerChannelManager();
this.retryService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("controller-request-retry-sender"));
}

public <T extends AbstractRequest, R extends ApiMessage> CompletableFuture<R> send(AbstractRequest.Builder<T> requestBuilder,
Class<R> responseDataType) {
CompletableFuture<R> cf = new CompletableFuture<>();
LOGGER.debug("Sending request {}", requestBuilder);
public void send(RequestTask task) {
Builder requestBuilder = task.requestBuilder;
Class responseDataType = task.responseDataType;
task.sendHit();
LOGGER.trace("Sending task: {}", task);
channelManager.sendRequest(requestBuilder, new ControllerRequestCompletionHandler() {
@Override
public void onTimeout() {
// TODO: add timeout retry policy
LOGGER.error("Timeout while creating stream");
cf.completeExceptionally(new TimeoutException("Timeout while creating stream"));
task.completeExceptionally(new TimeoutException("Timeout while creating stream"));
}

@Override
public void onComplete(ClientResponse response) {
if (response.authenticationException() != null) {
LOGGER.error("Authentication error while sending request: {}", requestBuilder, response.authenticationException());
cf.completeExceptionally(response.authenticationException());
task.completeExceptionally(response.authenticationException());
return;
}
if (response.versionMismatch() != null) {
LOGGER.error("Version mismatch while sending request: {}", requestBuilder, response.versionMismatch());
cf.completeExceptionally(response.versionMismatch());
task.completeExceptionally(response.versionMismatch());
return;
}
if (!responseDataType.isInstance(response.responseBody().data())) {
LOGGER.error("Unexpected response type: {} while sending request: {}",
response.responseBody().data().getClass().getSimpleName(), requestBuilder);
cf.completeExceptionally(new RuntimeException("Unexpected response type while sending request"));
response.responseBody().data().getClass().getSimpleName(), requestBuilder);
task.completeExceptionally(new RuntimeException("Unexpected response type while sending request"));
}
ApiMessage data = response.responseBody().data();
try {
ResponseHandleResult result = (ResponseHandleResult) task.responseHandler.apply(data);
if (result.retry()) {
retryTask(task);
return;
}
task.complete(result.getResponse());
} catch (Exception e) {
task.completeExceptionally(e);
}
cf.complete((R) response.responseBody().data());
}
});
return cf;
}

private void retryTask(RequestTask task) {
if (task.sendCount() > retryPolicyContext.maxRetryCount) {
LOGGER.error("Task: {}, retry count exceed max retry count: {}", task, retryPolicyContext.maxRetryCount);
task.completeExceptionally(new RuntimeException("Retry count exceed max retry count: "));
return;
}
long delay = retryPolicyContext.retryBaseDelayMs * (1 << (task.sendCount() - 1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need delay upper bound to prevent next retry wait too long.

LOGGER.warn("Retry task: {}, delay : {} ms", task, delay);
retryService.schedule(() -> send(task), delay, TimeUnit.MILLISECONDS);
}

public static class RequestTask<T/*controller response data type*/, Z/*upper response data type*/> {

private final CompletableFuture<Z> cf;

private final AbstractRequest.Builder requestBuilder;

private final Class<T> responseDataType;

/**
* The response handler is used to determine whether the response is valid or retryable.
*/
private final Function<T, ResponseHandleResult<Z>> responseHandler;

private int sendCount;

public RequestTask(CompletableFuture<Z> future, AbstractRequest.Builder requestBuilder, Class<T> responseDataType,
Function<T, ResponseHandleResult<Z>> responseHandler) {
this.cf = future;
this.requestBuilder = requestBuilder;
this.responseDataType = responseDataType;
this.responseHandler = responseHandler;
}

public CompletableFuture<Z> cf() {
return cf;
}

public void sendHit() {
sendCount++;
}

public int sendCount() {
return sendCount;
}

public void complete(Z result) {
cf.complete(result);
}

public void completeExceptionally(Throwable throwable) {
cf.completeExceptionally(throwable);
}

@Override
public String toString() {
return "RequestTask{" +
"requestBuilder=" + requestBuilder +
", responseDataType=" + responseDataType +
", sendCount=" + sendCount +
'}';
}
}

public static class ResponseHandleResult<R> {

private final boolean retry;
private final R response;

private ResponseHandleResult(boolean retry, R response) {
this.retry = retry;
this.response = response;
}

public static ResponseHandleResult withRetry() {
return new ResponseHandleResult(true, null);
}

public static <R> ResponseHandleResult withSuccess(R response) {
return new ResponseHandleResult(false, response);
}

public boolean retry() {
return retry;
}

public R getResponse() {
return response;
}
}

public static class RetryPolicyContext {
private int maxRetryCount;
private long retryBaseDelayMs;

public RetryPolicyContext(int maxRetryCount, long retryBaseDelayMs) {
this.maxRetryCount = maxRetryCount;
this.retryBaseDelayMs = retryBaseDelayMs;
}

public int maxRetryCount() {
return maxRetryCount;
}

public long retryBaseDelayMs() {
return retryBaseDelayMs;
}

public void setMaxRetryCount(int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
}

public void setRetryBaseDelayMs(long retryBaseDelayMs) {
this.retryBaseDelayMs = retryBaseDelayMs;
}
}
}
Loading