Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector-V2] [Clickhouse] Improve Clickhouse File Connector #3416

Merged
merged 11 commits into from Dec 13, 2022

Conversation

Hisoka-X
Copy link
Member

Purpose of this pull request

  1. Close [Bug] [Clickhouse-File] clickhouse-file connector-v2 generated data part name conflict #3242
  2. Adjust ClickhouseFile sink to adapt to agg committer
  3. Save memory

Check list

@Hisoka-X Hisoka-X removed their assignment Nov 26, 2022
@@ -44,6 +46,6 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add UT for optionRule().

# Conflicts:
#	seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
#	seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
EricJoy2048
EricJoy2048 previously approved these changes Dec 5, 2022
@EricJoy2048 EricJoy2048 requested a review from Z1Wu December 7, 2022 14:31
String data = this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
.collect(Collectors.joining("\t")) + "\n";
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
data.getBytes(StandardCharsets.UTF_8).length);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting every row into buffer inside mmap region of file channel may cause the executor consumes too much RAM then crushes because of OOM. In my opinion, Clickhouse File Connector is designed for huge data loading scenario where there will be multiples GB of data in every executor.And it is hard to put all these data in memory.
Maybe we can make a tradeoff between speed and memory capacity. We can dump some rows into disk when it accumulate to a certain threshold.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, can you tell me more about mmap consume too much RAM, because we map only one row each time. Anything I didn't know?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very sured about the behaviors of file channel.
But when I using fileChannel.map, spark executors always crash. I currently use the following code as workaround.

ByteBuffer buffer = ByteBuffer.allocate(data.size());
buffer.put(out.toByteArray());
// flip offset
buffer.flip();
fileChannel.write(buffer);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide your crash log? Thanks!

@EricJoy2048 EricJoy2048 added this to the 2.3.0 milestone Dec 8, 2022
hailin0
hailin0 previously approved these changes Dec 9, 2022
docs/en/connector-v2/sink/ClickhouseFile.md Outdated Show resolved Hide resolved
Co-authored-by: hailin0 <hailin088@gmail.com>
@EricJoy2048 EricJoy2048 merged commit e07e9a7 into apache:dev Dec 13, 2022
@Hisoka-X Hisoka-X deleted the improve_clickhousefile branch December 13, 2022 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] [Clickhouse-File] clickhouse-file connector-v2 generated data part name conflict
4 participants