Skip to content

Conversation

@danny0405
Copy link
Contributor

Describe the issue this Pull Request addresses

design for RLI support for Flink streaming

Summary and Changelog

list the goals and design detals with the doc.

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Dec 16, 2025
@danny0405
Copy link
Contributor Author

Also cc @geserdugarov and @cshuo for the review.

## Background
Flink does not support RLI while spark does, this caused inconsistency between engines, for tables migrated from Spark to flink streaming, the index type needs to be switched to either bucket or flink_state , this caused a overhead for users in production.

Another reason is for multiple partition upserts, currently the only choice is flink_state index, but the flink_state actually costs a lot of memory and can not be shared between different workloads.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple partition upserts

do you mean cross partition upsert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

#### The Cache of RLI Access
We need fast access in streaming to have high throughput(ideally per record access should be < 10ms), thus a general hotspot cache is needed. We will build a in-memory LRU cache by the active upsert records keys, the cache items will be force evictted by a configured memory threshold.

We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mappings should be persisted somewhere in case the failover happens and memory cache will be lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, they are persisted in the MDT, that's why we read inglight commits from MDT.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, got it. I misunderstood the mapping here. I thought it was the mapping of checkpoint id -> instant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you meant a buffer of current write's index mapping entries to be flushed later upon Hudi commit? let's clarify a bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please explain this problem in more detail than 1 sentence.. and even more in comments.. I am not following what the issue is..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the inflight instants read support for MDT? I added the details in line 72 ~ line 77

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this is so that if we see same record key again within the commit, it gets routed to the same location on data table?

The actual updates to RLI/SI will happen away/outside of the cache you are discussing?

Copy link
Contributor Author

@danny0405 danny0405 Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it gets routed to the same location on data table

yes, the actual writes happens in the IndexWrite operator, here in BucketAssign operator, we just needs the cache pus MDT to query the locations, the cache is updated for new records/location changes while the MDT is only for query.

I put the update flow of the cache in the doc.

This cache can be cleaned once the checkpoint/instant is committed to Hudi.

On job restart or task failover, there is use case that the checkpoint succeeds on Flink while the instant is not committed to Hudi, for DT metadata, the pipeline will recommit
the instant with the recovered table metadata, while for RLI access, we need to include these special inflight instants on queries, basically, we need to support reading inflight instants on MDT.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for RLI, we should also recommit the write status meta just like DT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes for sure, the MDT is commited before the DT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On job restart or task failover, there is use case that the checkpoint succeeds on Flink while the instant is not committed to Hudi, this instant is expected to be recommitted with the recovered table metadata, but because the BucketAssign operator is the upstream operator of StreamWrite operator, we do not want to block the processing of BucketAssign(to wait for the inflight instants to recommit successfully).

