-
Notifications
You must be signed in to change notification settings - Fork 24
Implement partitioning in Upsert / Lookup #220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Would appreciate your reviews here. I will address the rebase conflict in follow up commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request implements partitioning support for Upsert and Lookup operations in the Fluss Rust client, addressing issue #178. The changes enable the client to handle partitioned tables, bringing it to feature parity with the Java implementation.
Changes:
- Introduced
PhysicalTablePathto represent table paths with optional partition information, replacing directTablePathusage in write/lookup paths - Modified metadata structures to track partition information including partition IDs and names
- Implemented
PartitionGetterto extract partition information from rows based on partition keys - Updated Upsert and Lookup operations to handle partitioned tables by routing operations to the correct partition
- Added comprehensive integration tests and example code for partitioned KV tables
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/metadata/table.rs | Changed partition_keys from Vec to Arc<[String]> for better sharing; added PhysicalTablePath support; fixed typo in panic message |
| crates/fluss/src/cluster/cluster.rs | Added partition metadata tracking and processing; updated cluster to store partition IDs and locations |
| crates/fluss/src/client/write/*.rs | Updated write path to use PhysicalTablePath; added partition ID handling in accumulator and sender |
| crates/fluss/src/client/table/upsert.rs | Added partition extraction using PartitionGetter; updated to create correct PhysicalTablePath |
| crates/fluss/src/client/table/lookup.rs | Added partition-aware lookup; handles non-existent partitions gracefully |
| crates/fluss/src/client/table/partition_getter.rs | New module for extracting partition values from rows |
| crates/fluss/tests/integration/kv_table.rs | Added comprehensive integration test for partitioned tables |
| crates/examples/src/example_partitioned_kv_table.rs | Added example demonstrating partitioned table operations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
52a97b1 to
8a3aa59
Compare
|
Rebased and all copilot comments have been addressed |
fresh-borzoni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leekeiabstraction Thanks for the PR! Left comments. PTAL
| // TODO: we shouldn't add unready partitions to unknownLeaderTables, | ||
| // because it cases PartitionNotExistException later | ||
| unknown_leader_tables.insert(physical_table_path); | ||
| return Ok(ReadyCheckResult::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return? shouldn't we continue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to Java flow: https://github.com/apache/fluss/blob/96faf08cbd84d1d43a2ef610c189829fa9c34e76/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java#L463-L469
Returning is fine because sender.run_once() resolves the unknown table leader and on subsequent run, goes through this flow again.
|
Seems like we're running into timeouts on CI runs, this timeout doesn't happen on my machine for some reason. Debugging currently. |
# Conflicts: # crates/fluss/src/client/connection.rs # crates/fluss/src/client/write/accumulator.rs
3d581d3 to
f4219a9
Compare
Purpose
Linked issue: close #178
Brief change log