Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Umbrella][Sort] Improve memory stability of data ingesting into iceberg #7747

Closed
4 of 5 tasks
thexiay opened this issue Mar 31, 2023 · 1 comment
Closed
4 of 5 tasks

Comments

@thexiay
Copy link
Contributor

thexiay commented Mar 31, 2023

InLong Component

InLong Sort

Movtivation

In the existing data synchronization, snapshot data and incremental data are send to kafka first, and then streaming write to Iceberg by Flink. Because the direct consumption of snapshot data will lead to problems such as high throughput and serious disorder (writing partition randomly), which will lead to write performance degradation and throughput glitches. It will always crash beacuse memory limit.e.g.
企业微信截图_16798860197817

Proposed Changes

The main idea is to reduce the number of writers, close the writers as soon as possible, and reduce the downstream burden as much as possible

Mini-batch pre group by partition

Add a sorting algorithm before write in advance, so that the writer can be closed immediately after the data in this chk cycle is written to the file every time, so that there is no need to open n multiple writers at the same time, which takes up a lot of memory. The advantage is to save memory

This method is equivalent to simulating a micro-batch in the stream, and sending all the data accumulated in this batch downstream only when chk is performed. The data is blocked in the presort operator, from the flow to the processing of the write operator in the next stage after the processing of this stage is completed, so the disadvantage is that the delay of chk becomes several operators to process these data at the same time cumulative delay. If chk has not finished processing this batch of data, it may time out. (In the past, the write time and chk time were decoupled, and chk performed very lightweight operations. Now all data should be written to the file before chk)

Since we need to send a batch of data to the downstream, there is no big difference between two operators, presort and writer, or one operator, because they all need their data in a chk cycle to be in order, so it is better to directly in the writer Let's do the sort thing, and wait until chk is triggered before writing to the file

Rate limit

Operations related to current limiting, increasing current limiting, can relieve the pressure of some downstream writes

Multithreading closing files

Multi-threaded concurrent closing of files can shorten the flush time, and can also speed up the chk time in streaming mode, thereby reducing the risk of chk timeout

Spilling disk map

When pre-grouping by partition, there needs to be a map that can save all the data of each partition. Currently, to simplify the process, it is written into the memory, but the amount of data may be very large, or there may be data skew, so the memory will not hold up . So you need a disk map that can be overwritten to do this, rocksdb is very suitable.

Error Handles

case1:

Q:Will the triggering of chk be affected when the flow is limited? Will the downstream of the checkpoint barrier be unable to process for a long time and cause chk to time out??
A:There will be some impact but not serious。

a. The flow of downstream processing data is limited, resulting in a backlog of buffers, resulting in back pressure, and the upstream source will delay or even suspend pulling data from the split.
b. The insertion of the barrier is to insert the barrier to all downstream nodes synchronously when the CheckpointEvent is executed in the mailbox.
c. The sending of the barrier takes the route of Event, it will not be suspended due to insufficient buffer memory, but it will still be ranked behind data
d. That is to say, when chk is triggered, before chk times out, at least the amount of data to be processed downstream is "backlog buffer in the network / average data volume per piece = total backlog number", as long as it can be processed within a limited time After completing so many backlogs, you can complete chk

Rollout/Adoption Plan

Future more plan

  • Impelement rate limit.
  • Rate limit can dynamic adjustment.
  • Replace insertMap (determine whether to put it into position delete records in per checkpoint ) from heap map to rocksdb map.
@thexiay thexiay changed the title [Improve][Sort] Add Presort to reduce discrete partitions when ingesting data into iceberg [Umbrella][Sort] Add Presort to reduce discrete partitions when ingesting data into iceberg Apr 12, 2023
@thexiay thexiay changed the title [Umbrella][Sort] Add Presort to reduce discrete partitions when ingesting data into iceberg [Umbrella][Sort] Improve memory stability of data ingesting into iceberg Apr 12, 2023
@thexiay thexiay added this to the 1.7.0 milestone Apr 12, 2023
@dockerzhang
Copy link
Contributor

Please use English to translate the design document.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants