Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
3d4f386
Add resource hints property to Beam Environments.
tvalentyn Mar 31, 2021
837feb8
Add a mechanism to parse resource hints from command-line options. Se…
tvalentyn Mar 31, 2021
d82c5f5
Add a default environment and its urn.
tvalentyn Mar 31, 2021
4333e79
fixup: Explicitly pass desired environment in unit tests to avoid pic…
tvalentyn Apr 1, 2021
6bc3324
Fixup: mypy
tvalentyn Mar 31, 2021
036fe6b
Define resource hints on Beam transforms.
tvalentyn Mar 31, 2021
0b42ca9
Fix a bug where we pass a label instead of object when generating com…
tvalentyn Mar 30, 2021
be9f688
to_runner_api translation attaches transforms to environments with re…
tvalentyn Apr 1, 2021
ce1d76b
Don't store references to environments in from_runner_api-created obj…
tvalentyn Apr 1, 2021
80b8e53
Pass resource hints through v1beta3 API.
tvalentyn Apr 1, 2021
f787688
fixup: mypy
tvalentyn Apr 2, 2021
31291d0
fixup: lint
tvalentyn Apr 2, 2021
4371bac
fixup: add hint parser for 'any' hint, remove leftover code.
tvalentyn Apr 2, 2021
52b7ae0
Propagate hints defined on compostite transforms to subtransforms.
tvalentyn Apr 9, 2021
8d32b96
fixup: lint
tvalentyn Apr 9, 2021
ad33440
fixup: minor cleanup.
tvalentyn Apr 9, 2021
6b592c2
Adds tentative resource hints to be recognized by Dataflow Runner.
tvalentyn Apr 10, 2021
1a99ceb
Add Dataflow resource hints translation test.
tvalentyn Apr 11, 2021
bb4adbf
Test that environments with same resource hints are reused.
tvalentyn Apr 11, 2021
0f98fc3
Remove test scenarios covered in resourses_test.py.
tvalentyn Apr 11, 2021
0eee01e
Fixup : typo, also use a different hint name to avoid different hint …
tvalentyn Apr 12, 2021
3bd2934
Prefer lazy initialization of (empty) resource hints.
tvalentyn Apr 12, 2021
59fe217
Fixup: mypy
tvalentyn Apr 12, 2021
54e9243
Fixup: style.
tvalentyn Apr 13, 2021
5914c90
Fixup: Use [] as default value for command-line flag.
tvalentyn Apr 13, 2021
9a67adb
Use ResourceHint subclasses to define hint behavior.
tvalentyn Apr 14, 2021
cd9da90
fixup: Fix test.
tvalentyn Apr 14, 2021
46e128b
Copy hints onto AppliedPTransform and use them as SOT during translat…
tvalentyn Apr 14, 2021
68b5082
Attach option-specified hints to the root transform.
tvalentyn Apr 14, 2021
5d58030
Propagate hints when AppliedPTransforms are attached to pipeline grap…
tvalentyn Apr 14, 2021
0477bc5
Fixup: isort
tvalentyn Apr 14, 2021
cc0e3ef
fix bug/typo in Dataflow SideInputVisitor
tvalentyn Apr 14, 2021
0d6e62e
fixup: use Pipeline instead of TestPipeline
tvalentyn Apr 14, 2021
f2268f0
Fixup: Removes super() delegations that can be omitted.
tvalentyn Apr 14, 2021
589e263
Fixup: documentation on proto.
tvalentyn Apr 14, 2021
3d0f669
Prevent hash collisions on environments with same hints. Also compare…
tvalentyn Apr 15, 2021
63d8594
fixup: nits.
tvalentyn Apr 15, 2021
1fdadb8
fixup: nits.
tvalentyn Apr 15, 2021
97326d7
fixup f787: Instantiate Environment directly.
tvalentyn Apr 20, 2021
2176c43
fixup: Don't use fake coders.
tvalentyn Apr 20, 2021
95fd0c2
Return a fresh copy of hints dictionary when merging hints.
tvalentyn Apr 20, 2021
c6d59df
Make resource hint application additive.
tvalentyn Apr 20, 2021
fe56eca
fixup (0b42): Restrict get_by_proto to Environment protos only.
tvalentyn Apr 20, 2021
d7b2d83
fixup (c6d5): isort
tvalentyn Apr 20, 2021
bf81698
Allow alternative spelling minRam for memory hint.
tvalentyn Apr 20, 2021
c14a06a
Prevent users from accidentally supplying a min_ram hint on the comma…
tvalentyn Apr 20, 2021
a245612
fixup (bb4ad): Add an extra assertion to ensure scenario correctness.
tvalentyn Apr 20, 2021
571be41
Merge master into resource hints feature branch.
tvalentyn Apr 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,8 @@ message StandardEnvironments {
PROCESS = 1 [(beam_urn) = "beam:env:process:v1"]; // A managed native process to run user code.

EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"]; // An external non managed process to run user code.

DEFAULT = 3 [(beam_urn) = "beam:env:default:v1"]; // Used as a stub when context is missing a runner-provided default environment.
}
}

