Skip to content

Commit

Permalink
[IOTDB-3184] Set up Delete timeseries statement and PlanNodes (#5974)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcosZyk committed May 22, 2022
1 parent 9a1bc7c commit 642c1b6
Show file tree
Hide file tree
Showing 13 changed files with 656 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;

public class PathPatternTree {

private PathPatternNode root;
Expand Down Expand Up @@ -252,18 +250,18 @@ private PartialPath constructFullPath(PathPatternNode node, Deque<String> ancest
return new PartialPath(nodeList.toArray(new String[0]));
}

public PathPatternTree extractInvolvedPartByPrefix(PartialPath prefixPath) {
public PathPatternTree findOverlappedPattern(PartialPath pattern) {
if (pathList.isEmpty()) {
pathList = splitToPathList();
}
PartialPath pattern = prefixPath.concatNode(MULTI_LEVEL_PATH_WILDCARD);
List<PartialPath> involvedPath = new ArrayList<>();

List<PartialPath> results = new ArrayList<>();
for (PartialPath path : pathList) {
if (pattern.overlapWith(path)) {
involvedPath.add(path);
results.add(path);
}
}
return new PathPatternTree(involvedPath);
return new PathPatternTree(results);
}

@TestOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.analyze;

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
Expand Down Expand Up @@ -90,6 +92,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;

/** Analyze the statement and generate Analysis. */
public class Analyzer {
private static final Logger logger = LoggerFactory.getLogger(Analyzer.class);
Expand Down Expand Up @@ -921,6 +925,46 @@ public Analysis visitAlterTimeseries(
return analysis;
}

@Override
public Analysis visitDeleteTimeseries(
DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(deleteTimeSeriesStatement);

// fetch partition information

PathPatternTree patternTree = new PathPatternTree(deleteTimeSeriesStatement.getPaths());

SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);

Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
for (String storageGroup : schemaPartitionInfo.getSchemaPartitionMap().keySet()) {
try {
for (String devicePath :
patternTree
.findOverlappedPattern(
new PartialPath(storageGroup).concatNode(MULTI_LEVEL_PATH_WILDCARD))
.findAllDevicePaths()) {
DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
queryParam.setDevicePath(devicePath);
sgNameToQueryParamsMap
.computeIfAbsent(storageGroup, key -> new ArrayList<>())
.add(queryParam);
}
} catch (IllegalPathException e) {
// definitely won't happen
throw new RuntimeException(e);
}
}

DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
analysis.setDataPartitionInfo(dataPartition);

return analysis;
}

@Override
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
Expand Down Expand Up @@ -354,6 +355,19 @@ public void parseAliasClause(
}
}

// Delete Timeseries ======================================================================

@Override
public Statement visitDeleteTimeseries(IoTDBSqlParser.DeleteTimeseriesContext ctx) {
DeleteTimeSeriesStatement deleteTimeSeriesStatement = new DeleteTimeSeriesStatement();
List<PartialPath> partialPaths = new ArrayList<>();
for (IoTDBSqlParser.PrefixPathContext prefixPathContext : ctx.prefixPath()) {
partialPaths.add(parsePrefixPath(prefixPathContext));
}
deleteTimeSeriesStatement.setPartialPaths(partialPaths);
return deleteTimeSeriesStatement;
}

// Show Timeseries ========================================================================

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;

public class LogicalPlanBuilder {

private PlanNode root;
Expand Down Expand Up @@ -542,8 +544,10 @@ public LogicalPlanBuilder planSchemaFetchSource(
new SchemaFetchScanNode(
context.getQueryId().genPlanNodeId(),
storageGroupPath,
patternTree.extractInvolvedPartByPrefix(storageGroupPath)));
patternTree.findOverlappedPattern(
storageGroupPath.concatNode(MULTI_LEVEL_PATH_WILDCARD))));
} catch (IllegalPathException e) {
// definitely won't happen
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InvalidateSchemaCacheNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
Expand All @@ -54,6 +56,7 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
Expand Down Expand Up @@ -107,7 +110,10 @@ public enum PlanNodeType {
CREATE_MULTI_TIME_SERIES((short) 39),
CHILD_PATHS_SCAN((short) 40),
CHILD_NODES_SCAN((short) 41),
NODE_MANAGEMENT_MEMORY_MERGE((short) 42);
NODE_MANAGEMENT_MEMORY_MERGE((short) 42),
INVALIDATE_SCHEMA_CACHE((short) 43),
DELETE_DATA((short) 44),
DELETE_TIMESERIES((short) 45);

private final short nodeType;

Expand Down Expand Up @@ -217,6 +223,12 @@ public static PlanNode deserialize(ByteBuffer buffer) {
return ChildNodesSchemaScanNode.deserialize(buffer);
case 42:
return NodeManagementMemoryMergeNode.deserialize(buffer);
case 43:
return InvalidateSchemaCacheNode.deserialize(buffer);
case 44:
return DeleteDataNode.deserialize(buffer);
case 45:
return DeleteTimeSeriesNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;

import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class DeleteTimeSeriesNode extends WritePlanNode {

private final List<PartialPath> pathList;

public DeleteTimeSeriesNode(PlanNodeId id, List<PartialPath> pathList) {
super(id);
this.pathList = pathList;
}

public List<PartialPath> getPathList() {
return pathList;
}

@Override
public List<PlanNode> getChildren() {
return null;
}

@Override
public void addChild(PlanNode child) {}

@Override
public PlanNode clone() {
return new DeleteTimeSeriesNode(getPlanNodeId(), pathList);
}

@Override
public int allowedChildCount() {
return 0;
}

@Override
public List<String> getOutputColumnNames() {
return null;
}

@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.DELETE_TIMESERIES.serialize(byteBuffer);
ReadWriteIOUtils.write(pathList.size(), byteBuffer);
for (PartialPath path : pathList) {
path.serialize(byteBuffer);
}
}

public static DeleteTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
int size = ReadWriteIOUtils.readInt(byteBuffer);
List<PartialPath> pathList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
pathList.add((PartialPath) PathDeserializeUtil.deserialize(byteBuffer));
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new DeleteTimeSeriesNode(planNodeId, pathList);
}

@Override
public TRegionReplicaSet getRegionReplicaSet() {
return null;
}

@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
return null;
}
}

0 comments on commit 642c1b6

Please sign in to comment.