Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Airflow serialization for namedtuple #37168

Merged
merged 10 commits into from
Feb 18, 2024
29 changes: 23 additions & 6 deletions airflow/serialization/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,27 @@ def serialize(o: object, depth: int = 0) -> U | None:

cls = type(o)
qn = qualname(o)
classname = None
Joffreybvn marked this conversation as resolved.
Show resolved Hide resolved

# Serialize namedtuple like tuples
# We also override the classname returned by the builtin.py serializer. The classname
# has to be "builtins.tuple", so that the deserializer can deserialize the object into tuple.
if _is_namedtuple(o):
qn = "builtins.tuple"
classname = qn
Joffreybvn marked this conversation as resolved.
Show resolved Hide resolved

# if there is a builtin serializer available use that
Joffreybvn marked this conversation as resolved.
Show resolved Hide resolved
if qn in _serializers:
data, serialized_classname, version, is_serialized = _serializers[qn].serialize(o)
if is_serialized:
return encode(classname or serialized_classname, version, serialize(data, depth + 1))

# custom serializers
dct = {
CLASSNAME: qn,
VERSION: getattr(cls, "__version__", DEFAULT_VERSION),
}

# if there is a builtin serializer available use that
if qn in _serializers:
data, classname, version, is_serialized = _serializers[qn].serialize(o)
if is_serialized:
return encode(classname, version, serialize(data, depth + 1))

# object / class brings their own
if hasattr(o, "serialize"):
data = getattr(o, "serialize")()
Expand Down Expand Up @@ -337,6 +345,15 @@ def _is_pydantic(cls: Any) -> bool:
return hasattr(cls, "model_config") and hasattr(cls, "model_fields") and hasattr(cls, "model_fields_set")


def _is_namedtuple(cls: Any) -> bool:
"""Return True if the class is a namedtuple.

Checking is done by attributes as it is significantly faster than
using isinstance.
"""
return hasattr(cls, "_asdict") and hasattr(cls, "_fields") and hasattr(cls, "_field_defaults")
Joffreybvn marked this conversation as resolved.
Show resolved Hide resolved


def _register():
"""Register builtin serializers and deserializers for types that don't have any themselves."""
_serializers.clear()
Expand Down
9 changes: 9 additions & 0 deletions tests/serialization/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import datetime
import enum
from collections import namedtuple
from dataclasses import dataclass
from importlib import import_module
from typing import ClassVar
Expand Down Expand Up @@ -185,6 +186,14 @@ def test_ser_plain_dict(self):
i = {SCHEMA_ID: "cannot"}
serialize(i)

def test_ser_namedtuple(self):
CustomTuple = namedtuple("CustomTuple", ["id", "value"])
data = CustomTuple(id=1, value="something")

i = deserialize(serialize(data))
e = (1, "something")
assert i == e

def test_no_serializer(self):
with pytest.raises(TypeError, match="^cannot serialize"):
i = Exception
Expand Down