Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void selectLimitTest() {
"1970-01-01T00:00:01.000Z,d0,22222,22222,55555,",
};
tableResultSetEqualTest(
"select time, device_id, s0,s0,s1 from vehicle order by device_id offset 1 limit 10",
"select time, device_id, s0,s0,s1 from vehicle order by device_id,time offset 1 limit 10",
expectedHeader,
retArray,
DATABASE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void selectWildcardNoFilterTest() {
"1970-01-01T00:00:00.002Z,d1,2.2,false,2,", "1970-01-01T00:00:00.001Z,d2,11.1,false,11,",
};
tableResultSetEqualTest(
"SELECT * FROM table1 order by device_id OFFSET 1 LIMIT 2",
"SELECT * FROM table1 order by device_id, time OFFSET 1 LIMIT 2",
expectedHeader,
retArray,
DATABASE_NAME);
Expand Down Expand Up @@ -201,7 +201,7 @@ public void selectMeasurementNoFilterTest() {
"1970-01-01T00:00:00.002Z,d1,2,2.2,", "1970-01-01T00:00:00.001Z,d2,11,11.1,",
};
tableResultSetEqualTest(
"SELECT time, device_id, s3,s1 FROM table1 order by device_id OFFSET 1 LIMIT 2",
"SELECT time, device_id, s3,s1 FROM table1 order by device_id,time OFFSET 1 LIMIT 2",
expectedHeader,
retArray,
DATABASE_NAME);
Expand Down Expand Up @@ -299,7 +299,7 @@ public void selectWildcardWithFilterOrderByDeviceTest() {
"1970-01-01T00:00:00.001Z,d2,11.1,false,11,",
};
tableResultSetEqualTest(
"SELECT * FROM table1 WHERE time < 5 ORDER BY device_id DESC LIMIT 4",
"SELECT * FROM table1 WHERE time < 5 ORDER BY device_id DESC, time LIMIT 4",
expectedHeader,
retArray,
DATABASE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void orderByCanNotPushLimitTest() {
// 3. time filter, can push down LIMIT
retArray = new String[] {"1970-01-01T00:00:00.002Z,d3,33,", "1970-01-01T00:00:00.002Z,d2,22,"};
tableResultSetEqualTest(
"SELECT * FROM table1 WHERE time>1 and time<3 ORDER BY device_id DESC LIMIT 2",
"SELECT * FROM table1 WHERE time>1 and time<3 ORDER BY device_id DESC,time LIMIT 2",
expectedHeader,
retArray,
DATABASE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.stream.Collectors;

import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT;
import static org.apache.iotdb.relational.it.query.old.aligned.TableUtils.USE_DB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
Expand All @@ -59,12 +60,12 @@ public class IoTDBOrderByTableIT {

// the data can be viewed in
// https://docs.google.com/spreadsheets/d/1OWA1bKraArCwWVnuTjuhJ5yLG0PFLdD78gD6FjquepI/edit#gid=0
private static final String[] sql =
private static final String[] sql1 =
new String[] {
"CREATE DATABASE sg",
"USE sg",
"CREATE DATABASE db",
"USE db",
"CREATE TABLE table0 (device string id, attr1 string attribute, num int32 measurement, bigNum int64 measurement, "
+ "floatNum double measurement, str TEXT measurement, bool BOOLEAN measurement);",
+ "floatNum double measurement, str TEXT measurement, bool BOOLEAN measurement)",
"insert into table0(device, time,num,bigNum,floatNum,str,bool) values('d1', 0,3,2947483648,231.2121,'coconut',FALSE)",
"insert into table0(device, time,num,bigNum,floatNum,str,bool) values('d1', 20,2,2147483648,434.12,'pineapple',TRUE)",
"insert into table0(device, time,num,bigNum,floatNum,str,bool) values('d1', 40,1,2247483648,12.123,'apricot',TRUE)",
Expand Down Expand Up @@ -114,11 +115,22 @@ public static void tearDown() throws Exception {
}

private static void insertData() {
insertData1();
insertData2();
}
try (Connection connection = EnvFactory.getEnv().getTableConnection();
Statement statement = connection.createStatement()) {

private static String USE_DB = "use db0";
for (String sql : sql1) {
statement.execute(sql);
}
for (String sql : sql2) {
statement.execute(sql);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// sessionInsertData1();
// sessionInsertData2();
}

// ordinal data
public static final String[][] RES =
Expand Down Expand Up @@ -1452,7 +1464,7 @@ public void lastQueryOrderBy5() {
41536900000L,
51536000000L);

protected static void insertData1() {
protected static void sessionInsertData1() {
try (ISession session = EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {

session.open();
Expand Down Expand Up @@ -1509,7 +1521,7 @@ protected static void insertData1() {
}
}

protected static void insertData2() {
protected static void sessionInsertData2() {
try (ISession session = EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {

session.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ private void initResultTsBlock() {
positionCount, Optional.of(new boolean[positionCount]), new int[positionCount]);
break;
case INT64:
case TIMESTAMP:
columns[i] =
new LongColumn(
positionCount, Optional.of(new boolean[positionCount]), new long[positionCount]);
Expand All @@ -302,6 +303,8 @@ private void initResultTsBlock() {
new double[positionCount]);
break;
case TEXT:
case STRING:
case BLOB:
columns[i] =
new BinaryColumn(
positionCount,
Expand Down Expand Up @@ -371,9 +374,12 @@ private long getMemoryUsageOfOneMergeSortKey() {
case INT64:
case DOUBLE:
case VECTOR:
case TIMESTAMP:
memory += 8;
break;
case TEXT:
case STRING:
case BLOB:
memory += 16;
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,18 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.PlanOptimizersStatsCollector;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.AddExchangeNodes;
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.DistributedPlanGenerator;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushLimitOffsetIntoTableScan;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SortElimination;
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanGenerator;
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.db.queryengine.common.header.DatasetHeader.EMPTY_HEADER;
import static org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector.NOOP;
import static org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceVisitor.getStatementMemorySource;
Expand Down Expand Up @@ -81,38 +74,11 @@ public StatementMemorySource visitExplain(
}

// generate table model distributed plan
DistributedPlanGenerator.PlanContext planContext = new DistributedPlanGenerator.PlanContext();
List<PlanNode> distributedPlanResult =
new DistributedPlanGenerator(context.getQueryContext(), context.getAnalysis())
.genResult(logicalPlan.getRootNode(), planContext);
checkArgument(distributedPlanResult.size() == 1, "Root node must return only one");

// Notice: when change the optimizers in TableDistributionPlanner, these code also need to be
// adapted
List<PlanOptimizer> optimizers =
Arrays.asList(new PushLimitOffsetIntoTableScan(), new SortElimination());
// distribute plan optimize rule
PlanNode distributedPlan = distributedPlanResult.get(0);
for (PlanOptimizer optimizer : optimizers) {
distributedPlan =
optimizer.optimize(
distributedPlan,
new PlanOptimizer.Context(
null,
context.getAnalysis(),
null,
context.getQueryContext(),
context.getQueryContext().getTypeProvider(),
new SymbolAllocator(),
context.getQueryContext().getQueryId(),
NOOP,
PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector()));
}

// add exchange node for distributed plan
TableDistributedPlanGenerator.PlanContext planContext =
new TableDistributedPlanGenerator.PlanContext();
PlanNode outputNodeWithExchange =
new AddExchangeNodes(context.getQueryContext())
.addExchangeNodes(distributedPlan, planContext);
new TableDistributedPlanner(context.getAnalysis(), logicalPlan, context.getQueryContext())
.generateDistributedPlanWithOptimize(planContext);

List<String> lines =
outputNodeWithExchange.accept(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ public Operator visitTopK(TopKNode node, LocalExecutionPlanContext context) {
dataTypes,
getComparatorForTable(
node.getOrderingScheme().getOrderingList(), sortItemIndexList, sortItemDataTypeList),
node.getCount(),
(int) node.getCount(),
node.isChildrenDataInOrder());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,9 @@ public List<String> visitTableScan(TableScanNode node, GraphContext context) {
if (node.getPushDownPredicate() != null) {
boxValue.add(String.format("PushDownPredicate: %s", node.getPushDownPredicate()));
}
if (node.getPushDownOffset() > 0) {
boxValue.add(String.format("PushDownOffset: %s", node.getPushDownOffset()));
}
if (node.getPushDownLimit() > 0) {
boxValue.add(String.format("PushDownLimit: %s", node.getPushDownLimit()));
}
boxValue.add(String.format("PushDownOffset: %s", node.getPushDownOffset()));
boxValue.add(String.format("PushDownLimit: %s", node.getPushDownLimit()));
boxValue.add(String.format("PushDownLimitToEachDevice: %s", node.isPushLimitToEachDevice()));
boxValue.add(String.format("RegionId: %s", node.getRegionReplicaSet().getRegionId().getId()));
return render(node, boxValue, context);
}
Expand Down Expand Up @@ -687,6 +684,18 @@ public List<String> visitSort(
return render(node, boxValue, context);
}

@Override
public List<String> visitStreamSort(
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode node,
GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("StreamSort-%s", node.getPlanNodeId().getId()));
boxValue.add(String.format("OrderingScheme: %s", node.getOrderingScheme()));
boxValue.add(String.format("StreamCompareKeyEndIndex: %s", node.getStreamCompareKeyEndIndex()));
boxValue.add(String.format("OrderByAllIdsAndTime: %s", node.isOrderByAllIdsAndTime()));
return render(node, boxValue, context);
}

@Override
public List<String> visitMergeSort(
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode node,
Expand All @@ -708,6 +717,17 @@ public List<String> visitCollect(
return render(node, boxValue, context);
}

@Override
public List<String> visitTopK(
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode node,
GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("TopK-%s", node.getPlanNodeId().getId()));
boxValue.add(String.format("OrderingScheme: %s", node.getOrderingScheme()));
boxValue.add(String.format("Count: %s", node.getCount()));
return render(node, boxValue, context);
}

private String printRegion(TRegionReplicaSet regionReplicaSet) {
return String.format(
"Partition: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;

import org.apache.tsfile.utils.ReadWriteIOUtils;

Expand Down Expand Up @@ -540,11 +538,14 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) {
return org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode
.deserialize(buffer);
case 1008:
return TopKNode.deserialize(buffer);
return org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode.deserialize(
buffer);
case 1009:
return CollectNode.deserialize(buffer);
return org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode.deserialize(
buffer);
case 1010:
return StreamSortNode.deserialize(buffer);
return org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode
.deserialize(buffer);
case 2000:
return RelationalInsertTabletNode.deserialize(buffer);
case 2001:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ private PlanBuilder sort(PlanBuilder subPlan, Optional<OrderingScheme> orderingS

return subPlan.withNewRoot(
new SortNode(
queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), orderingScheme.get(), false));
queryIdAllocator.genPlanNodeId(),
subPlan.getRoot(),
orderingScheme.get(),
false,
false));
}

private PlanBuilder offset(PlanBuilder subPlan, Optional<Offset> offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
Expand Down Expand Up @@ -112,7 +112,7 @@ public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext contex

@Override
public DistributedQueryPlan doDistributionPlan(IAnalysis analysis, LogicalQueryPlan logicalPlan) {
return new TableDistributionPlanner((Analysis) analysis, logicalPlan, logicalPlan.getContext())
return new TableDistributedPlanner((Analysis) analysis, logicalPlan, logicalPlan.getContext())
.plan();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@
import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN;
import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_SOME_CHILD;

public class AddExchangeNodes extends PlanVisitor<PlanNode, DistributedPlanGenerator.PlanContext> {
public class AddExchangeNodes
extends PlanVisitor<PlanNode, TableDistributedPlanGenerator.PlanContext> {

private final MPPQueryContext queryContext;

public AddExchangeNodes(MPPQueryContext queryContext) {
this.queryContext = queryContext;
}

public PlanNode addExchangeNodes(PlanNode node, DistributedPlanGenerator.PlanContext context) {
public PlanNode addExchangeNodes(
PlanNode node, TableDistributedPlanGenerator.PlanContext context) {
return node.accept(this, context);
}

@Override
public PlanNode visitPlan(PlanNode node, DistributedPlanGenerator.PlanContext context) {
public PlanNode visitPlan(PlanNode node, TableDistributedPlanGenerator.PlanContext context) {
if (node instanceof WritePlanNode) {
return node;
}
Expand Down Expand Up @@ -89,7 +91,8 @@ public PlanNode visitPlan(PlanNode node, DistributedPlanGenerator.PlanContext co
}

@Override
public PlanNode visitTableScan(TableScanNode node, DistributedPlanGenerator.PlanContext context) {
public PlanNode visitTableScan(
TableScanNode node, TableDistributedPlanGenerator.PlanContext context) {
context.nodeDistributionMap.put(
node.getPlanNodeId(),
new NodeDistribution(SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSet()));
Expand All @@ -98,18 +101,18 @@ public PlanNode visitTableScan(TableScanNode node, DistributedPlanGenerator.Plan

@Override
public PlanNode visitTableDeviceFetch(
TableDeviceFetchNode node, DistributedPlanGenerator.PlanContext context) {
TableDeviceFetchNode node, TableDistributedPlanGenerator.PlanContext context) {
return processTableDeviceSourceNode(node, context);
}

@Override
public PlanNode visitTableDeviceQuery(
TableDeviceQueryNode node, DistributedPlanGenerator.PlanContext context) {
TableDeviceQueryNode node, TableDistributedPlanGenerator.PlanContext context) {
return processTableDeviceSourceNode(node, context);
}

private PlanNode processTableDeviceSourceNode(
TableDeviceSourceNode node, DistributedPlanGenerator.PlanContext context) {
TableDeviceSourceNode node, TableDistributedPlanGenerator.PlanContext context) {
context.nodeDistributionMap.put(
node.getPlanNodeId(),
new NodeDistribution(SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSet()));
Expand Down
Loading