diff --git a/core/src/main/scala/kafka/log/stream/s3/network/ControllerRequestSender.java b/core/src/main/scala/kafka/log/stream/s3/network/ControllerRequestSender.java index bd99330197..cd2b035391 100644 --- a/core/src/main/scala/kafka/log/stream/s3/network/ControllerRequestSender.java +++ b/core/src/main/scala/kafka/log/stream/s3/network/ControllerRequestSender.java @@ -156,7 +156,7 @@ public class RequestAccumulator { public RequestAccumulator() { } - void send(RequestTask task) { + synchronized void send(RequestTask task) { if (task != null) { requestQueue.add(task); } @@ -166,9 +166,6 @@ void send(RequestTask task) { } void send0() { - if (requestQueue.isEmpty()) { - return; - } List inflight = new ArrayList<>(); requestQueue.drainTo(inflight); Builder builder = inflight.get(0).request.toRequestBuilder(); @@ -176,6 +173,14 @@ void send0() { RequestCtx requestCtx = new RequestCtx() { @Override void onSuccess(AbstractResponse response) { + try { + onSuccess0(response); + } catch (Exception e) { + LOGGER.error("[UNEXPECTED]", e); + } + } + + void onSuccess0(AbstractResponse response) { if (!(response instanceof AbstractBatchResponse)) { LOGGER.error("Unexpected response type: {} while sending request: {}", response.getClass().getSimpleName(), builder); @@ -209,6 +214,14 @@ void onSuccess(AbstractResponse response) { @Override void onError(Throwable e) { + try { + onError0(e); + } catch (Exception ex) { + LOGGER.error("[UNEXPECTED]", ex); + } + } + + void onError0(Throwable e) { if (e instanceof TimeoutException) { RequestAccumulator.this.inflight.compareAndSet(true, false); inflight.forEach(ControllerRequestSender.this::retryTask); diff --git a/core/src/test/java/kafka/log/stream/s3/ControllerRequestSenderTest.java b/core/src/test/java/kafka/log/stream/s3/ControllerRequestSenderTest.java index dd06eef503..f083297946 100644 --- a/core/src/test/java/kafka/log/stream/s3/ControllerRequestSenderTest.java +++ b/core/src/test/java/kafka/log/stream/s3/ControllerRequestSenderTest.java @@ -227,8 +227,7 @@ public void testBatchSend() throws Exception { ClientResponse clientResponse = new ClientResponse( null, null, null, -1, -1, false, null, null, response); // first time sleep 3s - Thread.sleep(3000); - handler.onComplete(clientResponse); + CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS).execute(() -> handler.onComplete(clientResponse)); return null; }).doAnswer(ink -> { CreateStreamsRequest.Builder requestBuilder = ink.getArgument(0);