Skip to content

Commit

Permalink
Merge pull request #7457: [BEAM-6294] Ensure input and output coders …
Browse files Browse the repository at this point in the history
…are equal for reshuffle transforms.
  • Loading branch information
tweise committed Jan 10, 2019
2 parents 49d5cf5 + ee57028 commit 336fe0c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/pipeline.py
Expand Up @@ -544,11 +544,12 @@ def apply(self, transform, pvalueish=None, label=None):

def _infer_result_type(self, transform, inputs, result_pcollection):
# TODO(robertwb): Multi-input, multi-output inference.
# TODO(robertwb): Ideally we'd do intersection here.
type_options = self._options.view_as(TypeOptions)
if (type_options is not None and type_options.pipeline_type_check
and isinstance(result_pcollection, pvalue.PCollection)
and not result_pcollection.element_type):
and (not result_pcollection.element_type
# TODO(robertwb): Ideally we'd do intersection here.
or result_pcollection.element_type == typehints.Any)):
input_element_type = (
inputs[0].element_type
if len(inputs) == 1
Expand Down
Expand Up @@ -363,6 +363,12 @@ def test_group_by_key(self):
| beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))

# Runners may special case the Reshuffle transform urn.
def test_reshuffle(self):
with self.create_pipeline() as p:
assert_that(p | beam.Create([1, 2, 3]) | beam.Reshuffle(),
equal_to([1, 2, 3]))

def test_flatten(self):
with self.create_pipeline() as p:
res = (p | 'a' >> beam.Create(['a']),
Expand Down

0 comments on commit 336fe0c

Please sign in to comment.