Skip to content

Commit

Permalink
[BEAM-11715] Partial revert of "Combiner packing in Dataflow" (#13763) (
Browse files Browse the repository at this point in the history
#13884)

* Revert "[BEAM-11695] Combiner packing in Dataflow (#13763)"

This reverts commit 3b51aaa.

* Make pack_combiners optional

* Don't revert translations.py

* Add missing ValidatesRunner
  • Loading branch information
Yifan Mai committed Feb 5, 2021
1 parent 0f1b1e1 commit 3903998
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 83 deletions.
103 changes: 47 additions & 56 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Expand Up @@ -462,62 +462,6 @@ def run_pipeline(self, pipeline, options):

self._maybe_add_unified_worker_missing_options(options)

from apache_beam.transforms import environments
if options.view_as(SetupOptions).prebuild_sdk_container_engine:
# if prebuild_sdk_container_engine is specified we will build a new sdk
# container image with dependencies pre-installed and use that image,
# instead of using the inferred default container image.
self._default_environment = (
environments.DockerEnvironment.from_options(options))
options.view_as(WorkerOptions).worker_harness_container_image = (
self._default_environment.container_image)
else:
self._default_environment = (
environments.DockerEnvironment.from_container_image(
apiclient.get_container_image_from_options(options),
artifacts=environments.python_sdk_dependencies(options)))

# Optimize the pipeline if it not streaming and optimizations are enabled
# in options.
pre_optimize = options.view_as(DebugOptions).lookup_experiment(
'pre_optimize', 'default').lower()
if (not options.view_as(StandardOptions).streaming and
pre_optimize != 'none' and pre_optimize != 'default'):
from apache_beam.runners.portability.fn_api_runner import translations
if pre_optimize == 'all':
phases = [
translations.eliminate_common_key_with_none,
translations.pack_combiners,
translations.sort_stages
]
else:
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
phases.append(getattr(translations, phase_name))
else:
raise ValueError(
'Unknown or inapplicable phase for pre_optimize: %s' %
phase_name)
phases.append(translations.sort_stages)

proto_pipeline_to_optimize = pipeline.to_runner_api(
default_environment=self._default_environment)
optimized_proto_pipeline = translations.optimize_pipeline(
proto_pipeline_to_optimize,
phases=phases,
known_runner_urns=frozenset(),
partial=True)
pipeline = beam.Pipeline.from_runner_api(
optimized_proto_pipeline, self, options)
# The translations.pack_combiners optimizer phase produces a CombinePerKey
# PTransform, but DataflowRunner treats CombinePerKey as a composite, so
# this override expands CombinePerKey into primitive PTransforms.
if translations.pack_combiners in phases:
from apache_beam.runners.dataflow.ptransform_overrides import CombinePerKeyPTransformOverride
pipeline.replace_all([CombinePerKeyPTransformOverride()])

use_fnapi = apiclient._use_fnapi(options)

if not use_fnapi:
Expand All @@ -544,6 +488,21 @@ def run_pipeline(self, pipeline, options):
if use_fnapi and not apiclient._use_unified_worker(options):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)

from apache_beam.transforms import environments
if options.view_as(SetupOptions).prebuild_sdk_container_engine:
# if prebuild_sdk_container_engine is specified we will build a new sdk
# container image with dependencies pre-installed and use that image,
# instead of using the inferred default container image.
self._default_environment = (
environments.DockerEnvironment.from_options(options))
options.view_as(WorkerOptions).worker_harness_container_image = (
self._default_environment.container_image)
else:
self._default_environment = (
environments.DockerEnvironment.from_container_image(
apiclient.get_container_image_from_options(options),
artifacts=environments.python_sdk_dependencies(options)))

# This has to be performed before pipeline proto is constructed to make sure
# that the changes are reflected in the portable job submission path.
self._adjust_pipeline_for_dataflow_v2(pipeline)
Expand All @@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)

# Optimize the pipeline if it not streaming and the pre_optimize
# experiment is set.
pre_optimize = options.view_as(DebugOptions).lookup_experiment(
'pre_optimize', 'default').lower()
from apache_beam.runners.portability.fn_api_runner import translations
if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
pre_optimize == 'default'):
phases = []
elif pre_optimize == 'all':
phases = [
translations.eliminate_common_key_with_none,
# TODO(BEAM-11694): Enable translations.pack_combiners
# translations.pack_combiners,
translations.sort_stages
]
else:
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
phases.append(getattr(translations, phase_name))
else:
raise ValueError(
'Unknown or inapplicable phase for pre_optimize: %s' % phase_name)
phases.append(translations.sort_stages)

self.proto_pipeline = translations.optimize_pipeline(
self.proto_pipeline,
phases=phases,
known_runner_urns=frozenset(),
partial=True)

if use_fnapi:
self._check_for_unsupported_fnapi_features(self.proto_pipeline)
else:
Expand Down
Expand Up @@ -877,6 +877,7 @@ def test_pack_combiners_disabled_by_default(self):
self._test_pack_combiners(
PipelineOptions(self.default_properties), expect_packed=False)

@unittest.skip("BEAM-11694")
def test_pack_combiners_enabled_by_experiment(self):
self.default_properties.append('--experiment=pre_optimize=all')
self._test_pack_combiners(
Expand Down
27 changes: 0 additions & 27 deletions sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
Expand Up @@ -125,33 +125,6 @@ def expand(self, pbegin):
'Read'))


class CombinePerKeyPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``CombinePerKey``.
The translations.pack_combiners optimizer phase produces a CombinePerKey
PTransform, but DataflowRunner treats CombinePerKey as a composite, so
this override expands CombinePerKey into primitive PTransforms.
"""
def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import CombinePerKey

if isinstance(applied_ptransform.transform, CombinePerKey):
self.transform = applied_ptransform.transform
return True
return False

def get_replacement_transform(self, ptransform):
from apache_beam.transforms import ptransform_fn

@ptransform_fn
def ExpandCombinePerKey(pcoll):
return pcoll | ptransform

return ExpandCombinePerKey()


class CombineValuesPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``CombineValues``.
Expand Down
Expand Up @@ -259,6 +259,7 @@ def expand(self, pcoll):
| Create([('a', x) for x in vals])
| 'multiple-combines' >> MultipleCombines())

@attr('ValidatesRunner')
def test_run_packable_combine_globally(self):
class MultipleCombines(beam.PTransform):
def expand(self, pcoll):
Expand Down

0 comments on commit 3903998

Please sign in to comment.