Expand Down Expand Up @@ -1841,3 +1843,13 @@ message ExecutableStagePayload {
}
}
}

message StandardResourceHints {
enum Enum {
// Describes hardware accelerators that are desired to have in the execution environment.
ACCELERATOR = 0 [(beam_urn) = "beam:resources:accelerator:v1"];
// Describes desired minimal available RAM size in transform's execution environment.
// SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB).
MIN_RAM_BYTES = 1 [(beam_urn) = "beam:resources:min_ram_bytes:v1"];
}
}
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,17 @@ def _add_argparse_args(cls, parser):
action='store_true',
help='Whether to enable streaming mode.')

parser.add_argument(
'--resource_hint',
dest='resource_hints',
action='append',
default=[],
help=(
'Resource hint to set in the pipeline execution environment.'
'Hints specified via this option override hints specified '
'at transform level. Interpretation of hints is defined by '
'Beam runners.'))


class CrossLanguageOptions(PipelineOptions):
@classmethod
Expand Down
38 changes: 33 additions & 5 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
from apache_beam.transforms import ParDo
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.resources import merge_resource_hints
from apache_beam.transforms.resources import resource_hints_from_options
from apache_beam.transforms.sideinputs import get_sideinput_index
from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import typehints
Expand Down Expand Up @@ -220,7 +222,8 @@ def __init__(self, runner=None, options=None, argv=None):
# If a transform is applied and the full label is already in the set
# then the transform will have to be cloned with a new label.
self.applied_labels = set() # type: Set[str]

