Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,36 @@ on: [push]
jobs:
build:
runs-on: ubuntu-latest
name: Apache Flink ClickHouse Connector tests
strategy:
fail-fast: false
matrix:
clickhouse: [ "23.7", "24.3", "latest", "cloud" ]
name: Apache Flink ClickHouse Connector tests with ClickHouse ${{ matrix.clickhouse }}
steps:
- name: Check for Cloud Credentials
id: check-cloud-credentials
run: |
if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then
echo "SKIP_STEP=true" >> $GITHUB_ENV
else
echo "SKIP_STEP=false" >> $GITHUB_ENV
fi
shell: bash
- uses: actions/checkout@v3
if: env.SKIP_STEP != 'true'
- name: Set up JDK 17
if: env.SKIP_STEP != 'true'
uses: actions/setup-java@v3
with:
java-version: '21'
distribution: 'adopt'
architecture: x64
- name: Setup and execute Gradle 'test' task
if: env.SKIP_STEP != 'true'
uses: gradle/gradle-build-action@v2
env:
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
with:
arguments: test
4 changes: 4 additions & 0 deletions flink-connector-clickhouse-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extra.apply {
set("clickHouseDriverVersion", "0.8.5")
set("flinkVersion", "2.0.0")
set("log4jVersion","2.17.2")
set("testContainersVersion", "1.21.0")
}


Expand Down Expand Up @@ -56,6 +57,9 @@ dependencies {
testImplementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}")
// flink tests
testImplementation("org.apache.flink:flink-test-utils:${project.extra["flinkVersion"]}")
//
testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}")
testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}")
}

// Apply a specific Java toolchain to ease working on different environments.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.apache.flink.connector.test;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests;
import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests;
Copy link

Copilot AI May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is no longer used after changing the base class to FlinkClusterTests. Consider removing it to clean up unused imports.

Suggested change
import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests;

Copilot uses AI. Check for mistakes.
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand All @@ -9,8 +11,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

