What happened?
When determining the element_type for the PCollection output from a DoFn.process with @yields_batchhes, we incorrectly use the batch typehint. This affects Beam 2.40.0.
Relates to #21656
Part of #21650
For example: when applying a DoFn like:
class ElementToBatchDoFn(beam.DoFn):
@beam.DoFn.yields_batches
def process(self, element: int) -> Iterator[List[int]]:
yield [element] * element
def infer_output_type(self, input_element_type):
return input_element_type
We indicated that the output PCollection has element_type List[int] (the batch type), instead of int (the element type).
Issue Priority
Priority: 2
Issue Component
Component: sdk-py-core