Skip to content

Commit

Permalink
[BEAM-6294] Ensure input and output coders are equal for reshuffle tr…
Browse files Browse the repository at this point in the history
…ansforms.

The type declarations were there, but not getting applied due to a
longstanding TODO.  This doesn't resolve that TODO completely, but fixes
a large number of cases, including this one.
  • Loading branch information
robertwb authored and tweise committed Jan 12, 2019
1 parent 629eaf9 commit de94544
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/pipeline.py
Expand Up @@ -544,11 +544,12 @@ def apply(self, transform, pvalueish=None, label=None):

def _infer_result_type(self, transform, inputs, result_pcollection):
# TODO(robertwb): Multi-input, multi-output inference.
# TODO(robertwb): Ideally we'd do intersection here.
type_options = self._options.view_as(TypeOptions)
if (type_options is not None and type_options.pipeline_type_check
and isinstance(result_pcollection, pvalue.PCollection)
and not result_pcollection.element_type):
and (not result_pcollection.element_type
# TODO(robertwb): Ideally we'd do intersection here.
or result_pcollection.element_type == typehints.Any)):
input_element_type = (
inputs[0].element_type
if len(inputs) == 1
Expand Down
Expand Up @@ -363,6 +363,12 @@ def test_group_by_key(self):
| beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))

# Runners may special case the Reshuffle transform urn.
def test_reshuffle(self):
with self.create_pipeline() as p:
assert_that(p | beam.Create([1, 2, 3]) | beam.Reshuffle(),
equal_to([1, 2, 3]))

def test_flatten(self):
with self.create_pipeline() as p:
res = (p | 'a' >> beam.Create(['a']),
Expand Down

0 comments on commit de94544

Please sign in to comment.