Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change timezone serialization logic #16631

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 27 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cattr
import pendulum
from dateutil import relativedelta
from pendulum.tz.zoneinfo.exceptions import InvalidTimezone

try:
from functools import cache
Expand Down Expand Up @@ -195,6 +196,19 @@ def serialize_to_json(
serialized_object[key] = value
return serialized_object

@classmethod
def serialize_timezone(cls, var: datetime.tzinfo) -> Any:
"""Serializes a timezone to json"""
tz_name = getattr(var, 'name', None)
if not tz_name:
Comment on lines +202 to +203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also use tzname for pendulum Timezone as well? It’s a standard method that should always yield something useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a standard method that should always yield something useful

Well, not always with pendulum as explained in #16631 (comment) , pendulum.timezone('Europe/Stockholm').tzname(...) can return CEST which can't be converted back with pendulum.timezone('CEST') as it raises InvalidTimezone

I did try that at some point. The whole concept of datetime.tzinfo -> str -> datetime.tzinfo works sometimes but there is no really hard guarantee. So that's why I use the .name if present (which marginally increases the chances of success serialization/deserialization)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gosh we really should drop Pendulum at some point I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell there is no good timezone library in python really :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no good timezone libraries, in general 🙂 Personally I think the new zoneinfo is as good as it can be.

try:
tz_name = var.tzname(datetime.datetime.utcnow())
except InvalidTimezone:
raise TypeError(f"the timezone {var} can't be serialized")
ecerulm marked this conversation as resolved.
Show resolved Hide resolved


return cls._encode(str(tz_name), type_=DAT.TIMEZONE)
ecerulm marked this conversation as resolved.
Show resolved Hide resolved

# pylint: disable=too-many-return-statements
@classmethod
def _serialize(cls, var: Any) -> Any: # Unfortunately there is no support for recursive types in mypy
Expand Down Expand Up @@ -229,7 +243,7 @@ def _serialize(cls, var: Any) -> Any: # Unfortunately there is no support for r
elif isinstance(var, datetime.timedelta):
return cls._encode(var.total_seconds(), type_=DAT.TIMEDELTA)
elif isinstance(var, Timezone):
return cls._encode(str(var.name), type_=DAT.TIMEZONE)
return cls.serialize_timezone(var)
elif isinstance(var, relativedelta.relativedelta):
encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") and v}
if var.weekday and var.weekday.n:
Expand Down Expand Up @@ -300,7 +314,18 @@ def _deserialize(cls, encoded_var: Any) -> Any: # pylint: disable=too-many-retu
raise TypeError(f'Invalid type {type_!s} in deserialization.')

_deserialize_datetime = pendulum.from_timestamp
_deserialize_timezone = pendulum.tz.timezone

@classmethod
def _deserialize_timezone(cls, tz_name: str) -> Timezone:
try:
return pendulum.tz.timezone(tz_name)
except InvalidTimezone:
pass
try:
return pendulum.parse("2021-01-01T12:00:00" + tz_name).tzinfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks more than slightly odd -- when is this case hit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From any of the new cases provided in the tests_dag_serialization.py:test_deserialization_start_date()

But as an example if the DAG start_date is pendulum.parse("2019-08-01T00:00:00.000+01:00" then DAG timezone will be a pendulum.tz.timezone.FixedOffset with name +01:00 so that it's what it will be serialized to.

Now in the deserialization step pendulum.timezone('+01:00') will raise a InvalidTimezone because that only work for "named" timezones (and not all I must add). See sdispater/pendulum#554

So if we want to parse +01:00 back to a timezone object, pendulum.timezone() won't do it, we need to "hack it" by creating a "fake" iso 8601 string with the +01:00 and using pendulum.parse() to construct a FixedTimezone() object for us.

I couldn't find any other method in pendulum's to convert the string '+01:00 to a FixedTimezone/Timezone/datetime.tzinfo

except ValueError:
pass
raise ValueError("{tz_name} can't be converted to pendulum.tz.timezone.Timezone")

@classmethod
def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta:
Expand Down
24 changes: 24 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dateutil.relativedelta import FR, relativedelta
from kubernetes.client import models as k8s
from parameterized import parameterized
from pendulum.tz.timezone import FixedTimezone

from airflow.hooks.base import BaseHook
from airflow.kubernetes.pod_generator import PodGenerator
Expand Down Expand Up @@ -446,6 +447,26 @@ def validate_deserialized_task(
datetime(2019, 8, 1, tzinfo=timezone.utc),
),
(pendulum.datetime(2019, 8, 1, tz='UTC'), None, pendulum.datetime(2019, 8, 1, tz='UTC')),
(
pendulum.parse("2019-08-01T00:00:00.000+00:00"),
None,
pendulum.parse("2019-08-01T00:00:00.000+00:00"),
),
(
pendulum.parse("2019-08-01T00:00:00.000+01:30"),
None,
pendulum.parse("2019-08-01T00:00:00.000+01:30"),
),
(
pendulum.datetime(2019, 8, 1, tz='Europe/Stockholm'),
None,
pendulum.datetime(2019, 8, 1, tz='Europe/Stockholm'),
),
(
pendulum.datetime(2019, 8, 1, tz=FixedTimezone(3600)),
None,
pendulum.datetime(2019, 8, 1, tz=FixedTimezone(3600)),
),
]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb , any of this added cases with offset timezones "+01:00", FixedTimezone(3600), etc will raise an InvalidTimezone exception in the SerializedDAG._deserialize_timezone() at the pendulum.timezone(xxx).

Those timezones serialize to the timezone name which is +01:00,etc and pendulum.timezone('+01:00') raises InvalidTimezone because that method is meant to resolve "named" timezones only (so only works with UTC, Europe/Stockholm, EST, etc )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this works:

In [13]: pendulum.timezone(3600)                                                                                                                       
Out[13]: Timezone('+01:00')

Maybe we need to change the serialization format to accept string or int -- I think that might be less hacky.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe coupled with tz_name in pendulum.tz.timezones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try but I really that will add some very pendulum-specific logic.

I mean, it will involve checking if this is a pendulum.tz.timezone.FixedTimezone and serializing to int if so, there is no "standard" good way of checking if a given datetime.tzinfo is a simple timezone (fixed offset only) or a full fledged timezone (with arbitrarily complex dst rules, etc).

In principle the user can provide a datetime.tzinfo of it's own with name RubenTZ and that will fail the pendulum.timezone('RubenTZ') but doesn't mean that we can just serialize to a fixedoffset (RubenTZ is much more complex)

What I'm trying to say is that is going to endup being very hackish anyway, but in the serializing part instead of the deserializing part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe coupled with tz_name in pendulum.tz.timezones?

I'm not sure what you mean , but if you mean to use .tzname() instead of .name I can already tell you that is even more brittle.

)
def test_deserialization_start_date(self, dag_start_date, task_start_date, expected_task_start_date):
Expand All @@ -463,6 +484,9 @@ def test_deserialization_start_date(self, dag_start_date, task_start_date, expec
dag = SerializedDAG.from_dict(serialized_dag)
simple_task = dag.task_dict["simple_task"]
assert simple_task.start_date == expected_task_start_date
# timezone may have been converted but the offset and name should be the same
assert dag.timezone.utcoffset(dag_start_date) == dag_start_date.tzinfo.utcoffset(dag_start_date)
assert dag.timezone.tzname(dag_start_date) == dag_start_date.tzinfo.tzname(dag_start_date)

def test_deserialization_with_dag_context(self):
with DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1, tzinfo=timezone.utc)) as dag:
Expand Down