diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a8736ae..c1aef24 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -5,16 +5,36 @@ on: [push] jobs: build: runs-on: ubuntu-latest - name: Apache Flink ClickHouse Connector tests + strategy: + fail-fast: false + matrix: + clickhouse: [ "23.7", "24.3", "latest", "cloud" ] + name: Apache Flink ClickHouse Connector tests with ClickHouse ${{ matrix.clickhouse }} steps: + - name: Check for Cloud Credentials + id: check-cloud-credentials + run: | + if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then + echo "SKIP_STEP=true" >> $GITHUB_ENV + else + echo "SKIP_STEP=false" >> $GITHUB_ENV + fi + shell: bash - uses: actions/checkout@v3 + if: env.SKIP_STEP != 'true' - name: Set up JDK 17 + if: env.SKIP_STEP != 'true' uses: actions/setup-java@v3 with: java-version: '21' distribution: 'adopt' architecture: x64 - name: Setup and execute Gradle 'test' task + if: env.SKIP_STEP != 'true' uses: gradle/gradle-build-action@v2 + env: + CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} + CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} + CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} with: arguments: test \ No newline at end of file diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts index 708c80c..f4044f5 100644 --- a/flink-connector-clickhouse-base/build.gradle.kts +++ b/flink-connector-clickhouse-base/build.gradle.kts @@ -17,6 +17,7 @@ extra.apply { set("clickHouseDriverVersion", "0.8.5") set("flinkVersion", "2.0.0") set("log4jVersion","2.17.2") + set("testContainersVersion", "1.21.0") } @@ -56,6 +57,9 @@ dependencies { testImplementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}") // flink tests testImplementation("org.apache.flink:flink-test-utils:${project.extra["flinkVersion"]}") + // + testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}") + testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}") } // Apply a specific Java toolchain to ease working on different environments. diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java index 4ac99ae..e958437 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java @@ -1,6 +1,8 @@ package org.apache.flink.connector.test; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests; +import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests; import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -9,8 +11,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; -class DummyFlinkClusterTest extends EmbeddedFlinkClusterForTests { +class DummyFlinkClusterTest extends FlinkClusterTests { // A simple Collection Sink private static class CollectSink implements SinkFunction { @@ -39,11 +42,19 @@ void testDummyFlinkCluster() throws Exception { CollectSink.values.clear(); env.fromData(1L, 2L) + .setParallelism(1) .map(new IncrementMapFunction()) .addSink(new CollectSink()); env.execute("testDummyFlinkCluster"); Assertions.assertEquals(2, CollectSink.values.size()); - Assertions.assertEquals(2L, CollectSink.values.get(0)); - Assertions.assertEquals(3L, CollectSink.values.get(1)); + } + + @Test + void testClickHouse() throws ExecutionException, InterruptedException { + String tableName = "clickhouse_test"; + String createTableSQl = String.format("CREATE TABLE `%s`.`%s` (order_id UInt64) ENGINE = MergeTree ORDER BY tuple(order_id);", ClickHouseServerForTests.getDataBase(), tableName); + ClickHouseServerForTests.executeSql(createTableSQl); + int rows = ClickHouseServerForTests.countRows(tableName); + Assertions.assertEquals(0, rows); } } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java new file mode 100644 index 0000000..a92b6f1 --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java @@ -0,0 +1,21 @@ +package org.apache.flink.connector.test; + +import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests; +import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class FlinkClusterTests { + @BeforeAll + static void setUp() throws Exception { + EmbeddedFlinkClusterForTests.setUp(); + ClickHouseServerForTests.setUp(); + } + + @AfterAll + static void tearDown() { + EmbeddedFlinkClusterForTests.tearDown(); + ClickHouseServerForTests.tearDown(); + } + +} diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java new file mode 100644 index 0000000..532dfb1 --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -0,0 +1,91 @@ +package org.apache.flink.connector.test.embedded.clickhouse; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.query.GenericRecord; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.testcontainers.clickhouse.ClickHouseContainer; + +public class ClickHouseServerForTests { + + private static final Logger LOG = LoggerFactory.getLogger(ClickHouseServerForTests.class); + + protected static boolean isCloud = ClickHouseTestHelpers.isCloud(); + protected static String database = null; + protected static ClickHouseContainer db = null; + + protected static String host = null; + protected static int port = 0; + protected static String username = null; + protected static String password = null; + protected static boolean isSSL = false; + + + public static void initConfiguration() { + if (isCloud) { + LOG.info("Init ClickHouse Cloud Configuration"); + host = System.getenv("CLICKHOUSE_CLOUD_HOST"); + port = Integer.parseInt(ClickHouseTestHelpers.HTTPS_PORT); + database = String.format("flink_connector_test_%s", System.currentTimeMillis()); + username = ClickHouseTestHelpers.USERNAME_DEFAULT; + password = System.getenv("CLICKHOUSE_CLOUD_PASSWORD"); + } else { + LOG.info("Init ClickHouse Docker Configuration"); + host = db.getHost(); + port = db.getFirstMappedPort(); + database = ClickHouseTestHelpers.DATABASE_DEFAULT; + username = db.getUsername(); + password = db.getPassword(); + } + isSSL = ClickHouseTestHelpers.isCloud(); + } + public static void setUp() throws InterruptedException, ExecutionException { + if (!isCloud) { + db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE).withPassword("test_password").withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1"); + db.start(); + } + initConfiguration(); + // wakeup cloud + // have a for loop + boolean isLive = false; + int counter = 0; + while (counter < 5) { + isLive = ClickHouseTestHelpers.ping(host, port, isSSL, username, password); + if (isLive) { + String createDatabase = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database); + executeSql(createDatabase); + return; + } + Thread.sleep(2000); + counter++; + } + throw new RuntimeException("Failed to connect to ClickHouse"); + } + + public static void tearDown() { + if (db != null) { + db.stop(); + } + } + + public static String getDataBase() { return database; } + + public static void executeSql(String sql) throws ExecutionException, InterruptedException { + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + client.execute(sql).get(); + } + + public static int countRows(String table) throws ExecutionException, InterruptedException { + String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countSql); + return countResult.get(0).getInteger(1); + } +} diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java new file mode 100644 index 0000000..b079f77 --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java @@ -0,0 +1,51 @@ +package org.apache.flink.connector.test.embedded.clickhouse; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.enums.Protocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class ClickHouseTestHelpers { + private static final Logger LOG = LoggerFactory.getLogger(ClickHouseTestHelpers.class); + + public static final String CLICKHOUSE_VERSION_DEFAULT = "24.3"; + public static final String CLICKHOUSE_PROXY_VERSION_DEFAULT = "23.8"; + public static final String CLICKHOUSE_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", getClickhouseVersion()); + public static final String CLICKHOUSE_FOR_PROXY_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", CLICKHOUSE_PROXY_VERSION_DEFAULT); + + public static final String HTTPS_PORT = "8443"; + public static final String DATABASE_DEFAULT = "default"; + public static final String USERNAME_DEFAULT = "default"; + + private static final int TIMEOUT_VALUE = 60; + private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS; + + public static String getClickhouseVersion() { + String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); + if (clickHouseVersion == null) { + clickHouseVersion = CLICKHOUSE_VERSION_DEFAULT; + } + return clickHouseVersion; + } + + public static boolean isCloud() { + String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); + return clickHouseVersion != null && clickHouseVersion.equalsIgnoreCase("cloud"); + } + + public static Client getClient(String host, int port, boolean ssl, String username, String password) { + return new Client.Builder().addEndpoint(Protocol.HTTP, host, port, ssl) + .setUsername(username) + .setPassword(password) + .setConnectTimeout(TIMEOUT_VALUE, TIMEOUT_UNIT.toChronoUnit()) + .build(); + } + + public static boolean ping(String host, int port, boolean ssl, String username, String password) { + Client client = getClient(host, port, ssl, username, password); + return client.ping(); + } + +} diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/EmbeddedFlinkClusterForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java similarity index 89% rename from flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/EmbeddedFlinkClusterForTests.java rename to flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java index 31a9bb0..63a8447 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/EmbeddedFlinkClusterForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.test; +package org.apache.flink.connector.test.embedded.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; @@ -21,8 +21,7 @@ static int getFromEnvOrDefault(String key, int defaultValue) { return Integer.parseInt(value); } - @BeforeAll - static void setUp() throws Exception { + public static void setUp() throws Exception { Configuration config = new Configuration(); config.set(RestOptions.PORT, REST_PORT); // web UI port (optional) config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS); @@ -34,14 +33,13 @@ static void setUp() throws Exception { .build()); flinkCluster.before(); } - @AfterAll - static void tearDown() { + public static void tearDown() { if (flinkCluster != null) { flinkCluster.after(); } } - protected static MiniClusterWithClientResource getMiniCluster() { + public static MiniClusterWithClientResource getMiniCluster() { if (flinkCluster == null) throw new RuntimeException("No MiniCluster available"); return flinkCluster; diff --git a/flink-connector-clickhouse-base/src/test/resources/log4j.properties b/flink-connector-clickhouse-base/src/test/resources/log4j.properties new file mode 100644 index 0000000..0b4ff80 --- /dev/null +++ b/flink-connector-clickhouse-base/src/test/resources/log4j.properties @@ -0,0 +1,8 @@ +# Define the root logger with appender X +log4j.rootLogger=INFO, console +#log4j.logger.org.testcontainers=WARN +log4j.logger.com.clickhouse.kafka=DEBUG + +log4j.appender.console= org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.conversionPattern=[%d] %p %C %m%n \ No newline at end of file