Skip to content
Permalink
Browse files
[IOTDB-1764] Support vector timeseries in raw data query in cluster (#…
  • Loading branch information
Alima777 committed Oct 21, 2021
1 parent 516bf65 commit 1dcc82aad34bfc0820ac28f6a2e70757fef7d219
Showing 7 changed files with 96 additions and 112 deletions.
@@ -117,6 +117,8 @@
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.LOG_FAIL_CONNECT;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.THREAD_POOL_SIZE;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.waitForThreadPool;
import static org.apache.iotdb.cluster.utils.ClusterQueryUtils.getAssembledPathFromRequest;
import static org.apache.iotdb.cluster.utils.ClusterQueryUtils.getPathStrListForRequest;
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;

@SuppressWarnings("java:S1135") // ignore todos
@@ -229,18 +231,16 @@ public TSDataType getSeriesType(PartialPath fullPath) throws MetadataException {
MeasurementMNode.getMeasurementMNode(
null, measurementSchema.getMeasurementId(), measurementSchema, null);
if (measurementSchema instanceof VectorMeasurementSchema) {
for (int i = 0; i < measurementSchema.getSubMeasurementsList().size(); i++) {
for (String subMeasurement : measurementSchema.getSubMeasurementsList()) {
cacheMeta(
((VectorPartialPath) fullPath).getPathWithSubSensor(i), measurementMNode, false);
new VectorPartialPath(fullPath.getDevice(), subMeasurement),
measurementMNode,
false);
}
cacheMeta(
new PartialPath(fullPath.getDevice(), measurementSchema.getMeasurementId()),
measurementMNode,
true);
} else {
cacheMeta(fullPath, measurementMNode, true);
}
return measurementMNode.getDataType(fullPath.getMeasurement());
return measurementMNode.getDataType(measurement);
} else {
throw e;
}
@@ -974,7 +974,7 @@ private List<PartialPath> getMatchedPaths(Map<String, String> sgPathMap, boolean
throws MetadataException {
List<PartialPath> result = new ArrayList<>();
// split the paths by the data group they belong to
Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
Map<PartitionGroup, List<String>> remoteGroupPathMap = new HashMap<>();
for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
String storageGroupName = sgPathEntry.getKey();
PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
@@ -1000,14 +1000,15 @@ private List<PartialPath> getMatchedPaths(Map<String, String> sgPathMap, boolean
result.addAll(allTimeseriesName);
} else {
// batch the queries of the same group to reduce communication
groupPathMap
remoteGroupPathMap
.computeIfAbsent(partitionGroup, p -> new ArrayList<>())
.add(pathUnderSG.getFullPath());
}
}

// query each data group separately
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry :
remoteGroupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
List<String> pathsToQuery = partitionGroupPathEntry.getValue();
result.addAll(getMatchedPaths(partitionGroup, pathsToQuery, withAlias));
@@ -1090,14 +1091,10 @@ private List<PartialPath> getMatchedPaths(
// need to query other nodes in the group
List<PartialPath> partialPaths = new ArrayList<>();
for (int i = 0; i < result.paths.size(); i++) {
try {
PartialPath partialPath = new PartialPath(result.paths.get(i));
if (withAlias) {
partialPath.setMeasurementAlias(result.aliasList.get(i));
}
partialPaths.add(partialPath);
} catch (IllegalPathException e) {
// ignore
PartialPath matchedPath = getAssembledPathFromRequest(result.paths.get(i));
partialPaths.add(matchedPath);
if (withAlias) {
matchedPath.setMeasurementAlias(result.aliasList.get(i));
}
}
return partialPaths;
@@ -1297,21 +1294,6 @@ public Pair<List<PartialPath>, List<PartialPath>> getMatchedPaths(
return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths));
}

/**
* Get the local paths that match any path in "paths". The result is not deduplicated.
*
* @param paths paths potentially contain wildcards
*/
public List<String> getAllPaths(List<String> paths) throws MetadataException {
List<String> ret = new ArrayList<>();
for (String path : paths) {
getFlatMeasurementPaths(new PartialPath(path)).stream()
.map(PartialPath::getFullPath)
.forEach(ret::add);
}
return ret;
}

/**
* Get the local devices that match any path in "paths". The result is deduplicated.
*
@@ -1712,23 +1694,18 @@ private ByteBuffer getRemoteDevices(Node node, PartitionGroup group, ShowDevices

public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
throws MetadataException {
List<String> retPaths = new ArrayList<>();
List<String> alias = null;
if (withAlias) {
alias = new ArrayList<>();
}

if (withAlias) {
for (String path : paths) {
List<PartialPath> allTimeseriesPathWithAlias =
super.getFlatMeasurementPathsWithAlias(new PartialPath(path), -1, -1).left;
for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
retPaths.add(timeseriesPathWithAlias.getFullPath());
List<List<String>> retPaths = new ArrayList<>();
List<String> alias = withAlias ? new ArrayList<>() : null;

for (String path : paths) {
List<PartialPath> allTimeseriesPathWithAlias =
super.getFlatMeasurementPathsWithAlias(new PartialPath(path), -1, -1).left;
for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
retPaths.add(getPathStrListForRequest(timeseriesPathWithAlias));
if (withAlias) {
alias.add(timeseriesPathWithAlias.getMeasurementAlias());
}
}
} else {
retPaths = getAllPaths(paths);
}

GetAllPathsResult getAllPathsResult = new GetAllPathsResult();
@@ -47,7 +47,6 @@
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
@@ -93,6 +92,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.iotdb.cluster.utils.ClusterQueryUtils.getAssembledPathFromRequest;

public class LocalQueryExecutor {

private static final Logger logger = LoggerFactory.getLogger(LocalQueryExecutor.class);
@@ -217,12 +218,7 @@ public long querySingleSeries(SingleSeriesQueryRequest request)
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);

PartialPath path = null;
try {
path = new PartialPath(request.getPath());
} catch (IllegalPathException e) {
// ignore
}
PartialPath path = getAssembledPathFromRequest(request.getPath());
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
Filter timeFilter = null;
Filter valueFilter = null;
@@ -301,25 +297,7 @@ public long queryMultSeries(MultSeriesQueryRequest request)
dataGroupMember.syncLeaderWithConsistencyCheck(false);

List<PartialPath> paths = Lists.newArrayList();
request
.getPath()
.forEach(
fullPath -> {
try {
if (fullPath.contains("$#$")) {
String[] array = fullPath.split(":");
List<String> subSensorsList = new ArrayList<>();
for (int i = 1; i < array.length; i++) {
subSensorsList.add(array[i]);
}
paths.add(new VectorPartialPath(array[0], subSensorsList));
} else {
paths.add(new PartialPath(fullPath));
}
} catch (IllegalPathException e) {
logger.warn("Failed to create partial path, fullPath is {}.", fullPath, e);
}
});
request.getPath().forEach(path -> paths.add(getAssembledPathFromRequest(path)));

List<TSDataType> dataTypes = Lists.newArrayList();
request.getDataTypeOrdinal().forEach(dataType -> dataTypes.add(TSDataType.values()[dataType]));
@@ -549,8 +527,6 @@ private void collectTimeseriesSchema(
* Create an IReaderByTime of a path, register it in the query manager to get a reader id for it
* and send the id back to the requester. If the reader does not have any data, an id of -1 will
* be returned.
*
* @param request
*/
public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request)
throws CheckConsistencyException, QueryProcessException, StorageEngineException {
@@ -562,12 +538,7 @@ public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request)
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);

