Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,44 @@ public void deleteTimeSeriesWithMultiPatternTest() throws Exception {
}
}

@Test
public void deleteTemplateTimeSeriesTest() throws Exception {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE root.db");
statement.execute("CREATE DEVICE TEMPLATE t1 (s1 INT64, s2 DOUBLE)");
statement.execute("SET DEVICE TEMPLATE t1 to root.db");
statement.execute("CREATE TIMESERIES USING DEVICE TEMPLATE ON root.db.d1");
try {
statement.execute("DELETE TIMESERIES root.**");
Assert.fail();
} catch (SQLException e) {
Assert.assertTrue(
e.getMessage()
.contains(
TSStatusCode.PATH_NOT_EXIST.getStatusCode()
+ ": Timeseries [root.**] does not exist or is represented by device template"));
}
try {
statement.execute("DELETE TIMESERIES root.db.**");
Assert.fail();
} catch (SQLException e) {
Assert.assertTrue(
e.getMessage()
.contains(
TSStatusCode.PATH_NOT_EXIST.getStatusCode()
+ ": Timeseries [root.db.**] does not exist or is represented by device template"));
}
}
}

@Test
public void deleteTimeSeriesAndReturnPathNotExistsTest() throws Exception {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try {
statement.execute("delete timeseries root.**");
Assert.fail();
} catch (SQLException e) {
Assert.assertTrue(
e.getMessage()
Expand All @@ -428,6 +460,7 @@ public void deleteTimeSeriesAndReturnPathNotExistsTest() throws Exception {
}

int cnt = 0;

try (ResultSet resultSet = statement.executeQuery("select count(s1) from root.*.d1")) {
while (resultSet.next()) {
StringBuilder ans = new StringBuilder(resultSet.getString(TIMESTAMP_STR));
Expand All @@ -442,6 +475,7 @@ public void deleteTimeSeriesAndReturnPathNotExistsTest() throws Exception {

try {
statement.execute("delete timeseries root.*.d1.s3");
Assert.fail();
} catch (SQLException e) {
Assert.assertTrue(
e.getMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public enum DataNodeRequestType {
ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
DEACTIVATE_TEMPLATE,
COUNT_PATHS_USING_TEMPLATE,
CHECK_SCHEMA_REGION_USING_TEMPLATE,
CHECK_TIMESERIES_EXISTENCE,

CONSTRUCT_VIEW_SCHEMA_BLACK_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
Expand Down Expand Up @@ -438,6 +440,12 @@ private void sendAsyncRequestToDataNode(
(CountPathsUsingTemplateRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
case CHECK_SCHEMA_REGION_USING_TEMPLATE:
client.checkSchemaRegionUsingTemplate(
(TCheckSchemaRegionUsingTemplateReq) clientHandler.getRequest(requestId),
(CheckSchemaRegionUsingTemplateRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
case CHECK_TIMESERIES_EXISTENCE:
client.checkTimeSeriesExistence(
(TCheckTimeSeriesExistenceReq) clientHandler.getRequest(requestId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
Expand Down Expand Up @@ -200,6 +202,14 @@ public AbstractAsyncRPCHandler<?> createAsyncRPCHandler(
dataNodeLocationMap,
(Map<Integer, TCountPathsUsingTemplateResp>) responseMap,
countDownLatch);
case CHECK_SCHEMA_REGION_USING_TEMPLATE:
return new CheckSchemaRegionUsingTemplateRPCHandler(
requestType,
requestId,
targetDataNode,
dataNodeLocationMap,
(Map<Integer, TCheckSchemaRegionUsingTemplateResp>) responseMap,
countDownLatch);
case CHECK_TIMESERIES_EXISTENCE:
return new CheckTimeSeriesExistenceRPCHandler(
requestType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.client.async.handlers.rpc.subscription;

import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.handlers.rpc.AbstractAsyncRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class CheckSchemaRegionUsingTemplateRPCHandler
extends AbstractAsyncRPCHandler<TCheckSchemaRegionUsingTemplateResp> {

private static final Logger LOGGER =
LoggerFactory.getLogger(CheckSchemaRegionUsingTemplateRPCHandler.class);

public CheckSchemaRegionUsingTemplateRPCHandler(
DataNodeRequestType requestType,
int requestId,
TDataNodeLocation targetDataNode,
Map<Integer, TDataNodeLocation> dataNodeLocationMap,
Map<Integer, TCheckSchemaRegionUsingTemplateResp> responseMap,
CountDownLatch countDownLatch) {
super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
}

@Override
public void onComplete(TCheckSchemaRegionUsingTemplateResp response) {
TSStatus tsStatus = response.getStatus();
responseMap.put(requestId, response);
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataNodeLocationMap.remove(requestId);
LOGGER.info(
"Successfully check schema region using template on DataNode: {}", targetDataNode);
} else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
dataNodeLocationMap.remove(requestId);
LOGGER.error(
"Failed to check schema region using template on DataNode {}, {}",
targetDataNode,
tsStatus);
} else {
LOGGER.error(
"Failed to check schema region using template on DataNode {}, {}",
targetDataNode,
tsStatus);
}
countDownLatch.countDown();
}

@Override
public void onError(Exception e) {
String errorMsg =
"Count paths using template error on DataNode: {id="
+ targetDataNode.getDataNodeId()
+ ", internalEndPoint="
+ targetDataNode.getInternalEndPoint()
+ "}"
+ e.getMessage();
LOGGER.error(errorMsg);

countDownLatch.countDown();
TCheckSchemaRegionUsingTemplateResp resp = new TCheckSchemaRegionUsingTemplateResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
responseMap.put(requestId, resp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.path.PathPatternUtil;
Expand Down Expand Up @@ -109,6 +110,7 @@
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
Expand Down Expand Up @@ -1814,13 +1816,15 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
boolean canOptimize = false;
HashSet<TDatabaseSchema> deleteDatabaseSchemas = new HashSet<>();
List<PartialPath> deleteTimeSeriesPatternPaths = new ArrayList<>();
List<PartialPath> deleteDatabasePatternPaths = new ArrayList<>();
for (PartialPath path : rawPatternTree.getAllPathPatterns()) {
if (PathPatternUtil.isMultiLevelMatchWildcard(path.getMeasurement())
&& !path.getDevicePath().hasWildcard()) {
Map<String, TDatabaseSchema> databaseSchemaMap =
getClusterSchemaManager().getMatchedDatabaseSchemasByPrefix(path.getDevicePath());
if (!databaseSchemaMap.isEmpty()) {
deleteDatabaseSchemas.addAll(databaseSchemaMap.values());
deleteDatabasePatternPaths.add(path);
canOptimize = true;
continue;
}
Expand All @@ -1830,6 +1834,12 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
if (!canOptimize) {
return procedureManager.deleteTimeSeries(queryId, rawPatternTree, isGeneratedByPipe);
}
// check if the database is using template
try {
SchemaUtils.checkSchemaRegionUsingTemplate(this, deleteDatabasePatternPaths);
} catch (MetadataException e) {
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
if (!deleteTimeSeriesPatternPaths.isEmpty()) {
// 1. delete time series that can not be optimized
PathPatternTree deleteTimeSeriesPatternTree = new PathPatternTree();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;

import java.util.ArrayList;
Expand All @@ -40,7 +41,7 @@

public abstract class DataNodeRegionTaskExecutor<Q, R> {

protected final ConfigNodeProcedureEnv env;
protected final ConfigManager configManager;
protected final Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup;
protected final boolean executeOnAllReplicaset;

Expand All @@ -50,13 +51,26 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> {

private boolean isInterrupted = false;

protected DataNodeRegionTaskExecutor(
ConfigManager configManager,
Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup,
boolean executeOnAllReplicaset,
DataNodeRequestType dataNodeRequestType,
BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q> dataNodeRequestGenerator) {
this.configManager = configManager;
this.targetSchemaRegionGroup = targetSchemaRegionGroup;
this.executeOnAllReplicaset = executeOnAllReplicaset;
this.dataNodeRequestType = dataNodeRequestType;
this.dataNodeRequestGenerator = dataNodeRequestGenerator;
}

protected DataNodeRegionTaskExecutor(
ConfigNodeProcedureEnv env,
Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup,
boolean executeOnAllReplicaset,
DataNodeRequestType dataNodeRequestType,
BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q> dataNodeRequestGenerator) {
this.env = env;
this.configManager = env.getConfigManager();
this.targetSchemaRegionGroup = targetSchemaRegionGroup;
this.executeOnAllReplicaset = executeOnAllReplicaset;
this.dataNodeRequestType = dataNodeRequestType;
Expand All @@ -70,8 +84,7 @@ void execute() {
executeOnAllReplicaset
? getAllReplicaDataNodeRegionGroupMap(targetSchemaRegionGroup)
: getLeaderDataNodeRegionGroupMap(
env.getConfigManager().getLoadManager().getRegionLeaderMap(),
targetSchemaRegionGroup);
configManager.getLoadManager().getRegionLeaderMap(), targetSchemaRegionGroup);
while (!dataNodeConsensusGroupIdMap.isEmpty()) {
AsyncClientHandler<Q, R> clientHandler = prepareRequestHandler(dataNodeConsensusGroupIdMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
Expand Down Expand Up @@ -142,8 +155,7 @@ private Map<TDataNodeLocation, List<TConsensusGroupId>> getAvailableDataNodeLoca

Map<TDataNodeLocation, List<TConsensusGroupId>> availableDataNodeLocation = new HashMap<>();

Map<TConsensusGroupId, Integer> leaderMap =
env.getConfigManager().getLoadManager().getRegionLeaderMap();
Map<TConsensusGroupId, Integer> leaderMap = configManager.getLoadManager().getRegionLeaderMap();
for (List<TConsensusGroupId> consensusGroupIdList :
failedDataNodeConsensusGroupIdMap.values()) {
for (TConsensusGroupId consensusGroupId : consensusGroupIdList) {
Expand Down
Loading