-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source: implementation for clickhouse #3361
Conversation
/test connector=source-clickhouse
|
/test connector=source-mysql
|
/test connector=source-postgres
|
/test connector=source-mssql
|
/test connector=source-oracle
|
/test connector=source-redshift
|
@@ -389,7 +412,8 @@ void testReadMultipleTables() throws Exception { | |||
|
|||
setEmittedAtToNull(actualMessages); | |||
|
|||
assertEquals(expectedMessages, actualMessages); | |||
assertTrue(expectedMessages.size() == actualMessages.size() && expectedMessages.containsAll(actualMessages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: easier to read as separate asserts
@Override | ||
public void accept(Connection connection, PreparedStatement preparedStatement) | ||
throws SQLException { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason ClickHouse doesn't need autocommit or fetch size adjustments like JdbcStreamingQueryConfiguration
for other databases?
If so, it'd be good to have a comment here to understand why this is a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still trying to understand why we need the auto commit to be off for other databases and whether its required for clickhouse or not
@@ -169,13 +169,22 @@ public static void setStatementField(PreparedStatement preparedStatement, | |||
switch (cursorFieldType) { | |||
// parse date, time, and timestamp the same way. this seems to not cause an problems and allows us |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also leave a comment here summarising what we discussed yesterday around potential errors using Date as a cursor?
|
||
dependencies { | ||
implementation project(':airbyte-db') | ||
implementation project(':airbyte-integrations:bases:base-java') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: sort this alphabetically
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-clickhouse') | ||
integrationTestJavaImplementation "org.testcontainers:clickhouse:1.15.3" | ||
|
||
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to this to the implementation project block
|
||
implementation 'ru.yandex.clickhouse:clickhouse-jdbc:0.3.1' | ||
|
||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: group as such
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-clickhouse')
integrationTestJavaImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
integrationTestJavaImplementation "org.testcontainers:clickhouse:1.15.3"
import java.sql.Connection; | ||
import java.sql.PreparedStatement; | ||
|
||
public class ClickHouseJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing this, let's create a NoOpJdbcStreamingQueryConfiguration
in the airbyte-db
package and reusing it in the ClickHouse class. The cost of doing so is minimal (we still add one new class in total), and it'll let other classes reuse this in the future (I'd imagine there might be some).
public class ClickHouseJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration { | ||
|
||
/** | ||
* The reason accept method for ClickHouse is not setting auto commit to false like other JDBC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice comment! I would move this to the constructor after we switch to using NoOpJdbcStreamingQueryConfiguration
.
* sources is cause method {@link ru.yandex.clickhouse.ClickHouseConnectionImpl#setAutoCommit} is | ||
* empty. The reason accept method for ClickHouse is not setting fetch size to 1000 like other JDBC | ||
* sources is cause method {@link ru.yandex.clickhouse.ClickHouseStatementImpl#setFetchSize} is | ||
* empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious - so what's Clickhouse's current fetch size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to figure this out but I dont know. The JDBC driver returns 0 as fetch size and there is nothing mentioned in the docs as well
tableInfo -> { | ||
try { | ||
return database.resultSetQuery(connection -> { | ||
String sql = "SELECT name FROM system.columns WHERE database = ? AND table = ? AND is_in_primary_key = 1"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remind me again - why does ClickHouse require a different manner of finding primary keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* The default implementation relies on {@link java.sql.DatabaseMetaData#getPrimaryKeys} method to
* get it but the ClickHouse JDBC driver returns an empty result set from the method
* {@link ru.yandex.clickhouse.ClickHouseDatabaseMetadata#getPrimaryKeys}. That's why we have to
* query the system table mentioned here
* https://clickhouse.tech/docs/en/operations/system-tables/columns/ to fetch the primary keys.
*/
Added this as comment as well
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"title": "ClickHouse Source Spec", | ||
"type": "object", | ||
"required": ["host", "port", "database", "username", "password"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general, we don't want password
to be required to make it easier on users to run tests. does Clickhouse enforce having a password?
@@ -0,0 +1,38 @@ | |||
{ | |||
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/clickhouse", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question for my own understanding: we spoke about how ClickHouse has different table engines under the hood. Do you think users/Airbyte might benefit from that being exposed?
Small sanity check that treating Clickhouse as a JDBC source is good enough for our purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed over call, we dont need the user to specify the engine in setup wizard cause our approach is not specific to an engine
public String createTableQuery(String tableName, String columnClause, String primaryKeyClause) { | ||
return String.format("CREATE TABLE %s(%s) %s", | ||
tableName, columnClause, primaryKeyClause.equals("") ? "Engine = TinyLog" | ||
: "ENGINE = MergeTree() ORDER BY " + primaryKeyClause + " PRIMARY KEY " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know what this is because we spoke about this yesterday. can we leave a comment explaining/pointing to docs why we require this?
} | ||
|
||
@Override | ||
protected ConfiguredAirbyteCatalog getConfiguredCatalog() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jrhizor I notice this method is identical across a bunch of the StandardSourceTest implementations, is this something we can move into the parent class, and provide a default implementation for, at a later date?
@@ -143,6 +143,28 @@ | |||
*/ | |||
public abstract AbstractJdbcSource getSource(); | |||
|
|||
protected String createTableQuery(String tableName, String columnClause, String primaryKeyClause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Some minor comments for readability + couple of nits. Feel free to merge after addressing them.
Guessing we'll put out documentation + publish in a follow up PR?
/test connector=source-clickhouse
|
What
Issue : #3317
Docs : https://clickhouse.tech/docs/en/
JDBC : https://github.com/ClickHouse/clickhouse-jdbc
Docker images : https://hub.docker.com/r/yandex/clickhouse-server/tags?page=1&ordering=last_updated
The following screenshots show the time taken for full refresh and incremental sync via the JDBC stress test.
It takes between 20-25 seconds for sync to finish
How
The solution uses AbstractJDBC source with a bit of tweaking.
Pre-merge Checklist
Recommended reading order
test.java
component.ts