Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
Expand Down Expand Up @@ -77,7 +78,11 @@ public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOExcep
}

/**
* Get a thrift client that will connect to "node" using the data port.
* IMPORTANT!!! After calling this function, the caller should make sure to call {@link
* org.apache.iotdb.cluster.utils.ClientUtils#putBackSyncClient(Client)} to put the client back
* into the client pool, otherwise there is a risk of client leakage.
*
* <p>Get a thrift client that will connect to "node" using the data port.
*
* @param node the node to be connected
* @param timeout timeout threshold of connection
Expand Down
125 changes: 85 additions & 40 deletions cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,27 @@ public MeasurementSchema getSeriesSchema(PartialPath device, String measurement)
return super.getSeriesSchema(device, measurement);
}

/**
* Check whether the path exists.
*
* @param path a full path or a prefix path
*/
@Override
public boolean isPathExist(PartialPath path) {
boolean localExist = super.isPathExist(path);
if (localExist) {
return true;
}

// search the cache
cacheLock.readLock().lock();
try {
return mRemoteMetaCache.containsKey(path);
} finally {
cacheLock.readLock().unlock();
}
}

private static class RemoteMetaCache extends LRUCache<PartialPath, MeasurementMNode> {

RemoteMetaCache(int cacheSize) {
Expand All @@ -397,6 +418,10 @@ public synchronized MeasurementMNode get(PartialPath key) {
return null;
}
}

public synchronized boolean containsKey(PartialPath key) {
return cache.containsKey(key);
}
Comment on lines +422 to +424
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to mix synchronized and lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all methods in RemoteMetaCache are synchronized, it is safe to modify the cache. However, in CMManager, functions in RemoteMetaCache may be called many times in a function, which may be due to the serialization of cache operations in CMManager, so the lock is added?

Therefore, I think in CMManager, whether the cache operation is locked or not is OK. Locking can ensure that all cache operations in CMManager are serial. However, since RemoteMetaCache itself is thread-safe, maybe it's also OK not to lock it.

}

/**
Expand Down Expand Up @@ -646,14 +671,18 @@ private List<String> getUnregisteredSeriesListRemotely(
SyncClientAdaptor.getUnregisteredMeasurements(
client, partitionGroup.getHeader(), seriesList);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result =
syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

if (result != null) {
return result;
}
Expand Down Expand Up @@ -826,16 +855,21 @@ private List<TimeseriesSchema> pullTimeSeriesSchemas(Node node, PullSchemaReques
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
schemas = SyncClientAdaptor.pullTimeseriesSchema(client, request);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
ByteBuffer buffer = pullSchemaResp.schemaBytes;
int size = buffer.getInt();
schemas = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
schemas.add(TimeseriesSchema.deserializeFrom(buffer));
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
ByteBuffer buffer = pullSchemaResp.schemaBytes;
int size = buffer.getInt();
schemas = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
schemas.add(TimeseriesSchema.deserializeFrom(buffer));
}
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

Expand Down Expand Up @@ -1060,12 +1094,16 @@ private List<PartialPath> getMatchedPaths(
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, withAlias);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

if (result != null) {
Expand Down Expand Up @@ -1180,12 +1218,16 @@ private Set<String> getMatchedDevices(Node node, Node header, List<String> paths
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths = syncDataClient.getAllDevices(header, pathsToQuery);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths = syncDataClient.getAllDevices(header, pathsToQuery);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}
return paths;
}
Expand Down Expand Up @@ -1501,17 +1543,20 @@ private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, ShowTim
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
plan.serialize(dataOutputStream);
resultBinary =
syncDataClient.getAllMeasurementSchema(
group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
plan.serialize(dataOutputStream);
resultBinary =
syncDataClient.getAllMeasurementSchema(
group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}
return resultBinary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
Expand Down Expand Up @@ -220,16 +221,21 @@ private List<MeasurementSchema> pullMeasurementSchemas(Node node, PullSchemaRequ
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
schemas = SyncClientAdaptor.pullMeasurementSchema(client, request);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
ByteBuffer buffer = pullSchemaResp.schemaBytes;
int size = buffer.getInt();
schemas = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
schemas.add(MeasurementSchema.deserializeFrom(buffer));
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
ByteBuffer buffer = pullSchemaResp.schemaBytes;
int size = buffer.getInt();
schemas = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
schemas.add(MeasurementSchema.deserializeFrom(buffer));
}
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,18 @@ private int getRemotePathCount(
SyncClientAdaptor.getPathCount(
client, partitionGroup.getHeader(), pathsToQuery, level);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

logger.debug(
"{}: get path count of {} from {}, result {}",
metaGroupMember.getName(),
Expand Down Expand Up @@ -357,14 +360,18 @@ private List<PartialPath> getRemoteNodesList(
SyncClientAdaptor.getNodeList(
client, group.getHeader(), schemaPattern.getFullPath(), level);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths = syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths =
syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

if (paths != null) {
break;
}
Expand All @@ -377,7 +384,6 @@ private List<PartialPath> getRemoteNodesList(
Thread.currentThread().interrupt();
}
}

return PartialPath.fromStringList(paths);
}

Expand Down Expand Up @@ -467,15 +473,18 @@ private Set<String> getRemoteNextChildren(PartitionGroup group, PartialPath path
nextChildren =
SyncClientAdaptor.getNextChildren(client, group.getHeader(), path.getFullPath());
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
nextChildren =
syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
nextChildren =
syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

if (nextChildren != null) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,16 @@ private List<ByteBuffer> getRemoteAggregateResult(Node node, GetAggrResultReques
// each buffer is an AggregationResult
resultBuffers = SyncClientAdaptor.getAggrResult(client, request);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBuffers = syncDataClient.getAggrResult(request);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBuffers = syncDataClient.getAggrResult(request);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}
return resultBuffers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,18 @@ public List<AggregateResult> calcResult(long curStartTime, long curEndTime) thro
SyncClientAdaptor.getGroupByResult(
client, header, executorId, curStartTime, curEndTime);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
aggrBuffers = syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
aggrBuffers =
syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

} catch (TException e) {
throw new IOException(e);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -129,15 +133,18 @@ public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndT
SyncClientAdaptor.peekNextNotNullValue(
client, header, executorId, nextStartTime, nextEndTime);
} else {
SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
aggrBuffer =
syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
ClientUtils.putBackSyncClient(syncDataClient);
SyncDataClient syncDataClient = null;
try {
syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
aggrBuffer =
syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
} finally {
ClientUtils.putBackSyncClient(syncDataClient);
}
}

} catch (TException e) {
throw new IOException(e);
} catch (InterruptedException e) {
Expand Down
Loading