Skip to content

Commit

Permalink
[BEAM-9340] Populate requirements for Python DoFn properties.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Feb 21, 2020
1 parent 742c570 commit d58d829
Show file tree
Hide file tree
Showing 32 changed files with 114,094 additions and 12 deletions.
4 changes: 4 additions & 0 deletions model/job-management/src/main/proto/beam_expansion_api.proto
Expand Up @@ -57,6 +57,10 @@ message ExpansionResponse {
// and subtransforms.
org.apache.beam.model.pipeline.v1.PTransform transform = 2;

// A set of requirements that must be appended to this pipeline's
// requirements.
repeated string requirements = 3;

// (Optional) An string representation of any error encountered while
// attempting to expand this transform.
string error = 10;
Expand Down
25 changes: 20 additions & 5 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Expand Up @@ -443,25 +443,40 @@ message ParDoPayload {
map<string, SideInput> side_inputs = 3;

// (Optional) A mapping of local state names to state specifications.
// If this is set, the stateful processing requirement should also
// be placed in the pipeline requirements.
map<string, StateSpec> state_specs = 4;

// (Optional) A mapping of local timer names to timer specifications.
// If this is set, the stateful processing requirement should also
// be placed in the pipeline requirements.
map<string, TimerSpec> timer_specs = 5;

// (Optional) A mapping of local timer family names to timer specifications.
// If this is set, the stateful processing requirement should also
// be placed in the pipeline requirements.
map<string, TimerFamilySpec> timer_family_specs = 9;

// Whether the DoFn is splittable
bool splittable = 6;

// (Required if splittable == true) Id of the restriction coder.
string restriction_coder_id = 7;

// (Optional) Only set when this ParDo can request bundle finalization.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
bool requests_finalization = 8;

// (Optional) A mapping of local timer family names to timer specifications.
map<string, TimerFamilySpec> timer_family_specs = 9;

// Whether this stage requires time sorted input
// Whether this stage requires time sorted input.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
bool requires_time_sorted_input = 10;

// Whether this stage requires stable input.
// If this is set, the corresponding standard requirement should also
// be placed in the pipeline requirements.
bool requires_stable_input = 11;
}

// Parameters that a UDF might require.
Expand Down Expand Up @@ -1331,7 +1346,7 @@ message StandardRequirements {

// This requirement indicates the requests_finalization field of ParDo
// transform payloads must be inspected.
REQUIRES_PARDO_FINALIZATION = 1 [(beam_urn) = "beam:requirement:pardo:finalization:v1"];
REQUIRES_BUNDLE_FINALIZATION = 1 [(beam_urn) = "beam:requirement:pardo:finalization:v1"];

// This requirement indicates the requires_stable_input field of ParDo
// transform payloads must be inspected.
Expand Down
24,402 changes: 24,402 additions & 0 deletions sdks/python/apache_beam/coders/coder_impl.html

Large diffs are not rendered by default.

4,240 changes: 4,240 additions & 0 deletions sdks/python/apache_beam/coders/stream.html

Large diffs are not rendered by default.

8,956 changes: 8,956 additions & 0 deletions sdks/python/apache_beam/metrics/cells.html

Large diffs are not rendered by default.

4,700 changes: 4,700 additions & 0 deletions sdks/python/apache_beam/metrics/execution.html

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions sdks/python/apache_beam/pipeline.py
Expand Up @@ -778,7 +778,8 @@ def visit_transform(self, transform_node):
root_transform_id = context.transforms.get_id(self._root_transform())
proto = beam_runner_api_pb2.Pipeline(
root_transform_ids=[root_transform_id],
components=context.to_runner_api())
components=context.to_runner_api(),
requirements=context.requirements())
proto.components.transforms[root_transform_id].unique_name = (
root_transform_id)
if return_context:
Expand All @@ -799,7 +800,9 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.Pipeline
p = Pipeline(runner=runner, options=options)
from apache_beam.runners import pipeline_context
context = pipeline_context.PipelineContext(
proto.components, allow_proto_holders=allow_proto_holders)
proto.components,
allow_proto_holders=allow_proto_holders,
requirements=proto.requirements)
root_transform_id, = proto.root_transform_ids
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
# TODO(robertwb): These are only needed to continue construction. Omit?
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/pipeline_test.py
Expand Up @@ -39,6 +39,7 @@
from apache_beam.pipeline import PipelineOptions
from apache_beam.pipeline import PipelineVisitor
from apache_beam.pipeline import PTransformOverride
from apache_beam.portability import common_urns
from apache_beam.pvalue import AsSingleton
from apache_beam.pvalue import TaggedOutput
from apache_beam.runners.dataflow.native_io.iobase import NativeSource
Expand Down Expand Up @@ -825,6 +826,16 @@ def expand(self, p):
self.assertEqual(
p.transforms_stack[0].parts[0].parent, p.transforms_stack[0])

def test_requirements(self):
p = beam.Pipeline()
_ = (
p | beam.Create([])
| beam.ParDo(lambda x, finalize=beam.DoFn.BundleFinalizerParam: None))
proto = p.to_runner_api()
self.assertTrue(
common_urns.requirements.REQUIRES_BUNDLE_FINALIZATION.urn,
proto.requirements)


if __name__ == '__main__':
unittest.main()
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/portability/common_urns.py
Expand Up @@ -26,6 +26,7 @@
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardEnvironments
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardProtocols
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardPTransforms
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardRequirements
from apache_beam.portability.api.beam_runner_api_pb2_urns import StandardSideInputTypes
from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfo
from apache_beam.portability.api.metrics_pb2_urns import MonitoringInfoSpecs
Expand Down Expand Up @@ -57,3 +58,4 @@
monitoring_info_labels = MonitoringInfo.MonitoringInfoLabels

protocols = StandardProtocols.Enum
requirements = StandardRequirements.Enum

0 comments on commit d58d829

Please sign in to comment.