-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Labels
Description
What would you like to happen?
I have a multi-output DoFn:
class DoFn1:
def process(self, row) -> Iterable[Union[Dict[str, Any], pvalue.TaggedOutput]]:
if something:
yield some_dict(...)
else:
yield pvalue.TaggedOutput("bad", ...)And another DoFn that consumes its outputs
class DoFn2:
def process(self, row: Dict[str, Any]) -> Iterable[...]:
if something:
yield some_dict(...)
else:
yield pvalue.TaggedOutput("bad", ...)And then when I use it like this:
pcoll = ...
pcoll = pcoll | "dofn1" >> beam.ParDo(DoFn1()).with_outputs(
"bad",
main="good",
)
pcoll["good"] | "dofn2" >> beam.ParDo(DoFn2())I get an error that looks like this:
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 'dofn2': requires Dict[str, Any]
but got Union[Dict[str, Any], TaggedOutput] for row
How can I tell Beam type checking that the good tag gives Dict[str, Any], and bad gives TaggedOutput? Am I forced to propagate the tagged output type hints?
Also asked in SO here if not appropriate for Github: https://stackoverflow.com/questions/76151787/in-apache-beam-dataflow-multi-output-dofns-how-do-you-assign-type-hints-to-spec
I found a related discussion here: #9810
Issue Priority
Priority: 3 (nice-to-have improvement)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
lazarillo