From 01248bdcd4bd741a7db5c12b825da12618fe32d0 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 17 Jul 2024 15:55:48 +0800 Subject: [PATCH 1/3] Pipe: filter out databases whose name starts with but not equals to `root.__system` --- .../pipe/coordinator/runtime/PipeLeaderChangeHandler.java | 4 +++- .../procedure/impl/pipe/task/AlterPipeProcedureV2.java | 7 ++++--- .../procedure/impl/pipe/task/CreatePipeProcedureV2.java | 3 ++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java index a3cb82ebf52c8..f2d998674ccfc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class PipeLeaderChangeHandler implements IClusterStatusSubscriber { @@ -85,7 +86,8 @@ public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEven configManager.getPartitionManager().getRegionStorageGroup(regionGroupId); // Pipe only collect user's data, filter metric database here. // DatabaseName may be null for config region group - if (!SchemaConstant.SYSTEM_DATABASE.equals(databaseName)) { + if (Objects.nonNull(databaseName) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) { // null or -1 means empty origin leader final int oldLeaderNodeId = (pair.left == null ? -1 : pair.left.getLeaderId()); final int newLeaderNodeId = (pair.right == null ? -1 : pair.right.getLeaderId()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index 6cccbbd7033a5..78b4469eb7e28 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -136,7 +136,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { final PipeTaskMeta currentPipeTaskMeta = currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId()); if (databaseName != null - && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE) && currentPipeTaskMeta.getLeaderNodeId() == regionLeaderNodeId) { // Pipe only collect user's data, filter metric database here. updatedConsensusGroupIdToTaskMetaMap.put( @@ -151,15 +151,16 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { // config region updatedConsensusGroupIdToTaskMetaMap.put( // 0 is the consensus group id of the config region, but data region id and schema region - // id - // also start from 0, so we use Integer.MIN_VALUE to represent the config region + // id also start from 0, so we use Integer.MIN_VALUE to represent the config region Integer.MIN_VALUE, new PipeTaskMeta( configRegionTaskMeta.getProgressIndex(), // The leader of the config region is the config node itself ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId())); } + updatedPipeRuntimeMeta = new PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap); + // If the pipe's previous status was user stopped, then after the alter operation, the pipe's // status remains user stopped; otherwise, it becomes running. if (!pipeTaskInfo.get().isPipeStoppedByUser(alterPipeRequest.getPipeName())) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index bbc9d7b2298fb..fd07f57ad0abf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -172,7 +172,8 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { env.getConfigManager() .getPartitionManager() .getRegionStorageGroup(regionGroupId); - if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) { + if (databaseName != null + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) { // Pipe only collect user's data, filter out metric database here. consensusGroupIdToTaskMetaMap.put( regionGroupId.getId(), From e2ad27ef2b5d7f45d89801d5c949561b74dc6117 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 17 Jul 2024 17:24:02 +0800 Subject: [PATCH 2/3] fix --- .../pipe/coordinator/runtime/PipeLeaderChangeHandler.java | 3 ++- .../procedure/impl/pipe/task/AlterPipeProcedureV2.java | 3 ++- .../procedure/impl/pipe/task/CreatePipeProcedureV2.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java index f2d998674ccfc..ecfab4942c51e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java @@ -87,7 +87,8 @@ public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEven // Pipe only collect user's data, filter metric database here. // DatabaseName may be null for config region group if (Objects.nonNull(databaseName) - && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) { + && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")) { // null or -1 means empty origin leader final int oldLeaderNodeId = (pair.left == null ? -1 : pair.left.getLeaderId()); final int newLeaderNodeId = (pair.right == null ? -1 : pair.right.getLeaderId()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index 78b4469eb7e28..5c11d40f6f913 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -136,7 +136,8 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { final PipeTaskMeta currentPipeTaskMeta = currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId()); if (databaseName != null - && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".") && currentPipeTaskMeta.getLeaderNodeId() == regionLeaderNodeId) { // Pipe only collect user's data, filter metric database here. updatedConsensusGroupIdToTaskMetaMap.put( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index fd07f57ad0abf..68075b5911dc1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -173,7 +173,8 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { .getPartitionManager() .getRegionStorageGroup(regionGroupId); if (databaseName != null - && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) { + && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")) { // Pipe only collect user's data, filter out metric database here. consensusGroupIdToTaskMetaMap.put( regionGroupId.getId(), From 7e234018c8e3c50f15dbf228d2a5a274307cd361 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 17 Jul 2024 18:22:51 +0800 Subject: [PATCH 3/3] Update PipeLeaderChangeHandler.java --- .../pipe/coordinator/runtime/PipeLeaderChangeHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java index ecfab4942c51e..284779fcb0395 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java @@ -86,9 +86,9 @@ public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEven configManager.getPartitionManager().getRegionStorageGroup(regionGroupId); // Pipe only collect user's data, filter metric database here. // DatabaseName may be null for config region group - if (Objects.nonNull(databaseName) - && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) - && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")) { + if (Objects.isNull(databaseName) + || (!databaseName.equals(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + "."))) { // null or -1 means empty origin leader final int oldLeaderNodeId = (pair.left == null ? -1 : pair.left.getLeaderId()); final int newLeaderNodeId = (pair.right == null ? -1 : pair.right.getLeaderId());