From 0f0e889d3e2db5b245ebcc124b1451b94a16b4fd Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 14:31:36 +0300 Subject: [PATCH 01/11] Added ClickHouse Server tests support. --- .github/workflows/tests.yaml | 22 ++++- .../build.gradle.kts | 4 + .../connector/test/DummyFlinkClusterTest.java | 17 +++- .../clickhouse/ClickHouseServerForTests.java | 89 +++++++++++++++++++ .../clickhouse/ClickHouseTestHelpers.java | 50 +++++++++++ .../flink}/EmbeddedFlinkClusterForTests.java | 10 +-- 6 files changed, 182 insertions(+), 10 deletions(-) create mode 100644 flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java create mode 100644 flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java rename flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/{ => embedded/flink}/EmbeddedFlinkClusterForTests.java (89%) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a8736ae..c1aef24 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -5,16 +5,36 @@ on: [push] jobs: build: runs-on: ubuntu-latest - name: Apache Flink ClickHouse Connector tests + strategy: + fail-fast: false + matrix: + clickhouse: [ "23.7", "24.3", "latest", "cloud" ] + name: Apache Flink ClickHouse Connector tests with ClickHouse ${{ matrix.clickhouse }} steps: + - name: Check for Cloud Credentials + id: check-cloud-credentials + run: | + if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then + echo "SKIP_STEP=true" >> $GITHUB_ENV + else + echo "SKIP_STEP=false" >> $GITHUB_ENV + fi + shell: bash - uses: actions/checkout@v3 + if: env.SKIP_STEP != 'true' - name: Set up JDK 17 + if: env.SKIP_STEP != 'true' uses: actions/setup-java@v3 with: java-version: '21' distribution: 'adopt' architecture: x64 - name: Setup and execute Gradle 'test' task + if: env.SKIP_STEP != 'true' uses: gradle/gradle-build-action@v2 + env: + CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} + CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} + CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} with: arguments: test \ No newline at end of file diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts index 708c80c..f4044f5 100644 --- a/flink-connector-clickhouse-base/build.gradle.kts +++ b/flink-connector-clickhouse-base/build.gradle.kts @@ -17,6 +17,7 @@ extra.apply { set("clickHouseDriverVersion", "0.8.5") set("flinkVersion", "2.0.0") set("log4jVersion","2.17.2") + set("testContainersVersion", "1.21.0") } @@ -56,6 +57,9 @@ dependencies { testImplementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}") // flink tests testImplementation("org.apache.flink:flink-test-utils:${project.extra["flinkVersion"]}") + // + testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}") + testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}") } // Apply a specific Java toolchain to ease working on different environments. diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java index 4ac99ae..e958437 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java @@ -1,6 +1,8 @@ package org.apache.flink.connector.test; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests; +import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests; import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -9,8 +11,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; -class DummyFlinkClusterTest extends EmbeddedFlinkClusterForTests { +class DummyFlinkClusterTest extends FlinkClusterTests { // A simple Collection Sink private static class CollectSink implements SinkFunction { @@ -39,11 +42,19 @@ void testDummyFlinkCluster() throws Exception { CollectSink.values.clear(); env.fromData(1L, 2L) + .setParallelism(1) .map(new IncrementMapFunction()) .addSink(new CollectSink()); env.execute("testDummyFlinkCluster"); Assertions.assertEquals(2, CollectSink.values.size()); - Assertions.assertEquals(2L, CollectSink.values.get(0)); - Assertions.assertEquals(3L, CollectSink.values.get(1)); + } + + @Test + void testClickHouse() throws ExecutionException, InterruptedException { + String tableName = "clickhouse_test"; + String createTableSQl = String.format("CREATE TABLE `%s`.`%s` (order_id UInt64) ENGINE = MergeTree ORDER BY tuple(order_id);", ClickHouseServerForTests.getDataBase(), tableName); + ClickHouseServerForTests.executeSql(createTableSQl); + int rows = ClickHouseServerForTests.countRows(tableName); + Assertions.assertEquals(0, rows); } } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java new file mode 100644 index 0000000..7090b16 --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -0,0 +1,89 @@ +package org.apache.flink.connector.test.embedded.clickhouse; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.query.GenericRecord; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.testcontainers.clickhouse.ClickHouseContainer; + +public class ClickHouseServerForTests { + + private static final Logger LOG = LoggerFactory.getLogger(ClickHouseServerForTests.class); + + protected static boolean isCloud = ClickHouseTestHelpers.isCloud(); + protected static String database = null; + protected static ClickHouseContainer db = null; + + protected static String host = null; + protected static int port = 0; + protected static String username = null; + protected static String password = null; + protected static boolean isSSL = false; + + + public static void initConfiguration() { + if (isCloud) { + host = System.getenv("CLICKHOUSE_CLOUD_HOST"); + port = Integer.parseInt(System.getenv("CLICKHOUSE_CLOUD_PORT")); + database = System.getenv("CLICKHOUSE_DATABASE"); + username = System.getenv("CLICKHOUSE_USERNAME"); + password = System.getenv("CLICKHOUSE_PASSWORD"); + } else { + host = db.getHost(); + port = db.getFirstMappedPort(); + database = ClickHouseTestHelpers.DATABASE_DEFAULT; + username = db.getUsername(); + password = db.getPassword(); + } + isSSL = ClickHouseTestHelpers.isCloud(); + } + public static void setUp() { + if (database == null) { + database = String.format("flink_connector_test_%s", System.currentTimeMillis()); + } + if (!isCloud) { + db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE).withPassword("test_password").withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1"); + db.start(); + } + initConfiguration(); + if (isCloud) { + // wakeup cloud + // have a for loop + boolean isLive = false; + int counter = 0; + while (isLive || counter < 3) { + isLive = ClickHouseTestHelpers.ping(isCloud, host, port, isSSL, username, password); + counter++; + } + if (!isLive) + throw new RuntimeException("Failed to connect to ClickHouse"); + } + } + + public static void tearDown() { + if (db != null) { + db.stop(); + } + } + + public static String getDataBase() { return database; } + + public static void executeSql(String sql) throws ExecutionException, InterruptedException { + Client client = ClickHouseTestHelpers.getClient(isCloud, host, port, isSSL, username, password); + client.execute(sql).get(); + } + + public static int countRows(String table) throws ExecutionException, InterruptedException { + String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table); + Client client = ClickHouseTestHelpers.getClient(isCloud, host, port, isSSL, username, password); + List countResult = client.queryAll(countSql); + return countResult.get(0).getInteger(1); + } +} diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java new file mode 100644 index 0000000..9364d81 --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java @@ -0,0 +1,50 @@ +package org.apache.flink.connector.test.embedded.clickhouse; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.enums.Protocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class ClickHouseTestHelpers { + private static final Logger LOG = LoggerFactory.getLogger(ClickHouseTestHelpers.class); + + public static final String CLICKHOUSE_VERSION_DEFAULT = "24.3"; + public static final String CLICKHOUSE_PROXY_VERSION_DEFAULT = "23.8"; + public static final String CLICKHOUSE_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", getClickhouseVersion()); + public static final String CLICKHOUSE_FOR_PROXY_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", CLICKHOUSE_PROXY_VERSION_DEFAULT); + + public static final String HTTPS_PORT = "8443"; + public static final String DATABASE_DEFAULT = "default"; + public static final String USERNAME_DEFAULT = "default"; + + private static final int CLOUD_TIMEOUT_VALUE = 900; + private static final TimeUnit CLOUD_TIMEOUT_UNIT = TimeUnit.SECONDS; + + public static String getClickhouseVersion() { + String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); + if (clickHouseVersion == null) { + clickHouseVersion = CLICKHOUSE_VERSION_DEFAULT; + } + return clickHouseVersion; + } + + public static boolean isCloud() { + String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); + return clickHouseVersion != null && clickHouseVersion.equalsIgnoreCase("cloud"); + } + + public static Client getClient(boolean isCloud, String host, int port, boolean ssl, String username, String password) { + return new Client.Builder().addEndpoint(Protocol.HTTP, host, port, ssl) + .setUsername(username) + .setPassword(password) + .build(); + } + + public static boolean ping(boolean isCloud, String host, int port, boolean ssl, String username, String password) { + Client client = getClient(isCloud(), host, port, ssl, username, password); + return client.ping(); + } + +} diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/EmbeddedFlinkClusterForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java similarity index 89% rename from flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/EmbeddedFlinkClusterForTests.java rename to flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java index 31a9bb0..63a8447 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/EmbeddedFlinkClusterForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.test; +package org.apache.flink.connector.test.embedded.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; @@ -21,8 +21,7 @@ static int getFromEnvOrDefault(String key, int defaultValue) { return Integer.parseInt(value); } - @BeforeAll - static void setUp() throws Exception { + public static void setUp() throws Exception { Configuration config = new Configuration(); config.set(RestOptions.PORT, REST_PORT); // web UI port (optional) config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS); @@ -34,14 +33,13 @@ static void setUp() throws Exception { .build()); flinkCluster.before(); } - @AfterAll - static void tearDown() { + public static void tearDown() { if (flinkCluster != null) { flinkCluster.after(); } } - protected static MiniClusterWithClientResource getMiniCluster() { + public static MiniClusterWithClientResource getMiniCluster() { if (flinkCluster == null) throw new RuntimeException("No MiniCluster available"); return flinkCluster; From a04956824b27a49a7315f77ab91f335dda76ad76 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 14:31:56 +0300 Subject: [PATCH 02/11] Added ClickHouse Server tests support. --- .../connector/test/FlinkClusterTests.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java new file mode 100644 index 0000000..9de4f8a --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java @@ -0,0 +1,21 @@ +package org.apache.flink.connector.test; + +import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests; +import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class FlinkClusterTests { + @BeforeAll + static void setup() throws Exception { + EmbeddedFlinkClusterForTests.setUp(); + ClickHouseServerForTests.setUp(); + } + + @AfterAll + static void tearDown() { + EmbeddedFlinkClusterForTests.tearDown(); + ClickHouseServerForTests.tearDown(); + } + +} From 74001a2f1bba942e1e120f9eac50a27ab62eb4e0 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 14:40:08 +0300 Subject: [PATCH 03/11] Update env variables --- .../test/embedded/clickhouse/ClickHouseServerForTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index 7090b16..a259162 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -30,8 +30,8 @@ public class ClickHouseServerForTests { public static void initConfiguration() { if (isCloud) { - host = System.getenv("CLICKHOUSE_CLOUD_HOST"); - port = Integer.parseInt(System.getenv("CLICKHOUSE_CLOUD_PORT")); + host = System.getenv("INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT"); + port = Integer.parseInt(System.getenv("INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT")); database = System.getenv("CLICKHOUSE_DATABASE"); username = System.getenv("CLICKHOUSE_USERNAME"); password = System.getenv("CLICKHOUSE_PASSWORD"); From 44649846a26774c79c4ccfbbe9989125b09a59ff Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 14:46:14 +0300 Subject: [PATCH 04/11] Fix port retrival --- .../test/embedded/clickhouse/ClickHouseServerForTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index a259162..46e5ccb 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -30,11 +30,11 @@ public class ClickHouseServerForTests { public static void initConfiguration() { if (isCloud) { - host = System.getenv("INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT"); - port = Integer.parseInt(System.getenv("INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT")); + host = System.getenv("CLICKHOUSE_CLOUD_HOST"); + port = Integer.parseInt(ClickHouseTestHelpers.HTTPS_PORT); database = System.getenv("CLICKHOUSE_DATABASE"); username = System.getenv("CLICKHOUSE_USERNAME"); - password = System.getenv("CLICKHOUSE_PASSWORD"); + password = System.getenv("CLICKHOUSE_CLOUD_HOST"); } else { host = db.getHost(); port = db.getFirstMappedPort(); From 8357c3d276ce1dd96e53e974f24e19f0f0a103a4 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 14:52:14 +0300 Subject: [PATCH 05/11] Fix wakeup logic --- .../embedded/clickhouse/ClickHouseServerForTests.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index 46e5ccb..2855884 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -44,7 +44,7 @@ public static void initConfiguration() { } isSSL = ClickHouseTestHelpers.isCloud(); } - public static void setUp() { + public static void setUp() throws InterruptedException { if (database == null) { database = String.format("flink_connector_test_%s", System.currentTimeMillis()); } @@ -58,12 +58,13 @@ public static void setUp() { // have a for loop boolean isLive = false; int counter = 0; - while (isLive || counter < 3) { + while (counter < 5) { isLive = ClickHouseTestHelpers.ping(isCloud, host, port, isSSL, username, password); + if (isLive) return; + Thread.sleep(2000); counter++; } - if (!isLive) - throw new RuntimeException("Failed to connect to ClickHouse"); + throw new RuntimeException("Failed to connect to ClickHouse"); } } From 63e9da78672bfe9e2bee8a4fbea9409d88d6c087 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 15:15:04 +0300 Subject: [PATCH 06/11] Adjust cloud config --- .../clickhouse/ClickHouseServerForTests.java | 31 +++++++++---------- .../src/test/resources/log4j.properties | 8 +++++ 2 files changed, 22 insertions(+), 17 deletions(-) create mode 100644 flink-connector-clickhouse-base/src/test/resources/log4j.properties diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index 2855884..ee8e76c 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -30,12 +30,14 @@ public class ClickHouseServerForTests { public static void initConfiguration() { if (isCloud) { + LOG.info("Init ClickHouse Cloud Configuration"); host = System.getenv("CLICKHOUSE_CLOUD_HOST"); port = Integer.parseInt(ClickHouseTestHelpers.HTTPS_PORT); - database = System.getenv("CLICKHOUSE_DATABASE"); + database = String.format("flink_connector_test_%s", System.currentTimeMillis()); username = System.getenv("CLICKHOUSE_USERNAME"); - password = System.getenv("CLICKHOUSE_CLOUD_HOST"); + password = System.getenv("CLICKHOUSE_CLOUD_PASSWORD"); } else { + LOG.info("Init ClickHouse Docker Configuration"); host = db.getHost(); port = db.getFirstMappedPort(); database = ClickHouseTestHelpers.DATABASE_DEFAULT; @@ -45,27 +47,22 @@ public static void initConfiguration() { isSSL = ClickHouseTestHelpers.isCloud(); } public static void setUp() throws InterruptedException { - if (database == null) { - database = String.format("flink_connector_test_%s", System.currentTimeMillis()); - } if (!isCloud) { db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE).withPassword("test_password").withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1"); db.start(); } initConfiguration(); - if (isCloud) { - // wakeup cloud - // have a for loop - boolean isLive = false; - int counter = 0; - while (counter < 5) { - isLive = ClickHouseTestHelpers.ping(isCloud, host, port, isSSL, username, password); - if (isLive) return; - Thread.sleep(2000); - counter++; - } - throw new RuntimeException("Failed to connect to ClickHouse"); + // wakeup cloud + // have a for loop + boolean isLive = false; + int counter = 0; + while (counter < 5) { + isLive = ClickHouseTestHelpers.ping(isCloud, host, port, isSSL, username, password); + if (isLive) return; + Thread.sleep(2000); + counter++; } + throw new RuntimeException("Failed to connect to ClickHouse"); } public static void tearDown() { diff --git a/flink-connector-clickhouse-base/src/test/resources/log4j.properties b/flink-connector-clickhouse-base/src/test/resources/log4j.properties new file mode 100644 index 0000000..0b4ff80 --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/resources/log4j.properties @@ -0,0 +1,8 @@ +# Define the root logger with appender X +log4j.rootLogger=INFO, console +#log4j.logger.org.testcontainers=WARN +log4j.logger.com.clickhouse.kafka=DEBUG + +log4j.appender.console= org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.conversionPattern=[%d] %p %C %m%n \ No newline at end of file From 3226918f79f7342a1a8da46b649201350d1c3f05 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 15:28:25 +0300 Subject: [PATCH 07/11] Change ping logic & create database on init --- .../apache/flink/connector/test/FlinkClusterTests.java | 2 +- .../embedded/clickhouse/ClickHouseServerForTests.java | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java index 9de4f8a..a92b6f1 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java @@ -7,7 +7,7 @@ public class FlinkClusterTests { @BeforeAll - static void setup() throws Exception { + static void setUp() throws Exception { EmbeddedFlinkClusterForTests.setUp(); ClickHouseServerForTests.setUp(); } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index ee8e76c..e9601f8 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -46,7 +46,7 @@ public static void initConfiguration() { } isSSL = ClickHouseTestHelpers.isCloud(); } - public static void setUp() throws InterruptedException { + public static void setUp() throws InterruptedException, ExecutionException { if (!isCloud) { db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE).withPassword("test_password").withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1"); db.start(); @@ -58,7 +58,11 @@ public static void setUp() throws InterruptedException { int counter = 0; while (counter < 5) { isLive = ClickHouseTestHelpers.ping(isCloud, host, port, isSSL, username, password); - if (isLive) return; + if (isLive) { + String createDatabase = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database); + executeSql(createDatabase); + return; + } Thread.sleep(2000); counter++; } From 7354a7b2ad57d362692e4e437ccaaaba44e81250 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 15:44:30 +0300 Subject: [PATCH 08/11] Upload error report to artifact --- .github/workflows/tests.yaml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index c1aef24..28fd11d 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -37,4 +37,10 @@ jobs: CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} with: - arguments: test \ No newline at end of file + arguments: test + - name: Upload Test Report + if: failure() # Only upload if tests fail (optional) + uses: actions/upload-artifact@v4 + with: + name: test-report + path: file:///home/runner/work/flink-connector-clickhouse/flink-connector-clickhouse/flink-connector-clickhouse-base/build/reports/ \ No newline at end of file From 4b72f14651d18dd3754d38b14e21da74cd03f709 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 15:48:30 +0300 Subject: [PATCH 09/11] Fix reports filename --- .github/workflows/tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 28fd11d..87abe00 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -43,4 +43,4 @@ jobs: uses: actions/upload-artifact@v4 with: name: test-report - path: file:///home/runner/work/flink-connector-clickhouse/flink-connector-clickhouse/flink-connector-clickhouse-base/build/reports/ \ No newline at end of file + path: file:///home/runner/work/flink-connector-clickhouse/flink-connector-clickhouse/flink-connector-clickhouse-base/build/reports/tests/test/index.html \ No newline at end of file From 55df666caaae1301fb0db02814dd33e5fd4fdcac Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 16:20:43 +0300 Subject: [PATCH 10/11] Adding timeout --- .github/workflows/tests.yaml | 8 +------- .../embedded/clickhouse/ClickHouseServerForTests.java | 2 +- .../test/embedded/clickhouse/ClickHouseTestHelpers.java | 5 +++-- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 87abe00..c1aef24 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -37,10 +37,4 @@ jobs: CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} with: - arguments: test - - name: Upload Test Report - if: failure() # Only upload if tests fail (optional) - uses: actions/upload-artifact@v4 - with: - name: test-report - path: file:///home/runner/work/flink-connector-clickhouse/flink-connector-clickhouse/flink-connector-clickhouse-base/build/reports/tests/test/index.html \ No newline at end of file + arguments: test \ No newline at end of file diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index e9601f8..a515f87 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -34,7 +34,7 @@ public static void initConfiguration() { host = System.getenv("CLICKHOUSE_CLOUD_HOST"); port = Integer.parseInt(ClickHouseTestHelpers.HTTPS_PORT); database = String.format("flink_connector_test_%s", System.currentTimeMillis()); - username = System.getenv("CLICKHOUSE_USERNAME"); + username = ClickHouseTestHelpers.USERNAME_DEFAULT; password = System.getenv("CLICKHOUSE_CLOUD_PASSWORD"); } else { LOG.info("Init ClickHouse Docker Configuration"); diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java index 9364d81..c850d4e 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java @@ -19,8 +19,8 @@ public class ClickHouseTestHelpers { public static final String DATABASE_DEFAULT = "default"; public static final String USERNAME_DEFAULT = "default"; - private static final int CLOUD_TIMEOUT_VALUE = 900; - private static final TimeUnit CLOUD_TIMEOUT_UNIT = TimeUnit.SECONDS; + private static final int TIMEOUT_VALUE = 60; + private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS; public static String getClickhouseVersion() { String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); @@ -39,6 +39,7 @@ public static Client getClient(boolean isCloud, String host, int port, boolean s return new Client.Builder().addEndpoint(Protocol.HTTP, host, port, ssl) .setUsername(username) .setPassword(password) + .setConnectTimeout(TIMEOUT_VALUE, TIMEOUT_UNIT.toChronoUnit()) .build(); } From c018cbf3b106002814ab410d57956997bb65cd1b Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 19 May 2025 16:33:40 +0300 Subject: [PATCH 11/11] Remove redundant code --- .../test/embedded/clickhouse/ClickHouseServerForTests.java | 6 +++--- .../test/embedded/clickhouse/ClickHouseTestHelpers.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index a515f87..532dfb1 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -57,7 +57,7 @@ public static void setUp() throws InterruptedException, ExecutionException { boolean isLive = false; int counter = 0; while (counter < 5) { - isLive = ClickHouseTestHelpers.ping(isCloud, host, port, isSSL, username, password); + isLive = ClickHouseTestHelpers.ping(host, port, isSSL, username, password); if (isLive) { String createDatabase = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database); executeSql(createDatabase); @@ -78,13 +78,13 @@ public static void tearDown() { public static String getDataBase() { return database; } public static void executeSql(String sql) throws ExecutionException, InterruptedException { - Client client = ClickHouseTestHelpers.getClient(isCloud, host, port, isSSL, username, password); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); client.execute(sql).get(); } public static int countRows(String table) throws ExecutionException, InterruptedException { String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table); - Client client = ClickHouseTestHelpers.getClient(isCloud, host, port, isSSL, username, password); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); List countResult = client.queryAll(countSql); return countResult.get(0).getInteger(1); } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java index c850d4e..b079f77 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java @@ -35,7 +35,7 @@ public static boolean isCloud() { return clickHouseVersion != null && clickHouseVersion.equalsIgnoreCase("cloud"); } - public static Client getClient(boolean isCloud, String host, int port, boolean ssl, String username, String password) { + public static Client getClient(String host, int port, boolean ssl, String username, String password) { return new Client.Builder().addEndpoint(Protocol.HTTP, host, port, ssl) .setUsername(username) .setPassword(password) @@ -43,8 +43,8 @@ public static Client getClient(boolean isCloud, String host, int port, boolean s .build(); } - public static boolean ping(boolean isCloud, String host, int port, boolean ssl, String username, String password) { - Client client = getClient(isCloud(), host, port, ssl, username, password); + public static boolean ping(String host, int port, boolean ssl, String username, String password) { + Client client = getClient(host, port, ssl, username, password); return client.ping(); }