diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java new file mode 100644 index 0000000000000..e6dfd37a307dd --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.autocreate; + +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2AutoCreateSchema.class}) +public class IoTDBPipeConditionalOperationsIT extends AbstractPipeDualAutoIT { + + @Test + public void testBasicCreatePipeIfNotExists() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + // Create pipe + String sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.pattern'='root.test1', 'source.realtime.mode'='stream') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + long creationTime; + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.size()); + // Check status + Assert.assertEquals("RUNNING", showPipeResult.get(0).state); + // Check configurations + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source")); + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1")); + Assert.assertTrue( + showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream")); + Assert.assertTrue( + showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor")); + Assert.assertTrue( + showPipeResult + .get(0) + .pipeConnector + .contains(String.format("node-urls=%s", receiverDataNode.getIpAndPortString()))); + // Record last creation time + creationTime = showPipeResult.get(0).creationTime; + } + + // Create pipe If Not Exists + sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.path'='root.test2.**') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.size()); + // Check status + Assert.assertEquals("RUNNING", showPipeResult.get(0).state); + // Check configurations + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source")); + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1")); + Assert.assertTrue( + showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream")); + Assert.assertTrue( + showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor")); + Assert.assertTrue( + showPipeResult + .get(0) + .pipeConnector + .contains(String.format("node-urls=%s", receiverDataNode.getIpAndPortString()))); + Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test2.**")); + Assert.assertEquals(creationTime, showPipeResult.get(0).creationTime); + } + } + + @Test + public void testBasicDropPipeIfExists() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + // Create pipe + final String sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.path'='root.test1.**') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // Drop pipe If Exists + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("Drop pipe If Exists a2b"); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + } + + // Drop pipe If Exists + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("Drop pipe If Exists a2b"); + } catch (SQLException e) { + fail(e.getMessage()); + } + } + + public void testBasicAlterPipeIfExists() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + // Alter pipe If Exists + String sql = + String.format( + "Alter pipe If Exists a2b replace sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + } + + // Create pipe + sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.pattern'='root.test1', 'source.realtime.mode'='stream') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // Alter pipe If Exists + sql = + String.format( + "Alter pipe If Exists a2b replace source () replace processor () replace sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.size()); + // Check status + Assert.assertEquals("RUNNING", showPipeResult.get(0).state); + // Check configurations + Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source")); + Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1")); + Assert.assertFalse( + showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream")); + Assert.assertFalse( + showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor")); + Assert.assertTrue( + showPipeResult + .get(0) + .pipeConnector + .contains(String.format("node-urls=%s", receiverDataNode.getIpAndPortString()))); + } + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 20fba208d4fbe..80e31e2469de8 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -250,6 +250,7 @@ public enum TSStatusCode { ALTER_TOPIC_ERROR(2002), SHOW_TOPIC_ERROR(2003), TOPIC_PUSH_META_ERROR(2004), + TOPIC_NOT_EXIST_ERROR(2005), // Consumer CREATE_CONSUMER_ERROR(2100), diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index e66c875c9a76d..8986fce18e145 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -535,7 +535,7 @@ verifyConnection // Pipe Task ========================================================================================= createPipe - : CREATE PIPE pipeName=identifier + : CREATE PIPE (IF NOT EXISTS)? pipeName=identifier extractorAttributesClause? processorAttributesClause? connectorAttributesClause @@ -575,7 +575,7 @@ connectorAttributeClause ; alterPipe - : ALTER PIPE pipeName=identifier + : ALTER PIPE (IF EXISTS)? pipeName=identifier alterExtractorAttributesClause? alterProcessorAttributesClause? alterConnectorAttributesClause? @@ -603,7 +603,7 @@ alterConnectorAttributesClause ; dropPipe - : DROP PIPE pipeName=identifier + : DROP PIPE (IF EXISTS)? pipeName=identifier ; startPipe @@ -620,11 +620,11 @@ showPipes // Pipe Plugin ========================================================================================= createPipePlugin - : CREATE PIPEPLUGIN pluginName=identifier AS className=STRING_LITERAL uriClause + : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS className=STRING_LITERAL uriClause ; dropPipePlugin - : DROP PIPEPLUGIN pluginName=identifier + : DROP PIPEPLUGIN (IF EXISTS)? pluginName=identifier ; showPipePlugins @@ -633,7 +633,7 @@ showPipePlugins // Topic ========================================================================================= createTopic - : CREATE TOPIC topicName=identifier topicAttributesClause? + : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause? ; topicAttributesClause @@ -645,7 +645,7 @@ topicAttributeClause ; dropTopic - : DROP TOPIC topicName=identifier + : DROP TOPIC (IF EXISTS)? topicName=identifier ; showTopics diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index d1865a1e20486..dc9c12c7c8c5e 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -286,6 +286,10 @@ EVERY : E V E R Y ; +EXISTS + : E X I S T S + ; + EXPLAIN : E X P L A I N ; @@ -942,10 +946,15 @@ ELSE : E L S E ; +IF + : I F + ; + INF : I N F ; + // Privileges Keywords PRIVILEGE_VALUE diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 0f8271d27ded6..59dacc5ad5ca9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -146,6 +146,9 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -1441,10 +1444,10 @@ public TSStatus createPipePlugin(TCreatePipePluginReq req) { } @Override - public TSStatus dropPipePlugin(String pipePluginName) { + public TSStatus dropPipePlugin(TDropPipePluginReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName) + ? pipeManager.getPipePluginCoordinator().dropPipePlugin(req) : status; } @@ -2026,10 +2029,10 @@ public TSStatus stopPipe(String pipeName) { } @Override - public TSStatus dropPipe(String pipeName) { + public TSStatus dropPipe(TDropPipeReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName) + ? pipeManager.getPipeTaskCoordinator().dropPipe(req) : status; } @@ -2058,10 +2061,10 @@ public TSStatus createTopic(TCreateTopicReq req) { } @Override - public TSStatus dropTopic(String topicName) { + public TSStatus dropTopic(TDropTopicReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? subscriptionManager.getSubscriptionCoordinator().dropTopic(topicName) + ? subscriptionManager.getSubscriptionCoordinator().dropTopic(req) : status; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 3df54d41cf78f..366be81c9a342 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -77,6 +77,9 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -464,7 +467,7 @@ TDataPartitionTableResp getOrCreateDataPartition( TSStatus createPipePlugin(TCreatePipePluginReq req); /** Drop pipe plugin. */ - TSStatus dropPipePlugin(String pluginName); + TSStatus dropPipePlugin(TDropPipePluginReq req); /** Show pipe plugins. */ TGetPipePluginTableResp getPipePluginTable(); @@ -631,11 +634,11 @@ TDataPartitionTableResp getOrCreateDataPartition( /** * Drop Pipe. * - * @param pipeName name of Pipe + * @param req Info about Pipe * @return {@link TSStatusCode#SUCCESS_STATUS} if dropped the pipe successfully, {@link * TSStatusCode#PIPE_ERROR} if encountered failure. */ - TSStatus dropPipe(String pipeName); + TSStatus dropPipe(TDropPipeReq req); /** * Get Pipe by name. If pipeName is empty, get all Pipe. @@ -670,7 +673,7 @@ TDataPartitionTableResp getOrCreateDataPartition( TSStatus createTopic(TCreateTopicReq topic); /** Drop Topic. */ - TSStatus dropTopic(String topicName); + TSStatus dropTopic(TDropTopicReq req); /** Show Topic. */ TShowTopicResp showTopic(TShowTopicReq req); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2a14d9445adc1..c77af98375784 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -106,6 +106,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; @@ -799,9 +800,10 @@ public TSStatus createCQ(TCreateCQReq req, ScheduledExecutorService scheduledExe return statusList.get(0); } - public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[] jarFile) { + public TSStatus createPipePlugin( + PipePluginMeta pipePluginMeta, byte[] jarFile, boolean isSetIfNotExistsCondition) { final CreatePipePluginProcedure createPipePluginProcedure = - new CreatePipePluginProcedure(pipePluginMeta, jarFile); + new CreatePipePluginProcedure(pipePluginMeta, jarFile, isSetIfNotExistsCondition); try { if (jarFile != null && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize() @@ -829,8 +831,11 @@ && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize() } } - public TSStatus dropPipePlugin(String pluginName) { - final long procedureId = executor.submitProcedure(new DropPipePluginProcedure(pluginName)); + public TSStatus dropPipePlugin(TDropPipePluginReq req) { + final long procedureId = + executor.submitProcedure( + new DropPipePluginProcedure( + req.getPluginName(), req.isSetIfExistsCondition() && req.isIfExistsCondition())); final List statusList = new ArrayList<>(); final boolean isSucceed = waitingProcedureFinished(Collections.singletonList(procedureId), statusList); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java index 5c0ad6492e67f..07259f842fbe8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java @@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; @@ -72,11 +73,16 @@ public TSStatus createPipePlugin(TCreatePipePluginReq req) { final PipePluginMeta pipePluginMeta = new PipePluginMeta(pluginName, className, false, jarName, jarMD5); - return configManager.getProcedureManager().createPipePlugin(pipePluginMeta, req.getJarFile()); + return configManager + .getProcedureManager() + .createPipePlugin( + pipePluginMeta, + req.getJarFile(), + req.isSetIfNotExistsCondition() && req.isIfNotExistsCondition()); } - public TSStatus dropPipePlugin(String pluginName) { - return configManager.getProcedureManager().dropPipePlugin(pluginName); + public TSStatus dropPipePlugin(TDropPipePluginReq req) { + return configManager.getProcedureManager().dropPipePlugin(req); } public TGetPipePluginTableResp getPipePluginTable() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index b07203e43ed71..100da334d35ec 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -160,13 +161,19 @@ public TSStatus stopPipe(String pipeName) { } /** Caller should ensure that the method is called in the lock {@link #lock()}. */ - public TSStatus dropPipe(String pipeName) { + public TSStatus dropPipe(TDropPipeReq req) { + final String pipeName = req.getPipeName(); final boolean isPipeExistedBeforeDrop = pipeTaskInfo.isPipeExisted(pipeName); final TSStatus status = configManager.getProcedureManager().dropPipe(pipeName); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, status); } - return isPipeExistedBeforeDrop + + final boolean isSetIfExistsCondition = + req.isSetIfExistsCondition() && req.isIfExistsCondition(); + // If the `IF EXISTS` condition is not set and the pipe does not exist before the delete + // operation, return an error status indicating that the pipe does not exist. + return isPipeExistedBeforeDrop || isSetIfExistsCondition ? status : RpcUtils.getStatus( TSStatusCode.PIPE_NOT_EXIST_ERROR, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index c48f3e0f07813..20dfce44bf29c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; @@ -38,6 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -149,12 +151,24 @@ public TSStatus createTopic(TCreateTopicReq req) { return status; } - public TSStatus dropTopic(String topicName) { + public TSStatus dropTopic(TDropTopicReq req) { + final String topicName = req.getTopicName(); + final boolean isTopicExistedBeforeDrop = subscriptionInfo.isTopicExisted(topicName); final TSStatus status = configManager.getProcedureManager().dropTopic(topicName); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName, status); } - return status; + + // If the `IF EXISTS` condition is not set and the topic does not exist before the drop + // operation, return an error status indicating that the topic does not exist. + final boolean isIfExistedConditionSet = + req.isSetIfExistsCondition() && req.isIfExistsCondition(); + return isTopicExistedBeforeDrop || isIfExistedConditionSet + ? status + : RpcUtils.getStatus( + TSStatusCode.TOPIC_NOT_EXIST_ERROR, + String.format( + "Failed to drop topic %s. Failures: %s does not exist.", topicName, topicName)); } public TShowTopicResp showTopic(TShowTopicReq req) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 1e4a81cfe443f..64d235e7f2312 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -93,19 +93,38 @@ public void releasePipePluginInfoLock() { /////////////////////////////// Validator /////////////////////////////// - public void validateBeforeCreatingPipePlugin( - final String pluginName, final String jarName, final String jarMD5) { + /** + * @return true if the pipe plugin is already created and the isSetIfNotExistsCondition is true, + * false otherwise + * @throws PipeException if the pipe plugin is already created and the isSetIfNotExistsCondition + * is false + */ + public boolean validateBeforeCreatingPipePlugin( + final String pluginName, final boolean isSetIfNotExistsCondition) { // both build-in and user defined pipe plugin should be unique if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { + if (isSetIfNotExistsCondition) { + return true; + } throw new PipeException( String.format( "Failed to create PipePlugin [%s], the same name PipePlugin has been created", pluginName)); } + return false; } - public void validateBeforeDroppingPipePlugin(final String pluginName) { + /** + * @return true if the pipe plugin is not created and the isSetIfExistsCondition is true, false + * otherwise + * @throws PipeException if the pipe plugin is not created and the isSetIfExistsCondition is false + */ + public boolean validateBeforeDroppingPipePlugin( + final String pluginName, final boolean isSetIfExistsCondition) { if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) { + if (isSetIfExistsCondition) { + return true; + } throw new PipeException( String.format( "Failed to drop PipePlugin [%s], this PipePlugin has not been created", pluginName)); @@ -116,6 +135,7 @@ public void validateBeforeDroppingPipePlugin(final String pluginName) { "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin", pluginName)); } + return false; } public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(final String jarName) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index c5a264c047a2a..ddc6ae6ac60df 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -155,19 +155,25 @@ public boolean canSkipNextSync() { /////////////////////////////// Validator /////////////////////////////// - public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) throws PipeException { + public boolean checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) + throws PipeException { acquireReadLock(); try { - checkBeforeCreatePipeInternal(createPipeRequest); + return checkBeforeCreatePipeInternal(createPipeRequest); } finally { releaseReadLock(); } } - private void checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeRequest) + private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeRequest) throws PipeException { if (!isPipeExisted(createPipeRequest.getPipeName())) { - return; + return true; + } + + if (createPipeRequest.isSetIfNotExistsCondition() + && createPipeRequest.isIfNotExistsCondition()) { + return false; } final String exceptionMessage = @@ -178,19 +184,23 @@ private void checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeReques throw new PipeException(exceptionMessage); } - public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq alterPipeRequest) + public boolean checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq alterPipeRequest) throws PipeException { acquireReadLock(); try { - checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest); + return checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest); } finally { releaseReadLock(); } } - private void checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq alterPipeRequest) + private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq alterPipeRequest) throws PipeException { if (!isPipeExisted(alterPipeRequest.getPipeName())) { + if (alterPipeRequest.isSetIfExistsCondition() && alterPipeRequest.isIfExistsCondition()) { + return false; + } + final String exceptionMessage = String.format( "Failed to alter pipe %s, the pipe does not exist", alterPipeRequest.getPipeName()); @@ -254,6 +264,8 @@ private void checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq al .getAttribute()); } } + + return true; } public void checkBeforeStartPipe(final String pipeName) throws PipeException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java index b020ffc045d06..6b64331422be8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java @@ -142,20 +142,24 @@ public boolean canSkipNextSync() { /////////////////////////////// Topic /////////////////////////////// - public void validateBeforeCreatingTopic(TCreateTopicReq createTopicReq) + public boolean validateBeforeCreatingTopic(TCreateTopicReq createTopicReq) throws SubscriptionException { acquireReadLock(); try { - checkBeforeCreateTopicInternal(createTopicReq); + return checkBeforeCreateTopicInternal(createTopicReq); } finally { releaseReadLock(); } } - private void checkBeforeCreateTopicInternal(TCreateTopicReq createTopicReq) + private boolean checkBeforeCreateTopicInternal(TCreateTopicReq createTopicReq) throws SubscriptionException { if (!isTopicExisted(createTopicReq.getTopicName())) { - return; + return true; + } + + if (createTopicReq.isSetIfNotExistsCondition() && createTopicReq.isIfNotExistsCondition()) { + return false; } final String exceptionMessage = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 8decdd0b4da90..325832a3869fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -185,7 +185,7 @@ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { /** * Execute at state {@link OperatePipeTaskState#VALIDATE_TASK}. * - * @return true if this procedure can skip subsequent stages (start RUNNING pipe or stop STOPPED + * @return false if this procedure can skip subsequent stages (start RUNNING pipe or stop STOPPED * pipe without runtime exception) * @throws PipeException if validation for pipe parameters failed */ @@ -224,8 +224,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState try { switch (state) { case VALIDATE_TASK: - if (executeFromValidateTask(env)) { - LOGGER.warn("ProcedureId {}: {}", getProcId(), SKIP_PIPE_PROCEDURE_MESSAGE); + if (!executeFromValidateTask(env)) { + LOGGER.info("ProcedureId {}: {}", getProcId(), SKIP_PIPE_PROCEDURE_MESSAGE); // On client side, the message returned after the successful execution of the pipe // command corresponding to this procedure is "Msg: The statement is executed // successfully." diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index 53fc302c6633b..fd48507989034 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -66,14 +66,21 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure subscriptionInfo; @@ -156,7 +160,7 @@ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { protected abstract SubscriptionOperation getOperation(); - protected abstract void executeFromValidate(ConfigNodeProcedureEnv env) + protected abstract boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException; protected abstract void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) @@ -179,7 +183,14 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperateSubscriptionS try { switch (state) { case VALIDATE: - executeFromValidate(env); + if (!executeFromValidate(env)) { + LOGGER.info("ProcedureId {}: {}", getProcId(), SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE); + // On client side, the message returned after the successful execution of the + // subscription command corresponding to this procedure is "Msg: The statement is + // executed successfully." + this.setResult(SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8)); + return Flow.NO_MORE_STATE; + } setNextState(OperateSubscriptionState.OPERATE_ON_CONFIG_NODES); break; case OPERATE_ON_CONFIG_NODES: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java index 981e0513d0451..6e6031b8102ee 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java @@ -75,10 +75,11 @@ protected SubscriptionOperation getOperation() { } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("AlterConsumerGroupProcedure: executeFromValidate, try to validate"); validateAndGetOldAndNewMeta(env); + return true; } protected void validateAndGetOldAndNewMeta(ConfigNodeProcedureEnv env) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java index 6e3a5387479e1..bb010eaca4062 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java @@ -91,10 +91,11 @@ protected SubscriptionOperation getOperation() { } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) { LOGGER.info("ConsumerGroupMetaSyncProcedure: executeFromValidate"); LAST_EXECUTION_TIME.set(System.currentTimeMillis()); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java index 0894feb62ad9e..166f7b3da5e09 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java @@ -86,7 +86,7 @@ protected SubscriptionOperation getOperation() { } @Override - protected void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("CreateSubscriptionProcedure: executeFromValidate"); subscriptionInfo.get().validateBeforeSubscribe(subscribeReq); @@ -131,6 +131,7 @@ protected void executeFromValidate(ConfigNodeProcedureEnv env) throws Subscripti createPipeProcedure.executeFromValidateTask(env); createPipeProcedure.executeFromCalculateInfoForTask(env); } + return true; } // TODO: check periodically if the subscription is still valid but no working pipe? diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java index 89da284ac0d95..7d0c1e09cbdbe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java @@ -87,7 +87,7 @@ protected SubscriptionOperation getOperation() { } @Override - protected void executeFromValidate(final ConfigNodeProcedureEnv env) + protected boolean executeFromValidate(final ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("DropSubscriptionProcedure: executeFromValidate"); @@ -133,6 +133,7 @@ protected void executeFromValidate(final ConfigNodeProcedureEnv env) // Validate AlterConsumerGroupProcedure alterConsumerGroupProcedure.executeFromValidate(env); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java index 93380ed10becf..f8fbdab72e0c4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java @@ -84,12 +84,14 @@ protected SubscriptionOperation getOperation() { } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("AlterTopicProcedure: executeFromValidate"); subscriptionInfo.get().validateBeforeAlteringTopic(updatedTopicMeta); existedTopicMeta = subscriptionInfo.get().getTopicMeta(updatedTopicMeta.getTopicName()); + + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java index ed3d59bd3d480..b712861cb54e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java @@ -66,11 +66,13 @@ protected SubscriptionOperation getOperation() { } @Override - protected void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("CreateTopicProcedure: executeFromValidate"); // 1. check if the topic exists - subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq); + if (!subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq)) { + return false; + } // 2. create the topic meta topicMeta = @@ -78,6 +80,7 @@ protected void executeFromValidate(ConfigNodeProcedureEnv env) throws Subscripti createTopicReq.getTopicName(), System.currentTimeMillis(), createTopicReq.getTopicAttributes()); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java index 54c6e3ed4696b..f1cfbb59d1055 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java @@ -61,10 +61,11 @@ protected SubscriptionOperation getOperation() { } @Override - protected void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("DropTopicProcedure: executeFromValidate({})", topicName); subscriptionInfo.get().validateBeforeDroppingTopic(topicName); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java index 3d49a766102d5..40920e439367a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java @@ -90,10 +90,11 @@ protected SubscriptionOperation getOperation() { } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) { LOGGER.info("TopicMetaSyncProcedure: executeFromValidate"); LAST_EXECUTION_TIME.set(System.currentTimeMillis()); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 51b5f5c89f84b..f1a09ad088a06 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -115,6 +115,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -808,7 +810,7 @@ public TSStatus createPipePlugin(TCreatePipePluginReq req) { @Override public TSStatus dropPipePlugin(TDropPipePluginReq req) { - return configManager.dropPipePlugin(req.getPluginName()); + return configManager.dropPipePlugin(req); } @Override @@ -1038,7 +1040,13 @@ public TSStatus stopPipe(String pipeName) { @Override public TSStatus dropPipe(String pipeName) { - return configManager.dropPipe(pipeName); + return configManager.dropPipe( + new TDropPipeReq().setPipeName(pipeName).setIfExistsCondition(false)); + } + + @Override + public TSStatus dropPipeExtended(TDropPipeReq req) { + return configManager.dropPipe(req); } @Override @@ -1068,7 +1076,13 @@ public TSStatus createTopic(TCreateTopicReq req) { @Override public TSStatus dropTopic(String topicName) { - return configManager.dropTopic(topicName); + return configManager.dropTopic( + new TDropTopicReq().setTopicName(topicName).setIfExistsCondition(false)); + } + + @Override + public TSStatus dropTopicExtended(TDropTopicReq req) throws TException { + return configManager.dropTopic(req); } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java index a697adcdec83e..f3571bed57ea6 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java @@ -152,7 +152,7 @@ public void testManagement() { pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan); // Drop pipe plugin test plugin - pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName); + pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, false); DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName); pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java index 8405f2d4675c9..93b95a3f33621 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java @@ -41,7 +41,7 @@ public void serializeDeserializeTest() { PipePluginMeta pipePluginMeta = new PipePluginMeta("test", "test.class", false, "test.jar", "testMD5test"); CreatePipePluginProcedure proc = - new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3}); + new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3}, false); try { proc.serialize(outputStream); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java index c29f5bd7fb5db..232a5fbee5922 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java @@ -36,7 +36,7 @@ public void serializeDeserializeTest() { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - DropPipePluginProcedure proc = new DropPipePluginProcedure("test"); + DropPipePluginProcedure proc = new DropPipePluginProcedure("test", false); try { proc.serialize(outputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index ede990bf15c90..c897d3000c37f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -83,6 +83,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -973,6 +975,12 @@ public TSStatus dropPipe(String pipeName) throws TException { () -> client.dropPipe(pipeName), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus dropPipeExtended(TDropPipeReq req) throws TException { + return executeRemoteCallWithRetry( + () -> client.dropPipeExtended(req), status -> !updateConfigNodeLeader(status)); + } + @Override public TShowPipeResp showPipe(TShowPipeReq req) throws TException { return executeRemoteCallWithRetry( @@ -997,6 +1005,12 @@ public TSStatus dropTopic(String topicName) throws TException { () -> client.dropTopic(topicName), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus dropTopicExtended(TDropTopicReq req) throws TException { + return executeRemoteCallWithRetry( + () -> client.dropTopicExtended(req), status -> !updateConfigNodeLeader(status)); + } + @Override public TShowTopicResp showTopic(TShowTopicReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 8587fc606dd73..8b6d53582ebec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -75,6 +75,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; @@ -181,6 +183,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement; @@ -856,6 +859,7 @@ public SettableFuture createPipePlugin( client.createPipePlugin( new TCreatePipePluginReq() .setPluginName(pluginName) + .setIfNotExistsCondition(createPipePluginStatement.hasIfNotExistsCondition()) .setClassName(className) .setJarFile(jarFile) .setJarMD5(jarMd5) @@ -882,13 +886,21 @@ public SettableFuture createPipePlugin( } @Override - public SettableFuture dropPipePlugin(String pluginName) { + public SettableFuture dropPipePlugin( + DropPipePluginStatement dropPipePluginStatement) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus executionStatus = client.dropPipePlugin(new TDropPipePluginReq(pluginName)); + final TSStatus executionStatus = + client.dropPipePlugin( + new TDropPipePluginReq() + .setPluginName(dropPipePluginStatement.getPluginName()) + .setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition())); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { - LOGGER.warn("[{}] Failed to drop pipe plugin {}.", executionStatus, pluginName); + LOGGER.warn( + "[{}] Failed to drop pipe plugin {}.", + executionStatus, + dropPipePluginStatement.getPluginName()); future.setException(new IoTDBException(executionStatus.message, executionStatus.code)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); @@ -1752,6 +1764,7 @@ public SettableFuture createPipe(CreatePipeStatement createPip TCreatePipeReq req = new TCreatePipeReq() .setPipeName(createPipeStatement.getPipeName()) + .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) .setExtractorAttributes(createPipeStatement.getExtractorAttributes()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); @@ -1821,6 +1834,7 @@ public SettableFuture alterPipe(AlterPipeStatement alterPipeSt alterPipeStatement.isReplaceAllConnectorAttributes()); req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes()); req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes()); + req.setIfExistsCondition(alterPipeStatement.hasIfExistsCondition()); final TSStatus tsStatus = configNodeClient.alterPipe(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn("Failed to alter pipe {} in config node, status is {}.", pipeName, tsStatus); @@ -1884,7 +1898,11 @@ public SettableFuture dropPipe(DropPipeStatement dropPipeState try (ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus tsStatus = configNodeClient.dropPipe(dropPipeStatement.getPipeName()); + final TSStatus tsStatus = + configNodeClient.dropPipeExtended( + new TDropPipeReq() + .setPipeName(dropPipeStatement.getPipeName()) + .setIfExistsCondition(dropPipeStatement.hasIfExistsCondition())); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn( "Failed to drop pipe {}, status is {}.", dropPipeStatement.getPipeName(), tsStatus); @@ -2032,7 +2050,10 @@ public SettableFuture createTopic(CreateTopicStatement createT try (final ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TCreateTopicReq req = - new TCreateTopicReq().setTopicName(topicName).setTopicAttributes(topicAttributes); + new TCreateTopicReq() + .setTopicName(topicName) + .setIfNotExistsCondition(createTopicStatement.hasIfNotExistsCondition()) + .setTopicAttributes(topicAttributes); final TSStatus tsStatus = configNodeClient.createTopic(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn("Failed to create topic {} in config node, status is {}.", topicName, tsStatus); @@ -2051,7 +2072,11 @@ public SettableFuture dropTopic(DropTopicStatement dropTopicSt final SettableFuture future = SettableFuture.create(); try (ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus tsStatus = configNodeClient.dropTopic(dropTopicStatement.getTopicName()); + final TSStatus tsStatus = + configNodeClient.dropTopicExtended( + new TDropTopicReq() + .setIfExistsCondition(dropTopicStatement.hasIfExistsCondition()) + .setTopicName(dropTopicStatement.getTopicName())); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn( "Failed to drop topic {}, status is {}.", dropTopicStatement.getTopicName(), tsStatus); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 05ad88bfc5e9e..fad6ec99cba82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -49,6 +49,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement; @@ -105,7 +106,7 @@ public interface IConfigTaskExecutor { SettableFuture createPipePlugin(CreatePipePluginStatement createPipeStatement); - SettableFuture dropPipePlugin(String pluginName); + SettableFuture dropPipePlugin(DropPipePluginStatement dropPipePluginStatement); SettableFuture showPipePlugins(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java index 82f9a3257063d..eb57f958430ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java @@ -28,15 +28,15 @@ public class DropPipePluginTask implements IConfigTask { - private final String pluginName; + private final DropPipePluginStatement dropPipePluginStatement; public DropPipePluginTask(DropPipePluginStatement dropPipePluginStatement) { - this.pluginName = dropPipePluginStatement.getPluginName(); + this.dropPipePluginStatement = dropPipePluginStatement; } @Override public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) throws InterruptedException { - return configTaskExecutor.dropPipePlugin(pluginName); + return configTaskExecutor.dropPipePlugin(dropPipePluginStatement); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index dfc2ee7964b75..7235ffbc5185d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -958,6 +958,7 @@ public Statement visitShowTriggers(IoTDBSqlParser.ShowTriggersContext ctx) { public Statement visitCreatePipePlugin(IoTDBSqlParser.CreatePipePluginContext ctx) { return new CreatePipePluginStatement( parseIdentifier(ctx.pluginName.getText()), + ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null, parseStringLiteral(ctx.className.getText()), parseAndValidateURI(ctx.uriClause())); } @@ -965,7 +966,10 @@ public Statement visitCreatePipePlugin(IoTDBSqlParser.CreatePipePluginContext ct // Drop PipePlugin ===================================================================== @Override public Statement visitDropPipePlugin(IoTDBSqlParser.DropPipePluginContext ctx) { - return new DropPipePluginStatement(parseIdentifier(ctx.pluginName.getText())); + final DropPipePluginStatement dropPipePluginStatement = new DropPipePluginStatement(); + dropPipePluginStatement.setPluginName(parseIdentifier(ctx.pluginName.getText())); + dropPipePluginStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + return dropPipePluginStatement; } // Show PipePlugins ===================================================================== @@ -3701,6 +3705,10 @@ public Statement visitCreatePipe(IoTDBSqlParser.CreatePipeContext ctx) { throw new SemanticException( "Not support for this sql in CREATE PIPE, please enter pipe name."); } + + createPipeStatement.setIfNotExists( + ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null); + if (ctx.extractorAttributesClause() != null) { createPipeStatement.setExtractorAttributes( parseExtractorAttributesClause( @@ -3731,6 +3739,8 @@ public Statement visitAlterPipe(IoTDBSqlParser.AlterPipeContext ctx) { "Not support for this sql in ALTER PIPE, please enter pipe name."); } + alterPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + if (ctx.alterExtractorAttributesClause() != null) { alterPipeStatement.setExtractorAttributes( parseExtractorAttributesClause( @@ -3763,6 +3773,7 @@ public Statement visitAlterPipe(IoTDBSqlParser.AlterPipeContext ctx) { alterPipeStatement.setConnectorAttributes(new HashMap<>()); alterPipeStatement.setReplaceAllConnectorAttributes(false); } + return alterPipeStatement; } @@ -3809,6 +3820,8 @@ public Statement visitDropPipe(IoTDBSqlParser.DropPipeContext ctx) { throw new SemanticException("Not support for this sql in DROP PIPE, please enter pipename."); } + dropPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + return dropPipeStatement; } @@ -3861,6 +3874,9 @@ public Statement visitCreateTopic(IoTDBSqlParser.CreateTopicContext ctx) { "Not support for this sql in CREATE TOPIC, please enter topicName."); } + createTopicStatement.setIfNotExists( + ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null); + if (ctx.topicAttributesClause() != null) { createTopicStatement.setTopicAttributes( parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause())); @@ -3893,6 +3909,8 @@ public Statement visitDropTopic(IoTDBSqlParser.DropTopicContext ctx) { "Not support for this sql in DROP TOPIC, please enter topicName."); } + dropTopicStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + return dropTopicStatement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java index 0c0bef6c2744c..de5dc3c1d5941 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java @@ -37,6 +37,7 @@ public class AlterPipeStatement extends Statement implements IConfigStatement { private String pipeName; + private boolean ifExistsCondition; private Map extractorAttributes; private Map processorAttributes; private Map connectorAttributes; @@ -52,6 +53,10 @@ public String getPipeName() { return pipeName; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + public Map getExtractorAttributes() { return extractorAttributes; } @@ -80,6 +85,10 @@ public void setPipeName(String pipeName) { this.pipeName = pipeName; } + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + public void setExtractorAttributes(Map extractorAttributes) { this.extractorAttributes = extractorAttributes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java index f954cc347dced..f0975b5601265 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java @@ -36,13 +36,16 @@ public class CreatePipePluginStatement extends Statement implements IConfigStatement { private final String pluginName; + private final boolean ifNotExistsCondition; private final String className; private final String uriString; - public CreatePipePluginStatement(String pluginName, String className, String uriString) { + public CreatePipePluginStatement( + String pluginName, boolean ifNotExistsCondition, String className, String uriString) { super(); statementType = StatementType.CREATE_PIPEPLUGIN; this.pluginName = pluginName; + this.ifNotExistsCondition = ifNotExistsCondition; this.className = className; this.uriString = uriString; } @@ -51,6 +54,10 @@ public String getPluginName() { return pluginName; } + public boolean hasIfNotExistsCondition() { + return ifNotExistsCondition; + } + public String getClassName() { return className; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java index 45a1a4664cbb4..a7b7471ffd057 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java @@ -37,6 +37,7 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { private String pipeName; + private boolean ifNotExistsCondition; private Map extractorAttributes; private Map processorAttributes; private Map connectorAttributes; @@ -49,6 +50,10 @@ public String getPipeName() { return pipeName; } + public boolean hasIfNotExistsCondition() { + return ifNotExistsCondition; + } + public Map getExtractorAttributes() { return extractorAttributes; } @@ -65,6 +70,10 @@ public void setPipeName(String pipeName) { this.pipeName = pipeName; } + public void setIfNotExists(boolean ifNotExistsCondition) { + this.ifNotExistsCondition = ifNotExistsCondition; + } + public void setExtractorAttributes(Map extractorAttributes) { this.extractorAttributes = extractorAttributes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java index 49807d21a2487..c7f4ebfd5dfb4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java @@ -35,18 +35,30 @@ public class DropPipePluginStatement extends Statement implements IConfigStatement { - private final String pluginName; + private String pluginName; + private boolean ifExistsCondition; - public DropPipePluginStatement(String pluginName) { + public DropPipePluginStatement() { super(); statementType = StatementType.DROP_PIPEPLUGIN; - this.pluginName = pluginName; } public String getPluginName() { return pluginName; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + + public void setPluginName(String pluginName) { + this.pluginName = pluginName; + } + + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + @Override public QueryType getQueryType() { return QueryType.WRITE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java index a7d54a3086f9b..a3403e00e6891 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java @@ -36,11 +36,16 @@ public class DropPipeStatement extends Statement implements IConfigStatement { private String pipeName; + private boolean ifExistsCondition; public DropPipeStatement(StatementType dropPipeStatement) { this.statementType = dropPipeStatement; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + public String getPipeName() { return pipeName; } @@ -49,6 +54,10 @@ public void setPipeName(String pipeName) { this.pipeName = pipeName; } + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + @Override public QueryType getQueryType() { return QueryType.WRITE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java index 6482975b0389a..a98a1d5d27392 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java @@ -37,7 +37,7 @@ public class CreateTopicStatement extends Statement implements IConfigStatement { private String topicName; - + private boolean ifNotExistsCondition; private Map topicAttributes; public CreateTopicStatement() { @@ -49,6 +49,10 @@ public String getTopicName() { return topicName; } + public boolean hasIfNotExistsCondition() { + return ifNotExistsCondition; + } + public Map getTopicAttributes() { return topicAttributes; } @@ -57,6 +61,10 @@ public void setTopicName(String topicName) { this.topicName = topicName; } + public void setIfNotExists(boolean ifNotExistsCondition) { + this.ifNotExistsCondition = ifNotExistsCondition; + } + public void setTopicAttributes(Map topicAttributes) { this.topicAttributes = topicAttributes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java index e366b1c5eb38f..36525b1846ee4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java @@ -35,6 +35,7 @@ public class DropTopicStatement extends Statement implements IConfigStatement { private String topicName; + private boolean ifExistsCondition; public DropTopicStatement() { super(); @@ -45,10 +46,18 @@ public String getTopicName() { return topicName; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + public void setTopicName(String topicName) { this.topicName = topicName; } + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + @Override public QueryType getQueryType() { return QueryType.WRITE; diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 4dcce1c49ffd0..7044bd0b22134 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -673,10 +673,12 @@ struct TCreatePipePluginReq { 3: required string jarName 4: required binary jarFile 5: required string jarMD5 + 6: optional bool ifNotExistsCondition } struct TDropPipePluginReq { 1: required string pluginName + 2: optional bool ifExistsCondition } // Get PipePlugin table from config node @@ -709,6 +711,7 @@ struct TCreatePipeReq { 2: optional map extractorAttributes 3: optional map processorAttributes 4: required map connectorAttributes + 5: optional bool ifNotExistsCondition } struct TAlterPipeReq { @@ -719,6 +722,12 @@ struct TAlterPipeReq { 5: required bool isReplaceAllConnectorAttributes 6: optional map extractorAttributes 7: optional bool isReplaceAllExtractorAttributes + 8: optional bool ifExistsCondition +} + +struct TDropPipeReq { + 1: required string pipeName + 2: optional bool ifExistsCondition } // Deprecated, restored for compatibility @@ -773,6 +782,12 @@ struct TAlterLogicalViewReq { struct TCreateTopicReq { 1: required string topicName 2: optional map topicAttributes + 3: optional bool ifNotExistsCondition +} + +struct TDropTopicReq { + 1: required string topicName + 2: optional bool ifExistsCondition } struct TShowTopicReq { @@ -1484,6 +1499,9 @@ service IConfigNodeRPCService { /** Drop Pipe */ common.TSStatus dropPipe(string pipeName) + /** Drop Pipe */ + common.TSStatus dropPipeExtended(TDropPipeReq req) + /** Show Pipe by name, if name is empty, show all Pipe */ TShowPipeResp showPipe(TShowPipeReq req) @@ -1505,6 +1523,9 @@ service IConfigNodeRPCService { /** Drop Topic */ common.TSStatus dropTopic(string topicName) + /** Drop Topic */ + common.TSStatus dropTopicExtended(TDropTopicReq req) + /** Show Topic by name, if name is empty, show all Topic */ TShowTopicResp showTopic(TShowTopicReq req)