Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-3184][IOTDB-3206]Delete timeseries #6055

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void deserialize(InputStream inputStream, TProtocol protocol)
}
}

public List<RegionReplicaSetInfo> getDataDistributionInfo() {
public List<RegionReplicaSetInfo> getDistributionInfo() {
Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();

dataPartitionMap.forEach(
Expand All @@ -327,7 +327,7 @@ public List<RegionReplicaSetInfo> getDataDistributionInfo() {
for (TRegionReplicaSet regionReplicaSet : ret) {
distributionMap
.computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
.addStorageGroup(storageGroup);
.setStorageGroup(storageGroup);
}
});
return new ArrayList<>(distributionMap.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;

import java.util.List;

public abstract class Partition {
protected String seriesSlotExecutorName;
protected int seriesPartitionSlotNum;
Expand All @@ -39,5 +41,7 @@ protected TSeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
return executor.getSeriesPartitionSlot(deviceName);
}

public abstract List<RegionReplicaSetInfo> getDistributionInfo();

public abstract boolean isEmpty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,23 @@

import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class RegionReplicaSetInfo {
private TRegionReplicaSet regionReplicaSet;
private Set<String> ownedStorageGroups;
private String storageGroup;

public RegionReplicaSetInfo(TRegionReplicaSet regionReplicaSet) {
this.regionReplicaSet = regionReplicaSet;
this.ownedStorageGroups = new HashSet<>();
}

public void addStorageGroup(String storageGroup) {
ownedStorageGroups.add(storageGroup);
public void setStorageGroup(String storageGroup) {
this.storageGroup = storageGroup;
}

public TRegionReplicaSet getRegionReplicaSet() {
return regionReplicaSet;
}

public List<String> getOwnedStorageGroups() {
return new ArrayList<>(ownedStorageGroups);
public String getStorageGroup() {
return storageGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ private Map<TSeriesPartitionSlot, TRegionReplicaSet> readMap(
return result;
}

public List<RegionReplicaSetInfo> getSchemaDistributionInfo() {
@Override
public List<RegionReplicaSetInfo> getDistributionInfo() {
Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();
schemaPartitionMap.forEach(
(storageGroup, partition) -> {
for (TRegionReplicaSet regionReplicaSet : partition.values()) {
distributionMap
.computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
.addStorageGroup(storageGroup);
.setStorageGroup(storageGroup);
}
});
return new ArrayList<>(distributionMap.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaBlacklist;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InvalidateSchemaCacheNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
Expand All @@ -39,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;

public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> {
Expand Down Expand Up @@ -113,4 +120,35 @@ public TSStatus visitDeleteRegion(DeleteRegionNode node, DataRegion dataRegion)
StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) node.getConsensusGroupId());
return StatusUtils.OK;
}

@Override
public TSStatus visitInvalidateSchemaCache(
InvalidateSchemaCacheNode node, DataRegion dataRegion) {
String storageGroup = dataRegion.getLogicalStorageGroupName();
PathPatternTree patternTree = new PathPatternTree();
for (PartialPath path : node.getPathList()) {
try {
patternTree.appendPaths(path.alterPrefixPath(new PartialPath(storageGroup)));
} catch (IllegalPathException e) {
// this definitely won't happen
throw new RuntimeException(e);
}
}
DataNodeSchemaBlacklist.getInstance().appendToBlacklist(patternTree);
return StatusUtils.OK;
}

@Override
public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
try {
for (PartialPath path : node.getPathList()) {
dataRegion.delete(
path, node.getDeleteStartTime(), node.getDeleteEndTime(), Long.MAX_VALUE, null);
}
return StatusUtils.OK;
} catch (IOException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return StatusUtils.EXECUTE_STATEMENT_ERROR;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.metadata.cache;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class DataNodeSchemaBlacklist {

private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();

private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();

private final Set<MeasurementPath> blacklist = Collections.synchronizedSet(new HashSet<>());

private DataNodeSchemaBlacklist() {}

public static DataNodeSchemaBlacklist getInstance() {
return DataNodeSchemaBlacklist.SchemaBlacklistHolder.INSTANCE;
}

/** singleton pattern. */
private static class SchemaBlacklistHolder {
private static final DataNodeSchemaBlacklist INSTANCE = new DataNodeSchemaBlacklist();
}

public void appendToBlacklist(PathPatternTree patternTree) {
List<MeasurementPath> pathList = schemaFetcher.fetchSchema(patternTree).getAllMeasurement();
blacklist.addAll(pathList);
for (MeasurementPath path : pathList) {
// todo improve the implement of schemaCache
schemaCache.invalidate(new PartialPath(path.getNodes()));
}
}

public SchemaTree filterTimeseriesInBlacklist(SchemaTree schemaTree) {
if (!blacklist.isEmpty()) {
for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
if (blacklist.contains(measurementPath)) {
schemaTree.pruneSingleMeasurement(measurementPath);
}
}
}
return schemaTree;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
Expand Down Expand Up @@ -213,5 +214,19 @@ public TSStatus visitDeleteRegion(DeleteRegionNode node, ISchemaRegion schemaReg
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}

@Override
public TSStatus visitDeleteTimeseries(DeleteTimeSeriesNode node, ISchemaRegion schemaRegion) {

try {
for (PartialPath path : node.getPathList()) {
schemaRegion.deleteTimeseries(path, false);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
}
}

private static class TransformerContext {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.db.mpp.common.schematree;

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.qp.constant.SQLConstant;
Expand Down Expand Up @@ -98,6 +97,9 @@ public void setRoot(PathPatternNode root) {

/** @return all device path patterns in the path pattern tree. */
public List<String> findAllDevicePaths() {
if (root.getChildren().isEmpty()) {
constructTree();
}
List<String> nodes = new ArrayList<>();
List<String> pathPatternList = new ArrayList<>();
findAllDevicePaths(root, nodes, pathPatternList);
Expand Down Expand Up @@ -153,13 +155,15 @@ public void appendPath(PartialPath newPath) {
}
}

public void appendPaths(List<PartialPath> paths) {
for (PartialPath path : paths) {
appendPath(path);
}
}

public void appendPaths(PartialPath device, List<String> measurementNameList) {
try {
for (String measurementName : measurementNameList) {
appendPath(new PartialPath(device.getFullPath(), measurementName));
}
} catch (IllegalPathException e) {
e.printStackTrace();
for (String measurementName : measurementNameList) {
appendPath(device.concatNode(measurementName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.RegionReplicaSetInfo;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
Expand All @@ -31,6 +32,7 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -143,6 +145,9 @@ public class Analysis {
// extra mesaage from config node, used for node management
private Set<String> matchedNodes;

// extracted from partition, used for delete data, invalidate cache and delete timeseries
private List<Pair<RegionReplicaSetInfo, List<PartialPath>>> regionRequestList;

public Analysis() {
this.finishQueryAfterAnalyze = false;
}
Expand Down Expand Up @@ -403,4 +408,13 @@ public Set<String> getMatchedNodes() {
public void setMatchedNodes(Set<String> matchedNodes) {
this.matchedNodes = matchedNodes;
}

public List<Pair<RegionReplicaSetInfo, List<PartialPath>>> getRegionRequestList() {
return regionRequestList;
}

public void setRegionRequestList(
List<Pair<RegionReplicaSetInfo, List<PartialPath>>> regionRequestList) {
this.regionRequestList = regionRequestList;
}
}