Skip to content

[python] Pre-repartition Ray writes by (partition, bucket) for fixed-bucket tables#7813

Open
TheR1sing3un wants to merge 3 commits into
apache:masterfrom
TheR1sing3un:py-ray-write-fixed-bucket-shuffle
Open

[python] Pre-repartition Ray writes by (partition, bucket) for fixed-bucket tables#7813
TheR1sing3un wants to merge 3 commits into
apache:masterfrom
TheR1sing3un:py-ray-write-fixed-bucket-shuffle

Conversation

@TheR1sing3un
Copy link
Copy Markdown
Member

@TheR1sing3un TheR1sing3un commented May 11, 2026

Purpose

When write_paimon is given a Ray Dataset, Ray's default round-robin
block distribution scatters rows that share the same (partition, bucket) across many Ray tasks. Each task opens its own writer and emits
its own data file, so the write produces
partitions × buckets × ray_tasks files instead of the
partitions × buckets the writer would naturally produce.

Spark and Flink already cluster rows by (partition, bucket) before
writing — see PaimonSparkWriter.repartitionByPartitionsAndBucket and
the RowAssignerChannelComputer / RowWithBucketChannelComputer chain.
This PR brings the same pre-clustering to the Ray path.

Linked Issue

N/A

Effect

Two new keyword-only parameters on write_paimon:

  • shuffle: bool = False — for HASH_FIXED tables, group rows by
    (partition_keys..., bucket) via Ray's groupby / map_groups so
    each (partition, bucket) lands in one Ray task. Bucket assignment
    is computed with FixedBucketRowKeyExtractor, the same extractor the
    writer uses, so the shuffle-time bucket is byte-equivalent to the
    writer's. Non-HASH_FIXED tables log a warning and write as before.

  • num_blocks: Optional[int] = None — optional Ray output block
    count. With shuffle=True it is a parallelism hint for the groupby;
    with shuffle=False it triggers a plain Ray block rebalance.

Defaults preserve the previous behaviour, so no existing caller is
affected.

Tests

  • pypaimon/tests/test_ray_shuffle_helper.py — 8 unit tests covering
    the bucket-key UDF (column type, empty input, multi-chunk combine)
    and every no-op / soft-fallback branch.

  • pypaimon/tests/ray_repartition_test.py — 7 end-to-end tests:

    • default shuffle=False roundtrip equality
    • shuffle=True roundtrip on a HASH_FIXED PK table
    • shuffle=True on a partitioned HASH_FIXED PK table (post-groupby
      schema integrity check)
    • file-count reduction on a multi-block HASH_FIXED write
    • soft fallback for BUCKET_UNAWARE + warning emitted
    • num_blocks=0 raises ValueError
    • num_blocks-only plain block rebalance

Existing Ray tests (ray_integration_test.py, ray_data_test.py)
remain green.

API & Format Impact

  • Public API: adds two new keyword-only parameters with safe defaults
    to pypaimon.ray.write_paimon. No signature break.
  • File format: unchanged. The transient __paimon_bucket__ column is
    stripped before the sink sees the dataset, so on-disk layout is
    unaffected.

Documentation

  • docs/content/pypaimon/ray-data.md — new section explaining the
    small-file problem and the shuffle / num_blocks options.

@TheR1sing3un TheR1sing3un force-pushed the py-ray-write-fixed-bucket-shuffle branch from 0ed753e to 9d6bc0f Compare May 11, 2026 12:23
@TheR1sing3un TheR1sing3un reopened this May 11, 2026
Without pre-clustering, Ray's default round-robin block distribution
scatters rows that share the same (partition, bucket) across many Ray
tasks. Each task opens its own writer, producing
partitions x buckets x ray_tasks files instead of the
partitions x buckets the writer would naturally produce.

This commit adds a helper module that groups rows by
(partition_keys..., bucket) using Ray's groupby/map_groups so all rows
for one (partition, bucket) land in one Ray task. Bucket assignment is
computed via FixedBucketRowKeyExtractor (the same extractor the writer
uses) so the shuffle bucket is byte-identical to the writer's.

The helper is opt-in (defaults to no-op) and the next commit wires it
through write_paimon. Non-HASH_FIXED tables soft-fall-back with a
warning instead of raising.
Adds two new keyword-only parameters to write_paimon():

  * shuffle: bool = False — pre-cluster rows by (partition_keys..., bucket)
    for HASH_FIXED tables, so each (partition, bucket) lands in one Ray
    task. Mirrors what PaimonSparkWriter.repartitionByPartitionsAndBucket
    does on the Spark side. Non-HASH_FIXED tables log a warning and
    fall back to the original no-shuffle write.

  * override_num_blocks: Optional[int] = None — Ray output block count
    (mirrors the same-named parameter on read_paimon). With shuffle=True
    it is a parallelism hint for the groupby shuffle; with shuffle=False
    it triggers a plain Ray block rebalance.

End-to-end coverage in pypaimon/tests/ray_repartition_test.py:

  * roundtrip equality for both shuffle=False and shuffle=True
  * file-count reduction on a HASH_FIXED multi-block write
  * soft fallback for BUCKET_UNAWARE + warning emitted
  * sink-visible schema does not carry the transient bucket column
  * override_num_blocks alone produces a plain rebalance

Defaults preserve the previous round-robin behaviour so no existing
caller is affected.
@TheR1sing3un TheR1sing3un force-pushed the py-ray-write-fixed-bucket-shuffle branch from 9d6bc0f to 553d9c0 Compare May 11, 2026 14:58
Ray's groupby pipeline drops Arrow field-level not-null annotations,
so shuffle-written PK data files lose the not-null on PK columns.
When read_paimon reads them back via RayDatasource, from_batches
rejects the nullability mismatch (batch: int32 vs schema: int32 not
null). This is a pre-existing read-path issue, not caused by shuffle.

Switch the shuffle roundtrip tests to read via the direct table API
(ReadBuilder -> to_arrow) instead of read_paimon, since these tests
verify write correctness, not the Ray read path.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant