Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public class RequestAccumulator {
public RequestAccumulator() {
}

void send(RequestTask task) {
synchronized void send(RequestTask task) {
if (task != null) {
requestQueue.add(task);
}
Expand All @@ -166,16 +166,21 @@ void send(RequestTask task) {
}

void send0() {
if (requestQueue.isEmpty()) {
return;
}
List<RequestTask> inflight = new ArrayList<>();
requestQueue.drainTo(inflight);
Builder builder = inflight.get(0).request.toRequestBuilder();
inflight.stream().map(task -> (BatchRequest) task.request).skip(1).forEach(req -> req.addSubRequest(builder));
RequestCtx requestCtx = new RequestCtx() {
@Override
void onSuccess(AbstractResponse response) {
try {
onSuccess0(response);
} catch (Exception e) {
LOGGER.error("[UNEXPECTED]", e);
}
}

void onSuccess0(AbstractResponse response) {
if (!(response instanceof AbstractBatchResponse)) {
LOGGER.error("Unexpected response type: {} while sending request: {}",
response.getClass().getSimpleName(), builder);
Expand Down Expand Up @@ -209,6 +214,14 @@ void onSuccess(AbstractResponse response) {

@Override
void onError(Throwable e) {
try {
onError0(e);
} catch (Exception ex) {
LOGGER.error("[UNEXPECTED]", ex);
}
}

void onError0(Throwable e) {
if (e instanceof TimeoutException) {
RequestAccumulator.this.inflight.compareAndSet(true, false);
inflight.forEach(ControllerRequestSender.this::retryTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ public void testBatchSend() throws Exception {
ClientResponse clientResponse = new ClientResponse(
null, null, null, -1, -1, false, null, null, response);
// first time sleep 3s
Thread.sleep(3000);
handler.onComplete(clientResponse);
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS).execute(() -> handler.onComplete(clientResponse));
return null;
}).doAnswer(ink -> {
CreateStreamsRequest.Builder requestBuilder = ink.getArgument(0);
Expand Down