From dcbad86e364b8de3796e52e16de9eb8f559e3567 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Fri, 24 Nov 2023 14:32:13 +0800 Subject: [PATCH 1/8] IT optimizing --- .../pipe/it/IoTDBPipeConnectorParallelIT.java | 30 +---- .../{extractor => }/IoTDBPipeExtractorIT.java | 2 +- ...ncIT.java => IoTDBPipeThriftSourceIT.java} | 107 +++++++++++------- 3 files changed, 70 insertions(+), 69 deletions(-) rename integration-test/src/test/java/org/apache/iotdb/pipe/it/{extractor => }/IoTDBPipeExtractorIT.java (99%) rename integration-test/src/test/java/org/apache/iotdb/pipe/it/{IoTDBPipeDataSyncIT.java => IoTDBPipeThriftSourceIT.java} (67%) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java index c8244cf01506a..70c8fd0bf8d31 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java @@ -44,10 +44,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; - -import static org.awaitility.Awaitility.await; -import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2.class}) @@ -120,30 +116,8 @@ public void testIoTConnectorParallel() throws Exception { expectedResSet.add("1,2.0,"); expectedResSet.add("2,3.0,"); expectedResSet.add("3,4.0,"); - assertDataOnReceiver(receiverEnv, expectedResSet); - assertDataOnReceiver(receiverEnv, expectedResSet); - } - } - - private void assertDataOnReceiver(BaseEnv receiverEnv, Set expectedResSet) { - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select * from root.**"), - "Time,root.sg1.d1.s1,", - expectedResSet); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + TestUtils.assertDataOnEnv( + receiverEnv, "select * from root.**", "Time,root.sg1.d1.s1,", expectedResSet); } } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java similarity index 99% rename from integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java rename to integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java index e7aa84fd68142..81bad049e57a3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.pipe.it.extractor; +package org.apache.iotdb.pipe.it; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java similarity index 67% rename from integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java rename to integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java index 3c23576497507..df1fba0a7a017 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java @@ -38,7 +38,6 @@ import org.junit.runner.RunWith; import java.sql.Connection; -import java.sql.SQLException; import java.sql.Statement; import java.util.Collections; import java.util.HashMap; @@ -50,7 +49,7 @@ @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2.class}) -public class IoTDBPipeDataSyncIT { +public class IoTDBPipeThriftSourceIT { private BaseEnv senderEnv; private BaseEnv receiverEnv; @@ -75,7 +74,7 @@ public void tearDown() { } @Test - public void testEnv() throws Exception { + public void testThriftConnector() throws Exception { DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); String receiverIp = receiverDataNode.getIp(); @@ -107,34 +106,65 @@ public void testEnv() throws Exception { // Do not fail if the failure has nothing to do with pipe // Because the failures will randomly generate due to resource limitation - try (Connection connection = senderEnv.getConnection(); - Statement statement = connection.createStatement()) { - statement.execute("insert into root.vehicle.d0(time, s1) values (0, 1)"); - } catch (SQLException e) { - e.printStackTrace(); + if (!TestUtils.tryExecuteNonQueryWithRetry( + senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) { return; } - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select * from root.**"), - "Time,root.vehicle.d0.s1,", - Collections.singleton("0,1.0,")); - } catch (Exception e) { - // Handle the exception generated during "executeQuery" - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + TestUtils.assertDataOnEnv( + receiverEnv, + "select * from root.**", + "Time,root.vehicle.d0.s1,", + Collections.singleton("0,1.0,")); + } + } + + @Test + public void testLegacyConnector() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("source.realtime.mode", "log"); + + connectorAttributes.put("sink", "iotdb-legacy-pipe-sink"); + connectorAttributes.put("sink.batch.enable", "false"); + connectorAttributes.put("sink.ip", receiverIp); + connectorAttributes.put("sink.port", Integer.toString(receiverPort)); + + // This version does not matter since it's no longer checked by the legacy receiver + connectorAttributes.put("sink.version", "1.3"); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueryWithRetry( + senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) { + return; } + + TestUtils.assertDataOnEnv( + receiverEnv, + "select * from root.**", + "Time,root.vehicle.d0.s1,", + Collections.singleton("0,1.0,")); } } @@ -166,21 +196,18 @@ public void testInsertNull() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - statement.execute("create aligned timeseries root.sg.d1(s0 float, s1 float)"); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); + if (!TestUtils.tryExecuteNonQueryWithRetry( + receiverEnv, "create aligned timeseries root.sg.d1(s0 float, s1 float)")) { + return; } - try (Connection connection = senderEnv.getConnection(); - Statement statement = connection.createStatement()) { - statement.execute("create aligned timeseries root.sg.d1(s0 float, s1 float)"); - statement.execute("insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"); - } catch (SQLException e) { - e.printStackTrace(); - fail(e.getMessage()); + if (!TestUtils.tryExecuteNonQueryWithRetry( + senderEnv, "create aligned timeseries root.sg.d1(s0 float, s1 float)")) { + return; + } + if (!TestUtils.tryExecuteNonQueryWithRetry( + senderEnv, "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)")) { + return; } try (Connection connection = receiverEnv.getConnection(); From 8fe3da8ee332c65613fbe403f746e115efbb397e Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Thu, 23 Nov 2023 11:33:53 +0800 Subject: [PATCH 2/8] Refactor --- .../iotdb/pipe/it/IoTDBPipeClusterIT.java | 120 ++++++++++++++---- .../iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 25 +++- .../pipe/it/IoTDBPipeThriftSourceIT.java | 9 +- .../plugin/builtin/BuiltinPipePlugin.java | 44 ++++--- 4 files changed, 149 insertions(+), 49 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java index 632e74e19eba7..ad01d57737a91 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java @@ -20,6 +20,7 @@ package org.apache.iotdb.pipe.it; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.RegionRoleType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; @@ -46,6 +47,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -236,13 +238,23 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { for (int i = 0; i < 3; ++i) { if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) { leaderIndex = i; - senderEnv.shutdownDataNode(i); + try { + senderEnv.shutdownDataNode(i); + } catch (Exception e) { + e.printStackTrace(); + return; + } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException ignored) { } - senderEnv.startDataNode(i); - ((AbstractEnv) senderEnv).testWorkingNoUnknown(); + try { + senderEnv.startDataNode(i); + ((AbstractEnv) senderEnv).testWorkingNoUnknown(); + } catch (Exception e) { + e.printStackTrace(); + return; + } } } if (leaderIndex == -1) { // ensure the leader is stopped @@ -267,8 +279,13 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { Collections.singleton("2,")); } - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); + try { + TestUtils.restartCluster(senderEnv); + TestUtils.restartCluster(receiverEnv); + } catch (Exception e) { + e.printStackTrace(); + return; + } try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -348,7 +365,12 @@ public void testPipeAfterRegisterNewDataNode() throws Exception { return; } - senderEnv.registerNewDataNode(true); + try { + senderEnv.registerNewDataNode(true); + } catch (Exception e) { + e.printStackTrace(); + return; + } DataNodeWrapper newDataNode = senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1); if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry( @@ -366,8 +388,13 @@ public void testPipeAfterRegisterNewDataNode() throws Exception { Collections.singleton("2,")); } - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); + try { + TestUtils.restartCluster(senderEnv); + TestUtils.restartCluster(receiverEnv); + } catch (Exception e) { + e.printStackTrace(); + return; + } try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -438,8 +465,9 @@ public void testCreatePipeWhenRegisteringNewDataNode() throws Exception { .setExtractorAttributes(extractorAttributes) .setProcessorAttributes(processorAttributes)); } catch (TException e) { + // Not sure if the "createPipe" has succeeded e.printStackTrace(); - fail(e.getMessage()); + return; } try { Thread.sleep(100); @@ -448,7 +476,12 @@ public void testCreatePipeWhenRegisteringNewDataNode() throws Exception { } }); t.start(); - senderEnv.registerNewDataNode(true); + try { + senderEnv.registerNewDataNode(true); + } catch (Exception e) { + e.printStackTrace(); + return; + } t.join(); } @@ -504,7 +537,12 @@ public void testRegisteringNewDataNodeWhenTransferringData() throws Exception { } }); t.start(); - senderEnv.registerNewDataNode(true); + try { + senderEnv.registerNewDataNode(true); + } catch (Exception e) { + e.printStackTrace(); + return; + } t.join(); TestUtils.assertDataOnEnv( @@ -513,8 +551,12 @@ public void testRegisteringNewDataNodeWhenTransferringData() throws Exception { "count(root.db.d1.s1),", Collections.singleton(succeedNum.get() + ",")); - senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); - senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1); + try { + senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); + senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1); + } catch (Exception e) { + e.printStackTrace(); + } } } @@ -554,7 +596,12 @@ public void testRegisteringNewDataNodeAfterTransferringData() throws Exception { } } - senderEnv.registerNewDataNode(true); + try { + senderEnv.registerNewDataNode(true); + } catch (Exception e) { + e.printStackTrace(); + return; + } TestUtils.assertDataOnEnv( receiverEnv, @@ -562,8 +609,12 @@ public void testRegisteringNewDataNodeAfterTransferringData() throws Exception { "count(root.db.d1.s1),", Collections.singleton(succeedNum + ",")); - senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); - senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1); + try { + senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); + senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1); + } catch (Exception e) { + e.printStackTrace(); + } } } @@ -609,11 +660,16 @@ public void testNewDataNodeFailureParallelToTransferringData() throws Exception } } - senderEnv.registerNewDataNode(false); - senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1); - senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); - senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1); - ((AbstractEnv) senderEnv).testWorkingNoUnknown(); + try { + senderEnv.registerNewDataNode(false); + senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1); + senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); + senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1); + ((AbstractEnv) senderEnv).testWorkingNoUnknown(); + } catch (Exception e) { + e.printStackTrace(); + return; + } TestUtils.assertDataOnEnv( receiverEnv, @@ -667,7 +723,12 @@ public void testSenderRestartWhenTransferring() throws Exception { return; } - TestUtils.restartCluster(senderEnv); + try { + TestUtils.restartCluster(senderEnv); + } catch (Exception e) { + e.printStackTrace(); + return; + } TestUtils.assertDataOnEnv( receiverEnv, @@ -708,7 +769,12 @@ public void testConcurrentlyCreatePipeOfSameName() throws Exception { if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { successCount.updateAndGet(v -> v + 1); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TException | ClientManagerException | IOException e) { + e.printStackTrace(); } catch (Exception e) { + // Fail iff pipe exception occurs e.printStackTrace(); fail(e.getMessage()); } @@ -733,7 +799,12 @@ public void testConcurrentlyCreatePipeOfSameName() throws Exception { if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { successCount.updateAndGet(v -> v + 1); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TException | ClientManagerException | IOException e) { + e.printStackTrace(); } catch (Exception e) { + // Fail iff pipe exception occurs e.printStackTrace(); fail(e.getMessage()); } @@ -798,7 +869,12 @@ private void testCreatePipesWithSameConnector(int pipeCount) throws Exception { .setProcessorAttributes(processorAttributes)); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TException | ClientManagerException | IOException e) { + e.printStackTrace(); } catch (Exception e) { + // Fail iff pipe exception occurs e.printStackTrace(); fail(e.getMessage()); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java index 166e0a5fd2c6b..b314993d0a2c4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java @@ -447,8 +447,13 @@ public void testLifeCycleWithClusterRestart() throws Exception { receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet); } - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); + try { + TestUtils.restartCluster(senderEnv); + TestUtils.restartCluster(receiverEnv); + } catch (Exception e) { + e.printStackTrace(); + return; + } try (SyncConfigNodeIServiceClient ignored = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -511,7 +516,12 @@ public void testReceiverRestartWhenTransferring() throws Exception { }); t.start(); - TestUtils.restartCluster(receiverEnv); + try { + TestUtils.restartCluster(receiverEnv); + } catch (Exception e) { + e.printStackTrace(); + return; + } t.join(); TestUtils.assertDataOnEnv( @@ -680,8 +690,13 @@ public void testDoubleLiving() throws Exception { TestUtils.assertDataOnEnv( receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet); - TestUtils.restartCluster(senderEnv); - TestUtils.restartCluster(receiverEnv); + try { + TestUtils.restartCluster(senderEnv); + TestUtils.restartCluster(receiverEnv); + } catch (Exception e) { + e.printStackTrace(); + return; + } for (int i = 400; i < 500; ++i) { if (!TestUtils.tryExecuteNonQueryWithRetry( diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java index df1fba0a7a017..906a608e997c4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java @@ -215,11 +215,16 @@ public void testInsertNull() throws Exception { await() .atMost(600, TimeUnit.SECONDS) .untilAsserted( - () -> + () -> { + try { TestUtils.assertResultSetEqual( statement.executeQuery("select * from root.**"), "Time,root.sg.d1.s0,root.sg.d1.s1,", - Collections.singleton("3,null,25.34,"))); + Collections.singleton("3,null,25.34,")); + } catch (Exception e) { + Assert.fail(); + } + }); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index 96c6bbae38f0e..ee10bf0e760c3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -31,6 +31,9 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; +import com.sun.tools.javac.util.List; + +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -88,24 +91,25 @@ public String getClassName() { return className; } - public static final Set SHOW_PIPE_PLUGINS_BLACKLIST = new HashSet<>(); - - static { - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName().toUpperCase()); - - SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase()); - - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase()); - SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName().toUpperCase()); - } + public static final Set SHOW_PIPE_PLUGINS_BLACKLIST = + Collections.unmodifiableSet( + new HashSet<>( + List.of( + // Extractors + IOTDB_EXTRACTOR.getPipePluginName().toUpperCase(), + // Connectors + DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(), + IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(), + IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), + IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), + IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), + IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), + WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), + OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), + WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), + // Sinks + IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(), + IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), + IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), + WEBSOCKET_SINK.getPipePluginName().toUpperCase()))); } From a6d5f47f41cd1bb8ae1c5ef3a81bbe27a8626d10 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Fri, 24 Nov 2023 15:42:47 +0800 Subject: [PATCH 3/8] Refactor --- .../apache/iotdb/db/it/utils/TestUtils.java | 49 +++ .../iotdb/pipe/it/IoTDBPipeClusterIT.java | 71 ++-- .../pipe/it/IoTDBPipeConnectorParallelIT.java | 19 +- .../iotdb/pipe/it/IoTDBPipeExtractorIT.java | 340 +++++------------- .../iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 8 +- .../iotdb/pipe/it/IoTDBPipeProtocolIT.java | 8 +- .../pipe/it/IoTDBPipeThriftSourceIT.java | 48 +-- .../plugin/builtin/BuiltinPipePlugin.java | 42 +-- 8 files changed, 209 insertions(+), 376 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java index 8c0965b951aad..f7aac47b65f56 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java @@ -411,6 +411,31 @@ public static boolean tryExecuteNonQueryWithRetry(BaseEnv env, String sql) { return false; } + // This method will not throw failure given that a failure is encountered. + // Instead, it return a flag to indicate the result of the execution. + public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env, List sqlList) { + for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) { + try (Connection connection = env.getConnection(); + Statement statement = connection.createStatement()) { + for (String sql : sqlList) { + statement.execute(sql); + } + return true; + } catch (SQLException e) { + if (retryCountLeft > 0) { + try { + Thread.sleep(10000); + } catch (InterruptedException ignored) { + } + } else { + e.printStackTrace(); + return false; + } + } + } + return false; + } + public static void executeNonQueryOnSpecifiedDataNodeWithRetry( BaseEnv env, DataNodeWrapper wrapper, String sql) { for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) { @@ -456,6 +481,30 @@ public static boolean tryExecuteNonQueryOnSpecifiedDataNodeWithRetry( return false; } + public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry( + BaseEnv env, DataNodeWrapper wrapper, List sqlList) { + for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) { + try (Connection connection = env.getConnectionWithSpecifiedDataNode(wrapper); + Statement statement = connection.createStatement()) { + for (String sql : sqlList) { + statement.execute(sql); + } + return true; + } catch (SQLException e) { + if (retryCountLeft > 0) { + try { + Thread.sleep(10000); + } catch (InterruptedException ignored) { + } + } else { + e.printStackTrace(); + return false; + } + } + } + return false; + } + public static void executeQuery(String sql) { executeQuery(sql, "root", "root"); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java index ad01d57737a91..8ead12976c231 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java @@ -49,6 +49,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -120,18 +121,14 @@ public void testWithAllParameters(String realtimeMode) throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1(time, s1) values (2010-01-01T10:00:00+08:00, 1)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1(time, s1) values (2010-01-02T10:00:00+08:00, 2)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s1) values (2010-01-01T10:00:00+08:00, 1)", + "insert into root.db.d1(time, s1) values (2010-01-02T10:00:00+08:00, 2)", + "flush"))) { return; } - Map extractorAttributes = new HashMap<>(); Map processorAttributes = new HashMap<>(); Map connectorAttributes = new HashMap<>(); @@ -169,11 +166,9 @@ public void testWithAllParameters(String realtimeMode) throws Exception { "count(root.db.d1.s1),", Collections.singleton("1,")); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList("insert into root.db.d1(time, s1) values (now(), 3)", "flush"))) { return; } @@ -215,11 +210,8 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values (1, 1)", "flush"))) { return; } @@ -261,14 +253,10 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { fail(); } - if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry( + if (!TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry( senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), - "insert into root.db.d1(time, s1) values (2, 2)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry( - senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush")) { + Arrays.asList("insert into root.db.d1(time, s1) values (2, 2)", "flush"))) { return; } @@ -311,11 +299,8 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, Arrays.asList("insert into root.db.d2(time, s1) values (1, 1)", "flush"))) { return; } @@ -357,11 +342,8 @@ public void testPipeAfterRegisterNewDataNode() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values (1, 1)", "flush"))) { return; } @@ -373,12 +355,10 @@ public void testPipeAfterRegisterNewDataNode() throws Exception { } DataNodeWrapper newDataNode = senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1); - if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry( - senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2, 2)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry( - senderEnv, newDataNode, "flush")) { + if (!TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry( + senderEnv, + newDataNode, + Arrays.asList("insert into root.db.d1(time, s1) values (2, 2)", "flush"))) { return; } TestUtils.assertDataOnEnv( @@ -420,11 +400,8 @@ public void testPipeAfterRegisterNewDataNode() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, Arrays.asList("insert into root.db.d2(time, s1) values (1, 1)", "flush"))) { return; } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java index 70c8fd0bf8d31..271957d61f90b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java @@ -37,9 +37,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -101,14 +99,13 @@ public void testIoTConnectorParallel() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - try (Connection connection = senderEnv.getConnection(); - Statement statement = connection.createStatement()) { - statement.execute("insert into root.sg1.d1(time, s1) values (0, 1)"); - statement.execute("insert into root.sg1.d1(time, s1) values (1, 2)"); - statement.execute("insert into root.sg1.d1(time, s1) values (2, 3)"); - statement.execute("insert into root.sg1.d1(time, s1) values (3, 4)"); - } catch (SQLException e) { - e.printStackTrace(); + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.sg1.d1(time, s1) values (0, 1)", + "insert into root.sg1.d1(time, s1) values (1, 2)", + "insert into root.sg1.d1(time, s1) values (2, 3)", + "insert into root.sg1.d1(time, s1) values (3, 4)"))) { return; } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java index 81bad049e57a3..0c3f3425edaf2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java @@ -48,10 +48,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import static org.awaitility.Awaitility.await; import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @@ -211,53 +208,23 @@ public void testExtractorPatternMatch() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.nonAligned.1TS (time, s_float) values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.nonAligned.100TS (time, s_float) values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.nonAligned.1000TS (time, s_float) values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.nonAligned.`1(TS)` (time, s_float) values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( + if (!TestUtils.tryExecuteNonQueriesWithRetry( senderEnv, - "insert into root.nonAligned.6TS.`6` (" - + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) " - + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.aligned.1TS (time, s_float) aligned values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, - "insert into root.aligned.100TS (time, s_float) aligned values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, - "insert into root.aligned.1000TS (time, s_float) aligned values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, - "insert into root.aligned.`1(TS)` (time, s_float) aligned values (now(), 0.5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, - "insert into root.aligned.6TS.`6` (" - + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) " - + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)")) { + Arrays.asList( + "insert into root.nonAligned.1TS (time, s_float) values (now(), 0.5)", + "insert into root.nonAligned.100TS (time, s_float) values (now(), 0.5)", + "insert into root.nonAligned.1000TS (time, s_float) values (now(), 0.5)", + "insert into root.nonAligned.`1(TS)` (time, s_float) values (now(), 0.5)", + "insert into root.nonAligned.6TS.`6` (" + + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) " + + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)", + "insert into root.aligned.1TS (time, s_float) aligned values (now(), 0.5)", + "insert into root.aligned.100TS (time, s_float) aligned values (now(), 0.5)", + "insert into root.aligned.1000TS (time, s_float) aligned values (now(), 0.5)", + "insert into root.aligned.`1(TS)` (time, s_float) aligned values (now(), 0.5)", + "insert into root.aligned.6TS.`6` (" + + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) " + + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)"))) { return; } @@ -304,36 +271,22 @@ public void testExtractorPatternMatch() throws Exception { assertTimeseriesCountOnReceiver(receiverEnv, expectedTimeseriesCount.get(i)); } - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - Set expectedDevices = new HashSet<>(); - expectedDevices.add("root.nonAligned.1TS,false,"); - expectedDevices.add("root.nonAligned.100TS,false,"); - expectedDevices.add("root.nonAligned.1000TS,false,"); - expectedDevices.add("root.nonAligned.`1(TS)`,false,"); - expectedDevices.add("root.nonAligned.6TS.`6`,false,"); - expectedDevices.add("root.aligned.1TS,true,"); - expectedDevices.add("root.aligned.100TS,true,"); - expectedDevices.add("root.aligned.1000TS,true,"); - expectedDevices.add("root.aligned.`1(TS)`,true,"); - expectedDevices.add("root.aligned.6TS.`6`,true,"); - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("show devices"), - "Device,IsAligned,", - expectedDevices); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + TestUtils.assertDataOnEnv( + receiverEnv, + "show devices", + "Device,IsAligned,", + new HashSet<>( + Arrays.asList( + "root.nonAligned.1TS,false,", + "root.nonAligned.100TS,false,", + "root.nonAligned.1000TS,false,", + "root.nonAligned.`1(TS)`,false,", + "root.nonAligned.6TS.`6`,false,", + "root.aligned.1TS,true,", + "root.aligned.100TS,true,", + "root.aligned.1000TS,true,", + "root.aligned.`1(TS)`,true,", + "root.aligned.6TS.`6`,true,"))); } } @@ -367,15 +320,12 @@ public void testMatchingMultipleDatabases() throws Exception { TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); assertTimeseriesCountOnReceiver(receiverEnv, 0); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db1.d1 (time, at1) values (1, 10)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db2.d1 (time, at1) values (1, 20)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.db1.d1 (time, at1) values (1, 10)", + "insert into root.db2.d1 (time, at1) values (1, 20)", + "flush"))) { return; } @@ -395,15 +345,12 @@ public void testMatchingMultipleDatabases() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p2").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db1.d1 (time, at1) values (2, 11)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db2.d1 (time, at1) values (2, 21)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.db1.d1 (time, at1) values (2, 11)", + "insert into root.db2.d1 (time, at1) values (2, 21)", + "flush"))) { return; } @@ -417,25 +364,11 @@ public void testMatchingMultipleDatabases() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p3").getCode()); - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select count(*) from root.**"), - "count(root.db1.d1.at1),count(root.db2.d1.at1),", - Collections.singleton("2,2,")); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db1.d1.at1),count(root.db2.d1.at1),", + Collections.singleton("2,2,")); } } @@ -448,23 +381,14 @@ public void testHistoryAndRealtime() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1 (time, at1) values (1, 10)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d2 (time, at1) values (1, 20)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d3 (time, at1) values (1, 30)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d4 (time, at1) values (1, 40)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.db.d1 (time, at1) values (1, 10)", + "insert into root.db.d2 (time, at1) values (1, 20)", + "insert into root.db.d3 (time, at1) values (1, 30)", + "insert into root.db.d4 (time, at1) values (1, 40)", + "flush"))) { return; } @@ -524,55 +448,25 @@ public void testHistoryAndRealtime() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p4").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1 (time, at1) values (2, 11)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d2 (time, at1) values (2, 21)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d3 (time, at1) values (2, 31)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d4 (time, at1) values (2, 41)")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.db.d1 (time, at1) values (2, 11)", + "insert into root.db.d3 (time, at1) values (2, 31)", + "insert into root.db.d4 (time, at1) values (2, 41)"))) { return; } - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select count(*) from root.** where time <= 1"), - "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),", - Collections.singleton("1,0,1,")); - } catch (Exception e) { - Assert.fail(); - } - }); - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select count(*) from root.** where time >= 2"), - "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),", - Collections.singleton("1,1,0,")); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.** where time <= 1", + "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),", + Collections.singleton("1,0,1,")); + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.** where time >= 2", + "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),", + Collections.singleton("1,1,0,")); } } @@ -585,19 +479,14 @@ public void testStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Exceptio try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, - "insert into root.db.d1 (time, at1)" - + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( + if (!TestUtils.tryExecuteNonQueriesWithRetry( senderEnv, - "insert into root.db.d2 (time, at1)" - + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + Arrays.asList( + "insert into root.db.d1 (time, at1)" + + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)", + "insert into root.db.d2 (time, at1)" + + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)", + "flush"))) { return; } @@ -624,25 +513,11 @@ public void testStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Exceptio Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select count(*) from root.**"), - "count(root.db.d1.at1),", - Collections.singleton("3,")); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db.d1.at1),", + Collections.singleton("3,")); extractorAttributes.remove("extractor.pattern"); status = @@ -654,47 +529,16 @@ public void testStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Exceptio Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select count(*) from root.**"), - "count(root.db.d1.at1),count(root.db.d2.at1),", - Collections.singleton("3,3,")); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db.d1.at1),count(root.db.d2.at1),", + Collections.singleton("3,3,")); } } private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count) { - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("count timeseries"), - "count(timeseries),", - Collections.singleton(count + ",")); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + TestUtils.assertDataOnEnv( + receiverEnv, "count timeseries", "count(timeseries),", Collections.singleton(count + ",")); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java index b314993d0a2c4..2cd8acc60c2b1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java @@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -149,11 +150,8 @@ public void testLifeCycleWithHistoryDisabled() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values (1, 1)", "flush"))) { return; } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java index 1290e46b94ac4..4c0eb1a5a5df0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java @@ -39,6 +39,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -413,11 +414,8 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values (2, 2)", "flush"))) { return; } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java index 906a608e997c4..dd85807ba191d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java @@ -37,15 +37,10 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.sql.Connection; -import java.sql.Statement; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.awaitility.Awaitility.await; -import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2.class}) @@ -196,39 +191,20 @@ public void testInsertNull() throws Exception { Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - if (!TestUtils.tryExecuteNonQueryWithRetry( - receiverEnv, "create aligned timeseries root.sg.d1(s0 float, s1 float)")) { - return; - } - - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "create aligned timeseries root.sg.d1(s0 float, s1 float)")) { - return; - } - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)")) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + receiverEnv, + Arrays.asList( + "create aligned timeseries root.sg.d1(s0 float, s1 float)", + "create aligned timeseries root.sg.d1(s0 float, s1 float)", + "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"))) { return; } - try (Connection connection = receiverEnv.getConnection(); - Statement statement = connection.createStatement()) { - await() - .atMost(600, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try { - TestUtils.assertResultSetEqual( - statement.executeQuery("select * from root.**"), - "Time,root.sg.d1.s0,root.sg.d1.s1,", - Collections.singleton("3,null,25.34,")); - } catch (Exception e) { - Assert.fail(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + TestUtils.assertDataOnEnv( + receiverEnv, + "select * from root.**", + "Time,root.sg.d1.s0,root.sg.d1.s1,", + Collections.singleton("3,null,25.34,")); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index ee10bf0e760c3..67ff6d140e5ac 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -31,9 +31,6 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor; import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor; -import com.sun.tools.javac.util.List; - -import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -91,25 +88,22 @@ public String getClassName() { return className; } - public static final Set SHOW_PIPE_PLUGINS_BLACKLIST = - Collections.unmodifiableSet( - new HashSet<>( - List.of( - // Extractors - IOTDB_EXTRACTOR.getPipePluginName().toUpperCase(), - // Connectors - DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), - WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), - WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), - // Sinks - IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(), - IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), - IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), - WEBSOCKET_SINK.getPipePluginName().toUpperCase()))); + public static final Set SHOW_PIPE_PLUGINS_BLACKLIST = new HashSet<>(); + + static { + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName().toUpperCase()); + } } From f40b705c85eec5e5ef3bedc171c1a36b7b21dfeb Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Fri, 24 Nov 2023 17:42:13 +0800 Subject: [PATCH 4/8] Update IoTDBPipeThriftSinkIT.java --- ...{IoTDBPipeThriftSourceIT.java => IoTDBPipeThriftSinkIT.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename integration-test/src/test/java/org/apache/iotdb/pipe/it/{IoTDBPipeThriftSourceIT.java => IoTDBPipeThriftSinkIT.java} (99%) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSinkIT.java similarity index 99% rename from integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java rename to integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSinkIT.java index dd85807ba191d..cd347c434d9b0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSinkIT.java @@ -44,7 +44,7 @@ @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2.class}) -public class IoTDBPipeThriftSourceIT { +public class IoTDBPipeThriftSinkIT { private BaseEnv senderEnv; private BaseEnv receiverEnv; From 5900ce2f653e834b0ff45e88b5a2f29a99a06afe Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 27 Nov 2023 10:29:36 +0800 Subject: [PATCH 5/8] Added legacy and forced-log mode test --- ...ftSinkIT.java => IoTDBPipeDataSinkIT.java} | 51 +------------------ .../iotdb/pipe/it/IoTDBPipeProtocolIT.java | 33 ++++++++++++ 2 files changed, 34 insertions(+), 50 deletions(-) rename integration-test/src/test/java/org/apache/iotdb/pipe/it/{IoTDBPipeThriftSinkIT.java => IoTDBPipeDataSinkIT.java} (74%) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java similarity index 74% rename from integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSinkIT.java rename to integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java index cd347c434d9b0..912d451442345 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeThriftSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java @@ -44,7 +44,7 @@ @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2.class}) -public class IoTDBPipeThriftSinkIT { +public class IoTDBPipeDataSinkIT { private BaseEnv senderEnv; private BaseEnv receiverEnv; @@ -114,55 +114,6 @@ public void testThriftConnector() throws Exception { } } - @Test - public void testLegacyConnector() throws Exception { - DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); - - String receiverIp = receiverDataNode.getIp(); - int receiverPort = receiverDataNode.getPort(); - - try (SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - Map extractorAttributes = new HashMap<>(); - Map processorAttributes = new HashMap<>(); - Map connectorAttributes = new HashMap<>(); - - extractorAttributes.put("source.realtime.mode", "log"); - - connectorAttributes.put("sink", "iotdb-legacy-pipe-sink"); - connectorAttributes.put("sink.batch.enable", "false"); - connectorAttributes.put("sink.ip", receiverIp); - connectorAttributes.put("sink.port", Integer.toString(receiverPort)); - - // This version does not matter since it's no longer checked by the legacy receiver - connectorAttributes.put("sink.version", "1.3"); - - TSStatus status = - client.createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(extractorAttributes) - .setProcessorAttributes(processorAttributes)); - - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - - // Do not fail if the failure has nothing to do with pipe - // Because the failures will randomly generate due to resource limitation - if (!TestUtils.tryExecuteNonQueryWithRetry( - senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) { - return; - } - - TestUtils.assertDataOnEnv( - receiverEnv, - "select * from root.**", - "Time,root.vehicle.d0.s1,", - Collections.singleton("0,1.0,")); - } - } - @Test public void testInsertNull() throws Exception { DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java index 4c0eb1a5a5df0..d5531613b7a28 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java @@ -343,6 +343,11 @@ public void testAsyncConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } + @Test + public void testLegacyConnectorUseNodeUrls() throws Exception { + doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName()); + } + @Test public void testAirGapConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); @@ -403,6 +408,9 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { connectorAttributes.put("connector.batch.enable", "false"); connectorAttributes.put("connector.node-urls", nodeUrlsBuilder.toString()); + // Test forced-log mode, in TimechoDB this might be "file" + extractorAttributes.put("source.realtime.mode", "forced-log"); + TSStatus status = client.createPipe( new TCreatePipeReq("p1", connectorAttributes) @@ -424,6 +432,31 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { "select count(*) from root.**", "count(root.db.d1.s1),", Collections.singleton("2,")); + + // Test file mode + extractorAttributes.replace("source.realtime.mode", "file"); + + status = + client.createPipe( + new TCreatePipeReq("p2", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + System.out.println(status.getMessage()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode()); + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values (3, 3)", "flush"))) { + return; + } + + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db.d1.s1),", + Collections.singleton("3,")); } } } From 63bfe89e426183f1bb194200013bdb3b1c4907be Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 27 Nov 2023 12:27:02 +0800 Subject: [PATCH 6/8] Update BuiltinPipePlugin.java --- .../iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java index 67ff6d140e5ac..96c6bbae38f0e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java @@ -92,6 +92,7 @@ public String getClassName() { static { SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase()); SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase()); SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase()); @@ -101,6 +102,7 @@ public String getClassName() { SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase()); SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName().toUpperCase()); SHOW_PIPE_PLUGINS_BLACKLIST.add(WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase()); + SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase()); SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase()); SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase()); From c9d046fbf80c589cba7b9f8ec15fb48179fe30d4 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 27 Nov 2023 15:09:44 +0800 Subject: [PATCH 7/8] Bug fix --- .../iotdb/pipe/it/IoTDBPipeDataSinkIT.java | 49 +++++++++++++++++++ .../iotdb/pipe/it/IoTDBPipeExtractorIT.java | 1 + 2 files changed, 50 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java index 912d451442345..52499cd14d40f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java @@ -114,6 +114,55 @@ public void testThriftConnector() throws Exception { } } + @Test + public void testLegacyConnector() throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("source.realtime.mode", "log"); + + connectorAttributes.put("sink", "iotdb-legacy-pipe-sink"); + connectorAttributes.put("sink.batch.enable", "false"); + connectorAttributes.put("sink.ip", receiverIp); + connectorAttributes.put("sink.port", Integer.toString(receiverPort)); + + // This version does not matter since it's no longer checked by the legacy receiver + connectorAttributes.put("sink.version", "1.3"); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + + // Do not fail if the failure has nothing to do with pipe + // Because the failures will randomly generate due to resource limitation + if (!TestUtils.tryExecuteNonQueryWithRetry( + senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) { + return; + } + + TestUtils.assertDataOnEnv( + receiverEnv, + "select * from root.**", + "Time,root.vehicle.d0.s1,", + Collections.singleton("0,1.0,")); + } + } + @Test public void testInsertNull() throws Exception { DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java index 0c3f3425edaf2..a8f0828bfa9cc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java @@ -452,6 +452,7 @@ public void testHistoryAndRealtime() throws Exception { senderEnv, Arrays.asList( "insert into root.db.d1 (time, at1) values (2, 11)", + "insert into root.db.d2 (time, at1) values (2, 21)", "insert into root.db.d3 (time, at1) values (2, 31)", "insert into root.db.d4 (time, at1) values (2, 41)"))) { return; From a0f056ecabdc45556e3a23a58e5069e2c906bc89 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 27 Nov 2023 15:33:26 +0800 Subject: [PATCH 8/8] Delete legacy protocol --- .../java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java index d5531613b7a28..6bcaee39b56e2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java @@ -343,11 +343,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } - @Test - public void testLegacyConnectorUseNodeUrls() throws Exception { - doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName()); - } - @Test public void testAirGapConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());