diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index eb2990d4222e..2a7576946833 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -258,34 +258,38 @@ def schema_field( description=description) -def _python_any_schema_pb2(): +def _python_any_schema_pb2(has_repr): # A portable schema matches FastPrimitivesCoder encoded values + if has_repr: + representation = schema_pb2.FieldType( + nullable=False, + row_type=schema_pb2.RowType( + schema=schema_pb2.Schema( + fields=[ + schema_pb2.Field( + name=_PYTHON_ANY_FIELD_TYPE_BYTE, + type=schema_pb2.FieldType( + atomic_type=schema_pb2.BYTE, nullable=False)), + schema_pb2.Field( + name=_PYTHON_ANY_FIELD_PAYLOAD, + type=schema_pb2.FieldType( + atomic_type=schema_pb2.BYTES, nullable=False)) + ], + options=[ + schema_pb2.Option( + name=_SCHEMA_OPTION_STATIC_ENCODING, + type=schema_pb2.FieldType( + atomic_type=schema_pb2.BOOLEAN), + value=schema_pb2.FieldValue( + atomic_value=schema_pb2.AtomicTypeValue( + boolean=True))) + ]))) if has_repr else None + else: + representation = None + return schema_pb2.FieldType( logical_type=schema_pb2.LogicalType( - urn=PYTHON_ANY_URN, - representation=schema_pb2.FieldType( - nullable=False, - row_type=schema_pb2.RowType( - schema=schema_pb2.Schema( - fields=[ - schema_pb2.Field( - name=_PYTHON_ANY_FIELD_TYPE_BYTE, - type=schema_pb2.FieldType( - atomic_type=schema_pb2.BYTE, nullable=False)), - schema_pb2.Field( - name=_PYTHON_ANY_FIELD_PAYLOAD, - type=schema_pb2.FieldType( - atomic_type=schema_pb2.BYTES, nullable=False)) - ], - options=[ - schema_pb2.Option( - name=_SCHEMA_OPTION_STATIC_ENCODING, - type=schema_pb2.FieldType( - atomic_type=schema_pb2.BOOLEAN), - value=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicTypeValue( - boolean=True))) - ])))), + urn=PYTHON_ANY_URN, representation=representation), nullable=True) @@ -388,14 +392,18 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType: return schema_pb2.FieldType( array_type=schema_pb2.ArrayType(element_type=element_type)) + elif type_ == Any: + return _python_any_schema_pb2(has_repr=False) + try: if LogicalType.is_known_logical_type(type_): logical_type = type_ else: logical_type = LogicalType.from_typing(type_) except ValueError: - # Unknown type, just treat it like Any - return _python_any_schema_pb2() + # Unknown type, use pythonsdk_any with a representation compatible with + # FastPrimitiveCoder encoded UNKNOWN type + return _python_any_schema_pb2(has_repr=True) else: argument_type = None argument = None diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 8032e4701c25..3a6d81fab5b9 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -582,9 +582,26 @@ def test_proto_survives_typing_roundtrip(self, fieldtype_proto): fieldtype_proto, schema_registry=SchemaTypeRegistry()), schema_registry=SchemaTypeRegistry())) + def test_any_maps_to_any(self): + # python_any for typing.Any logical type's representation is delibrately set + # absent to prevent the usage crossing language boundary, as its encoded + # form isn't predictable from foreign SDK. + self.assertEqual( + typing_to_runner_api(Any), + schemas._python_any_schema_pb2(has_repr=False)) + def test_unknown_primitive_maps_to_any(self): self.assertEqual( - typing_to_runner_api(np.uint32), schemas._python_any_schema_pb2()) + typing_to_runner_api(np.uint32), + schemas._python_any_schema_pb2(has_repr=True)) + + def test_unknown_user_type_maps_to_any(self): + class MyUnknownType: + pass + + self.assertEqual( + typing_to_runner_api(MyUnknownType), + schemas._python_any_schema_pb2(has_repr=True)) def test_unknown_atomic_raise_valueerror(self): self.assertRaises(