Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,34 +258,38 @@ def schema_field(
description=description)


def _python_any_schema_pb2():
def _python_any_schema_pb2(has_repr):
# A portable schema matches FastPrimitivesCoder encoded values
if has_repr:
representation = schema_pb2.FieldType(
nullable=False,
row_type=schema_pb2.RowType(
schema=schema_pb2.Schema(
fields=[
schema_pb2.Field(
name=_PYTHON_ANY_FIELD_TYPE_BYTE,
type=schema_pb2.FieldType(
atomic_type=schema_pb2.BYTE, nullable=False)),
schema_pb2.Field(
name=_PYTHON_ANY_FIELD_PAYLOAD,
type=schema_pb2.FieldType(
atomic_type=schema_pb2.BYTES, nullable=False))
],
options=[
schema_pb2.Option(
name=_SCHEMA_OPTION_STATIC_ENCODING,
type=schema_pb2.FieldType(
atomic_type=schema_pb2.BOOLEAN),
value=schema_pb2.FieldValue(
atomic_value=schema_pb2.AtomicTypeValue(
boolean=True)))
]))) if has_repr else None
else:
representation = None

return schema_pb2.FieldType(
logical_type=schema_pb2.LogicalType(
urn=PYTHON_ANY_URN,
representation=schema_pb2.FieldType(
nullable=False,
row_type=schema_pb2.RowType(
schema=schema_pb2.Schema(
fields=[
schema_pb2.Field(
name=_PYTHON_ANY_FIELD_TYPE_BYTE,
type=schema_pb2.FieldType(
atomic_type=schema_pb2.BYTE, nullable=False)),
schema_pb2.Field(
name=_PYTHON_ANY_FIELD_PAYLOAD,
type=schema_pb2.FieldType(
atomic_type=schema_pb2.BYTES, nullable=False))
],
options=[
schema_pb2.Option(
name=_SCHEMA_OPTION_STATIC_ENCODING,
type=schema_pb2.FieldType(
atomic_type=schema_pb2.BOOLEAN),
value=schema_pb2.FieldValue(
atomic_value=schema_pb2.AtomicTypeValue(
boolean=True)))
])))),
urn=PYTHON_ANY_URN, representation=representation),
nullable=True)


Expand Down Expand Up @@ -388,14 +392,18 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
return schema_pb2.FieldType(
array_type=schema_pb2.ArrayType(element_type=element_type))

elif type_ == Any:
return _python_any_schema_pb2(has_repr=False)

try:
if LogicalType.is_known_logical_type(type_):
logical_type = type_
else:
logical_type = LogicalType.from_typing(type_)
except ValueError:
# Unknown type, just treat it like Any
return _python_any_schema_pb2()
# Unknown type, use pythonsdk_any with a representation compatible with
# FastPrimitiveCoder encoded UNKNOWN type
return _python_any_schema_pb2(has_repr=True)
else:
argument_type = None
argument = None
Expand Down
19 changes: 18 additions & 1 deletion sdks/python/apache_beam/typehints/schemas_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,26 @@ def test_proto_survives_typing_roundtrip(self, fieldtype_proto):
fieldtype_proto, schema_registry=SchemaTypeRegistry()),
schema_registry=SchemaTypeRegistry()))

def test_any_maps_to_any(self):
# python_any for typing.Any logical type's representation is delibrately set
# absent to prevent the usage crossing language boundary, as its encoded
# form isn't predictable from foreign SDK.
self.assertEqual(
typing_to_runner_api(Any),
schemas._python_any_schema_pb2(has_repr=False))

def test_unknown_primitive_maps_to_any(self):
self.assertEqual(
typing_to_runner_api(np.uint32), schemas._python_any_schema_pb2())
typing_to_runner_api(np.uint32),
schemas._python_any_schema_pb2(has_repr=True))

def test_unknown_user_type_maps_to_any(self):
class MyUnknownType:
pass

self.assertEqual(
typing_to_runner_api(MyUnknownType),
schemas._python_any_schema_pb2(has_repr=True))

def test_unknown_atomic_raise_valueerror(self):
self.assertRaises(
Expand Down
Loading