Skip to content

Commit

Permalink
[IOTDB-412]Paths are not correctly deduplicated (#727)
Browse files Browse the repository at this point in the history
* add new ut

* add comment
  • Loading branch information
liutaohua authored and jt2594838 committed Jan 10, 2020
1 parent e8e1b7b commit 279ff01
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public PhysicalPlan transformToPhysicalPlan(Operator operator)
case SQLConstant.TOK_STORAGE_GROUP:
return new ShowPlan(ShowContentType.STORAGE_GROUP);
case SQLConstant.TOK_DEVICES:
return new ShowDevicesPlan(ShowContentType.DEVICES, ((ShowDevicesOperator) operator).getPath());
return new ShowDevicesPlan(ShowContentType.DEVICES,
((ShowDevicesOperator) operator).getPath());
case SQLConstant.TOK_COUNT_NODE_TIMESERIES:
return new CountPlan(ShowContentType.COUNT_NODE_TIMESERIES,
((CountOperator) operator).getPath(), ((CountOperator) operator).getLevel());
Expand Down Expand Up @@ -225,7 +226,8 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
Path fullPath = Path.addPrefixPath(suffixPath, prefixPath);
Set<String> tmpDeviceSet = new HashSet<>();
try {
List<String> actualPaths = executor.getAllMatchedPaths(fullPath.getFullPath()); // remove stars to get actual paths
List<String> actualPaths = executor
.getAllMatchedPaths(fullPath.getFullPath()); // remove stars to get actual paths
for (String pathStr : actualPaths) {
Path path = new Path(pathStr);
String device = path.getDevice();
Expand Down Expand Up @@ -259,9 +261,9 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
if (dataTypeConsistencyChecker.containsKey(measurementChecked)) {
if (!dataType.equals(dataTypeConsistencyChecker.get(measurementChecked))) {
throw new QueryProcessException(
"The data types of the same measurement column should be the same across "
+ "devices in GROUP_BY_DEVICE sql. For more details please refer to the "
+ "SQL document.");
"The data types of the same measurement column should be the same across "
+ "devices in GROUP_BY_DEVICE sql. For more details please refer to the "
+ "SQL document.");
}
} else {
dataTypeConsistencyChecker.put(measurementChecked, dataType);
Expand Down Expand Up @@ -313,12 +315,11 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
queryPlan.setMeasurementsGroupByDevice(measurementsGroupByDevice);
queryPlan.setDataTypeConsistencyChecker(dataTypeConsistencyChecker);
queryPlan.setPaths(paths);
queryPlan.setDeduplicatedPaths(paths);

// get device to filter map
FilterOperator filterOperator = queryOperator.getFilterOperator();

if(filterOperator != null){
if (filterOperator != null) {
queryPlan.setDeviceToFilterMap(concatFilterByDivice(prefixPaths, filterOperator));
}
} else {
Expand All @@ -345,8 +346,9 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
// e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
// [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
// root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
private Map<String, IExpression> concatFilterByDivice(List<Path> fromPaths, FilterOperator operator)
throws QueryProcessException {
private Map<String, IExpression> concatFilterByDivice(List<Path> fromPaths,
FilterOperator operator)
throws QueryProcessException {
Map<String, IExpression> deviceToFilterMap = new HashMap<>();
// remove stars in fromPaths and get deviceId with deduplication
List<String> noStarDevices = removeStarsInDeviceWithUnique(fromPaths);
Expand All @@ -361,7 +363,7 @@ private Map<String, IExpression> concatFilterByDivice(List<Path> fromPaths, Filt
}

private List<String> removeStarsInDeviceWithUnique(List<Path> paths)
throws LogicalOptimizeException {
throws LogicalOptimizeException {
List<String> retDevices;
Set<String> deviceSet = new LinkedHashSet<>();
try {
Expand All @@ -383,7 +385,7 @@ private List<String> removeStarsInDeviceWithUnique(List<Path> paths)
}

private FilterOperator concatFilterPath(String prefix, FilterOperator operator) {
if(!operator.isLeaf()){
if (!operator.isLeaf()) {
for (FilterOperator child : operator.getChildren()) {
concatFilterPath(prefix, child);
}
Expand Down Expand Up @@ -416,11 +418,13 @@ private void generateDataTypes(QueryPlan queryPlan) throws PathException {
}

private void deduplicate(QueryPlan queryPlan) {
//The deduplication of a GroupByDevice query is done in the dataset
if (queryPlan.isGroupByDevice()) {
return;
}
if (queryPlan instanceof AggregationPlan) {
if (!queryPlan.isGroupByDevice()) {
AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
deduplicateAggregation(aggregationPlan);
}
AggregationPlan aggregationPlan = (AggregationPlan) queryPlan;
deduplicateAggregation(aggregationPlan);
return;
}
List<Path> paths = queryPlan.getPaths();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import static org.junit.Assert.fail;

import java.io.File;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.StringContainer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -63,7 +66,7 @@ public class PhysicalPlanTest {
private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());

@Before
public void before() throws QueryProcessException {
public void before() throws QueryProcessException, StartupException {
Path path1 = new Path(
new StringContainer(new String[]{"root", "vehicle", "d1", "s1"},
TsFileConstant.PATH_SEPARATOR));
Expand All @@ -84,6 +87,12 @@ public void before() throws QueryProcessException {
.insert(new InsertPlan(path3.getDevice(), 10, path3.getMeasurement(), "10"));
processor.getExecutor()
.insert(new InsertPlan(path4.getDevice(), 10, path4.getMeasurement(), "10"));
MManager.getInstance().init();
}

@After
public void clean() {
MManager.getInstance().clear();
}

@Test
Expand Down Expand Up @@ -178,10 +187,10 @@ public void testGroupBy2()

@Test
public void testGroupBy3()
throws QueryProcessException, MetadataException {
throws QueryProcessException, MetadataException {
String sqlStr =
"select count(s1) " + "from root.vehicle.d1 " + "where s1 < 20 and time <= now() "
+ "group by([2017-6-2T12:00:12+07:00,2017-6-12T12:00:12+07:00], 3h, 24h)";
"select count(s1) " + "from root.vehicle.d1 " + "where s1 < 20 and time <= now() "
+ "group by([2017-6-2T12:00:12+07:00,2017-6-12T12:00:12+07:00], 3h, 24h)";
PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
if (!plan.isQuery()) {
fail();
Expand Down Expand Up @@ -595,7 +604,47 @@ public void testMoveFile() throws QueryProcessException, MetadataException {
QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
OperateFilePlan plan = (OperateFilePlan) processor.parseSQLToPhysicalPlan(metadata);
assertEquals(
String.format("OperateFilePlan{file=%s, targetDir=%s, autoCreateSchema=false, sgLevel=0, operatorType=MOVE_FILE}", filePath,
String.format(
"OperateFilePlan{file=%s, targetDir=%s, autoCreateSchema=false, sgLevel=0, operatorType=MOVE_FILE}",
filePath,
targetDir), plan.toString());
}

@Test
public void testDeduplicatedPath() throws Exception {
String sqlStr = "select * from root.sg.d1,root.sg.d1,root.sg.d1";
QueryPlan plan = (QueryPlan) processor.parseSQLToPhysicalPlan(sqlStr);
Assert.assertEquals(1, plan.getDeduplicatedPaths().size());
Assert.assertEquals(1, plan.getDeduplicatedDataTypes().size());
Assert.assertEquals(new Path("root.sg.d1.*"), plan.getDeduplicatedPaths().get(0));

sqlStr = "select count(*) from root.sg.d1,root.sg.d1,root.sg.d1";
plan = (QueryPlan) processor.parseSQLToPhysicalPlan(sqlStr);
Assert.assertEquals(1, plan.getDeduplicatedPaths().size());
Assert.assertEquals(1, plan.getDeduplicatedDataTypes().size());
Assert.assertEquals(new Path("root.sg.d1.*"), plan.getDeduplicatedPaths().get(0));

//'group by device' is deduplication in DeviceIterateDataSet
MManager manager = MManager.getInstance();
manager.setStorageGroupToMTree("root.vehicle");
manager.addPathToMTree("root.vehicle.d0.s1", "INT64", "PLAIN");
manager.addPathToMTree("root.vehicle.d0.s0", "INT64", "PLAIN");
manager.addPathToMTree("root.vehicle.d1.s0", "INT64", "PLAIN");
manager.addPathToMTree("root.vehicle.d1.s1", "INT64", "PLAIN");

sqlStr = "select s0,s0,s1 from root.vehicle.d0, root.vehicle.d1 group by device";
plan = (QueryPlan) processor.parseSQLToPhysicalPlan(sqlStr);
Assert.assertEquals(0, plan.getDeduplicatedPaths().size());
Assert.assertEquals(0, plan.getDeduplicatedDataTypes().size());
Assert.assertEquals(6, plan.getPaths().size());
Assert.assertEquals(6, plan.getDataTypes().size());

sqlStr = "select COUNT(s0),COUNT(s0),COUNT(s1) from root.vehicle.d0, root.vehicle.d1 group by device";
plan = (QueryPlan) processor.parseSQLToPhysicalPlan(sqlStr);
Assert.assertEquals(0, plan.getDeduplicatedPaths().size());
Assert.assertEquals(0, plan.getDeduplicatedPaths().size());
Assert.assertEquals(0, plan.getDeduplicatedDataTypes().size());
Assert.assertEquals(6, plan.getPaths().size());
Assert.assertEquals(6, plan.getDataTypes().size());
}
}

0 comments on commit 279ff01

Please sign in to comment.