diff --git a/.github/workflows/cluster-it-1c1d.yml b/.github/workflows/cluster-it-1c1d.yml deleted file mode 100644 index fe5fc2a6edb89..0000000000000 --- a/.github/workflows/cluster-it-1c1d.yml +++ /dev/null @@ -1,87 +0,0 @@ -name: Cluster IT - 1C1D - -on: - push: - branches: - - master - - 'rel/1.*' - - pipe-meta-sync - paths-ignore: - - 'docs/**' - - 'site/**' - pull_request: - branches: - - master - - 'rel/1.*' - - pipe-meta-sync - paths-ignore: - - 'docs/**' - - 'site/**' - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - -jobs: - Simple: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - os: [ ubuntu-latest, windows-latest ] - runs-on: ${{ matrix.os }} - - steps: - - uses: actions/checkout@v4 - - name: Set up JDK - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: 17 - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Adjust network dynamic TCP ports range - if: ${{ runner.os == 'Windows' }} - shell: pwsh - run: | - netsh int ipv4 set dynamicport tcp start=32768 num=32768 - netsh int ipv4 set dynamicport udp start=32768 num=32768 - netsh int ipv6 set dynamicport tcp start=32768 num=32768 - netsh int ipv6 set dynamicport udp start=32768 num=32768 - - name: Adjust Linux kernel somaxconn - if: ${{ runner.os == 'Linux' }} - shell: bash - run: sudo sysctl -w net.core.somaxconn=65535 -# - name: Adjust Mac kernel somaxconn -# if: ${{ runner.os == 'macOS' }} -# shell: bash -# run: sudo sysctl -w kern.ipc.somaxconn=65535 - - name: IT/UT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=2 \ - -pl integration-test \ - -am - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: standalone-log-java${{ matrix.java }}-${{ runner.os }} - path: integration-test/target/cluster-logs - retention-days: 1 - diff --git a/.github/workflows/cluster-it-1c3d.yml b/.github/workflows/cluster-it-1c3d.yml deleted file mode 100644 index 5e46c26152a13..0000000000000 --- a/.github/workflows/cluster-it-1c3d.yml +++ /dev/null @@ -1,65 +0,0 @@ -name: Cluster IT - 1C3D - -on: - push: - branches: - - master - - 'rel/1.*' - - pipe-meta-sync - paths-ignore: - - 'docs/**' - - 'site/**' - pull_request: - branches: - - master - - 'rel/1.*' - - pipe-meta-sync - paths-ignore: - - 'docs/**' - - 'site/**' - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - -jobs: - Simple: - strategy: - fail-fast: false - max-parallel: 20 - matrix: - java: [ 8, 11, 17 ] - runs-on: [self-hosted, iotdb] -# group: self-hosted -# labels: iotdb - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: IT/UT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=6 -DConfigNodeMaxHeapSize=1024 -DDataNodeMaxHeapSize=1024 \ - -pl integration-test \ - -am -PClusterIT - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-java${{ matrix.java }}-${{ runner.os }} - path: integration-test/target/cluster-logs - retention-days: 1 diff --git a/.github/workflows/multi-language-client.yml b/.github/workflows/multi-language-client.yml deleted file mode 100644 index b66fa4ed9f561..0000000000000 --- a/.github/workflows/multi-language-client.yml +++ /dev/null @@ -1,132 +0,0 @@ -name: Multi-Language Client -on: - push: - branches: - - master - - "rel/*" - paths-ignore: - - 'docs/**' - - 'site/**' - pull_request: - branches: - - master - - "rel/*" - paths-ignore: - - 'docs/**' - - 'site/**' - # allow manually run the action: - workflow_dispatch: - - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - -jobs: - cpp: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - os: [ubuntu-latest, macos-latest, windows-latest] - runs-on: ${{ matrix.os}} - - steps: - - uses: actions/checkout@v4 - - name: Install CPP Dependencies (Ubuntu) - if: matrix.os == 'ubuntu-latest' - shell: bash - run: sudo apt-get update && sudo apt-get install libboost-all-dev - - name: Install CPP Dependencies (Mac) - if: matrix.os == 'macos-latest' - shell: bash - run: | - brew install boost - brew install bison - echo 'export PATH="/opt/homebrew/opt/bison/bin:$PATH"' >> ~/.bash_profile - source ~/.bash_profile && export LDFLAGS="-L/opt/homebrew/opt/bison/lib" - - name: Install CPP Dependencies (Windows) - if: matrix.os == 'windows-latest' - run: | - choco install winflexbison3 - choco install boost-msvc-14.3 --version=1.84.0.1 - echo C:\\local\\boost_1_84_0 >> $env:GITHUB_PATH - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: client-${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Build IoTDB server and client - shell: bash - # Explicitly using mvnw here as the build requires maven 3.9 and the default installation is older - # Explicitly using "install" instead of package in order to be sure we're using libs built on this machine - # (was causing problems on windows, but could cause problem on linux, when updating the thrift module) - run: ./mvnw clean install -P with-cpp -pl distribution,example/client-cpp-example -am -DskipTests - - name: Test with Maven - shell: bash - # Explicitly using mvnw here as the build requires maven 3.9 and the default installation is older - run: ./mvnw clean verify -P with-cpp -pl iotdb-client/client-cpp -am - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cpp-IT-${{ runner.os }} - path: iotdb-client/client-cpp/target/build/test/Testing - retention-days: 1 - - go: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - with: - token: ${{secrets.GITHUB_TOKEN}} - submodules: recursive - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Compile IoTDB Server - run: mvn clean package -pl distribution -am -DskipTests - - name: Integration test - shell: bash - run: | - cd iotdb-client - git clone https://github.com/apache/iotdb-client-go.git - cd iotdb-client-go - make e2e_test_for_parent_git_repo e2e_test_clean_for_parent_git_repo - - python: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Build IoTDB server distribution zip and python client - run: mvn -B clean install -pl distribution,iotdb-client/client-py -am -DskipTests - - name: Build IoTDB server docker image - run: | - docker build . -f docker/src/main/Dockerfile-1c1d -t "iotdb:dev" - docker images - - name: Install IoTDB python client requirements - run: pip3 install -r iotdb-client/client-py/requirements_dev.txt - - name: Check code style - shell: bash - run: black iotdb-client/client-py/ --check --diff - - name: Integration test - shell: bash - run: | - cd iotdb-client/client-py/ && pytest . - - diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml index 789f8176da840..e2224480679fa 100644 --- a/.github/workflows/pipe-it-2cluster.yml +++ b/.github/workflows/pipe-it-2cluster.yml @@ -27,88 +27,15 @@ env: DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} jobs: - auto-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2AutoCreateSchema - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-auto-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 - manual-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2ManualCreateSchema - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-manual-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 subscription: strategy: fail-fast: false - max-parallel: 15 + max-parallel: 60 matrix: java: [ 17 ] # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode ] - cluster2: [ ScalableSingleNodeMode ] + cluster1: [ LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode ] + cluster2: [ LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} steps: diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml deleted file mode 100644 index bdf9a9dc47cd6..0000000000000 --- a/.github/workflows/unit-test.yml +++ /dev/null @@ -1,74 +0,0 @@ -# This workflow will build a Java project with Maven -# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven - -name: Unit-Test - -on: - push: - branches: - - master - - 'rel/*' - - pipe-meta-sync - paths-ignore: - - 'docs/**' - - 'site/**' - pull_request: - branches: - - master - - 'rel/*' - - pipe-meta-sync - paths-ignore: - - 'docs/**' - - 'site/**' - # allow manually run the action: - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - -jobs: - unit-test: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 8, 17 ] - os: [ ubuntu-latest, windows-latest ] - it_task: [ 'others', 'datanode' ] - include: - - java: 17 - os: macos-latest - it_task: 'datanode' - - java: 17 - os: macos-latest - it_task: 'others' - runs-on: ${{ matrix.os }} - - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: corretto - java-version: ${{ matrix.java }} - - name: Cache Maven packages - uses: actions/cache@v4 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Test Datanode Module with Maven - shell: bash - if: ${{ matrix.it_task == 'datanode'}} - run: mvn clean integration-test -Dtest.port.closed=true -pl iotdb-core/datanode -am -DskipTests -Diotdb.test.only=true - - name: Test Other Modules with Maven - shell: bash - if: ${{ matrix.it_task == 'others'}} - run: | - mvn clean install -DskipTests - mvn -P get-jar-with-dependencies,with-integration-tests clean test -Dtest.port.closed=true -Diotdb.test.skip=true diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index fa5447bc5b071..fde5439b8feda 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -45,6 +45,7 @@ import java.lang.management.MonitorInfo; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.lang.reflect.Field; import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -675,4 +676,24 @@ private String getKillPoints() { public abstract String getSystemPropertiesPath(); protected abstract MppJVMConfig initVMConfig(); + + public long getPid() { + return getPidOfProcess(this.instance); + } + + public static synchronized long getPidOfProcess(final Process p) { + long pid = -1; + try { + if (p.getClass().getName().equals("java.lang.UNIXProcess") + || p.getClass().getName().equals("java.lang.ProcessImpl")) { + final Field f = p.getClass().getDeclaredField("pid"); + f.setAccessible(true); + pid = f.getLong(p); + f.setAccessible(false); + } + } catch (final Exception e) { + pid = -1; + } + return pid; + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java index 650f861f498d0..608b28510ce36 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java @@ -50,6 +50,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.InputStreamReader; import java.sql.Connection; import java.sql.Statement; import java.util.ArrayList; @@ -1010,6 +1012,7 @@ private void pollMessagesAndCheck( } // Check data on receiver + final long[] currentTime = {System.currentTimeMillis()}; try { try (final Connection connection = receiverEnv.getConnection(); final Statement statement = connection.createStatement()) { @@ -1020,6 +1023,31 @@ private void pollMessagesAndCheck( LOGGER.info("detect receiver crashed, skipping this test..."); return; } + // potential stuck + if (System.currentTimeMillis() - currentTime[0] > 60_000L) { + currentTime[0] = System.currentTimeMillis(); + for (final DataNodeWrapper wrapper : senderEnv.getDataNodeWrapperList()) { + final long pid = wrapper.getPid(); + if (pid == -1) { + LOGGER.warn("Failed to get pid for {}", wrapper.getId()); + continue; + } + final String command = "jstack -l " + pid; + LOGGER.info("Executing command {} for {}", command, wrapper.getId()); + final Process process = Runtime.getRuntime().exec(command); + final StringBuilder output = new StringBuilder(); + try (final BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append(System.lineSeparator()); + } + } + final int exitCode = process.waitFor(); + LOGGER.info("Command {} exited with code {}", command, exitCode); + LOGGER.info(output.toString()); + } + } TestUtils.assertSingleResultSetEqual( TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), expectedHeaderWithResult); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java deleted file mode 100644 index 39a9f2225f6bb..0000000000000 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.subscription.it.dual; - -import org.apache.iotdb.db.it.utils.TestUtils; -import org.apache.iotdb.isession.ISession; -import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription; -import org.apache.iotdb.rpc.subscription.config.TopicConstant; -import org.apache.iotdb.session.subscription.SubscriptionSession; -import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; -import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; -import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; - -import org.apache.tsfile.write.record.Tablet; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; - -import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; -import static org.junit.Assert.fail; - -@RunWith(IoTDBTestRunner.class) -@Category({MultiClusterIT2Subscription.class}) -public class IoTDBSubscriptionTimePrecisionIT extends AbstractSubscriptionDualIT { - - private static final Logger LOGGER = - LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class); - - @Override - protected void setUpConfig() { - super.setUpConfig(); - - // Set timestamp precision to nanosecond - senderEnv.getConfig().getCommonConfig().setTimestampPrecision("ns"); - receiverEnv.getConfig().getCommonConfig().setTimestampPrecision("ns"); - } - - @Test - public void testTopicTimePrecision() throws Exception { - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); - - // Insert some historical data on sender - final long currentTime1 = System.currentTimeMillis() * 1000_000L; // in nanosecond - try (final ISession session = senderEnv.getSessionConnection()) { - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s2) values (%s, 1)", currentTime1 - i)); - } - session.executeNonQueryStatement("flush"); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // Create topic on sender - final String topic1 = "topic1"; - final String topic2 = "topic2"; - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - { - final Properties config = new Properties(); - config.put(TopicConstant.START_TIME_KEY, currentTime1 - 99); - config.put( - TopicConstant.END_TIME_KEY, - TopicConstant.NOW_TIME_VALUE); // now should be strictly larger than current time 1 - session.createTopic(topic1, config); - } - { - final Properties config = new Properties(); - config.put( - TopicConstant.START_TIME_KEY, - TopicConstant.NOW_TIME_VALUE); // now should be strictly smaller than current time 2 - session.createTopic(topic2, config); - } - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // Insert some historical data on sender again - final long currentTime2 = System.currentTimeMillis() * 1000_000L; // in nanosecond - try (final ISession session = senderEnv.getSessionConnection()) { - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d2(time, s1) values (%s, 1)", currentTime2 + i)); - } - session.executeNonQueryStatement("flush"); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // Subscribe on sender and insert on receiver - final AtomicBoolean isClosed = new AtomicBoolean(false); - final Thread thread = - new Thread( - () -> { - try (final SubscriptionPullConsumer consumer = - new SubscriptionPullConsumer.Builder() - .host(host) - .port(port) - .consumerId("c1") - .consumerGroupId("cg1") - .autoCommit(false) - .buildPullConsumer(); - final ISession session = receiverEnv.getSessionConnection()) { - consumer.open(); - consumer.subscribe(topic1, topic2); - while (!isClosed.get()) { - LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time - final List messages = - consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final Iterator it = - message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - consumer.commitSync(messages); - } - // Auto unsubscribe topics - } catch (final Exception e) { - e.printStackTrace(); - // Avoid failure - } finally { - LOGGER.info("consumer exiting..."); - } - }, - String.format("%s - consumer", testName.getMethodName())); - thread.start(); - - // Check data on receiver - try { - try (final Connection connection = receiverEnv.getConnection(); - final Statement statement = connection.createStatement()) { - // Keep retrying if there are execution failures - AWAIT.untilAsserted( - () -> - TestUtils.assertSingleResultSetEqual( - TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), - new HashMap() { - { - put("count(root.db.d1.s2)", "100"); - put("count(root.db.d2.s1)", "100"); - } - })); - } - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - isClosed.set(true); - thread.join(); - } - } -} diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java deleted file mode 100644 index ac7e83339844b..0000000000000 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java +++ /dev/null @@ -1,638 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.subscription.it.dual; - -import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; -import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo; -import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq; -import org.apache.iotdb.db.it.utils.TestUtils; -import org.apache.iotdb.isession.ISession; -import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription; -import org.apache.iotdb.rpc.subscription.config.TopicConstant; -import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException; -import org.apache.iotdb.session.subscription.SubscriptionSession; -import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; -import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; -import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; - -import org.apache.tsfile.write.record.Tablet; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.LockSupport; - -import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; -import static org.junit.Assert.fail; - -@RunWith(IoTDBTestRunner.class) -@Category({MultiClusterIT2Subscription.class}) -public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionDualIT { - - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSubscriptionTopicIT.class); - - @Test - public void testTopicPathSubscription() throws Exception { - // Insert some historical data on sender - try (final ISession session = senderEnv.getSessionConnection()) { - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); - session.executeNonQueryStatement( - String.format("insert into root.db.d2(time, s) values (%s, 1)", i)); - session.executeNonQueryStatement( - String.format("insert into root.db.d3(time, t) values (%s, 1)", i)); - session.executeNonQueryStatement( - String.format("insert into root.db.t1(time, s1) values (%s, 1)", i)); - } - session.executeNonQueryStatement("flush"); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // Create topic on sender - final String topicName = "topic1"; - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.*.s"); - session.createTopic(topicName, config); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - assertTopicCount(1); - - // Subscribe on sender and insert on receiver - final AtomicBoolean isClosed = new AtomicBoolean(false); - final Thread thread = - new Thread( - () -> { - try (final SubscriptionPullConsumer consumer = - new SubscriptionPullConsumer.Builder() - .host(host) - .port(port) - .consumerId("c1") - .consumerGroupId("cg1") - .autoCommit(false) - .buildPullConsumer(); - final ISession session = receiverEnv.getSessionConnection()) { - consumer.open(); - consumer.subscribe(topicName); - while (!isClosed.get()) { - LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time - final List messages = - consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final Iterator it = - message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - consumer.commitSync(messages); - } - consumer.unsubscribe(topicName); - } catch (final Exception e) { - e.printStackTrace(); - // Avoid fail - } finally { - LOGGER.info("consumer exiting..."); - } - }, - String.format("%s - consumer", testName.getMethodName())); - thread.start(); - - // Check data on receiver - try { - try (final Connection connection = receiverEnv.getConnection(); - final Statement statement = connection.createStatement()) { - // Keep retrying if there are execution failures - AWAIT.untilAsserted( - () -> - TestUtils.assertSingleResultSetEqual( - TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), - new HashMap() { - { - put("count(root.db.d1.s)", "100"); - put("count(root.db.d2.s)", "100"); - } - })); - } - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - isClosed.set(true); - thread.join(); - } - } - - @Test - public void testTopicTimeSubscription() throws Exception { - // Insert some historical data on sender - final long currentTime = System.currentTimeMillis(); - try (final ISession session = senderEnv.getSessionConnection()) { - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); - session.executeNonQueryStatement( - String.format("insert into root.db.d2(time, s) values (%s, 1)", currentTime + i)); - } - session.executeNonQueryStatement("flush"); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // Create topic on sender - final String topicName = "topic2"; - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.START_TIME_KEY, currentTime); - session.createTopic(topicName, config); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - assertTopicCount(1); - - // Subscribe on sender and insert on receiver - final AtomicBoolean isClosed = new AtomicBoolean(false); - final Thread thread = - new Thread( - () -> { - try (final SubscriptionPullConsumer consumer = - new SubscriptionPullConsumer.Builder() - .host(host) - .port(port) - .consumerId("c1") - .consumerGroupId("cg1") - .autoCommit(false) - .buildPullConsumer(); - final ISession session = receiverEnv.getSessionConnection()) { - consumer.open(); - consumer.subscribe(topicName); - while (!isClosed.get()) { - LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time - final List messages = - consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final Iterator it = - message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - consumer.commitSync(messages); - } - consumer.unsubscribe(topicName); - } catch (final Exception e) { - e.printStackTrace(); - // Avoid failure - } finally { - LOGGER.info("consumer exiting..."); - } - }, - String.format("%s - consumer", testName.getMethodName())); - thread.start(); - - // Check data on receiver - try { - try (final Connection connection = receiverEnv.getConnection(); - final Statement statement = connection.createStatement()) { - // Keep retrying if there are execution failures - AWAIT.untilAsserted( - () -> - TestUtils.assertSingleResultSetEqual( - TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), - new HashMap() { - { - put("count(root.db.d2.s)", "100"); - } - })); - } - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - isClosed.set(true); - thread.join(); - } - } - - @Test - public void testTopicProcessorSubscription() throws Exception { - // Insert some history data on sender - try (final ISession session = senderEnv.getSessionConnection()) { - session.executeNonQueryStatement( - "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2), (2000, 3), (2500, 4), (3000, 5)"); - session.executeNonQueryStatement("flush"); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // Create topic - final String topicName = "topic3"; - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put("processor", "tumbling-time-sampling-processor"); - config.put("processor.tumbling-time.interval-seconds", "1"); - config.put("processor.down-sampling.split-file", "true"); - session.createTopic(topicName, config); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - assertTopicCount(1); - - // Subscribe on sender and insert on receiver - final AtomicBoolean isClosed = new AtomicBoolean(false); - final Thread thread = - new Thread( - () -> { - try (final SubscriptionPullConsumer consumer = - new SubscriptionPullConsumer.Builder() - .host(host) - .port(port) - .consumerId("c1") - .consumerGroupId("cg1") - .autoCommit(false) - .buildPullConsumer(); - final ISession session = receiverEnv.getSessionConnection()) { - consumer.open(); - consumer.subscribe(topicName); - while (!isClosed.get()) { - LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time - final List messages = - consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final Iterator it = - message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - consumer.commitSync(messages); - } - consumer.unsubscribe(topicName); - } catch (final Exception e) { - e.printStackTrace(); - // Avoid failure - } finally { - LOGGER.info("consumer exiting..."); - } - }, - String.format("%s - consumer", testName.getMethodName())); - thread.start(); - - // Check data on receiver - final Set expectedResSet = new HashSet<>(); - expectedResSet.add("1000,1.0,"); - expectedResSet.add("2000,3.0,"); - expectedResSet.add("3000,5.0,"); - try { - try (final Connection connection = receiverEnv.getConnection(); - final Statement statement = connection.createStatement()) { - // Keep retrying if there are execution failures - AWAIT.untilAsserted( - () -> - TestUtils.assertResultSetEqual( - TestUtils.executeQueryWithRetry(statement, "select * from root.**"), - "Time,root.db.d1.at1,", - expectedResSet)); - } - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - isClosed.set(true); - thread.join(); - } - } - - @Test - public void testTopicNameWithBackQuote() throws Exception { - // Insert some historical data on sender - try (final ISession session = senderEnv.getSessionConnection()) { - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); - } - for (int i = 100; i < 200; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); - } - for (int i = 200; i < 300; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); - } - session.executeNonQueryStatement("flush"); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - // Create topic on sender - final String topic1 = "`topic4`"; - final String topic2 = "`'topic5'`"; - final String topic3 = "`\"topic6\"`"; - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - { - final Properties config = new Properties(); - config.put(TopicConstant.START_TIME_KEY, 0); - config.put(TopicConstant.END_TIME_KEY, 99); - session.createTopic(topic1, config); - } - { - final Properties config = new Properties(); - config.put(TopicConstant.START_TIME_KEY, 100); - config.put(TopicConstant.END_TIME_KEY, 199); - session.createTopic(topic2, config); - } - { - final Properties config = new Properties(); - config.put(TopicConstant.START_TIME_KEY, 200); - config.put(TopicConstant.END_TIME_KEY, 299); - session.createTopic(topic3, config); - } - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - assertTopicCount(3); - - // Subscribe on sender and insert on receiver - final Set topics = new HashSet<>(); - topics.add(topic1); - topics.add(topic2); - topics.add(topic3); - final AtomicBoolean isClosed = new AtomicBoolean(false); - final Thread thread = - new Thread( - () -> { - try (final SubscriptionPullConsumer consumer = - new SubscriptionPullConsumer.Builder() - .host(host) - .port(port) - .consumerId("c1") - .consumerGroupId("cg1") - .autoCommit(false) - .buildPullConsumer(); - final ISession session = receiverEnv.getSessionConnection()) { - consumer.open(); - consumer.subscribe(topics); - while (!isClosed.get()) { - LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time - final List messages = - consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final Iterator it = - message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - consumer.commitSync(messages); - } - consumer.unsubscribe(topics); - } catch (final Exception e) { - e.printStackTrace(); - // Avoid failure - } finally { - LOGGER.info("consumer exiting..."); - } - }, - String.format("%s - consumer", testName.getMethodName())); - thread.start(); - - // Check data on receiver - try { - try (final Connection connection = receiverEnv.getConnection(); - final Statement statement = connection.createStatement()) { - // Keep retrying if there are execution failures - AWAIT.untilAsserted( - () -> - TestUtils.assertSingleResultSetEqual( - TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), - new HashMap() { - { - put("count(root.db.d1.s)", "300"); - } - })); - } - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - isClosed.set(true); - thread.join(); - } - } - - @Test - public void testTopicInvalidTimeRangeConfig() throws Exception { - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); - - // Scenario 1: invalid time - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties properties = new Properties(); - properties.put(TopicConstant.START_TIME_KEY, "2024-01-32"); - properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE); - session.createTopic("topic7", properties); - fail(); - } catch (final Exception ignored) { - } - assertTopicCount(0); - - // Scenario 2: test when 'start-time' is greater than 'end-time' - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties properties = new Properties(); - properties.put(TopicConstant.START_TIME_KEY, "2001.01.01T08:00:00"); - properties.put(TopicConstant.END_TIME_KEY, "2000.01.01T08:00:00"); - session.createTopic("topic8", properties); - fail(); - } catch (final Exception ignored) { - } - assertTopicCount(0); - } - - @Test - public void testTopicInvalidPathConfig() throws Exception { - // Test invalid path when using tsfile format - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - config.put(TopicConstant.PATH_KEY, "root.db.*.s"); - testTopicInvalidRuntimeConfigTemplate("topic9", config); - } - - @Test - public void testTopicInvalidProcessorConfig() throws Exception { - // Test invalid processor when using tsfile format - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - config.put("processor", "tumbling-time-sampling-processor"); - config.put("processor.tumbling-time.interval-seconds", "1"); - config.put("processor.down-sampling.split-file", "true"); - testTopicInvalidRuntimeConfigTemplate("topic10", config); - } - - private void testTopicInvalidRuntimeConfigTemplate( - final String topicName, final Properties config) throws Exception { - // Create topic - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); - try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - session.createTopic(topicName, config); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - assertTopicCount(1); - - final AtomicBoolean dataPrepared = new AtomicBoolean(false); - final AtomicBoolean topicSubscribed = new AtomicBoolean(false); - final AtomicBoolean result = new AtomicBoolean(false); - final List threads = new ArrayList<>(); - - // Subscribe on sender - threads.add( - new Thread( - () -> { - final AtomicInteger retryCount = new AtomicInteger(); - final int maxRetryTimes = 32; - try (final SubscriptionPullConsumer consumer = - new SubscriptionPullConsumer.Builder() - .host(host) - .port(port) - .consumerId("c1") - .consumerGroupId("cg1") - .autoCommit(false) - .fileSaveDir(System.getProperty("java.io.tmpdir")) // hack for license check - .buildPullConsumer()) { - consumer.open(); - consumer.subscribe(topicName); - topicSubscribed.set(true); - while (retryCount.getAndIncrement() < maxRetryTimes) { - LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time - if (dataPrepared.get()) { - final List messages = - consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); - consumer.commitSync(messages); - } - } - consumer.unsubscribe(topicName); - } catch (final SubscriptionRuntimeCriticalException e) { - result.set(true); - } catch (final Exception e) { - e.printStackTrace(); - // Avoid failure - } finally { - LOGGER.info("consumer exiting..."); - } - }, - String.format("%s - consumer", testName.getMethodName()))); - - // Insert some realtime data on sender - threads.add( - new Thread( - () -> { - while (!topicSubscribed.get()) { - LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time - } - try (final ISession session = senderEnv.getSessionConnection()) { - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); - } - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d2(time, s) values (%s, 1)", i)); - } - for (int i = 0; i < 100; ++i) { - session.executeNonQueryStatement( - String.format("insert into root.db.d3(time, t) values (%s, 1)", i)); - } - session.executeNonQueryStatement("flush"); - } catch (final Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - dataPrepared.set(true); - }, - String.format("%s - data inserter", testName.getMethodName()))); - - for (final Thread thread : threads) { - thread.start(); - } - - // The expected SubscriptionRuntimeCriticalException was not thrown if result is false - AWAIT.untilTrue(result); - - for (final Thread thread : threads) { - thread.join(); - } - } - - private void assertTopicCount(final int count) throws Exception { - try (final SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - final List showTopicResult = - client.showTopic(new TShowTopicReq()).topicInfoList; - Assert.assertEquals(count, showTopicResult.size()); - } - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 37c04e26f5085..531172844f110 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -332,14 +332,20 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); pipeRemainingTimeList.add(remainingEventAndTime.getRight()); + final Pair remainingEvents = + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .getRemainingEvents(staticMeta.getPipeName(), staticMeta.getCreationTime()); + logger.ifPresent( l -> l.info( - "Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}", + "Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}, remainingEventsInExtractors: {}, getRemainingEventsInConnectors: {}", pipeMeta.coreReportMessage(), isCompleted, remainingEventAndTime.getLeft(), - remainingEventAndTime.getRight())); + remainingEventAndTime.getRight(), + remainingEvents.getLeft(), + remainingEvents.getRight())); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); } catch (final IOException | IllegalPathException e) { @@ -411,14 +417,20 @@ protected void collectPipeMetaListInternal( pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); pipeRemainingTimeList.add(remainingEventAndTime.getRight()); + final Pair remainingEvents = + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .getRemainingEvents(staticMeta.getPipeName(), staticMeta.getCreationTime()); + logger.ifPresent( l -> l.info( - "Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}", + "Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}, remainingEventsInExtractors: {}, getRemainingEventsInConnectors: {}", pipeMeta.coreReportMessage(), isCompleted, remainingEventAndTime.getLeft(), - remainingEventAndTime.getRight())); + remainingEventAndTime.getRight(), + remainingEvents.getLeft(), + remainingEvents.getRight())); } LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); } catch (final IOException | IllegalPathException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index cdab8d422159b..55739bf8f3291 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -135,12 +135,22 @@ public PipeTsFileInsertionEvent( */ public boolean waitForTsFileClose() throws InterruptedException { if (!isClosed.get()) { + isClosed.set(resource.isClosed()); + synchronized (isClosed) { while (!isClosed.get()) { - isClosed.wait(); + isClosed.wait(100); + + final boolean isClosedNow = resource.isClosed(); + if (isClosedNow) { + isClosed.set(true); + isClosed.notifyAll(); + break; + } } } } + // From illustrations above we know If the status is "closed", then the tsFile is flushed // And here we guarantee that the isEmpty() is set before flushing if tsFile is empty // Then we know: "isClosed" --> tsFile flushed --> (isEmpty() <--> tsFile is empty) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java index c1ecb8460b4d0..f58e4badcc974 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java @@ -208,6 +208,14 @@ public Pair getRemainingEventAndTime( return new Pair<>(operator.getRemainingEvents(), operator.getRemainingTime()); } + public Pair getRemainingEvents(final String pipeName, final long creationTime) { + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.computeIfAbsent( + pipeName + "_" + creationTime, k -> new PipeDataNodeRemainingEventAndTimeOperator()); + return new Pair<>( + operator.getRemainingEventsInExtractors(), operator.getRemainingEventsInConnectors()); + } + //////////////////////////// singleton //////////////////////////// private static class PipeDataNodeRemainingEventAndTimeMetricsHolder { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java index f7a08295f408d..56b83053b227d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java @@ -79,6 +79,20 @@ long getRemainingEvents() { .orElse(0L); } + long getRemainingEventsInExtractors() { + return dataRegionExtractors.keySet().stream() + .map(IoTDBDataRegionExtractor::getEventCount) + .reduce(Integer::sum) + .orElse(0); + } + + long getRemainingEventsInConnectors() { + return dataRegionConnectors.keySet().stream() + .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName)) + .reduce(Integer::sum) + .orElse(0); + } + /** * This will calculate the estimated remaining time of pipe. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java index 14d3315e1b79d..4f8550e4e931d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java @@ -170,7 +170,7 @@ private void prefetchOnce() { // - PipeHeartbeatEvent: ignored? (may affect pipe metrics) // - UserDefinedEnrichedEvent: ignored? // - Others: events related to meta sync, safe to ignore - LOGGER.warn( + LOGGER.info( "Subscription: SubscriptionPrefetchingTabletsQueue {} ignore EnrichedEvent {} when prefetching.", this, event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index 3f2dac42db6c0..90a41b1a51dbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -112,7 +112,7 @@ public SubscriptionTsFileEvent poll(final String consumerId) { } if (!(event instanceof PipeTsFileInsertionEvent)) { - LOGGER.warn( + LOGGER.info( "Subscription: SubscriptionPrefetchingTsFileQueue {} only support poll PipeTsFileInsertionEvent. Ignore {}.", this, event); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 1de7f33df3a83..f3919211d71a9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -203,9 +203,9 @@ public class CommonConfig { private double pipeAllSinksRateLimitBytesPerSecond = -1; private boolean isSeperatedPipeHeartbeatEnabled = true; - private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100; - private long pipeMetaSyncerInitialSyncDelayMinutes = 3; - private long pipeMetaSyncerSyncIntervalMinutes = 3; + private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30; + private long pipeMetaSyncerInitialSyncDelayMinutes = 1; + private long pipeMetaSyncerSyncIntervalMinutes = 1; private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1; private boolean pipeAutoRestartEnabled = true;