PartialPath path = null;
try {
path = new PartialPath(request.getPath());
} catch (IllegalPathException e) {
// ignore
}
PartialPath path = getAssembledPathFromRequest(request.getPath());
TSDataType dataType = TSDataType.values()[request.dataTypeOrdinal];
Set<String> deviceMeasurements = request.getDeviceMeasurements();

@@ -52,7 +52,6 @@
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -91,6 +90,8 @@
import java.util.Map.Entry;
import java.util.Set;

import static org.apache.iotdb.cluster.utils.ClusterQueryUtils.getPathStrListForRequest;

@SuppressWarnings("java:S107")
public class ClusterReaderFactory {

@@ -704,23 +705,8 @@ private MultSeriesQueryRequest constructMultQueryRequest(
request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter));
}

List<String> fullPaths = Lists.newArrayList();
paths.forEach(
path -> {
if (path instanceof VectorPartialPath) {
StringBuilder builder = new StringBuilder(path.getFullPath());
List<String> subSensorsList = ((VectorPartialPath) path).getSubSensorsList();
for (String subSensor : subSensorsList) {
builder.append(":");
builder.append(path.getFullPath());
builder.append(".");
builder.append(subSensor);
}
fullPaths.add(builder.toString());
} else {
fullPaths.add(path.getFullPath());
}
});
List<List<String>> fullPaths = Lists.newArrayList();
paths.forEach(path -> fullPaths.add(getPathStrListForRequest(path)));

