diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d566a8a..0a57a25 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -14,6 +14,12 @@ cd flink-connector-clickhouse ./gradlew build ``` +### Publish locally + ```bash + cd flink-connector-clickhouse + ./gradlew publishToMavenLocal + ``` + ## Testing ### Tooling diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts index 6a6d7d1..f41cee5 100644 --- a/flink-connector-clickhouse-base/build.gradle.kts +++ b/flink-connector-clickhouse-base/build.gradle.kts @@ -4,20 +4,24 @@ */ plugins { + `maven-publish` scala java + id("com.github.johnrengelman.shadow") version "8.1.1" } val scalaVersion = "2.13.12" +val sinkVersion = "0.0.1" repositories { // Use Maven Central for resolving dependencies. - mavenLocal() +// mavenLocal() + maven("https://s01.oss.sonatype.org/content/groups/staging/") // Temporary until we have a Java Client release mavenCentral() } extra.apply { - set("clickHouseDriverVersion", "0.8.6") + set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") // Temporary until we have a Java Client release set("flinkVersion", "2.0.0") set("log4jVersion","2.17.2") set("testContainersVersion", "1.21.0") @@ -128,3 +132,28 @@ tasks.register("runScalaTests") { "-s", "org.apache.flink.connector.clickhouse.test.scala.ClickHouseSinkTests" ) } + +tasks.shadowJar { + archiveClassifier.set("all") + + dependencies { + exclude(dependency("org.apache.flink:.*")) + } + mergeServiceFiles() +} + +tasks.jar { + enabled = false +} + +publishing { + publications { + create("maven") { + //from(components["java"]) + artifact(tasks.shadowJar) + groupId = "org.apache.flink.connector" + artifactId = "clickhouse" + version = sinkVersion + } + } +} diff --git a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java index 7b5d5fd..ed17adc 100644 --- a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java +++ b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java @@ -1,15 +1,14 @@ package com.clickhouse.utils; -import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.api.ConnectionInitiationException; +import com.clickhouse.client.api.ServerException; import org.apache.flink.connector.clickhouse.exception.RetriableException; -import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.util.Collection; public class Utils { @@ -18,18 +17,18 @@ public class Utils { private static final String CLICKHOUSE_CLIENT_ERROR_WRITE_TIMEOUT_MSG = "Write timed out after"; /** - * This will drill down to the first ClickHouseException in the exception chain + * This will drill down to the first ServerException in the exception chain * * @param e Exception to drill down - * @return ClickHouseException or null if none found + * @return ServerException or null if none found */ - public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseException) { + public static Exception getRootCause(Throwable e, Boolean prioritizeServerException) { if (e == null) return null; Throwable runningException = e;//We have to use Throwable because of the getCause() signature while (runningException.getCause() != null && - (!prioritizeClickHouseException || !(runningException instanceof ClickHouseException))) { + (!prioritizeServerException || !(runningException instanceof ServerException))) { LOG.trace("Found exception: {}", runningException.getLocalizedMessage()); runningException = runningException.getCause(); } @@ -45,42 +44,18 @@ public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseEx public static void handleException(Throwable e) { LOG.warn("Deciding how to handle exception: {}", e.getLocalizedMessage()); - - //Let's check if we have a ClickHouseException to reference the error code - //https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp Exception rootCause = Utils.getRootCause(e, true); - if (rootCause instanceof ClickHouseException) { - ClickHouseException clickHouseException = (ClickHouseException) rootCause; - LOG.warn("ClickHouseException code: {}", clickHouseException.getErrorCode()); - switch (clickHouseException.getErrorCode()) { - case 3: // UNEXPECTED_END_OF_FILE - case 107: // FILE_DOESNT_EXIST - case 159: // TIMEOUT_EXCEEDED - case 164: // READONLY - case 202: // TOO_MANY_SIMULTANEOUS_QUERIES - case 203: // NO_FREE_CONNECTION - case 209: // SOCKET_TIMEOUT - case 210: // NETWORK_ERROR - case 241: // MEMORY_LIMIT_EXCEEDED - case 242: // TABLE_IS_READ_ONLY - case 252: // TOO_MANY_PARTS - case 285: // TOO_FEW_LIVE_REPLICAS - case 319: // UNKNOWN_STATUS_OF_INSERT - case 425: // SYSTEM_ERROR - case 999: // KEEPER_EXCEPTION - throw new RetriableException(e); - default: - LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseException.getErrorCode()); - break; + if (rootCause instanceof ServerException) { + ServerException serverException = (ServerException) rootCause; + LOG.warn("ClickHouse Server Exception Code: {} isRetryable: {}", serverException.getCode(), serverException.isRetryable()); + if (serverException.isRetryable()) { + throw new RetriableException(e); } - } - - //Otherwise use Root-Cause Exception Checking - if (rootCause instanceof SocketTimeoutException) { - LOG.warn("SocketTimeoutException thrown, wrapping exception: {}", e.getLocalizedMessage()); + } else if (rootCause instanceof ConnectionInitiationException) { + LOG.warn("ClickHouse Connection Initiation Exception: {}", rootCause.getLocalizedMessage()); throw new RetriableException(e); - } else if (rootCause instanceof UnknownHostException) { - LOG.warn("UnknownHostException thrown, wrapping exception: {}", e.getLocalizedMessage()); + } else if (rootCause instanceof SocketTimeoutException) { + LOG.warn("SocketTimeoutException thrown, wrapping exception: {}", e.getLocalizedMessage()); throw new RetriableException(e); } else if (rootCause instanceof IOException) { final String msg = rootCause.getMessage(); @@ -90,5 +65,4 @@ public static void handleException(Throwable e) { } } } - } diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java index 25fb0ae..24f6f08 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java @@ -9,10 +9,13 @@ public class ClickHousePayload implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ClickHousePayload.class); private static final long serialVersionUID = 1L; + private int attemptCount = 1; private final byte[] payload; public ClickHousePayload(byte[] payload) { this.payload = payload; } public byte[] getPayload() { return payload; } public int getPayloadLength() { return payload.length; } + public int getAttemptCount() { return attemptCount; } + public void incrementAttempts() { attemptCount++; } } diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java index 8222d1f..60eaa02 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java @@ -26,13 +26,18 @@ public class ClickHouseAsyncWriter extends AsyncSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class); + private static final int DEFAULT_MAX_RETRIES = 3; private final ClickHouseClientConfig clickHouseClientConfig; private ClickHouseFormat clickHouseFormat = null; + private int numberOfRetries = DEFAULT_MAX_RETRIES; private final Counter numBytesSendCounter; private final Counter numRecordsSendCounter; private final Counter numRequestSubmittedCounter; + private final Counter numOfDroppedBatchesCounter; + private final Counter numOfDroppedRecordsCounter; + private final Counter totalBatchRetriesCounter; public ClickHouseAsyncWriter(ElementConverter elementConverter, WriterInitContext context, @@ -42,26 +47,58 @@ public ClickHouseAsyncWriter(ElementConverter element long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + int numberOfRetries, ClickHouseClientConfig clickHouseClientConfig, ClickHouseFormat clickHouseFormat, Collection> state) { super(elementConverter, - context, - AsyncSinkWriterConfiguration.builder() - .setMaxBatchSize(maxBatchSize) - .setMaxBatchSizeInBytes(maxBatchSizeInBytes) - .setMaxInFlightRequests(maxInFlightRequests) - .setMaxBufferedRequests(maxBufferedRequests) - .setMaxTimeInBufferMS(maxTimeInBufferMS) - .setMaxRecordSizeInBytes(maxRecordSizeInBytes) - .build(), - state); + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .build(), + state); this.clickHouseClientConfig = clickHouseClientConfig; this.clickHouseFormat = clickHouseFormat; + this.numberOfRetries = numberOfRetries; final SinkWriterMetricGroup metricGroup = context.metricGroup(); this.numBytesSendCounter = metricGroup.getNumBytesSendCounter(); this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter(); this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted"); + this.numOfDroppedBatchesCounter = metricGroup.counter("numOfDroppedBatches"); + this.numOfDroppedRecordsCounter = metricGroup.counter("numOfDroppedRecords"); + this.totalBatchRetriesCounter = metricGroup.counter("totalBatchRetries"); + } + + + public ClickHouseAsyncWriter(ElementConverter elementConverter, + WriterInitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + ClickHouseClientConfig clickHouseClientConfig, + ClickHouseFormat clickHouseFormat, + Collection> state) { + this(elementConverter, + context, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes, + clickHouseClientConfig.getNumberOfRetries(), + clickHouseClientConfig, + clickHouseFormat, + state + ); } @Override @@ -138,11 +175,27 @@ private void handleFailedRequest( Utils.handleException(error); } catch (RetriableException e) { LOG.info("Retriable exception occurred while processing request. ", e); - // TODO: send data again - resultHandler.retryForEntries(requestEntries); + // Let's try to retry + if (requestEntries != null && !requestEntries.isEmpty()) { + ClickHousePayload firstElement = requestEntries.get(0); + LOG.warn("Retry number [{}] out of [{}]", firstElement.getAttemptCount(), this.numberOfRetries); + firstElement.incrementAttempts(); + if (firstElement.getAttemptCount() <= this.numberOfRetries) { + totalBatchRetriesCounter.inc(); + LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - (firstElement.getAttemptCount() - 1) ); + // We are not in retry threshold we can send data again + resultHandler.retryForEntries(requestEntries); + return; + } else { + LOG.warn("No attempts left going to drop batch"); + } + } + } - LOG.info("completeExceptionally"); - resultHandler.completeExceptionally((Exception)error); + LOG.info("Dropping {} request entries due to non-retryable failure: {}", requestEntries.size(), error.getLocalizedMessage()); + numOfDroppedBatchesCounter.inc(); + numOfDroppedRecordsCounter.inc(requestEntries.size()); + resultHandler.completeExceptionally((Exception) error); } } diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java index 9fd3b69..079e6ae 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java @@ -14,6 +14,8 @@ public class ClickHouseClientConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseClientConfig.class); private static final long serialVersionUID = 1L; + private static final int DEFAULT_MAX_RETRIES = 3; + private final String url; private final String username; private final String password; @@ -22,6 +24,8 @@ public class ClickHouseClientConfig implements Serializable { private final String fullProductName; private Boolean supportDefault = null; private final Map options; + private transient Client client = null; + private int numberOfRetries = DEFAULT_MAX_RETRIES; public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) { this.url = url; @@ -34,23 +38,30 @@ public ClickHouseClientConfig(String url, String username, String password, Stri } public Client createClient(String database) { - Client client = new Client.Builder() - .addEndpoint(url) - .setUsername(username) - .setPassword(password) - .setDefaultDatabase(database) - .setClientName(fullProductName) - .setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true") - .setOptions(options) - .build(); - return client; + if (this.client == null) { + Client client = new Client.Builder() + .addEndpoint(url) + .setUsername(username) + .setPassword(password) + .setDefaultDatabase(database) + .setClientName(fullProductName) + .setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true") + .setOptions(options) + .build(); + this.client = client; + return client; + } else { + return this.client; + } } public Client createClient() { return createClient(this.database); } - public String getTableName() { return tableName; } + public String getTableName() { + return tableName; + } public void setSupportDefault(Boolean supportDefault) { this.supportDefault = supportDefault; @@ -65,4 +76,12 @@ public void setOptions(Map options) { this.options.putAll(options); } } + + public void setNumberOfRetries(int numberOfRetries) { + this.numberOfRetries = numberOfRetries; + } + + public int getNumberOfRetries() { + return numberOfRetries; + } } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 8d023de..2af6350 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -2,6 +2,7 @@ import com.clickhouse.client.api.metadata.TableSchema; import com.clickhouse.data.ClickHouseFormat; +import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -28,21 +29,35 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests.countMerges; +import static org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests.isCloud; + public class ClickHouseSinkTests extends FlinkClusterTests { static final int EXPECTED_ROWS = 10000; - - private int executeJob(StreamExecutionEnvironment env, String tableName) throws Exception { + static final int EXPECTED_ROWS_ON_FAILURE = 0; + static final int MAX_BATCH_SIZE = 5000; + static final int MIN_BATCH_SIZE = 1; + static final int MAX_IN_FLIGHT_REQUESTS = 2; + static final int MAX_BUFFERED_REQUESTS = 20000; + static final long MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024; + static final long MAX_TIME_IN_BUFFER_MS = 5 * 1000; + static final long MAX_RECORD_SIZE_IN_BYTES = 1000; + + static final int STREAM_PARALLELISM = 5; + + private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception { JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource"); int rows = 0; int iterations = 0; - while (iterations < 10) { + while (iterations < numIterations) { Thread.sleep(1000); iterations++; rows = ClickHouseServerForTests.countRows(tableName); - System.out.println("Rows: " + rows); - if (rows == EXPECTED_ROWS) + System.out.println("Rows: " + rows + " EXPECTED_ROWS: " + expectedRows); + if (rows == expectedRows) break; + } // cancel job jobClient.cancel(); @@ -72,19 +87,19 @@ void CSVDataTest() throws Exception { ClickHouseServerForTests.executeSql(tableSql); final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); - env.setParallelism(1); + env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); ElementConverter convertorString = new ClickHouseConvertor<>(String.class); // create sink ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( convertorString, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); csvSink.setClickHouseFormat(ClickHouseFormat.CSV); @@ -101,7 +116,7 @@ void CSVDataTest() throws Exception { "GzipCsvSource" ); lines.sinkTo(csvSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -128,13 +143,11 @@ void CovidPOJODataTest() throws Exception { "ORDER BY (location_key, date); "; ClickHouseServerForTests.executeSql(tableSql); - TableSchema covidTableSchema = ClickHouseServerForTests.getTableSchema(tableName); -// POJOConvertor covidPOJOConvertor = POJOSerializable.create().createConvertor(covidTableSchema, CovidPOJO.class); POJOConvertor covidPOJOConvertor = new CovidPOJOConvertor(); final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); - env.setParallelism(5); + env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); clickHouseClientConfig.setSupportDefault(covidTableSchema.hasDefaults()); @@ -142,12 +155,12 @@ void CovidPOJODataTest() throws Exception { ClickHouseAsyncSink covidPOJOSink = new ClickHouseAsyncSink<>( convertorCovid, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); @@ -172,7 +185,7 @@ public CovidPOJO map(String value) throws Exception { }); // send to a sink covidPOJOs.sinkTo(covidPOJOSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -204,11 +217,10 @@ void SimplePOJODataTest() throws Exception { TableSchema simpleTableSchema = ClickHouseServerForTests.getTableSchema(tableName); -// POJOConvertor simplePOJOConvertor = POJOSerializable.create().createConvertor(simpleTableSchema, SimplePOJO.class); POJOConvertor simplePOJOConvertor = new SimplePOJOConvertor(); final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); - env.setParallelism(5); + env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults()); @@ -217,12 +229,12 @@ void SimplePOJODataTest() throws Exception { ClickHouseAsyncSink simplePOJOSink = new ClickHouseAsyncSink<>( convertorCovid, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); @@ -234,7 +246,7 @@ void SimplePOJODataTest() throws Exception { DataStream simplePOJOs = env.fromData(simplePOJOList.toArray(new SimplePOJO[0])); // send to a sink simplePOJOs.sinkTo(simplePOJOSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -268,12 +280,12 @@ void ProductNameTest() throws Exception { // create sink ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( convertorString, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); csvSink.setClickHouseFormat(ClickHouseFormat.CSV); @@ -290,7 +302,7 @@ void ProductNameTest() throws Exception { "GzipCsvSource" ); lines.sinkTo(csvSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS"); if (ClickHouseServerForTests.isCloud()) @@ -302,4 +314,200 @@ void ProductNameTest() throws Exception { boolean isContains = productName.contains(compareString); Assertions.assertTrue(isContains, "Expected user agent to contain: " + compareString + " but got: " + productName); } + + /** + * Suppose to drop data on failure. The way we try to generate this use case is by supplying the writer with wrong Format + * @throws Exception + */ + @Test + void CSVDataOnFailureDropDataTest() throws Exception { + String tableName = "csv_failure_covid"; + String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); + ClickHouseServerForTests.executeSql(dropTable); + // create table + String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + + "date Date," + + "location_key LowCardinality(String)," + + "new_confirmed Int32," + + "new_deceased Int32," + + "new_recovered Int32," + + "new_tested Int32," + + "cumulative_confirmed Int32," + + "cumulative_deceased Int32," + + "cumulative_recovered Int32," + + "cumulative_tested Int32" + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (location_key, date); "; + ClickHouseServerForTests.executeSql(tableSql); + + final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); + env.setParallelism(STREAM_PARALLELISM); + + + ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); + ElementConverter convertorString = new ClickHouseConvertor<>(String.class); + // create sink + ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( + convertorString, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, + clickHouseClientConfig + ); + csvSink.setClickHouseFormat(ClickHouseFormat.TSV); + + Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz"); + + FileSource source = FileSource + .forRecordStreamFormat(new TextLineInputFormat(), filePath) + .build(); + // read csv data from file + DataStreamSource lines = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "GzipCsvSource" + ); + lines.sinkTo(csvSink); + // TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); + Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows); + } + + /** + * Suppose to retry and drop data on failure. The way we try to generate this use case is by supplying a different port of ClickHouse server + * @throws Exception + */ + @Test + void CSVDataOnRetryAndDropDataTest() throws Exception { + String tableName = "csv_retry_covid"; + String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); + ClickHouseServerForTests.executeSql(dropTable); + // create table + String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + + "date Date," + + "location_key LowCardinality(String)," + + "new_confirmed Int32," + + "new_deceased Int32," + + "new_recovered Int32," + + "new_tested Int32," + + "cumulative_confirmed Int32," + + "cumulative_deceased Int32," + + "cumulative_recovered Int32," + + "cumulative_tested Int32" + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (location_key, date); "; + ClickHouseServerForTests.executeSql(tableSql); + + final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); + env.setParallelism(STREAM_PARALLELISM); + + + ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getIncorrectServerURL(), getUsername(), getPassword(), getDatabase(), tableName); + ElementConverter convertorString = new ClickHouseConvertor<>(String.class); + // create sink + ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( + convertorString, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, + clickHouseClientConfig + ); + csvSink.setClickHouseFormat(ClickHouseFormat.CSV); + + Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz"); + + FileSource source = FileSource + .forRecordStreamFormat(new TextLineInputFormat(), filePath) + .build(); + // read csv data from file + DataStreamSource lines = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "GzipCsvSource" + ); + lines.sinkTo(csvSink); + // TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); + Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows); + } + + /* + In this test, we lower the parts_to_throw_insert setting (https://clickhouse.com/docs/operations/settings/merge-tree-settings#parts_to_throw_insert) to trigger the "Too Many Parts" error more easily. + Once we exceed this threshold, ClickHouse will reject INSERT operations with a "Too Many Parts" error. + Our retry implementation will demonstrate how it handles these failures by retrying the inserts until all rows are successfully inserted. We will insert one batch containing two records to observe this behavior. + */ + @Test + void SimplePOJODataTooManyPartsTest() throws Exception { + // this test is not running on cloud + if (isCloud()) + return; + String tableName = "simple_too_many_parts_pojo"; + + String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); + ClickHouseServerForTests.executeSql(dropTable); + // create table + String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + + "bytePrimitive Int8," + + "byteObject Int8," + + "shortPrimitive Int16," + + "shortObject Int16," + + "intPrimitive Int32," + + "integerObject Int32," + + "longPrimitive Int64," + + "longObject Int64," + + "floatPrimitive Float," + + "floatObject Float," + + "doublePrimitive Double," + + "doubleObject Double," + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (longPrimitive) " + + "SETTINGS parts_to_throw_insert = 10;"; + ClickHouseServerForTests.executeSql(tableSql); + //ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName)); + + TableSchema simpleTableSchema = ClickHouseServerForTests.getTableSchema(tableName); + POJOConvertor simplePOJOConvertor = new SimplePOJOConvertor(); + + final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); + env.setParallelism(STREAM_PARALLELISM); + + ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); + clickHouseClientConfig.setNumberOfRetries(10); + clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults()); + + ElementConverter convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor); + + ClickHouseAsyncSink simplePOJOSink = new ClickHouseAsyncSink<>( + convertorCovid, + MIN_BATCH_SIZE * 2, + MAX_IN_FLIGHT_REQUESTS, + 10, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, + clickHouseClientConfig + ); + + List simplePOJOList = new ArrayList<>(); + for (int i = 0; i < EXPECTED_ROWS; i++) { + simplePOJOList.add(new SimplePOJO(i)); + } + // create from list + DataStream simplePOJOs = env.fromData(simplePOJOList.toArray(new SimplePOJO[0])); + // send to a sink + simplePOJOs.sinkTo(simplePOJOSink); + int rows = executeAsyncJob(env, tableName, 100, EXPECTED_ROWS); + Assertions.assertEquals(EXPECTED_ROWS, rows); + //ClickHouseServerForTests.executeSql(String.format("SYSTEM START MERGES `%s.%s`", getDatabase(), tableName)); + } + } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java index 9ee6463..334ba1c 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java @@ -23,6 +23,10 @@ public static String getServerURL() { return ClickHouseServerForTests.getURL(); } + public static String getIncorrectServerURL() { + return ClickHouseServerForTests.getURL(ClickHouseServerForTests.getHost(), ClickHouseServerForTests.getPort() + 1); + } + public static String getUsername() { return ClickHouseServerForTests.getUsername(); } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index b3983e6..2a72dff 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -78,7 +78,12 @@ public static void tearDown() { public static int getPort() { return port; } public static String getUsername() { return username; } public static String getPassword() { return password; } + public static String getURL() { + return ClickHouseServerForTests.getURL(host, port); + } + + public static String getURL(String host, int port) { if (isCloud) { return "https://" + host + ":" + port + "/"; } else { @@ -97,6 +102,20 @@ public static void executeSql(String sql) throws ExecutionException, Interrupted } } + public static int countParts(String table) { + String countPartsSql = String.format("SELECT count(*) FROM system.parts WHERE table = '%s' and active = 1", table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countPartsSql); + return countResult.get(0).getInteger(1); + } + + public static int countMerges(String table) { + String countPartsSql = String.format("SELECT count(*) FROM system.merges WHERE table = '%s'", table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countPartsSql); + return countResult.get(0).getInteger(1); + } + public static int countRows(String table) throws ExecutionException, InterruptedException { String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table); Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);