feat: add dynamic bucket assignment for primary-key tables#254
Conversation
5f087fe to
895fcad
Compare
Implement bucket assigner framework with four strategies: fixed, dynamic, cross-partition, and constant. Add BinaryRow hash/comparison support for bucket key computation. Update table read/write/scan/commit pipeline to support dynamic bucket mode. Include DataFusion integration tests for dynamic bucket and cross-partition table scenarios.
895fcad to
9803017
Compare
jerry-024
left a comment
There was a problem hiding this comment.
Nice work on the dynamic bucket framework. Posted inline comments on six correctness items I think should be addressed before merge. Happy to clarify any of them. Summary of the must-fix list:
schema.bucket_keys()still returns full primary keys while read-side pruning usestrimmedPrimaryKeys()— silent data loss on partitioned fixed-bucket PK tables.has_primary_keyscomputed fromtrimmed_primary_keys()misroutes PK == partition tables to the append writer.- Cross-partition detection is too narrow; partial PK/partition overlap is missed and old-partition DELETE is never emitted.
- DV + Deduplicate read path now hits
KeyValueFileReaderwhich rejects deletion files — hard error regression. _VALUE_KINDcolumn is added only whenhas_deletes=true, so a single writer can see two different Arrow schemas across batches.- New
IndexManifestwrite schema is missing_GLOBAL_INDEX, which the PR's new merge logic will silently drop on round-trip.
Also suggesting regression tests for each.
| .filter(|pk| !partition_set.contains(pk.as_str())) | ||
| .cloned() | ||
| .collect() | ||
| } |
There was a problem hiding this comment.
Must-fix (P1, silent data loss). The real target is TableSchema::bucket_keys() at line 145 of this file (outside this hunk, so leaving the comment here).
bucket_keys() still returns self.primary_keys.clone() when no explicit bucket key is set, but the read path added in this PR falls back to trimmed_primary_keys() in two places:
crates/paimon/src/table/read_builder.rs:57-63(bucket_predicate)crates/paimon/src/table/table_scan.rs:511-517(bucket_key_fields)
On a fixed-bucket partitioned PK table (PARTITIONED BY (pt) + PK (pt, id), default bucket key), writes compute hash([pt, id]) % N, but a query WHERE pt='a' AND id=5 computes the target bucket via hash([id]) % N. The target set at table_scan.rs:181-190 then prunes the real bucket and the query silently returns zero rows.
Java Paimon's TableSchema.bucketKeys() uses trimmedPrimaryKeys() in this fallback. Suggest changing line 145 to:
if !self.primary_keys.is_empty() {
return self.trimmed_primary_keys();
}(Note: may return empty when PK == partition_keys; FixedBucketAssigner construction in table_write.rs already falls through to the constant-bucket-0 branch when bucket_key_indices.is_empty(), so that case stays covered.)
Please add a regression test: partitioned PK fixed-bucket table, insert data, query with both partition and PK columns in the predicate, assert row count > 0.
| .to_string(), | ||
| }); | ||
| } | ||
| let has_primary_keys = !schema.trimmed_primary_keys().is_empty(); |
There was a problem hiding this comment.
Must-fix (P1): has_primary_keys misjudgment for PK ⊆ partition_keys.
Switching the definition to trimmed_primary_keys() causes tables where PK == partition_keys (a legal schema in Paimon — partition_keys ⊆ primary_keys is the only constraint, and equality is allowed) to be treated as append-only. Because primary_key_indices at line 192 is also derived from trimmed_primary_keys(), create_writer at line 521 sees self.primary_key_indices.is_empty() and picks create_append_writer. PK semantics (dedup, merge) are silently dropped.
Suggest separating the two concepts:
let has_primary_keys = !schema.primary_keys().is_empty();
// keep trimmed_primary_keys() only for building KV key indices belowPlease add a regression test for PK == partition_keys writes: insert duplicates within the same partition, assert the table dedupes.
|
|
||
| let is_cross_partition = is_dynamic_bucket | ||
| && !schema.partition_keys().is_empty() | ||
| && schema.trimmed_primary_keys().len() == schema.primary_keys().len(); |
There was a problem hiding this comment.
Must-fix (P1, stale data): cross-partition detection is too narrow.
trimmed_primary_keys().len() == primary_keys().len() is equivalent to "PK and partition columns do not overlap at all". Java Paimon triggers cross-partition update whenever !primaryKeys.containsAll(partitionKeys), i.e. any partition column outside the PK.
Counter-example that this branch misses: PARTITIONED BY (pt1, pt2) + PRIMARY KEY (pt1, id). Here trimmed=[id] (len 1), primary_keys=[pt1, id] (len 2), so 1 != 2 → routed to DynamicBucketAssigner. When a record moves across pt2 values, the PK lookup in the new partition does not see the old row, no old-partition DELETE is emitted, and the old partition keeps stale data forever.
Suggest:
let pk_set: HashSet<_> = schema.primary_keys().iter().collect();
let partition_not_in_pk = schema.partition_keys().iter().any(|p| !pk_set.contains(p));
let is_cross_partition = is_dynamic_bucket && partition_not_in_pk;Please add a regression test: PARTITIONED BY (pt1, pt2) + PK (pt1, id), write same id under different pt2, assert old partition has a DELETE record (or assert post-commit row count is 1).
| .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow) | ||
| core_options | ||
| .merge_engine() | ||
| .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow) |
There was a problem hiding this comment.
Must-fix (P1, read regression): DV guard removed.
Previous skip_level_zero covered both deletion_vectors_enabled and FirstRow. After this PR, DV mode no longer skips level-0, so splits with level-0 files stay visible.
table_read.rs:3832-3838 routes PK + Deduplicate + any level-0 split into read_kv() → KeyValueFileReader, and kv_file_reader.rs:250-259 explicitly errors when data_deletion_files contains any Some:
Err(Error::UnexpectedError {
message: "KeyValueFileReader does not support deletion vectors".to_string(),
...
})So DV + Deduplicate reads now hard-fail once any level-0 file exists, not just return stale rows. Options:
- Restore the DV guard here (and in
table_read.rs): keep DV tables on theDataFileReader/ raw path. - Or teach
KeyValueFileReaderto apply DV (pass deletion files throughKeyValueReadConfig).
Option 1 is the smaller change if DV + Deduplicate is not yet a design goal. Either way, please add a DV + Deduplicate + level-0 read test.
| batch.clone() | ||
| let sub_batch = Self::take_rows(batch, &row_indices)?; | ||
| let sub_batch = if has_deletes { | ||
| Self::add_value_kind_column(&sub_batch, 0)? |
There was a problem hiding this comment.
Must-fix (P1): _VALUE_KIND column is added only when has_deletes=true, causing intra-writer schema drift.
KeyValueFileWriter buffers batches and later calls concat_batches, which requires identical Arrow schemas across all accumulated batches. This code path only appends _VALUE_KIND when the current invocation produced cross-partition deletes. If the same writer instance first receives a batch with no deletes (schema without _VALUE_KIND), then later a batch containing deletes (schema with _VALUE_KIND), the later concat fails with a schema mismatch error.
Suggest one of:
- Always add
_VALUE_KIND=0to inserts in cross-partition-capable writers, keeping the writer schema stable across batches. - Or have
KeyValueFileWriterown the_VALUE_KINDcolumn (inject it on ingestion), soTableWritedoes not need to special-case this.
Please add a regression test: single TableWrite on a cross-partition table, first write_arrow_batch with no migrated keys, then second write_arrow_batch that migrates keys, both followed by a successful prepare_commit.
| {"name": "_INDEX_TYPE", "type": "string"}, | ||
| {"name": "_FILE_NAME", "type": "string"}, | ||
| {"name": "_FILE_SIZE", "type": "long"}, | ||
| {"name": "_ROW_COUNT", "type": "long"}, |
There was a problem hiding this comment.
Must-fix (P1): Avro schema is missing _GLOBAL_INDEX.
IndexFileMeta has a global_index_meta: Option<GlobalIndexMeta> field (with skip_serializing_if = Option::is_none), but this schema declares no _GLOBAL_INDEX field.
For HASH-only entries this is fine (the field is always None). The problem is the new merge path in table_commit.rs::write_index_manifest: on APPEND/OVERWRITE it reads the previous index manifest, retains entries, and rewrites the merged list through IndexManifest::write with this schema. Any pre-existing entry carrying a non-null global_index_meta (e.g. DV, global index) either fails to re-serialize (depending on writer strictness) or silently drops the field on round-trip, corrupting the manifest.
Suggest adding the field as optional, mirroring Java's IndexFileMeta.schema():
{
"name": "_GLOBAL_INDEX",
"type": ["null", { ...GlobalIndexMeta record... }],
"default": null
}Nit: the _FILE_SIZE / _ROW_COUNT declared as long here while the Rust struct fields are i32 does work — the test_single_object_serde test at line 169 already exercises this round-trip — but it's worth a short comment explaining that serde_avro_fast coerces across widths, otherwise it reads as a bug.
|
Thanks @jerry-024 for the review, comments addressed. |
Purpose
Implement bucket assigner framework with four strategies: fixed, dynamic, cross-partition, and constant. Add BinaryRow hash/comparison support for bucket key computation. Update table read/write/scan/commit pipeline to support dynamic bucket mode. Include DataFusion integration tests for dynamic bucket and cross-partition table scenarios.
Brief change log
Tests
API and Format
Documentation