Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ def _optimize_pipeline(
# Eventually remove the 'lift_combiners' phase from 'default'.
translations.pack_combiners,
translations.lift_combiners,
# Expand SDF so that portable runners that don't support SDFs
# natively (e.g. Spark) can still parallelize Read transforms.
# See https://github.com/apache/beam/issues/24422
translations.expand_sdf,
translations.sort_stages
]
partial = True
Expand All @@ -332,7 +336,7 @@ def _optimize_pipeline(
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('pack_combiners', 'lift_combiners'):
if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'):
phases.append(getattr(translations, phase_name))
else:
raise ValueError(
Expand Down
149 changes: 149 additions & 0 deletions sdks/python/apache_beam/runners/portability/portable_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,155 @@ def create_options(self):
return options


class PortableRunnerOptimizationTest(unittest.TestCase):
"""Tests for PortableRunner._optimize_pipeline."""
def test_default_optimize_expands_sdf(self):
"""Verify that expand_sdf is applied in the default pre_optimize setting.

See https://github.com/apache/beam/issues/24422.
"""
from apache_beam.io import restriction_trackers
from apache_beam.portability import common_urns

class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
def initial_restriction(self, element):
return restriction_trackers.OffsetRange(0, len(element))

def create_tracker(self, restriction):
return restriction_trackers.OffsetRestrictionTracker(restriction)

def restriction_size(self, element, restriction):
return restriction.size()

class ExpandingStringsDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
ExpandStringsProvider())):
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
yield element[cur]
cur += 1

p = beam.Pipeline()
_ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn()))
proto = p.to_runner_api()

# Default options (no pre_optimize experiment set).
options = PipelineOptions()
optimized = PortableRunner._optimize_pipeline(proto, options)

transform_urns = set()
for t in optimized.components.transforms.values():
if t.spec.urn:
transform_urns.add(t.spec.urn)

self.assertIn(
common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns)
self.assertIn(
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
transform_urns)
self.assertIn(
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
transform_urns)

def test_custom_optimize_expand_sdf(self):
"""Verify that expand_sdf can be requested explicitly."""
from apache_beam.io import restriction_trackers
from apache_beam.portability import common_urns

class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
def initial_restriction(self, element):
return restriction_trackers.OffsetRange(0, len(element))

def create_tracker(self, restriction):
return restriction_trackers.OffsetRestrictionTracker(restriction)

def restriction_size(self, element, restriction):
return restriction.size()

class ExpandingStringsDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
ExpandStringsProvider())):
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
yield element[cur]
cur += 1

p = beam.Pipeline()
_ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn()))
proto = p.to_runner_api()

options = PipelineOptions(['--experiments=pre_optimize=expand_sdf'])
optimized = PortableRunner._optimize_pipeline(proto, options)

transform_urns = set()
for t in optimized.components.transforms.values():
if t.spec.urn:
transform_urns.add(t.spec.urn)

self.assertIn(
common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns)
self.assertIn(
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
transform_urns)
self.assertIn(
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
transform_urns)

def test_default_optimize_expands_bounded_read(self):
"""Verify that iobase.Read(BoundedSource) is expanded by default.

This is the end-to-end scenario from
https://github.com/apache/beam/issues/24422: Read transforms like
ReadFromParquet use SDFs internally. Without expand_sdf in the default
optimization, these arrive at the Spark job server as a single ParDo,
executing on one partition with no parallelization.
"""
from apache_beam.io import iobase
from apache_beam.portability import common_urns

class _FakeBoundedSource(iobase.BoundedSource):
def get_range_tracker(self, start_position, stop_position):
return None

def read(self, range_tracker):
return iter([])

def estimate_size(self):
return 0

p = beam.Pipeline()
_ = p | beam.io.Read(_FakeBoundedSource())
proto = p.to_runner_api()

# Default options (no pre_optimize experiment set).
options = PipelineOptions()
optimized = PortableRunner._optimize_pipeline(proto, options)

transform_urns = set()
for t in optimized.components.transforms.values():
if t.spec.urn:
transform_urns.add(t.spec.urn)

# The SDFBoundedSourceReader DoFn should have been expanded into
# SDF component stages.
self.assertIn(
common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns)
self.assertIn(
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
transform_urns)
self.assertIn(
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
transform_urns)
# Reshuffle should be present to enable parallelization.
self.assertIn(common_urns.composites.RESHUFFLE.urn, transform_urns)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading