-
Notifications
You must be signed in to change notification settings - Fork 2k
[Improve][connector-clickhouse] Clickhouse support parallelism reading schema #9446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for parallel schema reading in the ClickHouse connector by leveraging the table part files from the system.parts table. Key changes include:
- New configuration options (e.g., partition_list, filter_query, batch_size) and test cases to support parallel reading.
- Updates to the core proxy, splitter, enumerator, source reader, and associated state management for splitting and reading parts concurrently.
- Documentation updates explaining the new parallel reader features.
Reviewed Changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
seatunnel-e2e/connector-clickhouse-e2e/.../clickhouse_with_parallelism_read.conf | Added test configuration for parallel read demonstration |
ClickhouseIT.java | Added new test methods and constants to verify parallel reading functionality |
TablePartSplitterTest.java | Introduced tests for generating splits, including duplicate parts handling |
ClickhouseValueReaderTest.java | Added tests to validate various batch reading scenarios |
ClickhouseProxy.java | Implemented methods to retrieve part lists and query data per part |
ClickhouseSourceState.java | Updated state object to include pending splits |
TablePartSplitter.java | Created new splitting logic for ClickHouse parts |
ClickhouseSourceSplitEnumerator.java | Added new split enumerator to support parallel splits assignment |
ClickhouseSourceSplit.java | Defined a split abstraction based on ClickHouse parts |
ClickhouseValueReader.java | Modified value reader to iteratively process splits and update part offsets |
ClickhouseSourceTable.java | Updated source table configuration to include new options |
ClickhouseSourceReader.java | Refactored source reader to integrate parallelism mode with split queue management |
ClickhouseSourceFactory.java | Enhanced factory to build source tables and incorporate new parallelism parameters |
ClickhouseSource.java | Updated the connector interface to support parallel reading with new enumerator and reader |
ClickhousePart.java | Introduced Comparable interface implementation (stubbed in current diff) |
ClickhouseTable.java | Added getter for local database name |
ClickhouseConnectorErrorCode.java | Added new error codes for part retrieval and query issues |
ClickhouseSourceOptions.java | Defined new options: part_size, partition_list, batch_size, and filter_query |
ClickhouseBaseOptions.java | Added table option to support table name configuration |
docs/en/connector-v2/source/Clickhouse.md | Updated documentation with instructions and tips for parallel reading |
Comments suppressed due to low confidence (1)
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java:77
- The compareTo method always returns 0, which effectively treats all instances as equal. Consider implementing a proper comparison (for example, based on the part name) or removing Comparable if natural ordering is not intended.
public int compareTo(ClickhousePart o) { return 0; }
"select name from system.parts where database = '%s' and table = '%s'", | ||
database, table); | ||
|
||
if (partitionList != null && !partitionList.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SQL query in getPartList is built by directly concatenating the partition list values. Consider using a parameterized query or properly escaping input values to mitigate the risk of SQL injection.
Copilot uses AI. Check for mistakes.
| username | String | Yes | - | `ClickHouse` user username. | | ||
| password | String | Yes | - | `ClickHouse` user password. | | ||
| database | String | NO | - | The `ClickHouse` database. | | ||
| table | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| table | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` | | |
| table_path | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` | |
Same as JDBC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the table_path
parameter is used instead, should the database
parameter also be removed? Is it uniformly represented by the table_path
parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@@ -211,6 +213,17 @@ public void testClickHouseWithMultiTableSink(TestContainer container) throws Exc | |||
} | |||
} | |||
|
|||
@TestTemplate | |||
public void testClickhouseWithParallelismRead(TestContainer testContainer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add test case to verify filter_query and partition_list work properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I will add more test cases.
|
||
String sql = | ||
String.format( | ||
"select * from %s.%s where %s limit %d, %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this implementation because for a single part, 'limit m,n' can guarantee the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is designed to read parts in batches to avoid large amounts of data when reading in parallel. Each ClickhousePart
object has an offset
attribute to record the offset of the current part that has been read, thereby ensuring the order of batch reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading the clickhouse documentation, I found that clickhouse supports a kind of LIMIT... WITH TIES
way, this can ensure that the data with the same value in the Order By
field will be queried in the same batch. Meanwhile, the Order By field of the table is used to define the sorting key when query part. Can this solution solve the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good.
…er, add sql parallelism read strategy and fix other problem.
I have made the following updates:
Thanks for helping with the review! |
@@ -40,6 +40,13 @@ public class ClickhouseBaseOptions { | |||
.noDefaultValue() | |||
.withDescription("Clickhouse database name"); | |||
|
|||
/** Clickhouse database name */ | |||
public static final Option<String> TABLE = | |||
Options.key("table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Options.key("table") | |
Options.key("table_path") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…aseOptions, fix ClickhouseValueReader bug and add unit tests
public String getTableIdentifier() { | ||
if (StringUtils.isEmpty(tablePath)) { | ||
// Extract table identifier from SQL | ||
return ClickhouseUtil.extractTablePathFromSql(sql); | ||
} | ||
|
||
return tablePath; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the sql contains join statement, how to handle it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sql in complex scenarios is considered, since there may be multiple situations, there may not be a suitable distributed parallel execution solution at present. One solution I thought of is to select one of the shards and execute SQL directly in single concurrency. Is this feasible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
select one of the shards and execute SQL directly in single concurrency.
I agree with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sql in complex scenarios is considered, since there may be multiple situations, there may not be a suitable distributed parallel execution solution at present. One solution I thought of is to select one of the shards and execute SQL directly in single concurrency. Is this feasible?
Will this cause an error when sorting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should DynamicChunkSplitter be used to address complex SQL scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first question: For complex sql scenarios, user input sql will only be executed on a single shard, and no additional order by and limit operations will be performed, just like the query executed on ck server.
The second question: I thought about it. There could be multiple possibilities in complex sql scenarios, such as users using group by, or multiple tables for global join, etc. Splitting sql in these scenarios might be rather complex, and no suitable solution has been thought of for the time being.
Perhaps we can first execute the above-mentioned complex sql according to the single concurrent solution and then continue to try to optimize it later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For aggregation scenarios, are there any potential issues? For example, in a "TOP N" scenario where such SQL can only be executed on a distributed table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for these scenarios (such as group by or join), the sql input by the user needs to be specified as a distributed table and executed in a single concurrent manner on a single shard. However, if the user specifies the local table, it will only be executed on the node where the local surface is located.
In my current sql concurrent execution scheme, it is best that the sql input by users only contains single tables and where filtering conditions. For sql in other complex scenarios, the above single concurrent solution will be used for execution.
Is this feasible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Thank you for your answer. I think your plan is very good
Purpose of this pull request
Clickhouse support parallelism reading schema.
related pr #9421
The Clickhouse source connector supports parallel reading of data. For query table mode, the parallel reading is implemented based on the part file of table, which is obtained from the system.parts table.
The
partition_list
andfilter_query
parameter is used to filter data.The
batch_size
parameter is used to control the amount of data read each time to avoid OOM exception.Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide