diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java index 15e26a6f10867..d21e4e0335bbb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java @@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocolFactory; @@ -77,7 +78,11 @@ public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOExcep } /** - * Get a thrift client that will connect to "node" using the data port. + * IMPORTANT!!! After calling this function, the caller should make sure to call {@link + * org.apache.iotdb.cluster.utils.ClientUtils#putBackSyncClient(Client)} to put the client back + * into the client pool, otherwise there is a risk of client leakage. + * + *

Get a thrift client that will connect to "node" using the data port. * * @param node the node to be connected * @param timeout timeout threshold of connection diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index 7ef9640fd2b2d..50c373241da5e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -372,6 +372,27 @@ public MeasurementSchema getSeriesSchema(PartialPath device, String measurement) return super.getSeriesSchema(device, measurement); } + /** + * Check whether the path exists. + * + * @param path a full path or a prefix path + */ + @Override + public boolean isPathExist(PartialPath path) { + boolean localExist = super.isPathExist(path); + if (localExist) { + return true; + } + + // search the cache + cacheLock.readLock().lock(); + try { + return mRemoteMetaCache.containsKey(path); + } finally { + cacheLock.readLock().unlock(); + } + } + private static class RemoteMetaCache extends LRUCache { RemoteMetaCache(int cacheSize) { @@ -397,6 +418,10 @@ public synchronized MeasurementMNode get(PartialPath key) { return null; } } + + public synchronized boolean containsKey(PartialPath key) { + return cache.containsKey(key); + } } /** @@ -646,14 +671,18 @@ private List getUnregisteredSeriesListRemotely( SyncClientAdaptor.getUnregisteredMeasurements( client, partitionGroup.getHeader(), seriesList); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + result = + syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } - if (result != null) { return result; } @@ -826,16 +855,21 @@ private List pullTimeSeriesSchemas(Node node, PullSchemaReques .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); schemas = SyncClientAdaptor.pullTimeseriesSchema(client, request); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); - ByteBuffer buffer = pullSchemaResp.schemaBytes; - int size = buffer.getInt(); - schemas = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - schemas.add(TimeseriesSchema.deserializeFrom(buffer)); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); + ByteBuffer buffer = pullSchemaResp.schemaBytes; + int size = buffer.getInt(); + schemas = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + schemas.add(TimeseriesSchema.deserializeFrom(buffer)); + } + } finally { + ClientUtils.putBackSyncClient(syncDataClient); } } @@ -1060,12 +1094,16 @@ private List getMatchedPaths( .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, withAlias); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } if (result != null) { @@ -1180,12 +1218,16 @@ private Set getMatchedDevices(Node node, Node header, List paths .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - paths = syncDataClient.getAllDevices(header, pathsToQuery); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + paths = syncDataClient.getAllDevices(header, pathsToQuery); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } return paths; } @@ -1501,17 +1543,20 @@ private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, ShowTim .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - plan.serialize(dataOutputStream); - resultBinary = - syncDataClient.getAllMeasurementSchema( - group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + plan.serialize(dataOutputStream); + resultBinary = + syncDataClient.getAllMeasurementSchema( + group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray())); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } return resultBinary; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java index 74855fc3d26ab..e98b67c38ff0e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java @@ -29,6 +29,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.member.MetaGroupMember; +import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.PartialPath; @@ -220,16 +221,21 @@ private List pullMeasurementSchemas(Node node, PullSchemaRequ .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); schemas = SyncClientAdaptor.pullMeasurementSchema(client, request); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); - ByteBuffer buffer = pullSchemaResp.schemaBytes; - int size = buffer.getInt(); - schemas = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - schemas.add(MeasurementSchema.deserializeFrom(buffer)); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request); + ByteBuffer buffer = pullSchemaResp.schemaBytes; + int size = buffer.getInt(); + schemas = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + schemas.add(MeasurementSchema.deserializeFrom(buffer)); + } + } finally { + ClientUtils.putBackSyncClient(syncDataClient); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java index d5259635a1f1d..e76be6c880bf2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java @@ -254,15 +254,18 @@ private int getRemotePathCount( SyncClientAdaptor.getPathCount( client, partitionGroup.getHeader(), pathsToQuery, level); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS()); - count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS()); + count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } - logger.debug( "{}: get path count of {} from {}, result {}", metaGroupMember.getName(), @@ -357,14 +360,18 @@ private List getRemoteNodesList( SyncClientAdaptor.getNodeList( client, group.getHeader(), schemaPattern.getFullPath(), level); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - paths = syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + paths = + syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } - if (paths != null) { break; } @@ -377,7 +384,6 @@ private List getRemoteNodesList( Thread.currentThread().interrupt(); } } - return PartialPath.fromStringList(paths); } @@ -467,15 +473,18 @@ private Set getRemoteNextChildren(PartitionGroup group, PartialPath path nextChildren = SyncClientAdaptor.getNextChildren(client, group.getHeader(), path.getFullPath()); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - nextChildren = - syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath()); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + nextChildren = + syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath()); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } - if (nextChildren != null) { break; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java index 3549e58c5fa01..bd540872405dd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java @@ -271,12 +271,16 @@ private List getRemoteAggregateResult(Node node, GetAggrResultReques // each buffer is an AggregationResult resultBuffers = SyncClientAdaptor.getAggrResult(client, request); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - resultBuffers = syncDataClient.getAggrResult(request); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + resultBuffers = syncDataClient.getAggrResult(request); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } return resultBuffers; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java index d7629f6b710ad..4289e23387a6a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java @@ -85,14 +85,18 @@ public List calcResult(long curStartTime, long curEndTime) thro SyncClientAdaptor.getGroupByResult( client, header, executorId, curStartTime, curEndTime); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); - aggrBuffers = syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); + aggrBuffers = + syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } - } catch (TException e) { throw new IOException(e); } catch (InterruptedException e) { @@ -129,15 +133,18 @@ public Pair peekNextNotNullValue(long nextStartTime, long nextEndT SyncClientAdaptor.peekNextNotNullValue( client, header, executorId, nextStartTime, nextEndTime); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); - aggrBuffer = - syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS()); + aggrBuffer = + syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } - } catch (TException e) { throw new IOException(e); } catch (InterruptedException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java index 066da1dd82783..d03a85d4c09c1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java @@ -258,21 +258,23 @@ private ByteBuffer lastAsync(Node node, QueryContext context) } private ByteBuffer lastSync(Node node, QueryContext context) throws TException { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - ByteBuffer result = - syncDataClient.last( - new LastQueryRequest( - PartialPath.toStringList(seriesPaths), - dataTypeOrdinals, - context.getQueryId(), - queryPlan.getDeviceToMeasurements(), - group.getHeader(), - syncDataClient.getNode())); - ClientUtils.putBackSyncClient(syncDataClient); - return result; + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + return syncDataClient.last( + new LastQueryRequest( + PartialPath.toStringList(seriesPaths), + dataTypeOrdinals, + context.getQueryId(), + queryPlan.getDeviceToMeasurements(), + group.getHeader(), + syncDataClient.getNode())); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index 2e239b974e934..9b7817555e764 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -657,12 +657,16 @@ private Long getRemoteGroupByExecutorId(Node node, GroupByRequest request) .getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); executorId = SyncClientAdaptor.getGroupByExecutor(client, request); } else { - SyncDataClient syncDataClient = - metaGroupMember - .getClientProvider() - .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); - executorId = syncDataClient.getGroupByExecutor(request); - ClientUtils.putBackSyncClient(syncDataClient); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + metaGroupMember + .getClientProvider() + .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS()); + executorId = syncDataClient.getGroupByExecutor(request); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } return executorId; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java index 1ed1fe67deef0..7cec0ea409aff 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java @@ -29,6 +29,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.member.MetaGroupMember; +import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.TimeFilter; @@ -176,7 +177,7 @@ private Long applyForReaderIdSync(Node node, boolean byTimestamp, long timestamp } return newReaderId; } finally { - client.putBack(); + ClientUtils.putBackSyncClient(client); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java index 419f3cf8ad75a..2b68ef401ad00 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java @@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; +import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.utils.SerializeUtils; @@ -89,20 +90,21 @@ private ByteBuffer fetchResultAsync(long timestamp) throws IOException { } private ByteBuffer fetchResultSync(long timestamp) throws IOException { + SyncDataClient curSyncClient = null; try { - SyncDataClient curSyncClient = - sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); - ByteBuffer buffer = - curSyncClient.fetchSingleSeriesByTimestamp( - sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp); - curSyncClient.putBack(); - return buffer; + curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); + return curSyncClient.fetchSingleSeriesByTimestamp( + sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp); } catch (TException e) { // try other node if (!sourceInfo.switchNode(true, timestamp)) { return null; } return fetchResultSync(timestamp); + } finally { + if (curSyncClient != null) { + ClientUtils.putBackSyncClient(curSyncClient); + } } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java index f6c5a4507b8c2..2dcc1b7f98408 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java @@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; +import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; @@ -143,19 +144,20 @@ private ByteBuffer fetchResultAsync() throws IOException { } private ByteBuffer fetchResultSync() throws IOException { + SyncDataClient curSyncClient = null; try { - SyncDataClient curSyncClient = - sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); - ByteBuffer buffer = - curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getReaderId()); - curSyncClient.putBack(); - return buffer; + curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); + return curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getReaderId()); } catch (TException e) { // try other node if (!sourceInfo.switchNode(false, lastTimestamp)) { return null; } return fetchResultSync(); + } finally { + if (curSyncClient != null) { + ClientUtils.putBackSyncClient(curSyncClient); + } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java index be05a4b5703e8..df5b30e234994 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java @@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; import org.apache.iotdb.cluster.server.member.MetaGroupMember; +import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; @@ -312,10 +313,15 @@ protected void releaseQueryResource(long queryId) throws StorageEngineException queriedNode, RaftServer.getReadOperationTimeoutMS()); client.endQuery(header, coordinator.getThisNode(), queryId, handler); } else { - SyncDataClient syncDataClient = - coordinator.getSyncDataClient( - queriedNode, RaftServer.getReadOperationTimeoutMS()); - syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); + SyncDataClient syncDataClient = null; + try { + syncDataClient = + coordinator.getSyncDataClient( + queriedNode, RaftServer.getReadOperationTimeoutMS()); + syncDataClient.endQuery(header, coordinator.getThisNode(), queryId); + } finally { + ClientUtils.putBackSyncClient(syncDataClient); + } } } catch (IOException | TException e) { logger.error("Cannot end query {} in {}", queryId, queriedNode); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java index 4d08a043d7314..58d929b5d9407 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.common.TestUtils; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol.Factory; @@ -73,6 +74,8 @@ public void testSync() throws IOException, InterruptedException { client = provider.getSyncDataClient(node, 100); } catch (TException e) { Assert.fail(e.getMessage()); + } finally { + ClientUtils.putBackSyncClient(client); } assertNotNull(client); ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java index 047b5237145cd..881db033fdaca 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java @@ -78,6 +78,10 @@ public void reset() { count = 0; } + /** + * The synchronized keyword in this function is intentionally removed. For details, see + * https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173085039 + */ @Override public int size() { return count;