Skip to content

[Python] Expand SDF by default in PortableRunner#37965

Open
Eliaaazzz wants to merge 2 commits intoapache:masterfrom
Eliaaazzz:users/qingfengrumeng/issue-24422-expand-sdf-default-portable-runner
Open

[Python] Expand SDF by default in PortableRunner#37965
Eliaaazzz wants to merge 2 commits intoapache:masterfrom
Eliaaazzz:users/qingfengrumeng/issue-24422-expand-sdf-default-portable-runner

Conversation

@Eliaaazzz
Copy link
Contributor

Fixes #24422.

Python iobase.Read uses a Splittable DoFn internally. PortableRunner's default batch pre-optimization path did not include translations.expand_sdf, so portable runners that do not support SDFs natively can receive a single SDF ParDo for reads. In Spark, that means Python reads such as ReadFromParquet may execute on a single partition without parallelization.

This change:

  • adds translations.expand_sdf to PortableRunner's default optimization phases
  • allows --experiments=pre_optimize=expand_sdf as an explicit custom phase
  • adds optimizer coverage for:
    • default SDF expansion
    • explicit pre_optimize=expand_sdf
    • beam.io.Read(BoundedSource) expansion, including the expected RESHUFFLE

The bounded read test covers the issue scenario behind ReadFromParquet and similar Python Read transforms by verifying that the optimized pipeline contains the SDF component stages:

  • PAIR_WITH_RESTRICTION
  • SPLIT_AND_SIZE_RESTRICTIONS
  • PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS
  • RESHUFFLE

Local verification:

  • git diff --check
  • python -m compileall sdks/python/apache_beam/runners/portability/portable_runner.py sdks/python/apache_beam/runners/portability/portable_runner_test.py

Enable translations.expand_sdf in PortableRunner's default pre-optimization path so Python Read transforms are expanded for portable runners like Spark. Also add optimizer coverage for default SDF expansion, explicit pre_optimize=expand_sdf, and bounded Read expansion.\n\nRefs apache#24422.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical issue in Apache Beam's PortableRunner where Python iobase.Read operations, which internally utilize Splittable DoFns (SDFs), were not being properly parallelized on certain portable runners like Spark. The fix involves integrating the translations.expand_sdf optimization phase into the PortableRunner's default pipeline optimization process. This ensures that SDFs are correctly expanded into their constituent stages, thereby enabling parallel execution for read transforms and preventing performance bottlenecks, particularly for operations like ReadFromParquet.

Highlights

  • Default SDF Expansion: The expand_sdf optimization phase was added to the PortableRunner's default pre-optimization path to ensure proper parallelization of iobase.Read operations.
  • Explicit SDF Expansion: Users can now explicitly enable SDF expansion using the --experiments=pre_optimize=expand_sdf flag.
  • Enhanced Test Coverage: New tests were introduced to verify the correct expansion of SDFs in both default and explicit optimization scenarios, including beam.io.Read(BoundedSource) with expected RESHUFFLE for parallelization.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Eliaaazzz Eliaaazzz force-pushed the users/qingfengrumeng/issue-24422-expand-sdf-default-portable-runner branch from 60a831e to 169645d Compare March 26, 2026 14:26
@github-actions
Copy link
Contributor

Assigning reviewers:

R: @damccorm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: No parallelization for ReadFromParquet (or any Python Read transforms) in Spark RDD Runner

1 participant