Skip to content

Commit

Permalink
Fix Airflow serialization for namedtuple (apache#37168)
Browse files Browse the repository at this point in the history
Namedtuple is serialized like 'builtins.tuple'

The serialize method (in airflow/serialization/serializers/builtin.py) does qualname() on the namedtuple, which returns an arbitrary name. If this is used as classname, it will fail to deserialize: there won't be any deserializer for it.
  • Loading branch information
Joffreybvn authored and sunank200 committed Feb 21, 2024
1 parent 7692d6a commit b74b109
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
29 changes: 23 additions & 6 deletions airflow/serialization/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,27 @@ def serialize(o: object, depth: int = 0) -> U | None:

cls = type(o)
qn = qualname(o)
classname = None

# Serialize namedtuple like tuples
# We also override the classname returned by the builtin.py serializer. The classname
# has to be "builtins.tuple", so that the deserializer can deserialize the object into tuple.
if _is_namedtuple(o):
qn = "builtins.tuple"
classname = qn

# if there is a builtin serializer available use that
if qn in _serializers:
data, serialized_classname, version, is_serialized = _serializers[qn].serialize(o)
if is_serialized:
return encode(classname or serialized_classname, version, serialize(data, depth + 1))

# custom serializers
dct = {
CLASSNAME: qn,
VERSION: getattr(cls, "__version__", DEFAULT_VERSION),
}

# if there is a builtin serializer available use that
if qn in _serializers:
data, classname, version, is_serialized = _serializers[qn].serialize(o)
if is_serialized:
return encode(classname, version, serialize(data, depth + 1))

# object / class brings their own
if hasattr(o, "serialize"):
data = getattr(o, "serialize")()
Expand Down Expand Up @@ -337,6 +345,15 @@ def _is_pydantic(cls: Any) -> bool:
return hasattr(cls, "model_config") and hasattr(cls, "model_fields") and hasattr(cls, "model_fields_set")


def _is_namedtuple(cls: Any) -> bool:
"""Return True if the class is a namedtuple.
Checking is done by attributes as it is significantly faster than
using isinstance.
"""
return hasattr(cls, "_asdict") and hasattr(cls, "_fields") and hasattr(cls, "_field_defaults")


def _register():
"""Register builtin serializers and deserializers for types that don't have any themselves."""
_serializers.clear()
Expand Down
9 changes: 9 additions & 0 deletions tests/serialization/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import datetime
import enum
from collections import namedtuple
from dataclasses import dataclass
from importlib import import_module
from typing import ClassVar
Expand Down Expand Up @@ -185,6 +186,14 @@ def test_ser_plain_dict(self):
i = {SCHEMA_ID: "cannot"}
serialize(i)

def test_ser_namedtuple(self):
CustomTuple = namedtuple("CustomTuple", ["id", "value"])
data = CustomTuple(id=1, value="something")

i = deserialize(serialize(data))
e = (1, "something")
assert i == e

def test_no_serializer(self):
with pytest.raises(TypeError, match="^cannot serialize"):
i = Exception
Expand Down

0 comments on commit b74b109

Please sign in to comment.