diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java index 31308f1c0e1e..2ebad93348cf 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java @@ -23,6 +23,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; @@ -39,6 +40,8 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -81,13 +84,12 @@ public void setUp() { public void testSingleEnv() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); - final String sql = - String.format( - "create pipe a2b with source ('source'='iotdb-source') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')", - receiverDataNode.getIpAndPortString()); try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { - statement.execute(sql); + statement.execute( + String.format( + "create pipe a2b with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString())); } catch (final SQLException e) { fail(e.getMessage()); } @@ -104,5 +106,47 @@ public void testSingleEnv() throws Exception { || (Objects.equals(showPipeResult.get(1).id, "a2b_history") && Objects.equals(showPipeResult.get(0).id, "a2b_realtime"))); } + + // Do not split for pipes without insertion or non-full + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "drop pipe a2b_history", + "drop pipe a2b_realtime", + String.format( + "create pipe a2b1 with source ('inclusion'='schema') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()), + String.format( + "create pipe a2b2 with source ('realtime.enable'='false') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()), + String.format( + "create pipe a2b3 with source ('history.enable'='false') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()))); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List showPipeResult = + client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList; + showPipeResult.removeIf(i -> i.getId().startsWith("__consensus")); + Assert.assertEquals(3, showPipeResult.size()); + } + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "drop pipe a2b1", + "drop pipe a2b2", + "drop pipe a2b3", + "insert into root.test.device(time, field) values(0,1),(1,2)", + "delete from root.test.device.* where time == 0", + String.format( + "create pipe a2b with source ('inclusion'='all') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()))); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.test.device", + "Time,root.test.device.field,", + Collections.singleton("1,2.0,")); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 0f8b9446d60f..ea12513d647d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -584,7 +584,7 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) { } } - public boolean isFullSync(final PipeParameters parameters) { + public boolean isFullSync(final PipeParameters parameters) throws IllegalPathException { if (isSnapshotMode(parameters)) { return false; } @@ -598,7 +598,10 @@ public boolean isFullSync(final PipeParameters parameters) { Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE); - return isHistoryEnable && isRealtimeEnable; + return isHistoryEnable + && isRealtimeEnable + && DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters) + .getLeft(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d28edddb2f9b..d0627351b273 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -181,6 +181,7 @@ import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -2205,10 +2206,10 @@ public SettableFuture createPipe( new PipeParameters(createPipeStatement.getSourceAttributes()); final PipeParameters sinkPipeParameters = new PipeParameters(createPipeStatement.getSinkAttributes()); - if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled() - && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) { - try (final ConfigNodeClient configNodeClient = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + try (final ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled() + && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) { // 1. Send request to create the real-time data synchronization pipeline final TCreatePipeReq realtimeReq = new TCreatePipeReq() @@ -2253,11 +2254,17 @@ public SettableFuture createPipe( Boolean.toString(false), PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.toString(true), - // We force the historical pipe to transfer data only + // We force the historical pipe to transfer data (and maybe + // deletion) only // Thus we can transfer schema only once // And may drop the historical pipe on successfully transferred PipeSourceConstant.SOURCE_INCLUSION_KEY, - PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE, + DataRegionListeningFilter + .parseInsertionDeletionListeningOptionPair( + sourcePipeParameters) + .getRight() + ? "data" + : PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE, PipeSourceConstant.SOURCE_EXCLUSION_KEY, PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE))) .getAttribute()) @@ -2280,27 +2287,20 @@ public SettableFuture createPipe( // 3. Set success status only if both pipelines are created successfully future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); - } catch (final Exception e) { - // Catch any other exceptions (e.g., network issues) - future.setException(e); - } - return future; - } - - try (final ConfigNodeClient configNodeClient = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TCreatePipeReq req = - new TCreatePipeReq() - .setPipeName(pipeName) - .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) - .setExtractorAttributes(createPipeStatement.getSourceAttributes()) - .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getSinkAttributes()); - final TSStatus tsStatus = configNodeClient.createPipe(req); - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { - future.setException(new IoTDBException(tsStatus)); } else { - future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + final TCreatePipeReq req = + new TCreatePipeReq() + .setPipeName(pipeName) + .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) + .setExtractorAttributes(createPipeStatement.getSourceAttributes()) + .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) + .setConnectorAttributes(createPipeStatement.getSinkAttributes()); + final TSStatus tsStatus = configNodeClient.createPipe(req); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + future.setException(new IoTDBException(tsStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } } } catch (final Exception e) { future.setException(e);