diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 8b5ae0595053..7648a08165df 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1484,6 +1484,8 @@ message StandardEnvironments { PROCESS = 1 [(beam_urn) = "beam:env:process:v1"]; // A managed native process to run user code. EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"]; // An external non managed process to run user code. + + DEFAULT = 3 [(beam_urn) = "beam:env:default:v1"]; // Used as a stub when context is missing a runner-provided default environment. } } @@ -1841,3 +1843,13 @@ message ExecutableStagePayload { } } } + +message StandardResourceHints { + enum Enum { + // Describes hardware accelerators that are desired to have in the execution environment. + ACCELERATOR = 0 [(beam_urn) = "beam:resources:accelerator:v1"]; + // Describes desired minimal available RAM size in transform's execution environment. + // SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB). + MIN_RAM_BYTES = 1 [(beam_urn) = "beam:resources:min_ram_bytes:v1"]; + } +} diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7d3b13d800eb..7fee3631b630 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -445,6 +445,17 @@ def _add_argparse_args(cls, parser): action='store_true', help='Whether to enable streaming mode.') + parser.add_argument( + '--resource_hint', + dest='resource_hints', + action='append', + default=[], + help=( + 'Resource hint to set in the pipeline execution environment.' + 'Hints specified via this option override hints specified ' + 'at transform level. Interpretation of hints is defined by ' + 'Beam runners.')) + class CrossLanguageOptions(PipelineOptions): @classmethod diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index c37542792b1f..388b7c02dabb 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -91,6 +91,8 @@ from apache_beam.transforms import ParDo from apache_beam.transforms import ptransform from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.resources import merge_resource_hints +from apache_beam.transforms.resources import resource_hints_from_options from apache_beam.transforms.sideinputs import get_sideinput_index from apache_beam.typehints import TypeCheckError from apache_beam.typehints import typehints @@ -220,7 +222,8 @@ def __init__(self, runner=None, options=None, argv=None): # If a transform is applied and the full label is already in the set # then the transform will have to be cloned with a new label. self.applied_labels = set() # type: Set[str] - + # Hints supplied via pipeline options are considered the outermost hints. + self._root_transform().resource_hints = resource_hints_from_options(options) # Create a ComponentIdMap for assigning IDs to components. Ensures that any # components that receive an ID during pipeline construction (for example in # ExternalTransform), will receive the same component ID when generating the @@ -1043,6 +1046,12 @@ def __init__( self.outputs = {} # type: Dict[Union[str, int, None], pvalue.PValue] self.parts = [] # type: List[AppliedPTransform] self.environment_id = environment_id if environment_id else None # type: Optional[str] + # We may need to merge the hints with environment-provided hints here + # once environment is a first-class citizen in Beam graph and we have + # access to actual environment, not just an id. + self.resource_hints = dict( + transform.get_resource_hints()) if transform else { + } # type: Dict[str, bytes] if annotations is None and transform: @@ -1109,6 +1118,7 @@ def add_output( def add_part(self, part): # type: (AppliedPTransform) -> None assert isinstance(part, AppliedPTransform) + part._merge_outer_resource_hints() self.parts.append(part) def is_composite(self): @@ -1198,9 +1208,11 @@ def named_outputs(self): def to_runner_api(self, context): # type: (PipelineContext) -> beam_runner_api_pb2.PTransform - # External tranforms require more splicing than just setting the spec. + # External transforms require more splicing than just setting the spec. from apache_beam.transforms import external if isinstance(self.transform, external.ExternalTransform): + # TODO(BEAM-12082): Support resource hints in XLang transforms. + # In particular, make sure hints on composites are properly propagated. return self.transform.to_runner_api_transform(context, self.full_label) from apache_beam.portability.api import beam_runner_api_pb2 @@ -1229,7 +1241,8 @@ def transform_to_runner_api( transform_urn = transform_spec.urn if transform_spec else None if (not environment_id and (transform_urn not in Pipeline.runner_implemented_transforms())): - environment_id = context.default_environment_id() + environment_id = context.get_environment_id_for_resource_hints( + self.resource_hints) return beam_runner_api_pb2.PTransform( unique_name=self.full_label, @@ -1278,6 +1291,12 @@ def from_runner_api( ] transform = ptransform.PTransform.from_runner_api(proto, context) + if transform and proto.environment_id: + resource_hints = context.environments.get_by_id( + proto.environment_id).resource_hints() + if resource_hints: + transform._resource_hints = dict(resource_hints) + # Ordering is important here. # TODO(BEAM-9635): use key, value pairs instead of depending on tags with # index as a suffix. @@ -1292,7 +1311,7 @@ def from_runner_api( transform=transform, full_label=proto.unique_name, inputs=main_inputs, - environment_id=proto.environment_id, + environment_id=None, annotations=proto.annotations) if result.transform and result.transform.side_inputs: @@ -1303,7 +1322,7 @@ def from_runner_api( for transform_id in proto.subtransforms: part = context.transforms.get_by_id(transform_id) part.parent = result - result.parts.append(part) + result.add_part(part) result.outputs = { None if tag == 'None' else tag: context.pcollections.get_by_id(id) for tag, @@ -1321,6 +1340,15 @@ def from_runner_api( pc.tag = None if tag == 'None' else tag return result + def _merge_outer_resource_hints(self): + if (self.parent is not None and self.parent.resource_hints): + self.resource_hints = merge_resource_hints( + outer_hints=self.parent.resource_hints, + inner_hints=self.resource_hints) + if self.resource_hints: + for part in self.parts: + part._merge_outer_resource_hints() + class PTransformOverride(with_metaclass(abc.ABCMeta, object)): # type: ignore[misc] diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 4d1064cdf5ab..285825c69d46 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -34,6 +34,7 @@ from apache_beam.coders import BytesCoder from apache_beam.io import Read from apache_beam.metrics import Metrics +from apache_beam.options.pipeline_options import PortableOptions from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor @@ -54,6 +55,8 @@ from apache_beam.transforms import PTransform from apache_beam.transforms import WindowInto from apache_beam.transforms.display import DisplayDataItem +from apache_beam.transforms.environments import ProcessEnvironment +from apache_beam.transforms.resources import ResourceHint from apache_beam.transforms.userstate import BagStateSpec from apache_beam.transforms.window import SlidingWindows from apache_beam.transforms.window import TimestampedValue @@ -1047,6 +1050,206 @@ def display_data(self): # type: () -> dict double_value=1.1).SerializeToString()), ]) + def test_runner_api_roundtrip_preserves_resource_hints(self): + p = beam.Pipeline() + _ = ( + p | beam.Create([1, 2]) + | beam.Map(lambda x: x + 1).with_resource_hints(accelerator='gpu')) + + self.assertEqual( + p.transforms_stack[0].parts[1].transform.get_resource_hints(), + {common_urns.resource_hints.ACCELERATOR.urn: b'gpu'}) + + for _ in range(3): + # Verify that DEFAULT environments are recreated during multiple RunnerAPI + # translation and hints don't get lost. + p = Pipeline.from_runner_api(Pipeline.to_runner_api(p), None, None) + self.assertEqual( + p.transforms_stack[0].parts[1].transform.get_resource_hints(), + {common_urns.resource_hints.ACCELERATOR.urn: b'gpu'}) + + def test_hints_on_composite_transforms_are_propagated_to_subtransforms(self): + class FooHint(ResourceHint): + urn = 'foo_urn' + + class BarHint(ResourceHint): + urn = 'bar_urn' + + class BazHint(ResourceHint): + urn = 'baz_urn' + + class QuxHint(ResourceHint): + urn = 'qux_urn' + + class UseMaxValueHint(ResourceHint): + urn = 'use_max_value_urn' + + @classmethod + def get_merged_value( + cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes + return ResourceHint._use_max(outer_value, inner_value) + + ResourceHint.register_resource_hint('foo_hint', FooHint) + ResourceHint.register_resource_hint('bar_hint', BarHint) + ResourceHint.register_resource_hint('baz_hint', BazHint) + ResourceHint.register_resource_hint('qux_hint', QuxHint) + ResourceHint.register_resource_hint('use_max_value_hint', UseMaxValueHint) + + @beam.ptransform_fn + def SubTransform(pcoll): + return pcoll | beam.Map(lambda x: x + 1).with_resource_hints( + foo_hint='set_on_subtransform', use_max_value_hint='10') + + @beam.ptransform_fn + def CompositeTransform(pcoll): + return pcoll | beam.Map(lambda x: x * 2) | SubTransform() + + p = beam.Pipeline() + _ = ( + p | beam.Create([1, 2]) + | CompositeTransform().with_resource_hints( + foo_hint='should_be_overriden_by_subtransform', + bar_hint='set_on_composite', + baz_hint='set_on_composite', + use_max_value_hint='100')) + options = PortableOptions([ + '--resource_hint=baz_hint=should_be_overriden_by_composite', + '--resource_hint=qux_hint=set_via_options', + '--environment_type=PROCESS', + '--environment_option=process_command=foo', + '--sdk_location=container', + ]) + environment = ProcessEnvironment.from_options(options) + proto = Pipeline.to_runner_api(p, default_environment=environment) + + for t in proto.components.transforms.values(): + if "CompositeTransform/SubTransform/Map" in t.unique_name: + environment = proto.components.environments.get(t.environment_id) + self.assertEqual( + environment.resource_hints.get('foo_urn'), b'set_on_subtransform') + self.assertEqual( + environment.resource_hints.get('bar_urn'), b'set_on_composite') + self.assertEqual( + environment.resource_hints.get('baz_urn'), b'set_on_composite') + self.assertEqual( + environment.resource_hints.get('qux_urn'), b'set_via_options') + self.assertEqual( + environment.resource_hints.get('use_max_value_urn'), b'100') + found = True + assert found + + def test_environments_with_same_resource_hints_are_reused(self): + class HintX(ResourceHint): + urn = 'X_urn' + + class HintY(ResourceHint): + urn = 'Y_urn' + + class HintIsOdd(ResourceHint): + urn = 'IsOdd_urn' + + ResourceHint.register_resource_hint('X', HintX) + ResourceHint.register_resource_hint('Y', HintY) + ResourceHint.register_resource_hint('IsOdd', HintIsOdd) + + p = beam.Pipeline() + num_iter = 4 + for i in range(num_iter): + _ = ( + p + | f'NoHintCreate_{i}' >> beam.Create([1, 2]) + | f'NoHint_{i}' >> beam.Map(lambda x: x + 1)) + _ = ( + p + | f'XCreate_{i}' >> beam.Create([1, 2]) + | + f'HintX_{i}' >> beam.Map(lambda x: x + 1).with_resource_hints(X='X')) + _ = ( + p + | f'XYCreate_{i}' >> beam.Create([1, 2]) + | f'HintXY_{i}' >> beam.Map(lambda x: x + 1).with_resource_hints( + X='X', Y='Y')) + _ = ( + p + | f'IsOddCreate_{i}' >> beam.Create([1, 2]) + | f'IsOdd_{i}' >> + beam.Map(lambda x: x + 1).with_resource_hints(IsOdd=str(i % 2 != 0))) + + proto = Pipeline.to_runner_api(p) + count_x = count_xy = count_is_odd = count_no_hints = 0 + env_ids = set() + for _, t in proto.components.transforms.items(): + env = proto.components.environments[t.environment_id] + if t.unique_name.startswith('HintX_'): + count_x += 1 + env_ids.add(t.environment_id) + self.assertEqual(env.resource_hints, {'X_urn': b'X'}) + + if t.unique_name.startswith('HintXY_'): + count_xy += 1 + env_ids.add(t.environment_id) + self.assertEqual(env.resource_hints, {'X_urn': b'X', 'Y_urn': b'Y'}) + + if t.unique_name.startswith('NoHint_'): + count_no_hints += 1 + env_ids.add(t.environment_id) + self.assertEqual(env.resource_hints, {}) + + if t.unique_name.startswith('IsOdd_'): + count_is_odd += 1 + env_ids.add(t.environment_id) + self.assertTrue( + env.resource_hints == {'IsOdd_urn': b'True'} or + env.resource_hints == {'IsOdd_urn': b'False'}) + assert count_x == count_is_odd == count_xy == count_no_hints == num_iter + assert num_iter > 1 + + self.assertEqual(len(env_ids), 5) + + def test_multiple_application_of_the_same_transform_set_different_hints(self): + class FooHint(ResourceHint): + urn = 'foo_urn' + + class UseMaxValueHint(ResourceHint): + urn = 'use_max_value_urn' + + @classmethod + def get_merged_value( + cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes + return ResourceHint._use_max(outer_value, inner_value) + + ResourceHint.register_resource_hint('foo_hint', FooHint) + ResourceHint.register_resource_hint('use_max_value_hint', UseMaxValueHint) + + @beam.ptransform_fn + def SubTransform(pcoll): + return pcoll | beam.Map(lambda x: x + 1) + + @beam.ptransform_fn + def CompositeTransform(pcoll): + sub = SubTransform() + return ( + pcoll + | 'first' >> sub.with_resource_hints(foo_hint='first_application') + | 'second' >> sub.with_resource_hints(foo_hint='second_application')) + + p = beam.Pipeline() + _ = (p | beam.Create([1, 2]) | CompositeTransform()) + proto = Pipeline.to_runner_api(p) + count = 0 + for t in proto.components.transforms.values(): + if "CompositeTransform/first/Map" in t.unique_name: + environment = proto.components.environments.get(t.environment_id) + self.assertEqual( + b'first_application', environment.resource_hints.get('foo_urn')) + count += 1 + if "CompositeTransform/second/Map" in t.unique_name: + environment = proto.components.environments.get(t.environment_id) + self.assertEqual( + b'second_application', environment.resource_hints.get('foo_urn')) + count += 1 + assert count == 2 + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index f999c43073f7..18cc249e4b2d 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -29,6 +29,7 @@ from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardProtocols from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardPTransforms from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardRequirements +from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardResourceHints from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardSideInputTypes from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfo from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfoSpecs @@ -52,6 +53,7 @@ environments = StandardEnvironments.Environments artifact_types = StandardArtifacts.Types artifact_roles = StandardArtifacts.Roles +resource_hints = StandardResourceHints.Enum global_windows = GlobalWindowsPayload.Enum.PROPERTIES fixed_windows = FixedWindowsPayload.Enum.PROPERTIES diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index aae5d5b1a27a..b700976dcf87 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -37,6 +37,7 @@ from typing import TYPE_CHECKING from typing import List from urllib.parse import quote +from urllib.parse import quote_from_bytes from urllib.parse import unquote_to_bytes from future.utils import iteritems @@ -307,7 +308,7 @@ def visit_transform(self, transform_node): is_bounded=side_input.pvalue.is_bounded) parent = transform_node.parent or pipeline._root_transform() map_to_void_key = beam.pipeline.AppliedPTransform( - pipeline, + parent, beam.Map(lambda x: (b'', x)), transform_node.full_label + '/MapToVoidKey%s' % ix, (side_input.pvalue, )) @@ -456,7 +457,8 @@ def run_pipeline(self, pipeline, options): self._default_environment = ( environments.DockerEnvironment.from_container_image( apiclient.get_container_image_from_options(options), - artifacts=environments.python_sdk_dependencies(options))) + artifacts=environments.python_sdk_dependencies(options), + resource_hints=environments.resource_hints_from_options(options))) # This has to be performed before pipeline proto is constructed to make sure # that the changes are reflected in the portable job submission path. @@ -707,6 +709,14 @@ def _add_step(self, step_kind, step_label, transform_node, side_tags=()): for item in DisplayData.create_from(transform_node.transform).items ]) + if transform_node.resource_hints: + step.add_property( + PropertyNames.RESOURCE_HINTS, + { + hint: quote_from_bytes(value) + for (hint, value) in transform_node.resource_hints.items() + }) + return step def _add_singleton_step( diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 5fdb7667edf7..6695b42e1e63 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -28,6 +28,8 @@ from datetime import datetime import mock +from parameterized import param +from parameterized import parameterized import apache_beam as beam import apache_beam.transforms as ptransform @@ -863,6 +865,34 @@ def test_pack_combiners_enabled_by_experiment(self): self._test_pack_combiners( PipelineOptions(self.default_properties), expect_packed=True) + @parameterized.expand([ + param(memory_hint='min_ram'), + param(memory_hint='minRam'), + ]) + def test_resource_hints_translation(self, memory_hint): + runner = DataflowRunner() + self.default_properties.append('--resource_hint=accelerator=some_gpu') + self.default_properties.append(f'--resource_hint={memory_hint}=20GB') + with beam.Pipeline(runner=runner, + options=PipelineOptions(self.default_properties)) as p: + # pylint: disable=expression-not-assigned + ( + p + | beam.Create([1]) + | 'MapWithHints' >> beam.Map(lambda x: x + 1).with_resource_hints( + min_ram='10GB', + accelerator='type:nvidia-tesla-k80;count:1;install-nvidia-drivers' + )) + + step = self._find_step(runner.job, 'MapWithHints') + self.assertEqual( + step['properties']['resource_hints'], + { + 'beam:resources:min_ram_bytes:v1': '20000000000', + 'beam:resources:accelerator:v1': \ + 'type%3Anvidia-tesla-k80%3Bcount%3A1%3Binstall-nvidia-drivers' + }) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 242033c5c687..4742aec32497 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -258,27 +258,15 @@ def test_dataflow_container_image_override(self): pipeline = Pipeline(options=pipeline_options) pipeline | Create([1, 2, 3]) | ParDo(DoFn()) # pylint:disable=expression-not-assigned - proto_pipeline, _ = pipeline.to_runner_api(return_context=True) - - dummy_env = beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=( - beam_runner_api_pb2.DockerPayload( - container_image='apache/beam_dummy_name:dummy_tag') - ).SerializeToString()) - proto_pipeline.components.environments['dummy_env_id'].CopyFrom(dummy_env) - - dummy_transform = beam_runner_api_pb2.PTransform( - environment_id='dummy_env_id') - proto_pipeline.components.transforms['dummy_transform_id'].CopyFrom( - dummy_transform) + dummy_env = DockerEnvironment( + container_image='apache/beam_dummy_name:dummy_tag') + proto_pipeline, _ = pipeline.to_runner_api( + return_context=True, default_environment=dummy_env) # Accessing non-public method for testing. apiclient.DataflowApplicationClient._apply_sdk_environment_overrides( proto_pipeline, dict(), pipeline_options) - self.assertIsNotNone(2, len(proto_pipeline.components.environments)) - from apache_beam.utils import proto_utils found_override = False for env in proto_pipeline.components.environments.values(): @@ -301,20 +289,10 @@ def test_non_apache_container_not_overridden(self): pipeline = Pipeline(options=pipeline_options) pipeline | Create([1, 2, 3]) | ParDo(DoFn()) # pylint:disable=expression-not-assigned - proto_pipeline, _ = pipeline.to_runner_api(return_context=True) - - dummy_env = beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=( - beam_runner_api_pb2.DockerPayload( - container_image='other_org/dummy_name:dummy_tag') - ).SerializeToString()) - proto_pipeline.components.environments['dummy_env_id'].CopyFrom(dummy_env) - - dummy_transform = beam_runner_api_pb2.PTransform( - environment_id='dummy_env_id') - proto_pipeline.components.transforms['dummy_transform_id'].CopyFrom( - dummy_transform) + dummy_env = DockerEnvironment( + container_image='other_org/dummy_name:dummy_tag') + proto_pipeline, _ = pipeline.to_runner_api( + return_context=True, default_environment=dummy_env) # Accessing non-public method for testing. apiclient.DataflowApplicationClient._apply_sdk_environment_overrides( @@ -347,18 +325,10 @@ def test_pipeline_sdk_not_overridden(self): proto_pipeline, _ = pipeline.to_runner_api(return_context=True) - dummy_env = beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=( - beam_runner_api_pb2.DockerPayload( - container_image='dummy_prefix/dummy_name:dummy_tag') - ).SerializeToString()) - proto_pipeline.components.environments['dummy_env_id'].CopyFrom(dummy_env) - - dummy_transform = beam_runner_api_pb2.PTransform( - environment_id='dummy_env_id') - proto_pipeline.components.transforms['dummy_transform_id'].CopyFrom( - dummy_transform) + dummy_env = DockerEnvironment( + container_image='dummy_prefix/dummy_name:dummy_tag') + proto_pipeline, _ = pipeline.to_runner_api( + return_context=True, default_environment=dummy_env) # Accessing non-public method for testing. apiclient.DataflowApplicationClient._apply_sdk_environment_overrides( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index cd93185f3118..ea0bc475b104 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -108,6 +108,7 @@ class PropertyNames(object): PUBSUB_SUBSCRIPTION = 'pubsub_subscription' PUBSUB_TIMESTAMP_ATTRIBUTE = 'pubsub_timestamp_label' PUBSUB_TOPIC = 'pubsub_topic' + RESOURCE_HINTS = 'resource_hints' RESTRICTION_ENCODING = 'restriction_encoding' SERIALIZED_FN = 'serialized_fn' SHARD_NAME_TEMPLATE = 'shard_template' diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index 09950f5fbe00..be62545d2c24 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -49,12 +49,14 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.transforms import core from apache_beam.transforms import environments +from apache_beam.transforms.resources import merge_resource_hints from apache_beam.typehints import native_type_compatibility if TYPE_CHECKING: from google.protobuf import message # pylint: disable=ungrouped-imports from apache_beam.coders.coder_impl import IterableStateReader from apache_beam.coders.coder_impl import IterableStateWriter + from apache_beam.transforms import ptransform PortableObjectT = TypeVar('PortableObjectT', bound='PortableObject') @@ -118,13 +120,23 @@ def get_by_id(self, id): def get_by_proto(self, maybe_new_proto, label=None, deduplicate=False): # type: (message.Message, Optional[str], bool) -> str + # TODO: this method may not be safe for arbitrary protos due to + # xlang concerns, hence limiting usage to the only current use-case it has. + # See: https://github.com/apache/beam/pull/14390#discussion_r616062377 + assert isinstance(maybe_new_proto, beam_runner_api_pb2.Environment) + obj = self._obj_type.from_runner_api( + maybe_new_proto, self._pipeline_context) + if deduplicate: + if obj in self._obj_to_id: + return self._obj_to_id[obj] + for id, proto in self._id_to_proto.items(): if proto == maybe_new_proto: return id return self.put_proto( self._pipeline_context.component_id_map.get_or_assign( - label, obj_type=self._obj_type), + obj=obj, obj_type=self._obj_type, label=label), maybe_new_proto) def get_id_to_proto_map(self): @@ -184,6 +196,7 @@ def __init__(self, self.component_id_map = component_id_map or ComponentIdMap(namespace) assert self.component_id_map.namespace == namespace + # TODO(BEAM-12084) Initialize component_id_map with objects from proto. self.transforms = _PipelineContextMap( self, pipeline.AppliedPTransform, @@ -210,12 +223,12 @@ def __init__(self, namespace, proto.environments if proto is not None else None) - if default_environment: - self._default_environment_id = self.environments.get_id( - default_environment, - label='default_environment') # type: Optional[str] - else: - self._default_environment_id = None + if default_environment is None: + default_environment = environments.DefaultEnvironment() + + self._default_environment_id = self.environments.get_id( + default_environment, label='default_environment') # type: str + self.use_fake_coders = use_fake_coders self.iterable_state_read = iterable_state_read self.iterable_state_write = iterable_state_write @@ -274,5 +287,33 @@ def to_runner_api(self): return context_proto def default_environment_id(self): - # type: () -> Optional[str] + # type: () -> str return self._default_environment_id + + def get_environment_id_for_resource_hints( + self, hints): # type: (Dict[str, bytes]) -> str + """Returns an environment id that has necessary resource hints.""" + if not hints: + return self.default_environment_id() + + def get_or_create_environment_with_resource_hints( + template_env_id, + resource_hints, + ): # type: (str, Dict[str, bytes]) -> str + """Creates an environment that has necessary hints and returns its id.""" + template_env = self.environments.get_proto_from_id(template_env_id) + cloned_env = beam_runner_api_pb2.Environment() + cloned_env.CopyFrom(template_env) + cloned_env.resource_hints.clear() + cloned_env.resource_hints.update(resource_hints) + + return self.environments.get_by_proto( + cloned_env, label='environment_with_resource_hints', deduplicate=True) + + default_env_id = self.default_environment_id() + env_hints = self.environments.get_by_id(default_env_id).resource_hints() + hints = merge_resource_hints(outer_hints=env_hints, inner_hints=hints) + maybe_new_env_id = get_or_create_environment_with_resource_hints( + default_env_id, hints) + + return maybe_new_env_id diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py index 119c25695e01..5e9334b7a3ff 100644 --- a/sdks/python/apache_beam/runners/pipeline_context_test.py +++ b/sdks/python/apache_beam/runners/pipeline_context_test.py @@ -25,6 +25,7 @@ from apache_beam import coders from apache_beam.runners import pipeline_context +from apache_beam.transforms import environments class PipelineContextTest(unittest.TestCase): @@ -36,11 +37,32 @@ def test_deduplication(self): def test_deduplication_by_proto(self): context = pipeline_context.PipelineContext() - bytes_coder_proto = coders.BytesCoder().to_runner_api(None) - bytes_coder_ref = context.coders.get_by_proto(bytes_coder_proto) - bytes_coder_ref2 = context.coders.get_by_proto( - bytes_coder_proto, deduplicate=True) - self.assertEqual(bytes_coder_ref, bytes_coder_ref2) + env_proto = environments.SubprocessSDKEnvironment( + command_string="foo").to_runner_api(None) + env_ref_1 = context.environments.get_by_proto(env_proto) + env_ref_2 = context.environments.get_by_proto(env_proto, deduplicate=True) + self.assertEqual(env_ref_1, env_ref_2) + + def test_equal_environments_are_deduplicated_when_fetched_by_obj_or_proto( + self): + context = pipeline_context.PipelineContext() + + env = environments.SubprocessSDKEnvironment(command_string="foo") + env_proto = env.to_runner_api(None) + id_from_proto = context.environments.get_by_proto(env_proto) + id_from_obj = context.environments.get_id(env) + self.assertEqual(id_from_obj, id_from_proto) + self.assertEqual( + context.environments.get_by_id(id_from_obj).command_string, "foo") + + env = environments.SubprocessSDKEnvironment(command_string="bar") + env_proto = env.to_runner_api(None) + id_from_obj = context.environments.get_id(env) + id_from_proto = context.environments.get_by_proto( + env_proto, deduplicate=True) + self.assertEqual(id_from_obj, id_from_proto) + self.assertEqual( + context.environments.get_by_id(id_from_obj).command_string, "bar") def test_serialization(self): context = pipeline_context.PipelineContext() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 506ab8dc35de..564450bab023 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2727,7 +2727,7 @@ def from_runner_api(proto, context): accumulation_mode=proto.accumulation_mode, timestamp_combiner=proto.output_time, allowed_lateness=Duration(micros=proto.allowed_lateness * 1000), - environment_id=proto.environment_id) + environment_id=None) @typehints.with_input_types(T) diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index bde951c19cd2..6876d965080a 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -27,6 +27,7 @@ import logging import sys import tempfile +from types import MappingProxyType from typing import TYPE_CHECKING from typing import Any from typing import Callable @@ -53,14 +54,17 @@ from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.portability import stager from apache_beam.runners.portability.sdk_container_builder import SdkContainerImageBuilder +from apache_beam.transforms.resources import resource_hints_from_options from apache_beam.utils import proto_utils if TYPE_CHECKING: + from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import PortableOptions from apache_beam.runners.pipeline_context import PipelineContext __all__ = [ 'Environment', + 'DefaultEnvironment', 'DockerEnvironment', 'ProcessEnvironment', 'ExternalEnvironment', @@ -76,6 +80,7 @@ Optional[Any], Iterable[str], Iterable[beam_runner_api_pb2.ArtifactInformation], + Mapping[str, bytes], 'PipelineContext' ], Any] @@ -109,12 +114,29 @@ class Environment(object): _urn_to_env_cls = {} # type: Dict[str, type] def __init__(self, - capabilities, # type: Iterable[str] - artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] - ): + capabilities=(), # type: Iterable[str] + artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints=None, # type: Optional[Mapping[str, bytes]] + ): # type: (...) -> None self._capabilities = capabilities - self._artifacts = artifacts + self._artifacts = sorted(artifacts, key=lambda x: x.SerializeToString()) + # Hints on created environments should be immutable since pipeline context + # stores environments in hash maps and we use hints to compute the hash. + self._resource_hints = MappingProxyType( + dict(resource_hints) if resource_hints else {}) + + def __eq__(self, other): + return ( + self.__class__ == other.__class__ and + self._artifacts == other._artifacts + # Assuming that we don't have instances of the same Environment subclass + # with different set of capabilities. + and self._resource_hints == other._resource_hints) + + def __hash__(self): + # type: () -> int + return hash((self.__class__, frozenset(self._resource_hints.items()))) def artifacts(self): # type: () -> Iterable[beam_runner_api_pb2.ArtifactInformation] @@ -128,6 +150,10 @@ def capabilities(self): # type: () -> Iterable[str] return self._capabilities + def resource_hints(self): + # type: () -> Mapping[str, bytes] + return self._resource_hints + @classmethod @overload def register_urn( @@ -204,7 +230,8 @@ def to_runner_api(self, context): (isinstance(typed_param, bytes) or typed_param is None) else typed_param.encode('utf-8'), capabilities=self.capabilities(), - dependencies=self.artifacts()) + dependencies=self.artifacts(), + resource_hints=self.resource_hints()) @classmethod def from_runner_api(cls, @@ -220,6 +247,7 @@ def from_runner_api(cls, proto_utils.parse_Bytes(proto.payload, parameter_type), proto.capabilities, proto.dependencies, + proto.resource_hints, context) @classmethod @@ -234,6 +262,26 @@ def from_options(cls, options): raise NotImplementedError +@Environment.register_urn(common_urns.environments.DEFAULT.urn, None) +class DefaultEnvironment(Environment): + """Used as a stub when context is missing a default environment.""" + def to_runner_api_parameter(self, context): + return common_urns.environments.DEFAULT.urn, None + + @staticmethod + def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.DockerPayload + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): + # type: (...) -> DefaultEnvironment + return DefaultEnvironment( + capabilities=capabilities, + artifacts=artifacts, + resource_hints=resource_hints) + + @Environment.register_urn( common_urns.environments.DOCKER.urn, beam_runner_api_pb2.DockerPayload) class DockerEnvironment(Environment): @@ -242,8 +290,9 @@ def __init__( container_image=None, # type: Optional[str] capabilities=(), # type: Iterable[str] artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints=None, # type: Optional[Mapping[str, bytes]] ): - super(DockerEnvironment, self).__init__(capabilities, artifacts) + super().__init__(capabilities, artifacts, resource_hints) if container_image: logging.info( 'Using provided Python SDK container image: %s' % (container_image)) @@ -257,11 +306,11 @@ def __init__( (self.container_image)) def __eq__(self, other): - return self.__class__ == other.__class__ \ - and self.container_image == other.container_image + return ( + super().__eq__(other) and self.container_image == other.container_image) def __hash__(self): - return hash((self.__class__, self.container_image)) + return hash((super().__hash__(), self.container_image)) def __repr__(self): return 'DockerEnvironment(container_image=%s)' % self.container_image @@ -274,15 +323,17 @@ def to_runner_api_parameter(self, context): @staticmethod def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.DockerPayload - capabilities, # type: Iterable[str] - artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] - context # type: PipelineContext - ): + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): # type: (...) -> DockerEnvironment return DockerEnvironment( container_image=payload.container_image, capabilities=capabilities, - artifacts=artifacts) + artifacts=artifacts, + resource_hints=resource_hints) @classmethod def from_options(cls, options): @@ -292,19 +343,25 @@ def from_options(cls, options): options) return cls.from_container_image( container_image=prebuilt_container_image, - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options), + ) return cls.from_container_image( container_image=options.lookup_environment_option( 'docker_container_image') or options.environment_config, - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options), + ) @classmethod - def from_container_image(cls, container_image, artifacts=()): - # type: (str, Iterable[beam_runner_api_pb2.ArtifactInformation]) -> DockerEnvironment + def from_container_image( + cls, container_image, artifacts=(), resource_hints=None): + # type: (str, Iterable[beam_runner_api_pb2.ArtifactInformation], Optional[Mapping[str, bytes]]) -> DockerEnvironment return cls( container_image=container_image, capabilities=python_sdk_capabilities(), - artifacts=artifacts) + artifacts=artifacts, + resource_hints=resource_hints) @staticmethod def default_docker_image(): @@ -337,23 +394,25 @@ def __init__( env=None, # type: Optional[Mapping[str, str]] capabilities=(), # type: Iterable[str] artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints=None, # type: Optional[Mapping[str, bytes]] ): # type: (...) -> None - super(ProcessEnvironment, self).__init__(capabilities, artifacts) + super().__init__(capabilities, artifacts, resource_hints) self.command = command self.os = os self.arch = arch self.env = env or {} def __eq__(self, other): - return self.__class__ == other.__class__ \ - and self.command == other.command and self.os == other.os \ - and self.arch == other.arch and self.env == other.env + return ( + super().__eq__(other) and self.command == other.command and + self.os == other.os and self.arch == other.arch and + self.env == other.env) def __hash__(self): # type: () -> int return hash(( - self.__class__, + super().__hash__(), self.command, self.os, self.arch, @@ -378,10 +437,11 @@ def to_runner_api_parameter(self, context): @staticmethod def from_runner_api_parameter(payload, - capabilities, # type: Iterable[str] - artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] - context # type: PipelineContext - ): + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): # type: (...) -> ProcessEnvironment return ProcessEnvironment( command=payload.command, @@ -389,7 +449,9 @@ def from_runner_api_parameter(payload, arch=payload.arch, env=payload.env, capabilities=capabilities, - artifacts=artifacts) + artifacts=artifacts, + resource_hints=resource_hints, + ) @staticmethod def parse_environment_variables(variables): @@ -415,7 +477,9 @@ def from_options(cls, options): arch=config.get('arch', ''), env=config.get('env', ''), capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options), + ) env = cls.parse_environment_variables( options.lookup_environment_option('process_variables').split(',') if options.lookup_environment_option('process_variables') else []) @@ -423,7 +487,9 @@ def from_options(cls, options): options.lookup_environment_option('process_command'), env=env, capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options), + ) @Environment.register_urn( @@ -435,19 +501,21 @@ def __init__( params=None, # type: Optional[Mapping[str, str]] capabilities=(), # type: Iterable[str] artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints=None, # type: Optional[Mapping[str, bytes]] ): - super(ExternalEnvironment, self).__init__(capabilities, artifacts) + super().__init__(capabilities, artifacts, resource_hints) self.url = url self.params = params def __eq__(self, other): - return self.__class__ == other.__class__ and self.url == other.url \ - and self.params == other.params + return ( + super().__eq__(other) and self.url == other.url and + self.params == other.params) def __hash__(self): # type: () -> int return hash(( - self.__class__, + super().__hash__(), self.url, frozenset(self.params.items()) if self.params is not None else None)) @@ -465,16 +533,18 @@ def to_runner_api_parameter(self, context): @staticmethod def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.ExternalPayload - capabilities, # type: Iterable[str] - artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] - context # type: PipelineContext - ): + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): # type: (...) -> ExternalEnvironment return ExternalEnvironment( payload.endpoint.url, params=payload.params or None, capabilities=capabilities, - artifacts=artifacts) + artifacts=artifacts, + resource_hints=resource_hints) @classmethod def from_options(cls, options): @@ -496,40 +566,34 @@ def from_options(cls, options): url, params=params, capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options)) @Environment.register_urn(python_urns.EMBEDDED_PYTHON, None) class EmbeddedPythonEnvironment(Environment): - def __init__(self, capabilities=None, artifacts=()): - super(EmbeddedPythonEnvironment, self).__init__(capabilities, artifacts) - - def __eq__(self, other): - return self.__class__ == other.__class__ - - def __hash__(self): - # type: () -> int - return hash(self.__class__) - def to_runner_api_parameter(self, context): # type: (PipelineContext) -> Tuple[str, None] return python_urns.EMBEDDED_PYTHON, None @staticmethod def from_runner_api_parameter(unused_payload, # type: None - capabilities, # type: Iterable[str] - artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] - context # type: PipelineContext - ): + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): # type: (...) -> EmbeddedPythonEnvironment - return EmbeddedPythonEnvironment(capabilities, artifacts) + return EmbeddedPythonEnvironment(capabilities, artifacts, resource_hints) @classmethod def from_options(cls, options): # type: (PortableOptions) -> EmbeddedPythonEnvironment return cls( capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options), + ) @Environment.register_urn(python_urns.EMBEDDED_PYTHON_GRPC, bytes) @@ -539,20 +603,25 @@ def __init__( state_cache_size=None, data_buffer_time_limit_ms=None, capabilities=(), - artifacts=()): - super(EmbeddedPythonGrpcEnvironment, self).__init__(capabilities, artifacts) + artifacts=(), + resource_hints=None, + ): + super().__init__(capabilities, artifacts, resource_hints) self.state_cache_size = state_cache_size self.data_buffer_time_limit_ms = data_buffer_time_limit_ms def __eq__(self, other): - return self.__class__ == other.__class__ \ - and self.state_cache_size == other.state_cache_size \ - and self.data_buffer_time_limit_ms == other.data_buffer_time_limit_ms + return ( + super().__eq__(other) and + self.state_cache_size == other.state_cache_size and + self.data_buffer_time_limit_ms == other.data_buffer_time_limit_ms) def __hash__(self): # type: () -> int - return hash( - (self.__class__, self.state_cache_size, self.data_buffer_time_limit_ms)) + return hash(( + super().__hash__(), + self.state_cache_size, + self.data_buffer_time_limit_ms)) def __repr__(self): # type: () -> str @@ -576,10 +645,11 @@ def to_runner_api_parameter(self, context): @staticmethod def from_runner_api_parameter(payload, # type: bytes - capabilities, # type: Iterable[str] - artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] - context # type: PipelineContext - ): + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): # type: (...) -> EmbeddedPythonGrpcEnvironment if payload: config = EmbeddedPythonGrpcEnvironment.parse_config( @@ -588,7 +658,8 @@ def from_runner_api_parameter(payload, # type: bytes state_cache_size=config.get('state_cache_size'), data_buffer_time_limit_ms=config.get('data_buffer_time_limit_ms'), capabilities=capabilities, - artifacts=artifacts) + artifacts=artifacts, + resource_hints=resource_hints) else: return EmbeddedPythonGrpcEnvironment() @@ -604,7 +675,8 @@ def from_options(cls, options): else: return cls( capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options)) @staticmethod def parse_config(s): @@ -629,17 +701,18 @@ def __init__( command_string, # type: str capabilities=(), # type: Iterable[str] artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints=None, # type: Optional[Mapping[str, bytes]] ): - super(SubprocessSDKEnvironment, self).__init__(capabilities, artifacts) + super().__init__(capabilities, artifacts, resource_hints) self.command_string = command_string def __eq__(self, other): - return self.__class__ == other.__class__ \ - and self.command_string == other.command_string + return ( + super().__eq__(other) and self.command_string == other.command_string) def __hash__(self): # type: () -> int - return hash((self.__class__, self.command_string)) + return hash((super().__hash__(), self.command_string)) def __repr__(self): # type: () -> str @@ -651,13 +724,14 @@ def to_runner_api_parameter(self, context): @staticmethod def from_runner_api_parameter(payload, # type: bytes - capabilities, # type: Iterable[str] - artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] - context # type: PipelineContext - ): + capabilities, # type: Iterable[str] + artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation] + resource_hints, # type: Mapping[str, bytes] + context # type: PipelineContext + ): # type: (...) -> SubprocessSDKEnvironment return SubprocessSDKEnvironment( - payload.decode('utf-8'), capabilities, artifacts) + payload.decode('utf-8'), capabilities, artifacts, resource_hints) @classmethod def from_options(cls, options): @@ -665,7 +739,8 @@ def from_options(cls, options): return cls( options.environment_config, capabilities=python_sdk_capabilities(), - artifacts=python_sdk_dependencies(options)) + artifacts=python_sdk_dependencies(options), + resource_hints=resource_hints_from_options(options)) class PyPIArtifactRegistry(object): diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index a1a1346b2389..020478d81103 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -119,6 +119,17 @@ def test_process_variables_missing_rvalue(self): ]) ProcessEnvironment.from_options(options) + def test_environments_with_same_hints_are_equal(self): + options = PortableOptions([ + '--environment_type=PROCESS', + '--environment_option=process_command=foo', + '--sdk_location=container', + '--resource_hint=accelerator=gpu', + ]) + environment1 = ProcessEnvironment.from_options(options) + environment2 = ProcessEnvironment.from_options(options) + self.assertEqual(environment1, environment2) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 699e3211b837..e823a1ce26d3 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -71,6 +71,7 @@ class and wrapper class that allows lambda functions to be used as from apache_beam.internal import util from apache_beam.portability import python_urns from apache_beam.pvalue import DoOutputsTuple +from apache_beam.transforms import resources from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX @@ -422,6 +423,35 @@ def with_output_types(self, type_hint): validate_composite_type_param(type_hint, 'Type hints for a PTransform') return super(PTransform, self).with_output_types(type_hint) + def with_resource_hints(self, **kwargs): # type: (...) -> PTransform + """Adds resource hints to the :class:`PTransform`. + + Resource hints allow users to express constraints on the environment where + the transform should be executed. Interpretation of the resource hints is + defined by Beam Runners. Runners may ignore the unsupported hints. + + Args: + **kwargs: key-value pairs describing hints and their values. + + Raises: + ValueError: if provided hints are unknown to the SDK. See + :mod:~apache_beam.transforms.resources` for a list of known hints. + + Returns: + PTransform: A reference to the instance of this particular + :class:`PTransform` object. + """ + self.get_resource_hints().update(resources.parse_resource_hints(kwargs)) + return self + + def get_resource_hints(self): + # type: () -> Dict[str, bytes] + if '_resource_hints' not in self.__dict__: + # PTransform subclasses don't always call super(), so prefer lazy + # initialization. By default, transforms don't have any resource hints. + self._resource_hints = {} # type: Dict[str, bytes] + return self._resource_hints + def type_check_inputs(self, pvalueish): self.type_check_inputs_or_outputs(pvalueish, 'input') diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 0bd10611721f..e540197bbb11 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -51,6 +51,7 @@ from apache_beam.metrics import Metrics from apache_beam.metrics.metric import MetricsFilter from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.portability import common_urns from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -895,6 +896,17 @@ def expand(self, pcoll): self.assertEqual(sorted(res1), [1, 2, 4, 8]) self.assertEqual(sorted(res2), [1, 2, 4, 8]) + def test_resource_hint_application_is_additive(self): + t = beam.Map(lambda x: x + 1).with_resource_hints( + accelerator='gpu').with_resource_hints(min_ram=1).with_resource_hints( + accelerator='tpu') + self.assertEqual( + t.get_resource_hints(), + { + common_urns.resource_hints.ACCELERATOR.urn: b'tpu', + common_urns.resource_hints.MIN_RAM_BYTES.urn: b'1' + }) + class TestGroupBy(unittest.TestCase): def test_lambdas(self): diff --git a/sdks/python/apache_beam/transforms/resources.py b/sdks/python/apache_beam/transforms/resources.py new file mode 100644 index 000000000000..10a32855e80e --- /dev/null +++ b/sdks/python/apache_beam/transforms/resources.py @@ -0,0 +1,216 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for defining resource requirements for execution of transforms. + +Pipeline authors can use resource hints to provide additional information to +runners about the desired aspects of the execution environment. + +Resource hints can be specified on a transform level for parts of the pipeline, +or globally via --resource_hint pipeline option. + +See also: PTransforms.with_resource_hints(). +""" + +import re +from typing import TYPE_CHECKING +from typing import Any +from typing import Dict +from typing import Optional + +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.portability.common_urns import resource_hints + +if TYPE_CHECKING: + from typing import Mapping + from apache_beam.options.pipeline_options import PipelineOptions + +__all__ = [ + 'ResourceHint', + 'AcceleratorHint', + 'MinRamHint', + 'merge_resource_hints', + 'parse_resource_hints', + 'resource_hints_from_options', +] + + +class ResourceHint: + """A superclass to define resource hints.""" + # A unique URN, one per Resource Hint class. + urn = None # type: Optional[str] + + _urn_to_known_hints = {} # type: Dict[str, type] + _name_to_known_hints = {} # type: Dict[str, type] + + @classmethod + def parse(cls, value): # type: (str) -> Dict[str, bytes] + """Describes how to parse the hint. + Override to specify a custom parsing logic.""" + assert cls.urn is not None + # Override this method to have a custom parsing logic. + return {cls.urn: ResourceHint._parse_str(value)} + + @classmethod + def get_merged_value( + cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes + """Reconciles values of a hint when the hint specified on a transform is + also defined in an outer context, for example on a composite transform, or + specified in the transform's execution environment. + Override to specify a custom merging logic. + """ + # Defaults to the inner value as it is the most specific one. + return inner_value + + @staticmethod + def get_by_urn(urn): + return ResourceHint._urn_to_known_hints[urn] + + @staticmethod + def get_by_name(name): + return ResourceHint._name_to_known_hints[name] + + @staticmethod + def register_resource_hint( + hint_name, hint_class): # type: (str, type) -> None + assert issubclass(hint_class, ResourceHint) + assert hint_class.urn is not None + ResourceHint._name_to_known_hints[hint_name] = hint_class + ResourceHint._urn_to_known_hints[hint_class.urn] = hint_class + + @staticmethod + def _parse_str(value): + if not isinstance(value, str): + raise ValueError() + return value.encode('ascii') + + @staticmethod + def _parse_int(value): + if isinstance(value, str): + value = int(value) + if not isinstance(value, int): + raise ValueError() + return str(value).encode('ascii') + + @staticmethod + def _parse_storage_size_str(value): + """Parses a human-friendly storage size string into a number of bytes. + """ + if isinstance(value, int): + return ResourceHint._parse_int(value) + + if not isinstance(value, str): + raise ValueError() + + value = value.strip().replace(" ", "") + units = { + 'PiB': 2**50, + 'TiB': 2**40, + 'GiB': 2**30, + 'MiB': 2**20, + 'KiB': 2**10, + 'PB': 10**15, + 'TB': 10**12, + 'GB': 10**9, + 'MB': 10**6, + 'KB': 10**3, + 'B': 1, + } + match = re.match(r'.*?(\D+)$', value) + if not match: + raise ValueError() + + suffix = match.group(1) + multiplier = units[suffix] + value = value[:-len(suffix)] + + return str(round(float(value) * multiplier)).encode('ascii') + + @staticmethod + def _use_max(v1, v2): + return str(max(int(v1), int(v2))).encode('ascii') + + +class AcceleratorHint(ResourceHint): + """Describes desired hardware accelerators in execution environment.""" + urn = resource_hints.ACCELERATOR.urn + + +ResourceHint.register_resource_hint('accelerator', AcceleratorHint) + + +class MinRamHint(ResourceHint): + """Describes min RAM requirements for transform's execution environment.""" + urn = resource_hints.MIN_RAM_BYTES.urn + + @classmethod + def parse(cls, value): # type: (str) -> Dict[str, bytes] + return {cls.urn: ResourceHint._parse_storage_size_str(value)} + + @classmethod + def get_merged_value( + cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes + return ResourceHint._use_max(outer_value, inner_value) + + +ResourceHint.register_resource_hint('min_ram', MinRamHint) +# Alias for interoperability with SDKs preferring camelCase. +ResourceHint.register_resource_hint('minRam', MinRamHint) + + +def parse_resource_hints(hints): # type: (Dict[Any, Any]) -> Dict[str, bytes] + parsed_hints = {} + for hint, value in hints.items(): + try: + hint_cls = ResourceHint.get_by_name(hint) + try: + parsed_hints.update(hint_cls.parse(value)) + except ValueError: + raise ValueError(f"Resource hint {hint} has invalid value {value}.") + except KeyError: + raise ValueError(f"Unknown resource hint: {hint}.") + + return parsed_hints + + +def resource_hints_from_options(options): + # type: (Optional[PipelineOptions]) -> Dict[str, bytes] + if options is None: + return {} + hints = {} + option_specified_hints = options.view_as(StandardOptions).resource_hints + for hint in option_specified_hints: + if '=' in hint: + k, v = hint.split('=', maxsplit=1) + hints[k] = v + else: + hints[hint] = None + + return parse_resource_hints(hints) + + +def merge_resource_hints( + outer_hints, inner_hints +): # type: (Mapping[str, bytes], Mapping[str, bytes]) -> Dict[str, bytes] + merged_hints = dict(inner_hints) + for urn, outer_value in outer_hints.items(): + if urn in inner_hints: + merged_value = ResourceHint.get_by_urn(urn).get_merged_value( + outer_value=outer_value, inner_value=inner_hints[urn]) + else: + merged_value = outer_value + merged_hints[urn] = merged_value + return merged_hints diff --git a/sdks/python/apache_beam/transforms/resources_test.py b/sdks/python/apache_beam/transforms/resources_test.py new file mode 100644 index 000000000000..19e1307d2bd7 --- /dev/null +++ b/sdks/python/apache_beam/transforms/resources_test.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from parameterized import param +from parameterized import parameterized + +from apache_beam import PTransform + + +class ResourcesTest(unittest.TestCase): + @parameterized.expand([ + param( + name='min_ram', + val='100 MiB', + urn='beam:resources:min_ram_bytes:v1', + bytestr=b'104857600'), + param( + name='minRam', + val='100MB', + urn='beam:resources:min_ram_bytes:v1', + bytestr=b'100000000'), + param( + name='min_ram', + val='6.5 GiB', + urn='beam:resources:min_ram_bytes:v1', + bytestr=b'6979321856'), + param( + name='accelerator', + val='gpu', + urn='beam:resources:accelerator:v1', + bytestr=b'gpu'), + ]) + def test_known_resource_hints(self, name, val, urn, bytestr): + t = PTransform() + t = t.with_resource_hints(**{name: val}) + self.assertEqual(t.get_resource_hints(), {urn: bytestr}) + + @parameterized.expand([ + param(name='min_ram', val='3,500G'), + param(name='accelerator', val=1), + param(name='unknown_hint', val=1) + ]) + def test_resource_hint_parsing_fails_early(self, name, val): + t = PTransform() + with self.assertRaises(ValueError): + _ = t.with_resource_hints(**{name: val}) + + +if __name__ == '__main__': + unittest.main()