List<Integer> dataTypeOrdinals = Lists.newArrayList();
dataTypes.forEach(dataType -> dataTypeOrdinals.add(dataType.ordinal()));
@@ -752,7 +738,7 @@ private SingleSeriesQueryRequest constructSingleQueryRequest(
if (valueFilter != null) {
request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter));
}
request.setPath(path.getFullPath());
request.setPath(ClusterQueryUtils.getPathStrListForRequest(path));
request.setHeader(partitionGroup.getHeader());
request.setQueryId(context.getQueryId());
request.setRequester(metaGroupMember.getThisNode());
@@ -24,13 +24,21 @@
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.read.common.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class ClusterQueryUtils {

private static final Logger logger = LoggerFactory.getLogger(ClusterQueryUtils.class);

private ClusterQueryUtils() {
// util class
}
@@ -64,4 +72,39 @@ public static void checkPathExistence(List<PartialPath> paths) throws QueryProce
checkPathExistence(path);
}
}

/**
* Generate path string list for RPC request.
*
* <p>If vector path, return its vectorId with all subSensors. Else just return path string.
*/
public static List<String> getPathStrListForRequest(Path path) {
if (path instanceof VectorPartialPath) {
List<String> pathWithSubSensors =
new ArrayList<>(((VectorPartialPath) path).getSubSensorsList().size() + 1);
pathWithSubSensors.add(path.getFullPath());
pathWithSubSensors.addAll(((VectorPartialPath) path).getSubSensorsList());
return pathWithSubSensors;
} else {
return Collections.singletonList(path.getFullPath());
}
}

/**
* Deserialize an assembled Path from path string list that's from RPC request.
*
* <p>This method is corresponding to getPathStringListForRequest().
*/
public static PartialPath getAssembledPathFromRequest(List<String> pathString) {
try {
if (pathString.size() == 1) {
return new PartialPath(pathString.get(0));
} else {
return new VectorPartialPath(pathString.get(0), pathString.subList(1, pathString.size()));
}
} catch (IllegalPathException e) {
logger.error("Failed to create partial path, fullPath is {}.", pathString, e);
return null;
}
}
}
@@ -245,7 +245,11 @@ public void getAllPaths(
List<String> path,
boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
resultHandler.onComplete(new GetAllPathsResult(path));
List<List<String>> pathString = new ArrayList<>();
for (String s : path) {
pathString.add(Collections.singletonList(s));
}
resultHandler.onComplete(new GetAllPathsResult(pathString));
}

@Override
@@ -391,9 +395,11 @@ public void testDataClient()
paths.subList(0, paths.size() / 2),
SyncClientAdaptor.getUnregisteredMeasurements(
dataClient, TestUtils.getRaftNode(0, 0), paths));
assertEquals(
paths,
SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0), paths, false).paths);
List<String> result = new ArrayList<>();
SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0), paths, false)
.paths
.forEach(p -> result.add(p.get(0)));
assertEquals(paths, result);
assertEquals(
paths.size(),
(int) SyncClientAdaptor.getPathCount(dataClient, TestUtils.getRaftNode(0, 0), paths, 0));

0 comments on commit 1dcc82a

Please sign in to comment.