Skip to content

Flink: Add passthroughRecords option to DynamicIcebergSink#15433

Open
sqd wants to merge 1 commit intoapache:mainfrom
sqd:oss_passthrough_records
Open

Flink: Add passthroughRecords option to DynamicIcebergSink#15433
sqd wants to merge 1 commit intoapache:mainfrom
sqd:oss_passthrough_records

Conversation

@sqd
Copy link
Contributor

@sqd sqd commented Feb 24, 2026

When enabled, records are forwarded directly from the record generator to the writer using a forward edge instead of a hash edge. This allows Flink to chain the two operators, avoiding serialization/deserialization overhead and drastically increasing throughput in high-volume pipelines.

Current topology:
image

Same pipeline, with the new change enabled:
image

Serdes of Flink RowData can be very expensive:
image

@sqd
Copy link
Contributor Author

sqd commented Feb 24, 2026

@mxm @pvary I would appreciate if you could please take a look. I'm happy to provide any detail/context. I have tested this on an internal pipeline which processes around 10TB~20TB of data per hour, where this change has drastically reduced the resources usage and increased output.

Comment on lines +451 to +457
if (passthroughRecords) {
if (!immediateUpdate) {
throw new UnsupportedOperationException(
"Immediate update must be enabled to pass through records");
}
rowDataDataStreamSink = converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink"));
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will ignore DistributionMode and partitioning in DynamicRecord. I saw that you listed this in the docs, but I'm not sure we should diverge too much from the normal mode of operation. I think what we can do, is to add a new chained side output with an extra DynamicWriter for this quick path.

It may be worth adding a new DistributionMode. Currently NONE does a round-robin, which is slightly confusing, we could rename it to ROUND_ROBIN and use NONE for this direct path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle DistributionMode in the normal Sink?
We should be consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DistributionMode.NONE in the regular sink does strict forward partitioning (no redistribution), which is similar to what the PR does. It leads to Flink chaining the input with the writer. For DynamicSink, because we have many tables, the idea was to spread out the data onto the available workers, which is why we opted for a round-robin across the workers chosen for the table.

Copy link
Contributor Author

@sqd sqd Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to add a new chained side output with an extra DynamicWriter

Could you elaborate on this please? I could be wrong but my understanding is that Flink cannot chain any operator with a side output. I was wrong.

rename it to ROUND_ROBIN and use NONE

Yes that sounds a big improvement in semanticity.

@pvary
Copy link
Contributor

pvary commented Feb 25, 2026

@​sqd Could you share a bit more about your use case? Ignoring DistributionMode and chaining directly to writers feels quite risky to me, even if the performance gains are tempting.

This approach might work if your input records are already correctly distributed. But any mistake there will lead to small files or skewed writes—fast for the writers, but potentially very costly for the readers.

@mxm
Copy link
Contributor

mxm commented Feb 25, 2026

I think the idea here is valid, but we should implement this in a clean way, e.g. by adding a new DistributionMode and add an additional chained writer to the processor. We don't want to silently ignore other distribution modes based on some additional flag (passthroughRecords(true)). That said, the potential performance improvements need to outweigh the slight increase in complexity due to the changes in the job topology.

@pvary
Copy link
Contributor

pvary commented Feb 25, 2026

I think the idea here is valid

You have either a very tricky balancing logic before the sink, or every table of yours is similarly loaded and continuously written. Not too dynamic IMHO 😄

@sqd
Copy link
Contributor Author

sqd commented Feb 25, 2026

That said, the potential performance improvements need to outweigh the slight increase in complexity

I actually have some numbers! Before the change the pipeline took around 1 to 1.5TB of memory and around 200 cores. With the change it shaved 50 to 70 cores (not to mention the increased throughput). Of course there are other computation going on as well, but parquet writing and Flink RowData serdes showed up in profiler to take >90% CPU combined. Serdes was taking up around 75% CPU of the actual parquet writing.

Could you share a bit more about your use case

My use case is that I have a firehose of data that I want to ingest into Iceberg. Because the volume is so high, it doesn't really matter which writer subtasks a record is routed to, there won't be small files either way. I was running DistributionMode.NONE, and noticed that serdes was taking up a ridiculous amount of resources, also caused a lot of unnecessary network shuffling.

adding a new DistributionMode

I am a big fan of calling it ROUND_ROBIN instead, but are we not worried about breaking existing code? Maybe introduce ROUND_ROBIN as an alias for NONE, and this new mode can be called "PASSTROUGH" or something?

@sqd
Copy link
Contributor Author

sqd commented Feb 26, 2026

Re: side output. I can definitely see the argument to not silently ignore other distribution modes, but if that's disabled by default and we have extensive document, maybe it's not that big of a deal? Also, even if we add a side output, and we only enable the side-output switching behavior when this feature is toggled on, a similar argument can be made that we are silently ignoring DistributionMode.PASS_THROUGH (or whatever we call) depending on the feature toggle.

@pvary
Copy link
Contributor

pvary commented Feb 26, 2026

@sqd: Given your use case, it makes perfect sense to skip the routing step.

The javadoc for DistributionMode.NONE states:

  1. none: don't shuffle rows. It is suitable for scenarios where the rows are located in only few partitions, otherwise that may produce too many small files because each task is writing rows into different partitions randomly.

That’s exactly the behavior we want here. If this were a greenfield project, adding a ROUND_ROBIN option would indeed be a good idea.

DynamicIcebergSink is still marked @Experimental, so we may have some flexibility. My only concern is that existing users of the DynamicSink might not notice this behavioral change when upgrading, and could unintentionally end up producing many small files.

@pvary
Copy link
Contributor

pvary commented Feb 26, 2026

We could call this a bugfix, and make the change very obvious in the documentation

@mxm
Copy link
Contributor

mxm commented Feb 26, 2026

Re: side output. I can definitely see the argument to not silently ignore other distribution modes, but if that's disabled by default and we have extensive document, maybe it's not that big of a deal?

@sqd I would like not to add a special flag, but use a DistributionMode instead. Using a special flag would deviate from the current design of the dynamic sink. The flag can create unexpected failures, e.g. you have the pass-through flag on and everything works fine, but then a records with DistributionMode.HASH arrives and the pipeline crashes.

Also, even if we add a side output, and we only enable the side-output switching behavior when this feature is toggled on, a similar argument can be made that we are silently ignoring DistributionMode.PASS_THROUGH (or whatever we call) depending on the feature toggle.

@sqd All distribution modes should continue to work, even in the presence of pass-through Distribution Mode. That's why I'm against the flag. We need to modify the topology that it supports a direct path and the round-robin/shuffle case.

We could call this a bugfix, and make the change very obvious in the documentation

@pvary +1 for treating this as a bugfix. We haven't documented the existing round-robin behavior of DistributionMode.NONE in DynamicSink, so it is fair to assume for the user that it behaves like the regular IcebergSink.

@sqd
Copy link
Contributor Author

sqd commented Feb 26, 2026

I would like not to add a special flag, but use a DistributionMode instead

@mxm Sure. Just to confirm I understand what's in your mind:

  1. No special flag at all. We always create a side output topology for the direct path.
  2. DistributionMode.NONE will be changed to go to the direct path. A new DistributionMode.ROUND_ROBIN will go to the old path (and processed round-robin like the current behavior with NONE)
  3. Be very obvious about these behaviors in documentation.

Am I missing anything?

@aiborodin
Copy link
Contributor

A new DistributionMode.ROUND_ROBIN will go to the old path (and processed round-robin like the current behavior with NONE)

I think we don't even need another distribution mode. We can change the DistributionMode.NONE to match the behaviour of the IcebergSink and always forward records to a writer without shuffling. The current round-robin behaviour of DistributionMode.NONE creates even more small files. And users can always achieve round-robin by using the Flink rebalance operation before the dynamic sink and/or hashing by some salt columns (or a table name) to distribute the load, if they so choose, explicitly opting to pay the costs of serialisation and shuffling.

@pvary
Copy link
Contributor

pvary commented Feb 27, 2026

A new DistributionMode.ROUND_ROBIN will go to the old path (and processed round-robin like the current behavior with NONE)

I think we don't even need another distribution mode. [..] The current round-robin behaviour of DistributionMode.NONE creates even more small files.

The current NONE (and future ROUND_ROBIN) mode is very important for scenarios where you have many tables with relatively low per‑table volume but a stable distribution of incoming records. By configuring DynamicRecord.writeParallelism appropriately, you can ensure that each table gets exactly the number of writers it needs—no more, no less.

@mxm
Copy link
Contributor

mxm commented Feb 27, 2026

DistributionMode.NONE aka DistributionMode.ROUND_ROBIN is indeed very clever. For example, if you have 10 workers and you write parallelism is 2, it will choose 2 random workers and round-robin the data there. This works with as many tables as the user desires. This will essentially load-balance the data across the workers. Achieving this is non-trivial and we can't expect that from the user.

You are right @aiborodin that all of this could be achieved manually by the user, but the current API of DynamicRecord allows to specify the write parallelism, and it feels wrong to just ignore it entirely when some static flag is set, but I'm ok with ignoring it for an actual "none" DistributionMode.

@mxm
Copy link
Contributor

mxm commented Feb 27, 2026

@mxm Sure. Just to confirm I understand what's in your mind:

  1. No special flag at all. We always create a side output topology for the direct path.

  2. DistributionMode.NONE will be changed to go to the direct path. A new DistributionMode.ROUND_ROBIN will go to the old path (and processed round-robin like the current behavior with NONE)

  3. Be very obvious about these behaviors in documentation.

@sqd Sounds good!

@sqd sqd force-pushed the oss_passthrough_records branch from ef6f483 to b7fe04c Compare March 2, 2026 21:19
@github-actions github-actions bot added the API label Mar 2, 2026
@pvary
Copy link
Contributor

pvary commented Mar 3, 2026

@mxm, @sqd: I think we have a misunderstanding here. In the DynamicSink currently the mode is set per record, while the pipeline is created when the sink is created. If we create the sink in a way that the operators are chained, then we can't depend on the mode set by the record themselves for routing.

Is there a way to have duplicated instances of writers (some chained, some are not chained) and route things out of the chained versions based on the mode? Sideoutput seems like a possibility, but maybe complicating the things so much is an overkill.

@mxm
Copy link
Contributor

mxm commented Mar 3, 2026

Unless there is more to come after b7fe04c, I think there is indeed a misunderstanding. Let's recap the criteria (1) from above:

  1. No special flag at all. We always create a side output topology for the direct path.

I still see the flag in the above commit. We need to remove the flag and add a direct chained path, as an add-on to the existing topology.

Is there a way to have duplicated instances of writers (some chained, some are not chained) and route things out of the chained versions based on the mode? Sideoutput seems like a possibility, but maybe complicating the things so much is an overkill.

Yes, this was what I suggested above and I think we had agreement on. Using a side output is the only option when we want a chained and a non-chained variant. "Side output" is just a fancy word for adding another output. Semantically, it is not different from the main output. Another option would be to multiplex via one output, but I think that makes things more complicated and harder to maintain.

@sqd
Copy link
Contributor Author

sqd commented Mar 3, 2026

@pvary @mxm

I still see the flag in the above commit. We need to remove the flag and add a direct chained path, as an add-on to the existing topology.

The flag is internal, not exposed through the public builder. I use a flag so I don't have to copy the code of DynamicIcebergSink into a very similar DynamicIcebergForwardSink or something. Instead we can reuse most of the logics. i.e. This line sets the flag to false to create a shuffled sink. This line sets it to true to create a forward sink.

side output

Yes that's what I am doing (unless I am still missing something?).

  1. These lines create and wire the normal path non-chained variant sink
  2. These lines do the same for the new path chained variant sink

Sorry about the CI failure. I know what's going on but a little swamped to fix them. I'll address them as soon as I can.

@mxm
Copy link
Contributor

mxm commented Mar 3, 2026

You are right, we missed that. This is indeed achieving what we discussed.

@sqd sqd force-pushed the oss_passthrough_records branch 3 times, most recently from e5e1d07 to 646dc5e Compare March 3, 2026 19:41
@github-actions github-actions bot added the spark label Mar 3, 2026
Currently, DistributionMode.NONE actually performs a round robin. This
commit changes the behavior so that records tagged as NONE will go to
a passthrough side output to enable chaining. A new distribution mode
ROUND_ROBIN is added which behaves like NONE before this change.
@sqd sqd force-pushed the oss_passthrough_records branch from 646dc5e to 95a28a2 Compare March 3, 2026 19:49
@sqd
Copy link
Contributor Author

sqd commented Mar 3, 2026

Hi, @mxm @pvary I would like some of your guidance here if you don't mind. To make the CI pass, I had to make some changes to the Spark code. The new ROUND_ROBIN distribution mode (which is a core API enum) was causing some Spark tests to fail because the tests iterate through all possible distribution modes on this line. I fixed the tests by treating ROUND_ROBIN as an alias for NONE in Spark, because it seems the Spark connector doesn't have a similar concept. I am pretty new to this project and not sure if that's acceptable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants