-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Description
What happened?
Found during working on #37855, this also happens on latest Beam version (2.71)
A minimum example:
import typing
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
Tuple1 = typing.NamedTuple(
"Tuple1", [("id", int)])
Tuple2 = typing.NamedTuple(
"Tuple1", [("id", int), ("name", str)])
def generate(num: int):
for i in range(2):
yield (Tuple1(i), num)
yield (Tuple2(i, 'a'), num)
if __name__ == '__main__':
pipeline = TestPipeline(is_integration_test=False)
with pipeline as p:
result = (
p
| 'Create' >> beam.Create([i for i in range(2)])
| 'Generate' >> beam.ParDo(generate).with_output_types(
tuple[(Tuple1 | Tuple2), int])
| 'GBK' >> beam.GroupByKey()
| 'Print' >> beam.Map(print))
Output (actual):
(BeamSchema_4d393d46_1e51_4fa6_b8e4_5397335c352c(id=1), [0, 0, 1, 1])
(BeamSchema_4d393d46_1e51_4fa6_b8e4_5397335c352c(id=0), [0, 0, 1, 1])
Output (expected):
(BeamSchema_[A](id=1), [0, 1])
(BeamSchema_[A](id=0), [0, 1])
(BeamSchema_[B](id=1), [0, 1])
(BeamSchema_[B](id=0), [0, 1])
This could happen more likely for dataclass, when typehint is (A | B) where dataclass B inherits from dataclass A.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Reactions are currently unavailable