From 6006036ead4210aee1dc6e806a9abcd661e0aff2 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 17 Dec 2025 13:36:49 +0800 Subject: [PATCH] Pipe: Improved the total performance by disable some useless logic (#16916) --- .github/workflows/pipe-it-2cluster.yml | 539 ------------------ .../it/autocreate/IoTDBPipeProtocolIT.java | 16 +- .../PipeConfigRegionSinkConstructor.java | 6 - .../PipeDataRegionProcessorConstructor.java | 30 - .../PipeDataRegionSinkConstructor.java | 12 - .../PipeSchemaRegionSinkConstructor.java | 6 - .../dataregion/IoTDBDataRegionSource.java | 45 +- .../PipeRealtimeDataRegionSource.java | 10 +- .../plugin/builtin/BuiltinPipePlugin.java | 40 -- .../commons/pipe/source/IoTDBSource.java | 10 +- 10 files changed, 30 insertions(+), 684 deletions(-) delete mode 100644 .github/workflows/pipe-it-2cluster.yml diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml deleted file mode 100644 index 6c5f11bc35c61..0000000000000 --- a/.github/workflows/pipe-it-2cluster.yml +++ /dev/null @@ -1,539 +0,0 @@ -name: Multi-Cluster IT - -on: - push: - branches: - - master - - 'rel/1.*' - - 'rc/1.*' - - 'dev/1.*' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - pull_request: - branches: - - master - - 'rel/1.*' - - 'rc/1.*' - - 'dev/1.*' - paths-ignore: - - 'docs/**' - - 'site/**' - - 'iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/**' #queryengine - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - MAVEN_ARGS: --batch-mode --no-transfer-progress - DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - -jobs: - auto-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2AutoCreateSchema \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-auto-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - manual-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [HighPerformanceMode] - cluster2: [HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2ManualCreateSchema \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-manual-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionArchVerification \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-regression-consumer: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionRegressionConsumer \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-regression-consumer-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - subscription-regression-misc: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionRegressionMisc \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - triple: - strategy: - fail-fast: false - max-parallel: 1 - matrix: - java: [ 17 ] - cluster1: [ ScalableSingleNodeMode ] - cluster2: [ ScalableSingleNodeMode ] - cluster3: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Sleep for a random duration between 0 and 10000 milliseconds - run: | - sleep $(( $(( RANDOM % 10000 + 1 )) / 1000)) - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - retry() { - local -i max_attempts=3 - local -i attempt=1 - local -i retry_sleep=5 - local test_output - while [ $attempt -le $max_attempts ]; do - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }},${{ matrix.cluster3 }} \ - -pl integration-test \ - -am -PMultiClusterIT3 \ - -ntp >> ~/run-tests-$attempt.log && return 0 - test_output=$(cat ~/run-tests-$attempt.log) - echo "==================== BEGIN: ~/run-tests-$attempt.log ====================" - echo "$test_output" - echo "==================== END: ~/run-tests-$attempt.log ======================" - if ! mv ~/run-tests-$attempt.log integration-test/target/cluster-logs/ 2>/dev/null; then - echo "Failed to move log file ~/run-tests-$attempt.log to integration-test/target/cluster-logs/. Skipping..." - fi - if echo "$test_output" | grep -q "Could not transfer artifact"; then - if [ $attempt -lt $max_attempts ]; then - echo "Test failed with artifact transfer issue, attempt $attempt. Retrying in $retry_sleep seconds..." - sleep $retry_sleep - attempt=$((attempt + 1)) - else - echo "Test failed after $max_attempts attempts due to artifact transfer issue." - echo "Treating this as a success because the issue is likely transient." - return 0 - fi - elif [ $? -ne 0 ]; then - echo "Test failed with a different error." - return 1 - else - echo "Tests passed" - return 0 - fi - done - } - retry - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-triple-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}-${{ matrix.cluster3 }} - path: integration-test/target/cluster-logs - retention-days: 30 diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java index dfaa2b73b0e63..97d6db008774e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java @@ -332,11 +332,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception { doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); } - @Test - public void testAirGapConnectorUseNodeUrls() throws Exception { - doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); - } - private void doTestUseNodeUrls(String connectorName) throws Exception { senderEnv .getConfig() @@ -371,16 +366,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { final StringBuilder nodeUrlsBuilder = new StringBuilder(); for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) { - if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) { - // Use default port for convenience - nodeUrlsBuilder - .append(wrapper.getIp()) - .append(":") - .append(wrapper.getPipeAirGapReceiverPort()) - .append(","); - } else { - nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); - } + nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(","); } try (final SyncConfigNodeIServiceClient client = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java index c7222e9a1bc97..82c394c7fb545 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; -import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionAirGapSink; import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionSink; import org.apache.iotdb.pipe.api.PipeConnector; @@ -41,9 +40,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBConfigRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBConfigRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); @@ -55,8 +51,6 @@ protected void initConstructors() { BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBConfigRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 44c6ef17800b5..9e0ba96d1976c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -21,17 +21,9 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor; -import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; -import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor; class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { @@ -43,28 +35,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(), - TumblingTimeSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(), - SwingingDoorTrendingSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(), - ChangingValueSamplingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(), - ThrowingExceptionProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(), - StandardStatisticsOperatorProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(), - TumblingWindowingProcessor::new); - pluginConstructors.put( - BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new); pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), PipeConsensusProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index 536cf71cdb8db..09773d0cad52e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -23,10 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; -import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; -import org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaSink; -import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; @@ -59,13 +56,8 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBDataRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); pluginConstructors.put( @@ -82,12 +74,8 @@ protected void initConstructors() { IoTDBDataRegionAsyncSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBDataRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaSink::new); - pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); pluginConstructors.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java index 160ecf54c0158..a7752994ee264 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; -import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink; import org.apache.iotdb.pipe.api.PipeConnector; @@ -41,9 +40,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(), IoTDBSchemaRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), - IoTDBSchemaRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); @@ -55,8 +51,6 @@ protected void initConstructors() { BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new); - pluginConstructors.put( - BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBSchemaRegionAirGapSink::new); pluginConstructors.put( BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index 77195ab58df08..a1ae842e922ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePatternOperations; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.source.IoTDBSource; @@ -45,6 +47,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; @@ -100,6 +103,21 @@ public class IoTDBDataRegionSource extends IoTDBSource { public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); + final boolean forwardingPipeRequests = + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, + PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), + PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + if (!forwardingPipeRequests) { + throw new PipeParameterNotValidException( + String.format( + "The parameter %s cannot be set to false.", + PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY)); + } + final Pair insertionDeletionListeningOptionPair = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( validator.getParameters()); @@ -163,26 +181,6 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)); - // Validate source.realtime.mode - if (validator - .getParameters() - .getBooleanOrDefault( - Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), - EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE) - || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY)) { - validator.validateAttributeValueRange( - validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY) - ? EXTRACTOR_REALTIME_MODE_KEY - : SOURCE_REALTIME_MODE_KEY, - true, - EXTRACTOR_REALTIME_MODE_FILE_VALUE, - EXTRACTOR_REALTIME_MODE_HYBRID_VALUE, - EXTRACTOR_REALTIME_MODE_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE, - EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); - } - // Validate source.start-time and source.end-time if (validator .getParameters() @@ -260,6 +258,13 @@ private void constructRealtimeExtractor(final PipeParameters parameters) { return; } + if (!(pipeName != null + && (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX) + || pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) { + realtimeExtractor = new PipeRealtimeDataRegionTsFileSource(); + return; + } + // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { realtimeExtractor = new PipeRealtimeDataRegionHybridSource(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index b9bc57b977a65..9289899738cf5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -107,7 +106,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { private final AtomicReference> dataRegionTimePartitionIdBound = new AtomicReference<>(); - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; private boolean shouldTransferModFile; // Whether to transfer mods @@ -234,12 +233,7 @@ public void customize( ? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) : TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1; - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + isForwardingPipeRequests = true; shouldTransferModFile = parameters.getBooleanOrDefault( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 155e5c7ea054a..2af0610b0e97b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -19,26 +19,15 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.airgap.IoTDBAirGapSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.consensus.PipeConsensusAsyncSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBLegacyPipeSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftAsyncSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSslSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSyncSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda.OpcDaSink; -import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcua.OpcUaSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSocketSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource; @@ -60,18 +49,8 @@ public enum BuiltinPipePlugin { // processors DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class), - TUMBLING_TIME_SAMPLING_PROCESSOR( - "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class), - SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class), - CHANGING_VALUE_SAMPLING_PROCESSOR( - "changing-value-sampling-processor", ChangingValueSamplingProcessor.class), - THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class), - AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class), - COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class), // Hidden-processors, which are plugins of the processors - STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), - TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", PipeConsensusProcessor.class), // connectors @@ -81,12 +60,9 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", IoTDBThriftSyncSink.class), IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncSink.class), IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeSink.class), - IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class), PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", PipeConsensusAsyncSink.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), - OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class), - OPC_DA_CONNECTOR("opc-da-connector", OpcDaSink.class), WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class), DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class), @@ -95,10 +71,7 @@ public enum BuiltinPipePlugin { IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncSink.class), IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", IoTDBThriftAsyncSink.class), IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeSink.class), - IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapSink.class), WEBSOCKET_SINK("websocket-sink", WebSocketSink.class), - OPC_UA_SINK("opc-ua-sink", OpcUaSink.class), - OPC_DA_SINK("opc-da-sink", OpcDaSink.class), WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncSink.class), @@ -136,14 +109,6 @@ public String getClassName() { // Sources DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(), // Processors - TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(), - THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(), - AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), - COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), - STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), - TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(), @@ -152,10 +117,7 @@ public String getClassName() { IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(), - IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(), WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(), - OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(), WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), // Sinks @@ -163,8 +125,6 @@ public String getClassName() { IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(), WEBSOCKET_SINK.getPipePluginName().toUpperCase(), - OPC_UA_SINK.getPipePluginName().toUpperCase(), - OPC_DA_SINK.getPipePluginName().toUpperCase(), SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase()))); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java index e346b24ff6154..43287b7666bf4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java @@ -20,7 +20,6 @@ package org.apache.iotdb.commons.pipe.source; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment; import org.apache.iotdb.pipe.api.PipeExtractor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; @@ -48,7 +47,7 @@ public abstract class IoTDBSource implements PipeExtractor { protected int regionId; protected PipeTaskMeta pipeTaskMeta; - protected boolean isForwardingPipeRequests; + protected boolean isForwardingPipeRequests = true; // The value is always true after the first start even the extractor is closed protected final AtomicBoolean hasBeenStarted = new AtomicBoolean(false); @@ -99,12 +98,7 @@ public void customize( taskID = pipeName + "_" + regionId + "_" + creationTime; pipeTaskMeta = environment.getPipeTaskMeta(); - isForwardingPipeRequests = - parameters.getBooleanOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, - PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), - PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); + isForwardingPipeRequests = true; } @Override