Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
Expand Down Expand Up @@ -525,6 +526,18 @@ public TSStatus visitDeleteLogicalView(DeleteLogicalViewNode node, ISchemaRegion
}
}

@Override
public TSStatus visitCreateTableDevice(CreateTableDeviceNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.createTableDevice(
node.getDevicePathList(), node.getAttributeNameList(), node.getAttributeValueList());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}

@Override
public TSStatus visitPipeEnrichedWritePlanNode(
PipeEnrichedWritePlanNode node, ISchemaRegion schemaRegion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTableStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
Expand All @@ -104,6 +105,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
import org.apache.iotdb.db.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.relational.sql.tree.Insert;
import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
Expand Down Expand Up @@ -336,17 +338,29 @@ private TSExecuteStatementResp executeStatementInternal(

queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);

// TODO audit log, quota, StatementType
result =
COORDINATOR.executeForTableModel(
s,
relationSqlParser,
clientSession,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
statement,
metadata,
req.getTimeout());
if (s instanceof Insert) {
result =
COORDINATOR.executeForTreeModel(
new InsertTableStatement(clientSession, (Insert) s),
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
statement,
partitionFetcher,
schemaFetcher,
req.getTimeout());
} else {
// TODO audit log, quota, StatementType
result =
COORDINATOR.executeForTableModel(
s,
relationSqlParser,
clientSession,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
statement,
metadata,
req.getTimeout());
}
}

if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class MPPQueryContext {

QueryPlanStatistics queryPlanStatistics = null;

private boolean skipSchemaValidate = false;

public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
Expand Down Expand Up @@ -289,4 +291,12 @@ public void setLogicalOptimizationCost(long logicalOptimizeCost) {
}
queryPlanStatistics.setLogicalOptimizationCost(logicalOptimizeCost);
}

public boolean isSkipSchemaValidate() {
return skipSchemaValidate;
}

public void setSkipSchemaValidate(boolean skipSchemaValidate) {
this.skipSchemaValidate = skipSchemaValidate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByVariationParameter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.schema.TableModelSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
Expand All @@ -101,9 +102,11 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTableStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.CreateTableDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
Expand Down Expand Up @@ -2719,6 +2722,9 @@ public Analysis visitPipeEnrichedStatement(

private void validateSchema(
Analysis analysis, InsertBaseStatement insertStatement, MPPQueryContext context) {
if (context.isSkipSchemaValidate()) {
return;
}
final long startTime = System.nanoTime();
try {
SchemaValidator.validate(schemaFetcher, insertStatement, context);
Expand Down Expand Up @@ -3644,4 +3650,54 @@ public Analysis visitShowCurrentTimestamp(
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowCurrentTimestampHeader());
return analysis;
}

@Override
public Analysis visitInsertTable(
InsertTableStatement insertTableStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
InsertRowStatement insertRowStatement = insertTableStatement.getInsertRowStatement();

final long startTime = System.nanoTime();
try {
TableModelSchemaFetcher.getInstance().validateDeviceSchema(insertTableStatement, context);
} catch (SemanticException e) {
analysis.setFinishQueryAfterAnalyze(true);
if (e.getCause() instanceof IoTDBException) {
IoTDBException exception = (IoTDBException) e.getCause();
analysis.setFailStatus(
RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage()));
} else {
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
}
} finally {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime);
}

if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
}
context.setSkipSchemaValidate(true);
return insertRowStatement.accept(this, context);
}

@Override
public Analysis visitCreateTableDevice(
CreateTableDeviceStatement createTableDeviceStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(createTableDeviceStatement);

PathPatternTree patternTree = new PathPatternTree();
for (PartialPath devicePath : createTableDeviceStatement.getPaths()) {
patternTree.appendFullPath(devicePath.concatNode(ONE_LEVEL_PATH_WILDCARD));
}
SchemaPartition partition =
partitionFetcher.getOrCreateSchemaPartition(
patternTree, context.getSession().getUserName());

analysis.setSchemaPartitionInfo(partition);

return analysis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.CreateTableDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
Expand Down Expand Up @@ -1010,4 +1012,14 @@ public PlanNode visitShowLogicalView(
.planLimit(showLogicalViewStatement.getLimit())
.getRoot();
}

@Override
public PlanNode visitCreateTableDevice(
CreateTableDeviceStatement createTableDeviceStatement, MPPQueryContext context) {
return new CreateTableDeviceNode(
context.getQueryId().genPlanNodeId(),
createTableDeviceStatement.getPaths(),
createTableDeviceStatement.getAttributeNameList(),
createTableDeviceStatement.getAttributeValueList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
Expand Down Expand Up @@ -209,6 +210,8 @@ public enum PlanNodeType {
EXPLAIN_ANALYZE((short) 90),

PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91),

CREATE_TABLE_DEVICE((short) 92),
;

public static final int BYTES = Short.BYTES;
Expand Down Expand Up @@ -440,6 +443,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) {
return ExplainAnalyzeNode.deserialize(buffer);
case 91:
return PipeOperateSchemaQueueNode.deserialize(buffer);
case 92:
return CreateTableDeviceNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
Expand Down Expand Up @@ -553,4 +554,8 @@ public R visitTopK(
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode node, C context) {
return visitPlan(node, context);
}

public R visitCreateTableDevice(CreateTableDeviceNode node, C context) {
return visitPlan(node, context);
}
}
Loading