Skip to content

Commit cebe628

Browse files
authored
Merge pull request #11283 [BEAM-9322] [BEAM-1833] Better naming for composite transform output tags.
2 parents 8fb7f0f + b497bf4 commit cebe628

File tree

3 files changed

+28
-15
lines changed

3 files changed

+28
-15
lines changed

CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,14 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss
131131
used to be deployed to
132132
[apachebeam](https://hub.docker.com/search?q=apachebeam&type=image) repository.
133133
([BEAM-9063](https://issues.apache.org/jira/browse/BEAM-9093))
134+
* PCollections now have tags inferred from the result type (e.g. the keys of a dict or index of a tuple). Users may expect the old implementation which gave PCollection output ids a monotonically increasing id. To go back to the old implementation, use the `force_generated_pcollection_output_ids` experiment.
134135

135136
## Deprecations
136137

137138
## Bugfixes
138139

139140
* Fixed numpy operators in ApproximateQuantiles (Python) ([BEAM-9579](https://issues.apache.org/jira/browse/BEAM-9579)).
140141
* Fixed exception when running in IPython notebook (Python) ([BEAM-X9277](https://issues.apache.org/jira/browse/BEAM-9277)).
141-
* Fixed 1833 (Python) ([BEAM-1833](https://issues.apache.org/jira/browse/BEAM-1833))
142142
* Fixed Flink uberjar job termination bug. ([BEAM-9225](https://issues.apache.org/jira/browse/BEAM-9225))
143143
* Fixed SyntaxError in process worker startup ([BEAM-9503](https://issues.apache.org/jira/browse/BEAM-9503))
144144

sdks/python/apache_beam/pipeline.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ def apply(
636636
if type_options is not None and type_options.pipeline_type_check:
637637
transform.type_check_outputs(pvalueish_result)
638638

639-
for result in ptransform.get_nested_pvalues(pvalueish_result):
639+
for tag, result in ptransform.get_named_nested_pvalues(pvalueish_result):
640640
assert isinstance(result, (pvalue.PValue, pvalue.DoOutputsTuple))
641641

642642
# Make sure we set the producer only for a leaf node in the transform
@@ -669,12 +669,16 @@ def apply(
669669
current.add_output(result, tag)
670670
continue
671671

672-
# TODO(BEAM-9322): Find the best auto-generated tags for nested
673-
# PCollections.
674-
# If the user wants the old implementation of always generated
675-
# PCollection output ids, then set the tag to None first, then count up
676-
# from 1.
677-
tag = len(current.outputs) if None in current.outputs else None
672+
if self._options.view_as(DebugOptions).lookup_experiment(
673+
'force_generated_pcollection_output_ids', default=False):
674+
tag = len(current.outputs) if None in current.outputs else None
675+
else:
676+
base = tag
677+
counter = 0
678+
while tag in current.outputs:
679+
counter += 1
680+
tag = '%s_%d' % (base, counter)
681+
678682
current.add_output(result, tag)
679683

680684
if (type_options is not None and
@@ -1252,7 +1256,8 @@ def is_python_side_input(tag):
12521256
return result
12531257

12541258

1255-
class PTransformOverride(with_metaclass(abc.ABCMeta, object)): # type: ignore[misc]
1259+
class PTransformOverride(with_metaclass(abc.ABCMeta,
1260+
object)): # type: ignore[misc]
12561261
"""For internal use only; no backwards-compatibility guarantees.
12571262
12581263
Gives a matcher and replacements for matching PTransforms.

sdks/python/apache_beam/transforms/ptransform.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -256,21 +256,29 @@ def get_nested_pvalues(pvalueish):
256256
return pvalues
257257

258258

259-
def get_nested_pvalues0(pvalueish):
260-
if isinstance(pvalueish, (tuple, list)):
259+
def get_named_nested_pvalues(pvalueish):
260+
if isinstance(pvalueish, tuple):
261+
# Check to see if it's a named tuple.
262+
fields = getattr(pvalueish, '_fields', None)
263+
if fields and len(fields) == len(pvalueish):
264+
tagged_values = zip(fields, pvalueish)
265+
else:
266+
tagged_values = enumerate(pvalueish)
267+
elif isinstance(pvalueish, list):
261268
tagged_values = enumerate(pvalueish)
262-
if isinstance(pvalueish, dict):
269+
elif isinstance(pvalueish, dict):
263270
tagged_values = pvalueish.items()
264271
else:
265-
yield None, pvalueish
272+
if isinstance(pvalueish, (pvalue.PValue, pvalue.DoOutputsTuple)):
273+
yield None, pvalueish
266274
return
267275

268276
for tag, subvalue in tagged_values:
269-
for subtag, subsubvalue in get_nested_pvalues(subvalue):
277+
for subtag, subsubvalue in get_named_nested_pvalues(subvalue):
270278
if subtag is None:
271279
yield tag, subsubvalue
272280
else:
273-
yield '%s.%s' % (tag, subsubvalue), subsubvalue
281+
yield '%s.%s' % (tag, subtag), subsubvalue
274282

275283

276284
class _ZipPValues(object):

0 commit comments

Comments
 (0)