From fb5e7c10a1cf5eb8e61b904e7fdad621d5fa9f08 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 27 Nov 2025 18:41:21 +0800 Subject: [PATCH] Pipe: Ignore logging when `returnSelf` is called in the event of an exception in `AsyncClient`. --- .../handler/PipeTransferTrackableHandler.java | 15 ++++++----- .../handler/PipeTransferTsFileHandler.java | 15 ++++++----- .../iotdb/commons/client/ClientManager.java | 25 +++++++++++++++++++ .../AsyncPipeDataTransferServiceClient.java | 14 ++++++++++- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index d0d6d0542990..8a552f0cbeeb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -97,12 +97,15 @@ protected boolean tryTransfer( clearEventsReferenceCount(); connector.eliminateHandler(this, true); client.setShouldReturnSelf(true); - try { - client.returnSelf(); - } catch (final IllegalStateException e) { - LOGGER.info( - "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); - } + client.returnSelf( + (e) -> { + if (e instanceof IllegalStateException) { + LOGGER.info( + "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); + return true; + } + return false; + }); this.client = null; return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 3cdb2abda156..3ef46034554a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -431,12 +431,15 @@ private void returnClientIfNecessary() { } client.setShouldReturnSelf(true); - try { - client.returnSelf(); - } catch (final IllegalStateException e) { - LOGGER.info( - "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); - } + client.returnSelf( + (e) -> { + if (e instanceof IllegalStateException) { + LOGGER.info( + "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); + return true; + } + return false; + }); client = null; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 0915e69802e0..41d779c4398a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.function.Function; public class ClientManager implements IClientManager { @@ -79,6 +80,30 @@ public void returnClient(K node, V client) { } } + /** + * return a client V for node K to the {@link ClientManager}, and ignore some exception + * + *

Note: We do not define this interface in {@link IClientManager} to make you aware that the + * return of a client is automatic whenever a particular client is used. + */ + public void returnClient(K node, V client, Function ignoreError) { + if (node != null) { + try { + pool.returnObject(node, client); + } catch (Exception e) { + if (!Boolean.TRUE.equals(ignoreError.apply(e))) { + LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e); + } + } + } else if (client instanceof ThriftClient) { + ((ThriftClient) client).invalidateAll(); + LOGGER.warn( + "Return client {} to pool failed because the node is null. " + + "This may cause resource leak, please check your code.", + client); + } + } + @Override public void clear(K node) { Optional.ofNullable(node) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index f662b697ebfb..e5a892158170 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncClient implements ThriftClient { @@ -84,7 +85,8 @@ public void onComplete() { public void onError(final Exception e) { super.onError(e); ThriftClient.resolveException(e, this); - returnSelf(); + returnSelf( + (i) -> i instanceof IllegalStateException && "Client has an error!".equals(i.getMessage())); } @Override @@ -114,6 +116,16 @@ public void returnSelf() { } } + /** + * return self, the method doesn't need to be called by the user and will be triggered after the + * RPC is finished. + */ + public void returnSelf(Function ignoreError) { + if (shouldReturnSelf.get()) { + clientManager.returnClient(endpoint, this, ignoreError); + } + } + public void setShouldReturnSelf(final boolean shouldReturnSelf) { this.shouldReturnSelf.set(shouldReturnSelf); }