My suggestion is to support MDT with specified inflight instants (which will finally be recommit so it's valid).

cc @vinothchandar

- The RLI is global, upserts among partitions is supported; Also support partition level RLI for large fact tables;
- Async compaction for MDT when RLI is enabled; in writer pipeline or table services background job;
- Smart caching of RLI;
- Clearly set limits for the kind of write throughput supported by RLI (based on certain average response time for the RLI access, like from x0ms to x00ms) via empirical benchmarks;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the limits, do you mean adding rate limiter in the pipeline, or just some notices in the documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, it is meant for the data volumn upper threshold that suggest using RLI index, because RLI index has relative worse performance than simple hash index.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean some recommendations for users when to use RLI? But if you mean to set some limits in the code on our side, then I suppose that it should be done by Flink with backpressure mechanism.

Copy link
Contributor Author

@danny0405 danny0405 Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that it should be done by Flink with backpressure mechanism.

backpressure can handle the regular volumn fluctuate instead of the continupus lagging for processing, which would cause stability issues though.

### The Write

### The RLI Access
In `BucketAssigner` operator, the RLI index metadata would be utilized as the index backend, the `BucketAssigner` operator will probe the RLI with the incoming record keys to figure out whether msg is update or insert or delete.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we also need a customized partitioner before bucket assigner operator, which can shuffle hoodie keys under same file group in RLI partition into the same subtask of bucket assigner. This map help improve the query efficiency of RLI index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually the data set volumn is 10x ~ 100x times larger than RLI index, data skew should take higher priorities here I think, we can benchmark how much this gains though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently the partitioner for bucket assigner with flink state index is also simply hashing by key, which may also has skew issue. Anyway, we can benchmark it to get specific numbers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use keyBy() before BucketAssigner operator?

With cache layer there is possible wrong tagging:

  • checkpoint1 is completed
  • Hudi committed T1
  • record5 is absent in RLI
  • record5 arrived and sent to BucketAssigner0, here we saved info about this record in cache
  • checkpoint2 is completed
  • async start of Hudi commit T2
  • record5 arrived and sent to BucketAssigner1, where cache miss will happen, and there is no info in RLI due to Hudi commit T2 in progress.
    record5 will be tagged as insert, but should be update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is in memory and would be re-bootstrapped on task/job restart.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geserdugarov yeah, it's a basic requirement to shuffle records with same key into the same sub-task of bucket assigner to ensure the correct tagging.

What we're talking about here is, should we shuffle different keys under the same file group of RLI in metadata table into the same sub-task, since the hoodie keys will first be grouped by the RLI file group before querying the index.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grouping keys to do batch lookup RLI file group would be beneficial for read efficiency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do things consistent with the Spark writer.. and we should plan on supporting both partitioned and global RLI.

Copy link
Member

@vinothchandar vinothchandar Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we shuffle different keys under the same file group of RLI in metadata table into the same sub-task

We should be reading each RLI shard once per commit . Even with caching, its a terrible idea to not group keys to task for RLI lookup, since it ll affect cache efficiency. i.e each node has to then cache all RLI shards.. which can be massive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@vinothchandar vinothchandar added the rfc Request for comments label Dec 18, 2025
Comment on lines 72 to 73
On job restart or task failover, there is use case that the checkpoint succeeds on Flink while the instant is not committed to Hudi, for DT metadata, the pipeline will recommit
the instant with the recovered table metadata, while for RLI access, we need to include these special inflight instants on queries, basically, we need to support reading inflight instants on MDT.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these special inflight instants

can you define such special inflight instants a bit more clearly? being able to read such inflight instants would be a general capability or a special case for flink MDT use case?

![The RLI Access Pattern](./rli-access-pattern.png)

### The Shuffle of RLI Payloads
In `StreamWrite` operator, the index items are inferred and sent to `IndexWrite` operator in streaming style, the index records are shuffled by `hash(record_key) % num_rli_shards`(the same hashing algorithm of the MDT `RLI index` partitioner),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How num_rli_shards will be defined?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are configs to control RLI min/max file groups

Copy link
Contributor

@geserdugarov geserdugarov Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have:

public static final ConfigProperty<Integer> RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP = ConfigProperty
.key(METADATA_PREFIX + ".record.level.index.min.filegroup.count")
.defaultValue(1)

public static final ConfigProperty<Integer> RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP = ConfigProperty
.key(METADATA_PREFIX + ".record.level.index.max.filegroup.count")
.defaultValue(10)

which then define the value of fileGroupCount used in:
final List<String> fileGroupFileIds = IntStream.range(0, fileGroupCount)
.mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup(metadataPartition, i, relativePartitionPath, dataPartitionName))
.collect(Collectors.toList());
ValidationUtils.checkArgument(fileGroupFileIds.size() == fileGroupCount);
engineContext.setJobStatus(this.getClass().getSimpleName(), msg);
engineContext.foreach(fileGroupFileIds, fileGroupFileId -> {

But for Flink num_rli_shards should be connected to IndexWriter operator parallelism, and couldn't be changed easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallelism of IndexWriter can be adjusted as long as we shuffle the index payloads in the same way of MDT partitioner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to reuse as much common configs as possible. strictly no flink specific table conigs.

@vinothchandar
Copy link
Member

@danny0405 can you please update the RFC and ping me

@danny0405
Copy link
Contributor Author

cc @vinothchandar for another review

@geserdugarov
Copy link
Contributor

geserdugarov commented Dec 19, 2025

@danny0405 , is it possible to write description of some corner cases in simple ordered steps, for instance, failure during async Hudi commit?

@danny0405
Copy link
Contributor Author

@danny0405 , is it possible to write description of some corner cases in simple ordered steps, for instance, failure during async Hudi commit?

Added in the appendix.

We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible.
This cache can be cleaned once the checkpoint/instant is committed to Hudi(indicates that the index payloads are also committed).

On job restart or task failover, there is use case that the checkpoint succeeds on Flink while the instant is not committed to Hudi, for DT metadata, the pipeline will recommit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, the issue also happens in append only use cases. We need to commit Kafka offset to hudi commit to make sure a failure job may restart from the right Kafka offset in the latest successful Hudi commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is also needed by MDT commit.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some preliminary scope discussions, that were already in GH discussion.

While I continue to review, please update the RFC accordingly to cover SI also. Lets tighten the writing to be comprehensive w.r.t this . Thanks!

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general., the RFC/design doc is still very high level. I'd like specifics to be clearly written out.

## Background
Flink does not support RLI while spark does, this caused inconsistency between engines, for tables migrated from Spark to flink streaming, the index type needs to be switched to either bucket or flink_state , this caused a overhead for users in production.

Another reason is for multiple partition upserts, currently the only choice is flink_state index, but the flink_state actually costs a lot of memory and can not be shared between different workloads.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Another reason is for multiple partition upserts, currently the only choice is flink_state index, but the flink_state actually costs a lot of memory and can not be shared between different workloads.
Another motivation is for scalable, efficient support for cross-partition updates (where the partition path of the record is changed). Currently, the only choice is flink_state index, which can be costly when used in such a scenario to hold state proportional to the size of table. This is due to the fact that the flink_state could use a lot of memory and can not be shared between different workloads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love for us to spell out actual issues like this, such that even a new reader can understand. @danny0405 Please make a pass over the entire RFC like this, before this comment is resolved.

The high-level ideas:

- a RLI based index backend will be there to replace the flink_state index;
- a cache of RLI would be introduced to speed the access;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- a cache of RLI would be introduced to speed the access;
- a cache of RLI would be introduced to speed the access; along with a caching invalidation mechanism to keep it consistent with committed state of the table.


The high-level ideas:

- a RLI based index backend will be there to replace the flink_state index;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- a RLI based index backend will be there to replace the flink_state index;
- a RLI based index backend will be added, which can be used in place of the current the flink_state index;


- a RLI based index backend will be there to replace the flink_state index;
- a cache of RLI would be introduced to speed the access;
- a separate index function to write the RLI/SI payloads;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- a separate index function to write the RLI/SI payloads;
- a separate Flink index function to write the RLI/SI payloads;

- a RLI based index backend will be there to replace the flink_state index;
- a cache of RLI would be introduced to speed the access;
- a separate index function to write the RLI/SI payloads;
- the MDT RLI files is written synchronously with the data table data files, the metadata is sent to the coordinator for a final commit to the MDT(after `FILES` partition is ready);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- the MDT RLI files is written synchronously with the data table data files, the metadata is sent to the coordinator for a final commit to the MDT(after `FILES` partition is ready);
- the MDT RLI and SI files will be written synchronously with the data table data files, the metadata is sent to the coordinator for a final commit to the MDT(after `FILES` partition is ready);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 please fix the entire RFC, such that you don't loosely refer to RLI, when you really mean RLI and SI. it gets very confusing to read. Please be precise. lets keep this comment open until its addressed across the board.

#### The Cache of RLI Access
We need fast access in streaming to have high throughput(ideally per record access should be < 10ms), thus a general hotspot cache is needed. We will build a in-memory LRU cache by the active upsert records keys, the cache items will be force evictted by a configured memory threshold.

We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please explain this problem in more detail than 1 sentence.. and even more in comments.. I am not following what the issue is..

We need fast access in streaming to have high throughput(ideally per record access should be < 10ms), thus a general hotspot cache is needed. We will build a in-memory LRU cache by the active upsert records keys, the cache items will be force evictted by a configured memory threshold.

We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible.
This cache can be cleaned once the checkpoint/instant is committed to Hudi(indicates that the index payloads are also committed).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 line for cache invalidation is really insufficient :). This design basically says - we are reading the RLI from shard all over again each commit?

Can't we do a file system level cache where we can just get the base file or log file downloaded on-demand.. reducing the amount of cache read down to executors in each commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MDT got its own file based cache, we are gonna try that, here I'm just saying these records are allowed to be evicted instead of invalidate immiediately, we have some basic eviction strategy based on LRU and cache size though.

We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible.
This cache can be cleaned once the checkpoint/instant is committed to Hudi(indicates that the index payloads are also committed).

On job restart or task failover, there is use case that the checkpoint succeeds on Flink while the instant is not committed to Hudi, for DT metadata, the pipeline will recommit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull into a different section.

![The RLI Access Pattern](./rli-access-pattern.png)

### The Shuffle of RLI Payloads
In `StreamWrite` operator, the index items are inferred and sent to `IndexWrite` operator in streaming style, the index records are shuffled by `hash(record_key) % num_rli_shards`(the same hashing algorithm of the MDT `RLI index` partitioner),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to reuse as much common configs as possible. strictly no flink specific table conigs.

this is critical to avoid `N*M` files to write to MDT partition(`N` is the RLI partition buckets number, `M` is the data table buckets involved in the current write).

How do we ensure the data record and index record always belong to one commit/checkpoint: the barrier is flowing together with the records in Flink, see [how-does-state-snapshotting-work](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work),
when the `StreamWrite` operator received a record, it emits its corresponding index record in one `#processElement` call, so we can always keep the bindings of these two, in other words, no barrier would be amidst of the two.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add example from GH discussion.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of places, where I could not understand the root issue

#### The Cache of RLI Access
We need fast access in streaming to have high throughput(ideally per record access should be < 10ms), thus a general hotspot cache is needed. We will build a in-memory LRU cache by the active upsert records keys, the cache items will be force evictted by a configured memory threshold.

We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this is so that if we see same record key again within the commit, it gets routed to the same location on data table?

The actual updates to RLI/SI will happen away/outside of the cache you are discussing?

On job restart or task failover, there is use case that the checkpoint succeeds on Flink while the instant is not committed to Hudi, for DT metadata, the pipeline will recommit
the instant with the recovered table metadata, because the `BucketAssigner` operator is the upstream operator of `StreamWrite` operator, there is time gap for these inflight instants to recommit,
and we do not want to block the processing of `BucketAssigner`(to wait for the inflight instants to recommit successfully). The suggested solution is
to include these special inflight instants on RLI access queries, basically, we need to support reading inflight instants on MDT.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be adding any "special" new types of instants for this. Once again, please write this out in detail since its not clear what these special instants are, why do they exist when the DT metadata was not written. I have a lot of questions like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add more details about the two-phase commit design of the checkpoint-instant, and such special instants as

These inflight instants are the ones whose corresponding checkpoint has succeeded, inflight instants without successful checkpoint are not included.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rfc Request for comments size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants