Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
Expand Down Expand Up @@ -90,6 +91,20 @@ public LogicalPlanner(
.getPlanOptimizers();
}

@TestOnly
public LogicalPlanner(
MPPQueryContext queryContext,
Metadata metadata,
SessionInfo sessionInfo,
WarningCollector warningCollector,
List<PlanOptimizer> planOptimizers) {
this.queryContext = queryContext;
this.metadata = metadata;
this.sessionInfo = requireNonNull(sessionInfo, "session is null");
this.warningCollector = requireNonNull(warningCollector, "warningCollector is null");
this.planOptimizers = planOptimizers;
}

public LogicalQueryPlan plan(Analysis analysis) {
PlanNode planNode = planStatement(analysis, analysis.getStatement());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public boolean requiresPreSortedInputs() {
return false;
}

public boolean isWithTies() {
return tiesResolvingScheme.isPresent();
}

public Optional<OrderingScheme> getTiesResolvingScheme() {
return tiesResolvingScheme;
}

@Override
public PlanNode clone() {
return new LimitNode(id, null, count, tiesResolvingScheme);
Expand Down Expand Up @@ -99,10 +107,6 @@ public long getCount() {
return count;
}

public Optional<OrderingScheme> getTiesResolvingScheme() {
return tiesResolvingScheme;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.plan.relational.planner;

import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ArithmeticBinaryExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.junit.Test;

import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.any;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.anyTree;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.dataType;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.expression;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.offset;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ArithmeticBinaryExpression.Operator.ADD;
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN;
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression.Operator.AND;
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem.NullOrdering.LAST;
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem.Ordering.ASCENDING;
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem.Ordering.DESCENDING;

public class ExampleTest {
@Test
public void exampleTest() {
PlanTester planTester = new PlanTester();

String sql =
"SELECT time, tag3, substring(tag1, 1), cast(s2 as double), s2+s3, attr1 FROM table1 "
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by time desc, s1+s2 asc, tag2 asc, tag1 desc offset 5";

LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);

// (("s1" + "s3") > 0) AND (CAST("s1" AS double) > 1E0)
Expression filterPredicate =
new LogicalExpression(
AND,
ImmutableList.of(
new ComparisonExpression(
GREATER_THAN,
new ArithmeticBinaryExpression(
ADD, new SymbolReference("s1"), new SymbolReference("s3")),
new LongLiteral("0")),
new ComparisonExpression(
GREATER_THAN,
new Cast(new SymbolReference("s1"), dataType("double")),
new DoubleLiteral("1.0"))));

PlanMatchPattern tableScan =
tableScan(
"testdb.table1",
ImmutableList.of("time", "tag1", "tag2", "tag3", "attr1", "s1", "s2", "s3"),
ImmutableSet.of("time", "tag1", "tag2", "tag3", "attr1", "s1", "s2", "s3"));

// Verify full LogicalPlan
// Output - Offset -Project - MergeSort - Sort - Project - Filter - TableScan
assertPlan(
logicalQueryPlan,
output(
offset(
5,
project(
sort(
ImmutableList.of(
sort("time", DESCENDING, LAST),
sort("expr_1", ASCENDING, LAST),
sort("tag2", ASCENDING, LAST),
sort("tag1", DESCENDING, LAST)),
project( // We need to indicate alias of expr_1 for parent
ImmutableMap.of(
"expr_1",
expression(
new ArithmeticBinaryExpression(
ADD,
new SymbolReference("s1"),
new SymbolReference("s2")))),
filter(filterPredicate, tableScan)))))));

// You can use anyTree() to match any partial(at least one Node) of Plan
assertPlan(logicalQueryPlan, output(anyTree(project(filter(filterPredicate, tableScan)))));

// Verify DistributionPlan

/*
* IdentitySinkNode-33
* └──OutputNode-8
* └──OffsetNode-6
* └──ProjectNode
* └──MergeSortNode-25
* ├──ExchangeNode-29: [SourceAddress:192.0.12.1/test_query.2.0/31]
* ├──SortNode-27
* │ └──ProjectNode-23
* │ └──FilterNode-17
* │ └──TableScanNode-14
* └──ExchangeNode-30: [SourceAddress:192.0.10.1/test_query.3.0/32]
*/
assertPlan(
planTester.getFragmentPlan(0),
anyTree(
project(
mergeSort(
exchange(), sort(project(filter(filterPredicate, tableScan))), exchange()))));

/*
* IdentitySinkNode-31
* └──SortNode-26
* └──ProjectNode-22
* └──FilterNode-16
* └──TableScanNode-13
*/
assertPlan(
planTester.getFragmentPlan(1),
any( // use any() to match any one node
sort(project(filter(filterPredicate, tableScan)))));

/* IdentitySinkNode-31
* └──SortNode-26
* └──ProjectNode-19
* └──FilterNode-16
* └──TableScanNode-13
*/
assertPlan(
planTester.getFragmentPlan(2), any(sort(project(filter(filterPredicate, tableScan)))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.relational.planner;

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestMatadata;
import org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.PlanOptimizersStatsCollector;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;

import java.time.ZoneId;
import java.util.Collections;
import java.util.List;

import static org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector.NOOP;
import static org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
import static org.junit.Assert.fail;

public class PlanTester {
private final QueryId queryId = new QueryId("test_query");
private final SessionInfo sessionInfo =
new SessionInfo(
1L,
"iotdb-user",
ZoneId.systemDefault(),
IoTDBConstant.ClientVersion.V_1_0,
"db",
IClientSession.SqlDialect.TABLE);
private final Metadata metadata = new TestMatadata();

private DistributedQueryPlan distributedQueryPlan;

private Analysis analysis;

private LogicalQueryPlan plan;

public LogicalQueryPlan createPlan(String sql) {
return createPlan(sessionInfo, sql, NOOP, createPlanOptimizersStatsCollector());
}

public LogicalQueryPlan createPlan(SessionInfo sessionInfo, String sql) {
return createPlan(sessionInfo, sql, NOOP, createPlanOptimizersStatsCollector());
}

public LogicalQueryPlan createPlan(
SessionInfo sessionInfo,
String sql,
WarningCollector warningCollector,
PlanOptimizersStatsCollector planOptimizersStatsCollector) {
MPPQueryContext context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);

Analysis analysis = analyze(sql, metadata);
this.analysis = analysis;

LogicalPlanner logicalPlanner =
new LogicalPlanner(context, metadata, sessionInfo, WarningCollector.NOOP);

plan = logicalPlanner.plan(analysis);

return plan;
}

public LogicalQueryPlan createPlan(
SessionInfo sessionInfo,
String sql,
List<PlanOptimizer> optimizers,
WarningCollector warningCollector,
PlanOptimizersStatsCollector planOptimizersStatsCollector) {
MPPQueryContext context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);

Analysis analysis = analyze(sql, metadata);

LogicalPlanner logicalPlanner =
new LogicalPlanner(context, metadata, sessionInfo, WarningCollector.NOOP, optimizers);

return logicalPlanner.plan(analysis);
}

public static Analysis analyze(String sql, Metadata metadata) {
SqlParser sqlParser = new SqlParser();
Statement statement = sqlParser.createStatement(sql, ZoneId.systemDefault());
SessionInfo session =
new SessionInfo(
0, "test", ZoneId.systemDefault(), "testdb", IClientSession.SqlDialect.TABLE);
return analyzeStatement(statement, metadata, sqlParser, session);
}

public static Analysis analyzeStatement(
Statement statement, Metadata metadata, SqlParser sqlParser, SessionInfo session) {
try {
StatementAnalyzerFactory statementAnalyzerFactory =
new StatementAnalyzerFactory(metadata, sqlParser, new NopAccessControl());

Analyzer analyzer =
new Analyzer(
session,
statementAnalyzerFactory,
Collections.emptyList(),
Collections.emptyMap(),
NOOP);
return analyzer.analyze(statement);
} catch (Exception e) {
e.printStackTrace();
fail(statement + ", " + e.getMessage());
}
fail();
return null;
}

public PlanNode getFragmentPlan(int index) {
if (distributedQueryPlan == null) {
distributedQueryPlan = new TableDistributedPlanner(analysis, plan, plan.getContext()).plan();
}
return distributedQueryPlan.getFragments().get(index).getPlanNodeTree();
}

private static class NopAccessControl implements AccessControl {}
}
Loading