Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated debezium version to 2.5.0.Beta1 #414

Merged
merged 20 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fa9010b
Updated debezium version to 2.5.0.Beta1
subkanthi Dec 11, 2023
fed5d08
Updated debezium version to 2.5.0.Beta1
subkanthi Dec 11, 2023
07cecf4
Added new parser events
subkanthi Dec 11, 2023
244ca04
Removed old parser events
subkanthi Dec 11, 2023
c4e0e6c
Updated function definition with newer debezium version
subkanthi Dec 11, 2023
a40afc2
Removed excessive debug logging
subkanthi Dec 12, 2023
523bd0f
Debug logs added to trace buffer logic.
subkanthi Dec 12, 2023
3635303
Added logic to change DATETIME limits and escape table name with back…
subkanthi Dec 13, 2023
2a4bccf
Fixed integration tests
subkanthi Dec 14, 2023
37d7278
Disable employees test as it fails in github actions
subkanthi Dec 18, 2023
38ccb27
Added monitoring thread that will restart debezium event loop if the …
subkanthi Dec 19, 2023
4dd1369
Added configuration variable for restarting debezium event loop and t…
subkanthi Dec 19, 2023
87f84b6
Removed unused imports in tests
subkanthi Dec 19, 2023
4a861c4
Restart event loop logic based on last record timestamp
subkanthi Dec 19, 2023
8140bb1
Fixed typo
subkanthi Dec 19, 2023
c2a1f8a
Added logic to batch records based on the configuration variable
subkanthi Dec 20, 2023
f556749
Fixed REST API integration test
subkanthi Dec 20, 2023
0c11178
Added new configuration variables to document
subkanthi Dec 20, 2023
3da9693
Added new configuration variables to document
subkanthi Dec 20, 2023
f30b7ea
Merge branch 'develop' of github.com:Altinity/clickhouse-sink-connect…
subkanthi Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 30 additions & 27 deletions doc/configuration.md

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ database.port: "3306"
database.user: "root"
database.password: "root"
database.server.name: "ER54"
database.include.list: sbtest
database.include.list: test
#table.include.list=sbtest1
clickhouse.server.url: "clickhouse"
clickhouse.server.user: "root"
Expand Down Expand Up @@ -48,3 +48,6 @@ auto.create.tables: "true"
#clickhouse.datetime.timezone: "UTC"
#skip_replica_start: "false"
#binary.handling.mode: "base64"
#restart.event.loop: "true"
#restart.event.loop.timeout.period.secs: "3000"
#buffer.max.records: "10000"
10 changes: 8 additions & 2 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.5.0.Alpha1</version.debezium>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.junit>5.9.1</version.junit>
<version.testcontainers>1.19.1</version.testcontainers>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
Expand Down Expand Up @@ -111,7 +111,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-jdbc</artifactId>
<version>2.4.0.Final</version>
<version>${version.debezium}</version>
<!-- <scope>system</scope>
<systemPath>${pom.basedir}/jar/debezium-storage-jdbc-2.4.0-SNAPSHOT.jar</systemPath> -->
</dependency>
Expand Down Expand Up @@ -493,6 +493,12 @@
<!-- <forkCount>10</forkCount> -->
<parallel>all</parallel>
<threadCount>10</threadCount>
<properties>
<property>
<name>listener</name>
<value>com.altinity.clickhouse.debezium.embedded.FailFastListener</value>
</property>
</properties>
<useUnlimitedThreads>true</useUnlimitedThreads>
<perCoreThreadCount>true</perCoreThreadCount>
<useSystemClassLoader>true</useSystemClassLoader>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.debezium.engine.DebeziumEngine;
import io.javalin.Javalin;
import io.javalin.http.HttpStatus;
import org.apache.log4j.ConsoleAppender;
Expand All @@ -23,7 +23,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