# Hints supplied via pipeline options are considered the outermost hints.
self._root_transform().resource_hints = resource_hints_from_options(options)
# Create a ComponentIdMap for assigning IDs to components. Ensures that any
# components that receive an ID during pipeline construction (for example in
# ExternalTransform), will receive the same component ID when generating the
Expand Down Expand Up @@ -1043,6 +1046,12 @@ def __init__(
self.outputs = {} # type: Dict[Union[str, int, None], pvalue.PValue]
self.parts = [] # type: List[AppliedPTransform]
self.environment_id = environment_id if environment_id else None # type: Optional[str]
# We may need to merge the hints with environment-provided hints here
# once environment is a first-class citizen in Beam graph and we have
# access to actual environment, not just an id.
self.resource_hints = dict(
transform.get_resource_hints()) if transform else {
} # type: Dict[str, bytes]

if annotations is None and transform:

Expand Down Expand Up @@ -1109,6 +1118,7 @@ def add_output(
def add_part(self, part):
# type: (AppliedPTransform) -> None
assert isinstance(part, AppliedPTransform)
part._merge_outer_resource_hints()
self.parts.append(part)

def is_composite(self):
Expand Down Expand Up @@ -1198,9 +1208,11 @@ def named_outputs(self):

def to_runner_api(self, context):
# type: (PipelineContext) -> beam_runner_api_pb2.PTransform
# External tranforms require more splicing than just setting the spec.
# External transforms require more splicing than just setting the spec.
from apache_beam.transforms import external
if isinstance(self.transform, external.ExternalTransform):
# TODO(BEAM-12082): Support resource hints in XLang transforms.
# In particular, make sure hints on composites are properly propagated.
return self.transform.to_runner_api_transform(context, self.full_label)

from apache_beam.portability.api import beam_runner_api_pb2
Expand Down Expand Up @@ -1229,7 +1241,8 @@ def transform_to_runner_api(
transform_urn = transform_spec.urn if transform_spec else None
if (not environment_id and
(transform_urn not in Pipeline.runner_implemented_transforms())):
environment_id = context.default_environment_id()
environment_id = context.get_environment_id_for_resource_hints(
self.resource_hints)

return beam_runner_api_pb2.PTransform(
unique_name=self.full_label,
Expand Down Expand Up @@ -1278,6 +1291,12 @@ def from_runner_api(
]

transform = ptransform.PTransform.from_runner_api(proto, context)
if transform and proto.environment_id:
resource_hints = context.environments.get_by_id(
proto.environment_id).resource_hints()
if resource_hints:
transform._resource_hints = dict(resource_hints)

# Ordering is important here.
# TODO(BEAM-9635): use key, value pairs instead of depending on tags with
# index as a suffix.
Expand All @@ -1292,7 +1311,7 @@ def from_runner_api(
transform=transform,
full_label=proto.unique_name,
inputs=main_inputs,
environment_id=proto.environment_id,
environment_id=None,
annotations=proto.annotations)

if result.transform and result.transform.side_inputs:
Expand All @@ -1303,7 +1322,7 @@ def from_runner_api(
for transform_id in proto.subtransforms:
part = context.transforms.get_by_id(transform_id)
part.parent = result
result.parts.append(part)
result.add_part(part)
result.outputs = {
None if tag == 'None' else tag: context.pcollections.get_by_id(id)
for tag,
Expand All @@ -1321,6 +1340,15 @@ def from_runner_api(
pc.tag = None if tag == 'None' else tag
return result

def _merge_outer_resource_hints(self):
if (self.parent is not None and self.parent.resource_hints):
self.resource_hints = merge_resource_hints(
outer_hints=self.parent.resource_hints,
inner_hints=self.resource_hints)
if self.resource_hints:
for part in self.parts:
part._merge_outer_resource_hints()


class PTransformOverride(with_metaclass(abc.ABCMeta,
object)): # type: ignore[misc]
Expand Down
203 changes: 203 additions & 0 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from apache_beam.coders import BytesCoder
from apache_beam.io import Read
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.pipeline import Pipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.pipeline import PipelineVisitor
Expand All @@ -54,6 +55,8 @@
from apache_beam.transforms import PTransform
from apache_beam.transforms import WindowInto
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.environments import ProcessEnvironment
from apache_beam.transforms.resources import ResourceHint
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
Expand Down Expand Up @@ -1047,6 +1050,206 @@ def display_data(self): # type: () -> dict
double_value=1.1).SerializeToString()),
])

def test_runner_api_roundtrip_preserves_resource_hints(self):
p = beam.Pipeline()
_ = (
p | beam.Create([1, 2])
| beam.Map(lambda x: x + 1).with_resource_hints(accelerator='gpu'))

self.assertEqual(
p.transforms_stack[0].parts[1].transform.get_resource_hints(),
{common_urns.resource_hints.ACCELERATOR.urn: b'gpu'})

for _ in range(3):
# Verify that DEFAULT environments are recreated during multiple RunnerAPI
# translation and hints don't get lost.
p = Pipeline.from_runner_api(Pipeline.to_runner_api(p), None, None)
self.assertEqual(
p.transforms_stack[0].parts[1].transform.get_resource_hints(),
{common_urns.resource_hints.ACCELERATOR.urn: b'gpu'})

def test_hints_on_composite_transforms_are_propagated_to_subtransforms(self):
class FooHint(ResourceHint):
urn = 'foo_urn'

class BarHint(ResourceHint):
urn = 'bar_urn'

class BazHint(ResourceHint):
urn = 'baz_urn'

class QuxHint(ResourceHint):
urn = 'qux_urn'

class UseMaxValueHint(ResourceHint):
urn = 'use_max_value_urn'

@classmethod
def get_merged_value(
cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes
return ResourceHint._use_max(outer_value, inner_value)

ResourceHint.register_resource_hint('foo_hint', FooHint)
ResourceHint.register_resource_hint('bar_hint', BarHint)
ResourceHint.register_resource_hint('baz_hint', BazHint)
ResourceHint.register_resource_hint('qux_hint', QuxHint)
ResourceHint.register_resource_hint('use_max_value_hint', UseMaxValueHint)

@beam.ptransform_fn
def SubTransform(pcoll):
return pcoll | beam.Map(lambda x: x + 1).with_resource_hints(
foo_hint='set_on_subtransform', use_max_value_hint='10')

@beam.ptransform_fn
def CompositeTransform(pcoll):
return pcoll | beam.Map(lambda x: x * 2) | SubTransform()

p = beam.Pipeline()
_ = (
p | beam.Create([1, 2])
| CompositeTransform().with_resource_hints(
foo_hint='should_be_overriden_by_subtransform',
bar_hint='set_on_composite',
baz_hint='set_on_composite',
use_max_value_hint='100'))
options = PortableOptions([
'--resource_hint=baz_hint=should_be_overriden_by_composite',
'--resource_hint=qux_hint=set_via_options',
'--environment_type=PROCESS',
'--environment_option=process_command=foo',
'--sdk_location=container',
])
environment = ProcessEnvironment.from_options(options)
proto = Pipeline.to_runner_api(p, default_environment=environment)

for t in proto.components.transforms.values():
if "CompositeTransform/SubTransform/Map" in t.unique_name:
environment = proto.components.environments.get(t.environment_id)
self.assertEqual(
environment.resource_hints.get('foo_urn'), b'set_on_subtransform')
self.assertEqual(
environment.resource_hints.get('bar_urn'), b'set_on_composite')
self.assertEqual(
environment.resource_hints.get('baz_urn'), b'set_on_composite')
self.assertEqual(
environment.resource_hints.get('qux_urn'), b'set_via_options')
self.assertEqual(
environment.resource_hints.get('use_max_value_urn'), b'100')
found = True
assert found

def test_environments_with_same_resource_hints_are_reused(self):
class HintX(ResourceHint):
urn = 'X_urn'

class HintY(ResourceHint):
urn = 'Y_urn'

class HintIsOdd(ResourceHint):
urn = 'IsOdd_urn'

ResourceHint.register_resource_hint('X', HintX)
ResourceHint.register_resource_hint('Y', HintY)
ResourceHint.register_resource_hint('IsOdd', HintIsOdd)

p = beam.Pipeline()
num_iter = 4
for i in range(num_iter):
_ = (
p
| f'NoHintCreate_{i}' >> beam.Create([1, 2])
| f'NoHint_{i}' >> beam.Map(lambda x: x + 1))
_ = (
p
| f'XCreate_{i}' >> beam.Create([1, 2])
|
f'HintX_{i}' >> beam.Map(lambda x: x + 1).with_resource_hints(X='X'))
_ = (
p
| f'XYCreate_{i}' >> beam.Create([1, 2])
| f'HintXY_{i}' >> beam.Map(lambda x: x + 1).with_resource_hints(
X='X', Y='Y'))
_ = (
p
| f'IsOddCreate_{i}' >> beam.Create([1, 2])
| f'IsOdd_{i}' >>
beam.Map(lambda x: x + 1).with_resource_hints(IsOdd=str(i % 2 != 0)))

proto = Pipeline.to_runner_api(p)
count_x = count_xy = count_is_odd = count_no_hints = 0
env_ids = set()
for _, t in proto.components.transforms.items():
env = proto.components.environments[t.environment_id]
if t.unique_name.startswith('HintX_'):
count_x += 1
env_ids.add(t.environment_id)
self.assertEqual(env.resource_hints, {'X_urn': b'X'})

if t.unique_name.startswith('HintXY_'):
count_xy += 1
env_ids.add(t.environment_id)
self.assertEqual(env.resource_hints, {'X_urn': b'X', 'Y_urn': b'Y'})

if t.unique_name.startswith('NoHint_'):
count_no_hints += 1
env_ids.add(t.environment_id)
self.assertEqual(env.resource_hints, {})

if t.unique_name.startswith('IsOdd_'):
count_is_odd += 1
env_ids.add(t.environment_id)
self.assertTrue(
env.resource_hints == {'IsOdd_urn': b'True'} or
env.resource_hints == {'IsOdd_urn': b'False'})
assert count_x == count_is_odd == count_xy == count_no_hints == num_iter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== 1? Or at least > 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_iter is 4

Copy link
Contributor Author

@tvalentyn tvalentyn Apr 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't hurt to assert that it is > 1 though.
FWIW while working on this test I noticed that the to-from translation is somewhat slow, even without my changes. This test took almost 2 seconds to run on my laptop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's slow. I don't think much time has gone into optimizing it, but speeding it up would especially help TFX pipelines (that tend to have lots and lots of stages).

assert num_iter > 1

self.assertEqual(len(env_ids), 5)

def test_multiple_application_of_the_same_transform_set_different_hints(self):
class FooHint(ResourceHint):
urn = 'foo_urn'

class UseMaxValueHint(ResourceHint):
urn = 'use_max_value_urn'

@classmethod
def get_merged_value(
cls, outer_value, inner_value): # type: (bytes, bytes) -> bytes
return ResourceHint._use_max(outer_value, inner_value)

ResourceHint.register_resource_hint('foo_hint', FooHint)
ResourceHint.register_resource_hint('use_max_value_hint', UseMaxValueHint)

@beam.ptransform_fn
def SubTransform(pcoll):
return pcoll | beam.Map(lambda x: x + 1)

@beam.ptransform_fn
def CompositeTransform(pcoll):
sub = SubTransform()
return (
pcoll
| 'first' >> sub.with_resource_hints(foo_hint='first_application')
| 'second' >> sub.with_resource_hints(foo_hint='second_application'))

p = beam.Pipeline()
_ = (p | beam.Create([1, 2]) | CompositeTransform())
proto = Pipeline.to_runner_api(p)
count = 0
for t in proto.components.transforms.values():
if "CompositeTransform/first/Map" in t.unique_name:
environment = proto.components.environments.get(t.environment_id)
self.assertEqual(
b'first_application', environment.resource_hints.get('foo_urn'))
count += 1
if "CompositeTransform/second/Map" in t.unique_name:
environment = proto.components.environments.get(t.environment_id)
self.assertEqual(
b'second_application', environment.resource_hints.get('foo_urn'))
count += 1
assert count == 2


if __name__ == '__main__':
unittest.main()
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardProtocols
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardPTransforms
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardRequirements
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardResourceHints
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardSideInputTypes
from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfo
from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfoSpecs
Expand All @@ -52,6 +53,7 @@
environments = StandardEnvironments.Environments
artifact_types = StandardArtifacts.Types
artifact_roles = StandardArtifacts.Roles
resource_hints = StandardResourceHints.Enum

global_windows = GlobalWindowsPayload.Enum.PROPERTIES
fixed_windows = FixedWindowsPayload.Enum.PROPERTIES
Expand Down
Loading