Skip to content
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ cd flink-connector-clickhouse
./gradlew build
```

### Publish locally
```bash
cd flink-connector-clickhouse
./gradlew publishToMavenLocal
```

## Testing

### Tooling
Expand Down
33 changes: 31 additions & 2 deletions flink-connector-clickhouse-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@
*/

plugins {
`maven-publish`
scala
java
id("com.github.johnrengelman.shadow") version "8.1.1"
}

val scalaVersion = "2.13.12"
val sinkVersion = "0.0.1"

repositories {
// Use Maven Central for resolving dependencies.
mavenLocal()
// mavenLocal()
maven("https://s01.oss.sonatype.org/content/groups/staging/") // Temporary until we have a Java Client release
mavenCentral()
}

extra.apply {
set("clickHouseDriverVersion", "0.8.6")
set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") // Temporary until we have a Java Client release
set("flinkVersion", "2.0.0")
set("log4jVersion","2.17.2")
set("testContainersVersion", "1.21.0")
Expand Down Expand Up @@ -128,3 +132,28 @@ tasks.register<JavaExec>("runScalaTests") {
"-s", "org.apache.flink.connector.clickhouse.test.scala.ClickHouseSinkTests"
)
}

tasks.shadowJar {
archiveClassifier.set("all")

dependencies {
exclude(dependency("org.apache.flink:.*"))
}
mergeServiceFiles()
}

tasks.jar {
enabled = false
}

publishing {
publications {
create<MavenPublication>("maven") {
//from(components["java"])
artifact(tasks.shadowJar)
groupId = "org.apache.flink.connector"
artifactId = "clickhouse"
version = sinkVersion
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.clickhouse.utils;

import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.api.ConnectionInitiationException;
import com.clickhouse.client.api.ServerException;
import org.apache.flink.connector.clickhouse.exception.RetriableException;
import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Collection;

public class Utils {

Expand All @@ -18,18 +17,18 @@ public class Utils {
private static final String CLICKHOUSE_CLIENT_ERROR_WRITE_TIMEOUT_MSG = "Write timed out after";

/**
* This will drill down to the first ClickHouseException in the exception chain
* This will drill down to the first ServerException in the exception chain
*
* @param e Exception to drill down
* @return ClickHouseException or null if none found
* @return ServerException or null if none found
*/
public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseException) {
public static Exception getRootCause(Throwable e, Boolean prioritizeServerException) {
if (e == null)
return null;

Throwable runningException = e;//We have to use Throwable because of the getCause() signature
while (runningException.getCause() != null &&
(!prioritizeClickHouseException || !(runningException instanceof ClickHouseException))) {
(!prioritizeServerException || !(runningException instanceof ServerException))) {
LOG.trace("Found exception: {}", runningException.getLocalizedMessage());
runningException = runningException.getCause();
}
Expand All @@ -45,42 +44,18 @@ public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseEx

public static void handleException(Throwable e) {
LOG.warn("Deciding how to handle exception: {}", e.getLocalizedMessage());

//Let's check if we have a ClickHouseException to reference the error code
//https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
Exception rootCause = Utils.getRootCause(e, true);
if (rootCause instanceof ClickHouseException) {
ClickHouseException clickHouseException = (ClickHouseException) rootCause;
LOG.warn("ClickHouseException code: {}", clickHouseException.getErrorCode());
switch (clickHouseException.getErrorCode()) {
case 3: // UNEXPECTED_END_OF_FILE
case 107: // FILE_DOESNT_EXIST
case 159: // TIMEOUT_EXCEEDED
case 164: // READONLY
case 202: // TOO_MANY_SIMULTANEOUS_QUERIES
case 203: // NO_FREE_CONNECTION
case 209: // SOCKET_TIMEOUT
case 210: // NETWORK_ERROR
case 241: // MEMORY_LIMIT_EXCEEDED
case 242: // TABLE_IS_READ_ONLY
case 252: // TOO_MANY_PARTS
case 285: // TOO_FEW_LIVE_REPLICAS
case 319: // UNKNOWN_STATUS_OF_INSERT
case 425: // SYSTEM_ERROR
case 999: // KEEPER_EXCEPTION
throw new RetriableException(e);
default:
LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseException.getErrorCode());
break;
if (rootCause instanceof ServerException) {
ServerException serverException = (ServerException) rootCause;
LOG.warn("ClickHouse Server Exception Code: {} isRetryable: {}", serverException.getCode(), serverException.isRetryable());
if (serverException.isRetryable()) {
throw new RetriableException(e);
}
}

//Otherwise use Root-Cause Exception Checking
if (rootCause instanceof SocketTimeoutException) {
LOG.warn("SocketTimeoutException thrown, wrapping exception: {}", e.getLocalizedMessage());
} else if (rootCause instanceof ConnectionInitiationException) {
LOG.warn("ClickHouse Connection Initiation Exception: {}", rootCause.getLocalizedMessage());
throw new RetriableException(e);
} else if (rootCause instanceof UnknownHostException) {
LOG.warn("UnknownHostException thrown, wrapping exception: {}", e.getLocalizedMessage());
} else if (rootCause instanceof SocketTimeoutException) {
LOG.warn("SocketTimeoutException thrown, wrapping exception: {}", e.getLocalizedMessage());
throw new RetriableException(e);
} else if (rootCause instanceof IOException) {
final String msg = rootCause.getMessage();
Expand All @@ -90,5 +65,4 @@ public static void handleException(Throwable e) {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ public class ClickHousePayload implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(ClickHousePayload.class);
private static final long serialVersionUID = 1L;

private int attemptCount = 1;
private final byte[] payload;
public ClickHousePayload(byte[] payload) {
this.payload = payload;
}
public byte[] getPayload() { return payload; }
public int getPayloadLength() { return payload.length; }
public int getAttemptCount() { return attemptCount; }
public void incrementAttempts() { attemptCount++; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@

public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, ClickHousePayload> {
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class);
private static final int DEFAULT_MAX_RETRIES = 3;

private final ClickHouseClientConfig clickHouseClientConfig;
private ClickHouseFormat clickHouseFormat = null;
private int numberOfRetries = DEFAULT_MAX_RETRIES;

private final Counter numBytesSendCounter;
private final Counter numRecordsSendCounter;
private final Counter numRequestSubmittedCounter;
private final Counter numOfDroppedBatchesCounter;
private final Counter numOfDroppedRecordsCounter;
private final Counter totalBatchRetriesCounter;

public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
WriterInitContext context,
Expand All @@ -42,26 +47,58 @@ public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> element
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
int numberOfRetries,
ClickHouseClientConfig clickHouseClientConfig,
ClickHouseFormat clickHouseFormat,
Collection<BufferedRequestState<ClickHousePayload>> state) {
super(elementConverter,
context,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeInBytes(maxBatchSizeInBytes)
.setMaxInFlightRequests(maxInFlightRequests)
.setMaxBufferedRequests(maxBufferedRequests)
.setMaxTimeInBufferMS(maxTimeInBufferMS)
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
.build(),
state);
context,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeInBytes(maxBatchSizeInBytes)
.setMaxInFlightRequests(maxInFlightRequests)
.setMaxBufferedRequests(maxBufferedRequests)
.setMaxTimeInBufferMS(maxTimeInBufferMS)
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
.build(),
state);
this.clickHouseClientConfig = clickHouseClientConfig;
this.clickHouseFormat = clickHouseFormat;
this.numberOfRetries = numberOfRetries;
final SinkWriterMetricGroup metricGroup = context.metricGroup();
this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
this.numOfDroppedBatchesCounter = metricGroup.counter("numOfDroppedBatches");
this.numOfDroppedRecordsCounter = metricGroup.counter("numOfDroppedRecords");
this.totalBatchRetriesCounter = metricGroup.counter("totalBatchRetries");
}


public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
WriterInitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
ClickHouseClientConfig clickHouseClientConfig,
ClickHouseFormat clickHouseFormat,
Collection<BufferedRequestState<ClickHousePayload>> state) {
this(elementConverter,
context,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
clickHouseClientConfig.getNumberOfRetries(),
clickHouseClientConfig,
clickHouseFormat,
state
);
}

@Override
Expand Down Expand Up @@ -138,11 +175,27 @@ private void handleFailedRequest(
Utils.handleException(error);
} catch (RetriableException e) {
LOG.info("Retriable exception occurred while processing request. ", e);
// TODO: send data again
resultHandler.retryForEntries(requestEntries);
// Let's try to retry
if (requestEntries != null && !requestEntries.isEmpty()) {
ClickHousePayload firstElement = requestEntries.get(0);
LOG.warn("Retry number [{}] out of [{}]", firstElement.getAttemptCount(), this.numberOfRetries);
firstElement.incrementAttempts();
if (firstElement.getAttemptCount() <= this.numberOfRetries) {
totalBatchRetriesCounter.inc();
LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - (firstElement.getAttemptCount() - 1) );
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation for remaining retry attempts is incorrect. Since getAttemptCount() is called after incrementing the counter, the correct calculation should be this.numberOfRetries - firstElement.getAttemptCount().

Suggested change
LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - (firstElement.getAttemptCount() - 1) );
LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - firstElement.getAttemptCount());

// We are not in retry threshold we can send data again
resultHandler.retryForEntries(requestEntries);
return;
} else {
LOG.warn("No attempts left going to drop batch");
}
}

}
LOG.info("completeExceptionally");
resultHandler.completeExceptionally((Exception)error);
LOG.info("Dropping {} request entries due to non-retryable failure: {}", requestEntries.size(), error.getLocalizedMessage());
numOfDroppedBatchesCounter.inc();
numOfDroppedRecordsCounter.inc(requestEntries.size());
resultHandler.completeExceptionally((Exception) error);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class ClickHouseClientConfig implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseClientConfig.class);
private static final long serialVersionUID = 1L;

private static final int DEFAULT_MAX_RETRIES = 3;

private final String url;
private final String username;
private final String password;
Expand All @@ -22,6 +24,8 @@ public class ClickHouseClientConfig implements Serializable {
private final String fullProductName;
private Boolean supportDefault = null;
private final Map<String, String> options;
private transient Client client = null;
private int numberOfRetries = DEFAULT_MAX_RETRIES;

public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) {
this.url = url;
Expand All @@ -34,23 +38,30 @@ public ClickHouseClientConfig(String url, String username, String password, Stri
}

public Client createClient(String database) {
Client client = new Client.Builder()
.addEndpoint(url)
.setUsername(username)
.setPassword(password)
.setDefaultDatabase(database)
.setClientName(fullProductName)
.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true")
.setOptions(options)
.build();
return client;
if (this.client == null) {
Client client = new Client.Builder()
.addEndpoint(url)
.setUsername(username)
.setPassword(password)
.setDefaultDatabase(database)
.setClientName(fullProductName)
.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true")
.setOptions(options)
.build();
this.client = client;
return client;
} else {
return this.client;
}
}

public Client createClient() {
return createClient(this.database);
}

public String getTableName() { return tableName; }
public String getTableName() {
return tableName;
}

public void setSupportDefault(Boolean supportDefault) {
this.supportDefault = supportDefault;
Expand All @@ -65,4 +76,12 @@ public void setOptions(Map<String, String> options) {
this.options.putAll(options);
}
}

public void setNumberOfRetries(int numberOfRetries) {
this.numberOfRetries = numberOfRetries;
}

public int getNumberOfRetries() {
return numberOfRetries;
}
}
Loading
Loading