Skip to content
Permalink
Browse files
[IOTDB-1846] Optimize device count for rpc (#4187)
[IOTDB-1846] Optimize device count for rpc (#4187)
  • Loading branch information
SpriCoder committed Oct 20, 2021
1 parent 2b70e2a commit b6eb283e8a61044d3f53ce7ec7dc6c26fd6e16ff
Showing 7 changed files with 61 additions and 3 deletions.
@@ -370,6 +370,16 @@ public static Integer getPathCount(
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}

public static Integer getDeviceCount(
AsyncDataClient client, RaftNode header, List<String> pathsToQuery)
throws InterruptedException, TException {
AtomicReference<Integer> remoteResult = new AtomicReference<>(null);
GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), remoteResult);

client.getDeviceCount(header, pathsToQuery, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}

public static Set<String> getAllDevices(
AsyncDataClient client, RaftNode header, List<String> pathsToQuery)
throws InterruptedException, TException {
@@ -223,16 +223,15 @@ private int getRemoteDeviceCount(PartitionGroup partitionGroup, List<String> pat
.getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
client.setTimeout(RaftServer.getReadOperationTimeoutMS());
count =
SyncClientAdaptor.getAllDevices(client, partitionGroup.getHeader(), pathsToCount)
.size();
SyncClientAdaptor.getDeviceCount(client, partitionGroup.getHeader(), pathsToCount);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
try {
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
count = syncDataClient.getAllDevices(partitionGroup.getHeader(), pathsToCount).size();
count = syncDataClient.getDeviceCount(partitionGroup.getHeader(), pathsToCount);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
syncDataClient.getInputProtocol().getTransport().close();
@@ -1014,6 +1014,17 @@ public int getPathCount(List<String> pathsToQuery, int level)
return count;
}

public int getDeviceCount(List<String> pathsToQuery)
throws CheckConsistencyException, MetadataException {
dataGroupMember.syncLeaderWithConsistencyCheck(false);

int count = 0;
for (String s : pathsToQuery) {
count += getCMManager().getDevicesNum(new PartialPath(s));
}
return count;
}

@SuppressWarnings("java:S1135") // ignore todos
public ByteBuffer last(LastQueryRequest request)
throws CheckConsistencyException, QueryProcessException, IOException, StorageEngineException,
@@ -866,6 +866,16 @@ public void getPathCount(
}
}

@Override
public void getDeviceCount(
RaftNode header, List<String> pathsToQuery, AsyncMethodCallback<Integer> resultHandler)
throws TException {
DataAsyncService service = getDataAsyncService(header, resultHandler, "count device");
if (service != null) {
service.getDeviceCount(header, pathsToQuery, resultHandler);
}
}

@Override
public void onSnapshotApplied(
RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler) {
@@ -1001,6 +1011,11 @@ public int getPathCount(RaftNode header, List<String> pathsToQuery, int level) t
return getDataSyncService(header).getPathCount(header, pathsToQuery, level);
}

@Override
public int getDeviceCount(RaftNode header, List<String> pathsToQuery) throws TException {
return getDataSyncService(header).getDeviceCount(header, pathsToQuery);
}

@Override
public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) {
return getDataSyncService(header).onSnapshotApplied(header, slots);
@@ -457,6 +457,18 @@ public void getPathCount(
}
}

@Override
public void getDeviceCount(
RaftNode header, List<String> pathsToQuery, AsyncMethodCallback<Integer> resultHandler)
throws TException {
try {
resultHandler.onComplete(
dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery));
} catch (CheckConsistencyException | MetadataException e) {
resultHandler.onError(e);
}
}

@Override
public void onSnapshotApplied(
RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler) {
@@ -422,6 +422,15 @@ public int getPathCount(RaftNode header, List<String> pathsToQuery, int level) t
}
}

@Override
public int getDeviceCount(RaftNode header, List<String> pathsToQuery) throws TException {
try {
return dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery);
} catch (CheckConsistencyException | MetadataException e) {
throw new TException(e);
}
}

@Override
public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) {
return dataGroupMember.onSnapshotInstalled(slots);
@@ -463,6 +463,8 @@ service TSDataService extends RaftService {

int getPathCount(1: RaftNode header, 2: list<string> pathsToQuery, 3: int level)

int getDeviceCount(1: RaftNode header, 2: list<string> pathsToQuery)

/**
* During slot transfer, when a member has pulled snapshot from a group, the member will use this
* method to inform the group that one replica of such slots has been pulled.

0 comments on commit b6eb283

Please sign in to comment.