From 2c2ff83a78c4041373377ac90252349904e6c46a Mon Sep 17 00:00:00 2001 From: wangbin Date: Fri, 15 Oct 2021 16:09:22 +0800 Subject: [PATCH] fix error when producer write failed --- .../java/io/hstream/impl/ProducerImpl.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/io/hstream/impl/ProducerImpl.java b/client/src/main/java/io/hstream/impl/ProducerImpl.java index 369ccd67..fc8f877b 100644 --- a/client/src/main/java/io/hstream/impl/ProducerImpl.java +++ b/client/src/main/java/io/hstream/impl/ProducerImpl.java @@ -86,11 +86,18 @@ private void flush() { logger.info("start flush recordBuffer, current buffer size is: {}", recordBufferCount); writeHStreamRecords(recordBuffer) - .thenAccept( - recordIds -> { - for (int i = 0; i < recordIds.size(); ++i) { - futures.get(i).complete(recordIds.get(i)); + .handle( + (recordIds, exception) -> { + if (exception == null) { + for (int i = 0; i < recordIds.size(); ++i) { + futures.get(i).complete(recordIds.get(i)); + } + } else { + for (int i = 0; i < futures.size(); ++i) { + futures.get(i).completeExceptionally(exception); + } } + return null; }) .join(); @@ -125,7 +132,8 @@ public void onNext(AppendResponse appendResponse) { @Override public void onError(Throwable t) { - throw new HStreamDBClientException(t); + logger.warn("write records error: ", t); + completableFuture.completeExceptionally(new HStreamDBClientException(t)); } @Override