Skip to content

Commit

Permalink
Merge branch 'Altinity:develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
vpol committed May 21, 2024
2 parents cfebd9f + 625150a commit db2414d
Show file tree
Hide file tree
Showing 119 changed files with 3,483 additions and 539 deletions.
24 changes: 15 additions & 9 deletions doc/Monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,30 @@ record_insert_seq:

```select event_time, database, table, rows, duration_ms,size_in_bytes from system.part_log where table='table' and event_type='NewPart' and event_time > now () - interval 30 minute and database='db' ;```


## Sink Connector (Kafka) monitoring

Sink Connector Config
OpenJDK 11.0.14.1

-Xms256M, -Xmx2G,

## Grafana Dashboard
JMX metrics of sink connector are exposed through the port

The JMX_exporter docker image scrapes the JMX metrics from the sink connector
The metrics can be read through the following URL
http://localhost:9072/metrics

A Grafana dashboard is included to view JMX metrics.
The docker-compose launches Grafana application which can be accessed in **http://localhost:3000**
The default username/password is `admin/admin`
![](img/Grafana_dashboard.png)
![](img/Grafana_dashboard_2.png)


**Memory**

Sink Connector Config
OpenJDK 11.0.14.1

-Xms256M, -Xmx2G,



Throughput
**Throughput**
Increase the `fetch.min.bytes` property to increase the size of message
consumed \
[1] https://strimzi.io/blog/2021/01/07/consumer-tuning/
Expand Down
9 changes: 8 additions & 1 deletion doc/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ Requirements
- Maven (mvn) (https://maven.apache.org/download.cgi)
- Docker and Docker-compose

Install JDK(For Mac)
```
brew install openjdk@17
export JAVA_HOME=/opt/homebrew/Cellar/openjdk@17/17.0.11/libexec/openjdk.jdk/Contents/Home/
mvn -v
# verify it's actual openjdk 17 used and continue with steps
```

1. Clone the ClickHouse Sink connector repository:
```bash
Expand All @@ -26,4 +33,4 @@ cd ../sink-connector-lightweight
mvn install -DskipTests=true
```

The JAR file will be created in the `target` directory.
The JAR file will be created in the `target` directory.
53 changes: 53 additions & 0 deletions doc/img/mysql_ddl_execution
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<mxfile host="app.diagrams.net" modified="2024-03-05T11:52:46.132Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" etag="6-FdwkM_er-bKnZmaXG9" version="23.0.2" type="device">
<diagram name="Page-1" id="4kWa008UeLW88EDx8_1b">
<mxGraphModel dx="1360" dy="759" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="850" pageHeight="1100" math="0" shadow="0">
<root>
<mxCell id="0" />
<mxCell id="1" parent="0" />
<mxCell id="Kt27N8K72PIJ8C6K6JpF-4" value="" style="whiteSpace=wrap;html=1;aspect=fixed;" vertex="1" parent="1">
<mxGeometry x="160" y="40" width="470" height="470" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-7" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" edge="1" parent="1" source="Kt27N8K72PIJ8C6K6JpF-2" target="Kt27N8K72PIJ8C6K6JpF-6">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-2" value="" style="outlineConnect=0;dashed=0;verticalLabelPosition=bottom;verticalAlign=top;align=center;html=1;shape=mxgraph.aws3.mysql_db_instance_2;fillColor=#2E73B8;gradientColor=none;" vertex="1" parent="1">
<mxGeometry x="30" y="170" width="60" height="80" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-5" value="ClickHouse" style="shape=cylinder3;whiteSpace=wrap;html=1;boundedLbl=1;backgroundOutline=1;size=15;fillColor=#fff2cc;strokeColor=#d6b656;" vertex="1" parent="1">
<mxGeometry x="730" y="240" width="60" height="80" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-14" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;" edge="1" parent="1" source="Kt27N8K72PIJ8C6K6JpF-6" target="Kt27N8K72PIJ8C6K6JpF-10">
<mxGeometry relative="1" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-6" value="Main&lt;br&gt;Thread&lt;br&gt;(DDL)" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="200" y="200" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-10" value="&lt;b&gt;Thread&amp;nbsp;&lt;br&gt;Pool&lt;/b&gt;" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="430" y="80" width="180" height="370" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-8" value="Thread-1" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="460" y="100" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-11" value="Thread-2" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="460" y="180" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-12" value="Thread-3" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="460" y="300" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-13" value="Thread-4" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
<mxGeometry x="460" y="380" width="120" height="60" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-15" value="&lt;b&gt;Sink Connector Lighweight&lt;/b&gt;" style="text;html=1;strokeColor=none;fillColor=none;align=center;verticalAlign=middle;whiteSpace=wrap;rounded=0;" vertex="1" parent="1">
<mxGeometry x="330" y="520" width="150" height="30" as="geometry" />
</mxCell>
<mxCell id="Kt27N8K72PIJ8C6K6JpF-16" value="" style="endArrow=classic;html=1;rounded=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;entryPerimeter=0;exitX=0.996;exitY=0.516;exitDx=0;exitDy=0;exitPerimeter=0;" edge="1" parent="1" source="Kt27N8K72PIJ8C6K6JpF-4" target="Kt27N8K72PIJ8C6K6JpF-5">
<mxGeometry width="50" height="50" relative="1" as="geometry">
<mxPoint x="400" y="420" as="sourcePoint" />
<mxPoint x="450" y="370" as="targetPoint" />
<Array as="points" />
</mxGeometry>
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
42 changes: 42 additions & 0 deletions release-notes/2.1.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
## What's Changed

## Breaking Changes.
The configuration `clickhouse.server.database` is now deprecated with the multiple database support.
By default the source MySQL/postgres database name will be used as the ClickHouse database name.

## Changes
* Release 2.0.2 by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/510
* Added release notes for 2.0.2 by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/527
* add 1 second delay after query execution by @Selfeer in https://github.com/Altinity/clickhouse-sink-connector/pull/537
* Update README.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/539
* Update Monitoring.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/541
* Change index_granularity to 8192 instead of 8198. by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/534
* Refactor TestFlows tests related to Lightweight by @Selfeer in https://github.com/Altinity/clickhouse-sink-connector/pull/543
* Update config.yml to include database.server.id by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/544
* Update Troubleshooting.md by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/545
* Update Monitoring.md to include insert duration query and part log query by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/546
* Removed references to deduplication.policy in kafka configuration by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/547
* Use sequence number + timestamp in non-gtid mode for version column. by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/550
* Added logic to support multiple databases by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/535
* 523 handle scenario when records could be inserted with the same timestampnon gtid mode by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/552
* [528] Added logic to create view for replica_source_info table by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/549
* Replaced slf4j calls with log4j2 api calls by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/553
* Can't load table from Postgres to Clickhouse containing nullable numeric column by @ZlobnyiSerg in https://github.com/Altinity/clickhouse-sink-connector/pull/529
* Kafka fixes for multiple database. by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/555
* Added integration test to perform updates on PK to verify incrementin… by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/554
* Enable postgres tests by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/556
* Grafana - Fix prometheus targets by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/558
* Fixed logic of creating sequence number based on debezium timestamp, … by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/557
* Removed excessive logging statements by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/565
* Changed CREATE VIEW to CREATE VIEW IF NOT EXISTS by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/567
* Fix alter drop column by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/560
* Force RMT to old version for Integration tests by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/571
* Changed from ts_ms to debezium ts_ms for adding sequence numbers by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/569
* Fixed alter table change column not null DDL query by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/573
* Changes to make sure the threads are exited when the CLI stop command… by @subkanthi in https://github.com/Altinity/clickhouse-sink-connector/pull/525
* remove old broken tests by @Selfeer in https://github.com/Altinity/clickhouse-sink-connector/pull/585

## New Contributors
* @ZlobnyiSerg made their first contribution in https://github.com/Altinity/clickhouse-sink-connector/pull/529

**Full Changelog**: https://github.com/Altinity/clickhouse-sink-connector/compare/2.0.2...2.1.0
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ offset.storage.offset.storage.jdbc.offset.table.ddl: "CREATE TABLE %s
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198"
SETTINGS index_granularity = 8192"
offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: io.debezium.storage.jdbc.history.JdbcSchemaHistory
schema.history.internal.jdbc.url: "jdbc:clickhouse://localhost:8123"
Expand Down
3 changes: 2 additions & 1 deletion sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<version.testcontainers>1.19.1</version.testcontainers>
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
<apache.httpclient.version>5.2.1</apache.httpclient.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sink-connector-library-version>0.0.7</sink-connector-library-version>
<sink-connector-library-version>0.0.8</sink-connector-library-version>
<version.junit>5.9.1</version.junit>
<maven.compiler.source>17</maven.compiler.source>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
Expand Down
4 changes: 2 additions & 2 deletions sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ database.user: "root"
database.password: "root"

# Unique name for the connector.
database.server.id: "connector-1"
database.server.id: "12345"

# The name of the MySQL database from which events are to be captured when not using snapshot mode.
database.server.name: "ER54"
Expand Down Expand Up @@ -82,7 +82,7 @@ offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8198"
ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8192"

# offset.storage.jdbc.offset.table.delete: The DML statement used to delete the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.delete: "select * from %s"
Expand Down
2 changes: 1 addition & 1 deletion sink-connector-lightweight/docker/config/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ scrape_configs:
- job_name: 'sink'
static_configs:
- targets:
- debezium-embedded:8083
- clickhouse-sink-connector-lt:8083
2 changes: 1 addition & 1 deletion sink-connector-lightweight/docker/config_local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8198"
ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8192"

# offset.storage.jdbc.offset.table.delete: The DML statement used to delete the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.delete: "select * from %s"
Expand Down
2 changes: 1 addition & 1 deletion sink-connector-lightweight/docker/config_postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198"
SETTINGS index_granularity = 8192"
offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ data:
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198"
SETTINGS index_granularity = 8192"
offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"
Expand Down Expand Up @@ -92,7 +92,7 @@ data:
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8198"
SETTINGS index_granularity = 8192"
offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
schema.history.internal.jdbc.url: "jdbc:clickhouse://chi-argocd-demo-0-0:8123/altinity_sink_connector"
Expand Down
10 changes: 6 additions & 4 deletions sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>2.14.0.Final</quarkus.platform.version>
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
<apache.httpclient.version>5.2.1</apache.httpclient.version>
<sink-connector-library-version>0.0.8</sink-connector-library-version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -187,11 +186,11 @@
<groupId>org.apache.maven</groupId>
<artifactId>maven-artifact</artifactId>
</dependency>
<!-- <dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
</dependency> -->
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down Expand Up @@ -540,14 +539,17 @@
<threadCount>10</threadCount> -->
<properties>
<property>
<surefire.test.runOrder>filesystem</surefire.test.runOrder>
<name>listener</name>
<value>com.altinity.clickhouse.debezium.embedded.FailFastListener</value>
</property>
</properties>
<useUnlimitedThreads>true</useUnlimitedThreads>
<perCoreThreadCount>true</perCoreThreadCount>
<useSystemClassLoader>true</useSystemClassLoader>
</configuration>
<runOrder>${surefire.test.runOrder}</runOrder>

</configuration>
</plugin>

<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ClickHouseDebeziumEmbeddedApplication {
* @param args arguments
* @throws Exception Exception
*/

public static void main(String[] args) throws Exception {
//BasicConfigurator.configure();
System.setProperty("log4j.configurationFile", "resources/log4j2.xml");
Expand Down Expand Up @@ -132,6 +133,8 @@ public static void start(DebeziumRecordParserService recordParserService,
public static void stop() throws IOException {
debeziumChangeEventCapture.stop();

//Stop Rest API
//DebeziumEmbeddedRestApi.stop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import com.google.inject.Injector;
import io.javalin.Javalin;
import io.javalin.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
Expand All @@ -21,8 +21,9 @@

public class DebeziumEmbeddedRestApi {

private static final Logger log = LoggerFactory.getLogger(DebeziumEmbeddedRestApi.class);
private static final Logger log = LogManager.getLogger(DebeziumEmbeddedRestApi.class);

static Javalin app;
public static void startRestApi(Properties props, Injector injector,
DebeziumChangeEventCapture debeziumChangeEventCapture,
Properties userProperties) {
Expand All @@ -31,7 +32,7 @@ public static void startRestApi(Properties props, Injector injector,
cliPort = "7000";
}

Javalin app = Javalin.create().start(Integer.parseInt(cliPort));
app = Javalin.create().start(Integer.parseInt(cliPort));
app.get("/", ctx -> {
ctx.result("Hello World");
});
Expand Down Expand Up @@ -105,8 +106,13 @@ public static void startRestApi(Properties props, Injector injector,
app.get("/start", ctx -> {
finalProps.putAll(userProperties);
CompletableFuture<String> cf = ClickHouseDebeziumEmbeddedApplication.startDebeziumEventLoop(injector, finalProps);
ctx.result("Started Replication....");
ctx.result("Started Replication...., this might take 60 seconds....");
});

}
// Stop the javalin server
public static void stop() {
if(app != null)
app.stop();
}
}
Loading

0 comments on commit db2414d

Please sign in to comment.