class DummyFlinkClusterTest extends EmbeddedFlinkClusterForTests {
class DummyFlinkClusterTest extends FlinkClusterTests {
// A simple Collection Sink
private static class CollectSink implements SinkFunction<Long> {

Expand Down Expand Up @@ -39,11 +42,19 @@ void testDummyFlinkCluster() throws Exception {
CollectSink.values.clear();

env.fromData(1L, 2L)
.setParallelism(1)
.map(new IncrementMapFunction())
.addSink(new CollectSink());
env.execute("testDummyFlinkCluster");
Assertions.assertEquals(2, CollectSink.values.size());
Assertions.assertEquals(2L, CollectSink.values.get(0));
Assertions.assertEquals(3L, CollectSink.values.get(1));
}

@Test
void testClickHouse() throws ExecutionException, InterruptedException {
String tableName = "clickhouse_test";
String createTableSQl = String.format("CREATE TABLE `%s`.`%s` (order_id UInt64) ENGINE = MergeTree ORDER BY tuple(order_id);", ClickHouseServerForTests.getDataBase(), tableName);
ClickHouseServerForTests.executeSql(createTableSQl);
int rows = ClickHouseServerForTests.countRows(tableName);
Assertions.assertEquals(0, rows);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.flink.connector.test;

import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests;
import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class FlinkClusterTests {
@BeforeAll
static void setUp() throws Exception {
EmbeddedFlinkClusterForTests.setUp();
ClickHouseServerForTests.setUp();
}

@AfterAll
static void tearDown() {
EmbeddedFlinkClusterForTests.tearDown();
ClickHouseServerForTests.tearDown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.apache.flink.connector.test.embedded.clickhouse;

import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.query.GenericRecord;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.testcontainers.clickhouse.ClickHouseContainer;

public class ClickHouseServerForTests {

private static final Logger LOG = LoggerFactory.getLogger(ClickHouseServerForTests.class);

protected static boolean isCloud = ClickHouseTestHelpers.isCloud();
protected static String database = null;
protected static ClickHouseContainer db = null;

protected static String host = null;
protected static int port = 0;
protected static String username = null;
protected static String password = null;
protected static boolean isSSL = false;


public static void initConfiguration() {
if (isCloud) {
LOG.info("Init ClickHouse Cloud Configuration");
host = System.getenv("CLICKHOUSE_CLOUD_HOST");
port = Integer.parseInt(ClickHouseTestHelpers.HTTPS_PORT);
database = String.format("flink_connector_test_%s", System.currentTimeMillis());
username = ClickHouseTestHelpers.USERNAME_DEFAULT;
password = System.getenv("CLICKHOUSE_CLOUD_PASSWORD");
} else {
LOG.info("Init ClickHouse Docker Configuration");
host = db.getHost();
port = db.getFirstMappedPort();
database = ClickHouseTestHelpers.DATABASE_DEFAULT;
username = db.getUsername();
password = db.getPassword();
}
isSSL = ClickHouseTestHelpers.isCloud();
}
public static void setUp() throws InterruptedException, ExecutionException {
if (!isCloud) {
db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE).withPassword("test_password").withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1");
db.start();
}
initConfiguration();
// wakeup cloud
// have a for loop
boolean isLive = false;
int counter = 0;
while (counter < 5) {
isLive = ClickHouseTestHelpers.ping(host, port, isSSL, username, password);
if (isLive) {
String createDatabase = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database);
executeSql(createDatabase);
return;
}
Thread.sleep(2000);
counter++;
}
throw new RuntimeException("Failed to connect to ClickHouse");
}

public static void tearDown() {
if (db != null) {
db.stop();
}
}

public static String getDataBase() { return database; }

public static void executeSql(String sql) throws ExecutionException, InterruptedException {
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
client.execute(sql).get();
}

public static int countRows(String table) throws ExecutionException, InterruptedException {
String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
List<GenericRecord> countResult = client.queryAll(countSql);
return countResult.get(0).getInteger(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.apache.flink.connector.test.embedded.clickhouse;

import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.enums.Protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class ClickHouseTestHelpers {
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseTestHelpers.class);

public static final String CLICKHOUSE_VERSION_DEFAULT = "24.3";
public static final String CLICKHOUSE_PROXY_VERSION_DEFAULT = "23.8";
public static final String CLICKHOUSE_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", getClickhouseVersion());
public static final String CLICKHOUSE_FOR_PROXY_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", CLICKHOUSE_PROXY_VERSION_DEFAULT);

public static final String HTTPS_PORT = "8443";
public static final String DATABASE_DEFAULT = "default";
public static final String USERNAME_DEFAULT = "default";

private static final int TIMEOUT_VALUE = 60;
private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;

public static String getClickhouseVersion() {
String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION");
if (clickHouseVersion == null) {
clickHouseVersion = CLICKHOUSE_VERSION_DEFAULT;
}
return clickHouseVersion;
}

public static boolean isCloud() {
String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION");
return clickHouseVersion != null && clickHouseVersion.equalsIgnoreCase("cloud");
}

public static Client getClient(String host, int port, boolean ssl, String username, String password) {
return new Client.Builder().addEndpoint(Protocol.HTTP, host, port, ssl)
.setUsername(username)
.setPassword(password)
.setConnectTimeout(TIMEOUT_VALUE, TIMEOUT_UNIT.toChronoUnit())
.build();
}

public static boolean ping(String host, int port, boolean ssl, String username, String password) {
Client client = getClient(host, port, ssl, username, password);
return client.ping();
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.flink.connector.test;
package org.apache.flink.connector.test.embedded.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
Expand All @@ -21,8 +21,7 @@ static int getFromEnvOrDefault(String key, int defaultValue) {
return Integer.parseInt(value);
}

@BeforeAll
static void setUp() throws Exception {
public static void setUp() throws Exception {
Configuration config = new Configuration();
config.set(RestOptions.PORT, REST_PORT); // web UI port (optional)
config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS);
Expand All @@ -34,14 +33,13 @@ static void setUp() throws Exception {
.build());
flinkCluster.before();
}
@AfterAll
static void tearDown() {
public static void tearDown() {
if (flinkCluster != null) {
flinkCluster.after();
}
}

protected static MiniClusterWithClientResource getMiniCluster() {
public static MiniClusterWithClientResource getMiniCluster() {
if (flinkCluster == null)
throw new RuntimeException("No MiniCluster available");
return flinkCluster;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Define the root logger with appender X
log4j.rootLogger=INFO, console
#log4j.logger.org.testcontainers=WARN
log4j.logger.com.clickhouse.kafka=DEBUG

log4j.appender.console= org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.conversionPattern=[%d] %p %C %m%n
Loading