When testing a PTranform (defined using @ptransform_fn) that merges several PCollections from different sources the following error is raised:
ValueError: Mixing value from different pipelines not allowed.
# Defined in module `utils`
@ptransform_fn
def Join(pcolls, by):
return pcolls | beam.CoGroupByKey()
class UtilsTest(unittest.TestCase):
def test_join(self):
p = TestPipeline(runner="DirectRunner")
p1 = (p
| "Create p1" >> beam.Create([
{'a': 1, 'b': 11},
{'a': 2, 'b': 22},
{'a': 3, 'b': 33}]))
p2 = (p
|
"Create p2" >> beam.Create([
{'a': 1, 'c': 111},
{'a': 1, 'c': 112},
{'a': 3, 'c': 333}]))
res = ((p1, p2) | "LeftJoin" >> utils.Join(by='a'))
beam.assert_that(res, beam.equal_to([
{'a': 1, 'b': 11, 'c': 111},
{'a':
1, 'b': 11, 'c': 112},
{'a': 2, 'b': 22},
{'a': 3, 'b': 33, 'c': 333}]))
# Run test pipeline
p.run()
When testing a PTranform (defined using @ptransform_fn) that merges several PCollections from different sources the following error is raised:
Actually running the same pipeline in GCP using the
DataflowRunnerdoes not give any error. Neither does running the test file manually instead of through nose.Here is an example:
Imported from Jira BEAM-1996. Original Jira may contain additional context.
Reported by: while.