Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Support Flink streaming reading #1383

Closed
JingsongLi opened this issue Aug 26, 2020 · 11 comments
Closed

Flink: Support Flink streaming reading #1383

JingsongLi opened this issue Aug 26, 2020 · 11 comments
Labels

Comments

@JingsongLi
Copy link
Contributor

JingsongLi commented Aug 26, 2020

Flink is famous for its streaming computation.

  • Iceberg has this capability as a message bus for stream computing. Even in near real time, it can meet many requirements.
  • Compared with Kafka, iceberg can save all the historical data, while Kafka can only save the data of recent days. The ad-hoc can query all the historical data. Moreover, iceberg has efficient query performance and storage efficiency.

After #1346 , it is easy to build Flink streaming reading based on it.
Unlike Spark, Flink streaming continuous monitor new Files of table, and directly send the splits to downstream tasks. The source don't need take care of micro-batch size, because the downstream tasks stores incoming splits into state, and consume one by one.

Monitor ----(Splits)-----> ReaderOperator

Monitor (Single task):

  • Monitoring snapshots of the Iceberg table.
  • Creating the splits corresponding to the incremental files using FlinkSplitGenerator. (Actually using TableScan.appendsBetween).
  • Assigning them to downstream tasks for further processing.

ReaderOperator (multiple tasks):

  • Put received splits into state (A splits queue).
  • Read splits using FlinkInputFormat in a checkpoint cycle.
  • If a checkpoint barrier coming, let the main thread complete the snapshot for checkpoint.
  • After that, the task continues to consume the remaining splits in the state.
@stevenzwu
Copy link
Contributor

@JingsongLi we should use the new FLIP-27 source interface, right?

  • enumerator/monitor runs in jobmanager
  • enumerator tracks discovered splits
  • enumerator assigns a split when a readers requests for one

We probably don't want enumerator statically assign all discovered splits up front. Dynamic assignment is better for load balancing with straggler/outlier reader nodes.

@JingsongLi
Copy link
Contributor Author

JingsongLi commented Oct 9, 2020

Hi @stevenzwu , yes, The advantage is that the assignment will be more dynamic balanced.

It depends on the progress of FLIP-27.
We are trying to implement Filesystem/Hive source on FLIP-27 in FLINK 1.12. And in order to achieve this goal, we are modifying the interfaces of FLIP-27 too. (FLIP-27 in Flink 1.11 is not ready)

If the time is not urgent, we can wait for FLINK 1.12.

@openinx
Copy link
Member

openinx commented Oct 9, 2020

@stevenzwu , we have implemented an internal version for flink streaming reader, which is not built on top of FLIP-27 now. Here is the pull request https://github.com/generic-datalake/iceberg-poc/pull/3/files for our own branch. As Jingsong described, once FLIP-27 is ready, we'd happy to switch the current implementation to FLIP-27.

@stevenzwu
Copy link
Contributor

@JingsongLi @openinx thx. We are currently implementing an Iceberg source based on FLIP-27 interface. Our initial goal is for backfill purpose. it is bounded but with streaming behavior. Meaning app code stayed with DataStream API, just switching source from Kafka to Iceberg. We are also very interested in streaming/continuous read pattern. It is not urgent. we can probably collaborate. Would love to see building blocks being pushed upstream slowly.

@stevenzwu
Copy link
Contributor

regarding TableScan.appendsBetween, we might need more flexibility of fine-grained control. E.g. if Flink job is lagging behind or bootstrap from an old snapshot, we probably don't want to eagerly plan all the unconsumed FileScanTask. That might blow up Flink checkpoint state if the enumerated list of FileScanTask is too big.

I am thinking about two level of enumerations to keep the enumerator memory footprint in check.

  • first, enumerate the list of unconsumed DataOperations.APPEND snapshots. It is cheap to track and checkpoint this list
  • second, enumerate FileScanTask up to a configurable number of oldest snapshots (e.g. 6) from the first step

if job is keeping up with the ingestion, we should only have one unconsumed snapshots.

@openinx
Copy link
Member

openinx commented Oct 12, 2020

That might blow up Flink checkpoint state if the enumerated list of FileScanTask is too big.

@stevenzwu , what is the maximum size of a table in your production environment ? I'm thinking whether it's worth to implement the two-phase enumerators in the first version.

If we have 1PB data and each file have the size 128MB, then it will have 8388608 files. If every FileScanTask consume 1KB , then its state is ~ 8GB. That should be acceptable for the flink state backend.

@JingsongLi
Copy link
Contributor Author

Hi @stevenzwu , about TableScan.appendsBetween, we can limit the snapshot number of scan, even scan only one at a time. Because TableScan.appendsBetween seems to be just a combination of single incremental snapshots, we can handle only one snapshot at a time.

  • For FLIP-27 Source, it is easy to do, because tasks came to ask for splits, the only problem is that within the coordinator, the coordinator can completely control how the splits are generated.
  • For the old API, the downstream reading operator should has a max split queue size to back pressure the Enumerator. And do above thing.

@stevenzwu
Copy link
Contributor

stevenzwu commented Oct 12, 2020

I was mainly discussing in the context of FLIP-27 source. Regardless how we implement the enumeration, there are two pieces of info that enumerator needs to track and checkpoint.

  1. last snapshot where enumeration/planning is done
  2. pending/unprocessed splits from previous discoveries/plannings

I was mainly concerned about the state size for the latter. That is where I was referring to throttle the eagerness of planned splits. I was thinking about using TableScan.useSnapshot(long snapshotId) so that we can control how many snapshots we plan the splits into state.

Here are some additional benefits of enumerating splits snapshot by snapshot.

  • We can track and assign splits snapshot by snapshot in the same order as they were committed
  • We can publish metrics like the number of pending snapshots, lag (current time - oldest timestamp from uncompleted snapshot), etc.

@openinx note that this is not keyed state where state is distributed among parallel tasks. Here, 8 GB operator state can be problematic enumerator state. I vaguely remember RocksDB can't handle a list larger than 1 GB. the bigger the list, the slower it gets. also if we do planTasks (vs planFiles), the number of splits can be a few times bigger. I can definitely buy the point of starting with sth simple, and optimize it later. It will be an internal change to the enumerator. So it has no user impact.

@JingsongLi Yeah, the key thing is how coordinator/enumerator controls how the splits are generated. I was saying that we may need some control/throttling there to avoid eagerly enumerate all pending snapshots so that the checkpointed split list is manageable/capped. I thought the idea TableScan.appendsBetween was to run planFiles or planTasks between last planned snapshot and the latest table snapshot. that is what I was referring earlier as eager discovery/planning of all unconsumed splits.

@JingsongLi
Copy link
Contributor Author

NIT: I think we still need use appendsBetween(snapshot-1, snapshot) since we want to get incremental data.
True, I think this is easy to do.

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Feb 15, 2024
Copy link

github-actions bot commented Mar 1, 2024

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants