Skip to content

Commit

Permalink
[BEAM-11695] Combiner packing in Dataflow (#13763)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yifan Mai committed Jan 28, 2021
1 parent a946429 commit 091cd88
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 32 deletions.
87 changes: 56 additions & 31 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Expand Up @@ -462,6 +462,62 @@ def run_pipeline(self, pipeline, options):

self._maybe_add_unified_worker_missing_options(options)

from apache_beam.transforms import environments
if options.view_as(SetupOptions).prebuild_sdk_container_engine:
# if prebuild_sdk_container_engine is specified we will build a new sdk
# container image with dependencies pre-installed and use that image,
# instead of using the inferred default container image.
self._default_environment = (
environments.DockerEnvironment.from_options(options))
options.view_as(WorkerOptions).worker_harness_container_image = (
self._default_environment.container_image)
else:
self._default_environment = (
environments.DockerEnvironment.from_container_image(
apiclient.get_container_image_from_options(options),
artifacts=environments.python_sdk_dependencies(options)))

# Optimize the pipeline if it not streaming and optimizations are enabled
# in options.
pre_optimize = options.view_as(DebugOptions).lookup_experiment(
'pre_optimize', 'default').lower()
if (not options.view_as(StandardOptions).streaming and
pre_optimize != 'none' and pre_optimize != 'default'):
from apache_beam.runners.portability.fn_api_runner import translations
if pre_optimize == 'all':
phases = [
translations.eliminate_common_key_with_none,
translations.pack_combiners,
translations.sort_stages
]
else:
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
phases.append(getattr(translations, phase_name))
else:
raise ValueError(
'Unknown or inapplicable phase for pre_optimize: %s' %
phase_name)
phases.append(translations.sort_stages)

proto_pipeline_to_optimize = pipeline.to_runner_api(
default_environment=self._default_environment)
optimized_proto_pipeline = translations.optimize_pipeline(
proto_pipeline_to_optimize,
phases=phases,
known_runner_urns=frozenset(),
partial=True)
pipeline = beam.Pipeline.from_runner_api(
optimized_proto_pipeline, self, options)
# The translations.pack_combiners optimizer phase produces a CombinePerKey
# PTransform, but DataflowRunner treats CombinePerKey as a composite, so
# this override expands CombinePerKey into primitive PTransforms.
if translations.pack_combiners in phases:
from apache_beam.runners.dataflow.ptransform_overrides import CombinePerKeyPTransformOverride
pipeline.replace_all([CombinePerKeyPTransformOverride()])

use_fnapi = apiclient._use_fnapi(options)

if not use_fnapi:
Expand All @@ -488,21 +544,6 @@ def run_pipeline(self, pipeline, options):
if use_fnapi and not apiclient._use_unified_worker(options):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)

from apache_beam.transforms import environments
if options.view_as(SetupOptions).prebuild_sdk_container_engine:
# if prebuild_sdk_container_engine is specified we will build a new sdk
# container image with dependencies pre-installed and use that image,
# instead of using the inferred default container image.
self._default_environment = (
environments.DockerEnvironment.from_options(options))
options.view_as(WorkerOptions).worker_harness_container_image = (
self._default_environment.container_image)
else:
self._default_environment = (
environments.DockerEnvironment.from_container_image(
apiclient.get_container_image_from_options(options),
artifacts=environments.python_sdk_dependencies(options)))

# This has to be performed before pipeline proto is constructed to make sure
# that the changes are reflected in the portable job submission path.
self._adjust_pipeline_for_dataflow_v2(pipeline)
Expand All @@ -511,22 +552,6 @@ def run_pipeline(self, pipeline, options):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)

# Optimize the pipeline if it not streaming and the
# disable_optimize_pipeline_for_dataflow experiment has not been set.
if (not options.view_as(StandardOptions).streaming and
not options.view_as(DebugOptions).lookup_experiment(
"disable_optimize_pipeline_for_dataflow")):
from apache_beam.runners.portability.fn_api_runner import translations
self.proto_pipeline = translations.optimize_pipeline(
self.proto_pipeline,
phases=[
translations.eliminate_common_key_with_none,
translations.pack_combiners,
translations.sort_stages,
],
known_runner_urns=frozenset(),
partial=True)

if use_fnapi:
self._check_for_unsupported_fnapi_features(self.proto_pipeline)
else:
Expand Down
33 changes: 33 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Expand Up @@ -849,6 +849,39 @@ def test_group_into_batches_translation_non_unified_worker(self):
self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties)
self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties)

def _test_pack_combiners(self, pipeline_options, expect_packed):
runner = DataflowRunner()

with beam.Pipeline(runner=runner, options=pipeline_options) as p:
data = p | beam.Create([10, 20, 30])
_ = data | 'PackableMin' >> beam.CombineGlobally(min)
_ = data | 'PackableMax' >> beam.CombineGlobally(max)

unpacked_minimum_step_name = 'PackableMin/CombinePerKey/Combine'
unpacked_maximum_step_name = 'PackableMax/CombinePerKey/Combine'
packed_step_name = (
'Packed[PackableMin/CombinePerKey, PackableMax/CombinePerKey]/Pack/'
'CombinePerKey(SingleInputTupleCombineFn)/Combine')
job_dict = json.loads(str(runner.job))
step_names = set(s[u'properties'][u'user_name'] for s in job_dict[u'steps'])
if expect_packed:
self.assertNotIn(unpacked_minimum_step_name, step_names)
self.assertNotIn(unpacked_maximum_step_name, step_names)
self.assertIn(packed_step_name, step_names)
else:
self.assertIn(unpacked_minimum_step_name, step_names)
self.assertIn(unpacked_maximum_step_name, step_names)
self.assertNotIn(packed_step_name, step_names)

def test_pack_combiners_disabled_by_default(self):
self._test_pack_combiners(
PipelineOptions(self.default_properties), expect_packed=False)

def test_pack_combiners_enabled_by_experiment(self):
self.default_properties.append('--experiment=pre_optimize=all')
self._test_pack_combiners(
PipelineOptions(self.default_properties), expect_packed=True)


class CustomMergingWindowFn(window.WindowFn):
def assign(self, assign_context):
Expand Down
27 changes: 27 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
Expand Up @@ -125,6 +125,33 @@ def expand(self, pbegin):
'Read'))


class CombinePerKeyPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``CombinePerKey``.
The translations.pack_combiners optimizer phase produces a CombinePerKey
PTransform, but DataflowRunner treats CombinePerKey as a composite, so
this override expands CombinePerKey into primitive PTransforms.
"""
def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import CombinePerKey

if isinstance(applied_ptransform.transform, CombinePerKey):
self.transform = applied_ptransform.transform
return True
return False

def get_replacement_transform(self, ptransform):
from apache_beam.transforms import ptransform_fn

@ptransform_fn
def ExpandCombinePerKey(pcoll):
return pcoll | ptransform

return ExpandCombinePerKey()


class CombineValuesPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``CombineValues``.
Expand Down
Expand Up @@ -1010,7 +1010,8 @@ def make_pack_name(names):
accumulator_coder_id=tuple_accumulator_coder_id).
SerializeToString()),
inputs={'in': input_pcoll_id},
outputs={'out': pack_pcoll_id},
# 'None' single output key follows convention for CombinePerKey.
outputs={'None': pack_pcoll_id},
environment_id=fused_stage.environment)
pack_stage = Stage(
pack_stage_name + '/Pack', [pack_transform],
Expand Down

0 comments on commit 091cd88

Please sign in to comment.