Expand All @@ -38,6 +41,12 @@ public class ClickHouseDebeziumEmbeddedApplication {

private static Properties userProperties = new Properties();

private static Injector injector;

private static Properties props;

private static Timer monitoringTimer;

/**
* Main Entry for the application
* @param args arguments
Expand All @@ -57,9 +66,9 @@ public static void main(String[] args) throws Exception {
} else {
LogManager.getRootLogger().setLevel(Level.INFO);
}
Injector injector = Guice.createInjector(new AppInjector());
injector = Guice.createInjector(new AppInjector());

Properties props = new Properties();
props = new Properties();
if(args.length > 0) {
log.info(String.format("****** CONFIGURATION FILE: %s ********", args[0]));

Expand All @@ -85,6 +94,8 @@ public static void main(String[] args) throws Exception {
log.error("Error starting REST API server", e);
}

setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)));

embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
}
Expand All @@ -100,7 +111,7 @@ public void startRestApi(Properties props, Injector injector) {
ctx.result("Hello World");
});
app.get("/stop", ctx -> {
debeziumChangeEventCapture.stop();
stop();
});
Properties finalProps1 = props;
app.get("/status", ctx -> {
Expand Down Expand Up @@ -201,26 +212,79 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector
}


public void start(DebeziumRecordParserService recordParserService,
DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception {
// Define the configuration for the Debezium Engine with MySQL connector...
// log.debug("Loading properties");
//final Properties props = new ConfigLoader().load();

public static void start(DebeziumRecordParserService recordParserService,
DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception {

debeziumChangeEventCapture = new DebeziumChangeEventCapture();
debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, forceStart);

setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)));

}

public void start(DebeziumRecordParserService recordParserService,
Properties props,
DDLParserService ddlParserService) throws Exception {
// Define the configuration for the Debezium Engine with MySQL connector...
log.debug("Loading properties");
public static void stop() throws IOException {
debeziumChangeEventCapture.stop();

debeziumChangeEventCapture = new DebeziumChangeEventCapture();
debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, false);
if (monitoringTimer != null) {
monitoringTimer.cancel();
monitoringTimer.purge();
}
}

/**
* Function to setup monitoring thread.
* @param config
*/
private static void setupMonitoringThread(ClickHouseSinkConnectorConfig config) {

try {
// Stop the timer, if its already running.

boolean restartEventLoop = config.getBoolean(String.valueOf(ClickHouseSinkConnectorConfigVariables.RESTART_EVENT_LOOP));

if (!restartEventLoop) {
return;
}

long restartEventLoopTimeout = config.getLong(String.valueOf(ClickHouseSinkConnectorConfigVariables.RESTART_EVENT_LOOP_TIMEOUT_PERIOD));

TimerTask timerTask = new TimerTask() {
@Override
public void run() {
if (debeziumChangeEventCapture == null) {
return;
}
try {
long lastRecordTimestamp = debeziumChangeEventCapture.getLastRecordTimestamp();
if(lastRecordTimestamp == -1) {
return;
}
// calculate delta.
long deltaInSecs = (System.currentTimeMillis() - lastRecordTimestamp) / 1000;
log.info("Last Record Timestamp: " + lastRecordTimestamp + " Delta: " + deltaInSecs + " Restart Event Loop Timeout: " + restartEventLoopTimeout);
if (deltaInSecs < restartEventLoopTimeout) {
return;
}
log.info("******* Restarting Event Loop ********");
debeziumChangeEventCapture.stop();
Thread.sleep(3000);
start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}

}
};
//running timer task as daemon thread
monitoringTimer = new Timer(true);
monitoringTimer.scheduleAtFixedRate(timerTask, 0, restartEventLoopTimeout * 1000);

} catch (Exception e) {
log.error("Error setting up monitoring thread", e);
}

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -35,9 +36,7 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -68,6 +67,8 @@ public class DebeziumChangeEventCapture {

private long replicationLag = 0;

private long lastRecordTimestamp = -1;

private boolean isReplicationRunning = false;

private String binLogFile = "";
Expand Down Expand Up @@ -191,17 +192,17 @@ private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord
} else {
ClickHouseStruct chStruct = debeziumRecordParserService.parse(sr);
try {
this.replicationLag = chStruct.getReplicationLag();
this.binLogFile = chStruct.getFile();
this.binLogPosition = String.valueOf(chStruct.getPos());
this.gtid = String.valueOf(chStruct.getGtid());
if(chStruct != null) {
this.replicationLag = chStruct.getReplicationLag();
this.lastRecordTimestamp = chStruct.getTs_ms();
this.binLogFile = chStruct.getFile();
this.binLogPosition = String.valueOf(chStruct.getPos());
this.gtid = String.valueOf(chStruct.getGtid());
}
} catch(Exception e) {
log.error("Error retrieving status metrics");
}
ConcurrentLinkedQueue<ClickHouseStruct> queue = new ConcurrentLinkedQueue<ClickHouseStruct>();
if (chStruct != null) {
queue.add(chStruct);
log.error("Error retrieving status metrics: Exception" + e.toString());
}

synchronized (this.records) {
if (chStruct != null) {
addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
Expand Down Expand Up @@ -441,6 +442,15 @@ public void setupDebeziumEventCapture(Properties props, DebeziumRecordParserServ
processEveryChangeRecord(props, record, debeziumRecordParserService, config);

});
// changeEventBuilder.notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() {
// @Override
// public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list,
// DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) throws InterruptedException {
// for(ChangeEvent<SourceRecord, SourceRecord> record : list) {
// processEveryChangeRecord(props, record, debeziumRecordParserService, config);
// }
// }
// });
this.engine = changeEventBuilder
.using(new DebeziumConnectorCallback()).using(new DebeziumEngine.CompletionCallback() {
@Override
Expand Down Expand Up @@ -479,7 +489,9 @@ public void connectorStopped() {
log.debug("Connector stopped");
}
}
).build();
)
//.build();
.using(OffsetCommitPolicy.always()).build();
engine.run();

} catch (Exception e) {
Expand Down Expand Up @@ -530,6 +542,10 @@ public long getReplicationLagInSecs() {
return this.replicationLag / 1000;
}

public long getLastRecordTimestamp() {
return this.lastRecordTimestamp;
}

public boolean isReplicationRunning() {
return this.isReplicationRunning;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6758,12 +6758,22 @@ public void exitBitOperator(MySqlParser.BitOperatorContext bitOperatorContext) {
}

@Override
public void enterMathOperator(MySqlParser.MathOperatorContext mathOperatorContext) {
public void enterMultOperator(MySqlParser.MultOperatorContext multOperatorContext) {

}

@Override
public void exitMathOperator(MySqlParser.MathOperatorContext mathOperatorContext) {
public void exitMultOperator(MySqlParser.MultOperatorContext multOperatorContext) {

}

@Override
public void enterAddOperator(MySqlParser.AddOperatorContext addOperatorContext) {

}

@Override
public void exitAddOperator(MySqlParser.AddOperatorContext addOperatorContext) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ public static ClickHouseDataType convert(String columnName, MySqlParser.DataType
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
MySqlValueConverters::adjustTemporal,
MySqlValueConverters::defaultParsingErrorHandler);
CommonConnectorConfig.BinaryHandlingMode.BYTES
);


DataType dataType = initializeDataTypeResolver().resolveDataType(columnDefChild);
Expand All @@ -52,9 +51,8 @@ public static String convertToString(String columnName, int scale, int precision
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
MySqlValueConverters::adjustTemporal,
MySqlValueConverters::defaultParsingErrorHandler);
CommonConnectorConfig.BinaryHandlingMode.BYTES
);


String convertedDataType = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.altinity.clickhouse.debezium.embedded;

import org.junit.runner.Description;
import org.junit.runner.notification.Failure;
import org.junit.runner.notification.RunListener;

public class FailFastListener extends RunListener {

public void testFailure(Failure failure) throws Exception {
System.err.println("FAILURE: " + failure);
System.exit(-1);
}

@Override
public void testFinished(Description description) throws Exception {
System.exit(-1);
}
}
Loading
Loading