Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decode old-style nested Xcom value #31866

Merged
merged 16 commits into from
Jun 29, 2023

Conversation

utkarsharma2
Copy link
Contributor

@utkarsharma2 utkarsharma2 commented Jun 13, 2023

Xcom values generated by airflow 2.5.2 was not getting decoded by airflow 2.6.
There were two issues

  1. I think it was a typo, which was mentioned in the issue -
    return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA][OLD_DATA]}
  2. When we have __type == dict in _convert() we don't need to wrap the data in dict again.

closes: #31769

@utkarsharma2 utkarsharma2 marked this pull request as draft June 13, 2023 05:45
@utkarsharma2 utkarsharma2 marked this pull request as ready for review June 13, 2023 06:28
@ephraimbuddy ephraimbuddy added this to the Airlfow 2.6.3 milestone Jun 13, 2023
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Jun 13, 2023
airflow/serialization/serde.py Outdated Show resolved Hide resolved
Copy link
Contributor

@phanikumv phanikumv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test docker-compose quick start is failing, other than that it lgtm

@@ -276,7 +276,11 @@ def deserialize(o: T | None, full=True, type_hint: Any = None) -> object:
def _convert(old: dict) -> dict:
"""Converts an old style serialization to new style."""
if OLD_TYPE in old and OLD_DATA in old:
return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA][OLD_DATA]}
# Added to handle for handling xcom data from airflow 2.5.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Added to handle for handling xcom data from airflow 2.5.2
# Added to handle for handling xcom data from airflow 2.5.2

This comment feels a little weird to me. Is this really just 2.5.2, or <2.6?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thought (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what all airflow version generates this encoded value, but I'll check and update that here. Sorry, should have done this in 1st iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code comment should be applicable for 2.4>= to <2.6 because the dataset concept was introduced in 2.4. Will update accordingly. Will update the code comment accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I was able to replicate the issue for airflow 2.4.3 as running example dag dataset_produces_1 produces data - [{“__type": "airflow.datasets.Dataset", "__source": "dataset_produces_1.producing_task_1", "__var": {"__var": {"uri": "s3://dag1/output_1.txt", "extra": {"__var": {"hi": "bye"}, "__type": "dict"}}, "__type": "dict"}}] and the issue was not caught by testcase - https://github.com/apache/airflow/blob/main/tests/serialization/test_serde.py#L259-L273 as it doesn't have more than two levels of nested __var

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find any encoder which add __var in a nested object __var, and why it was moved from

    @staticmethod
    def _convert(old: dict) -> dict:
        """Converts an old style serialization to new style"""
        if OLD_TYPE in old and OLD_SOURCE in old:
            return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA]}

to

def _convert(old: dict) -> dict:
    """Converts an old style serialization to new style"""
    if OLD_TYPE in old and OLD_DATA in old:
        return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA][OLD_DATA]}

    return old

in #31866.
I wonder if it was a bug in that PR and we need just to remove it instead of adding a condition for 2.5.2.
BTW, this PR was released in 2.6.0, so we will have the same problem with all the previous version, maybe we need to test 2.5.3 and 2.4.3 before merging this one, in order to update the comments:

# Added to handle for handling xcom data from airflow 2.5.2

@bolkedebruin wdyt?

@utkarsharma2
Copy link
Contributor Author

@hussein-awala In Airflow 2.4.3 if you run dag dataset_produces_1 it will produce xcom data as below, with nested __var -

{
        "__type": "airflow.datasets.Dataset",
        "__source": None,
        "__var": {
            "__var": {
                "uri": uri,
                "extra": {
                    "__var": {"hi": "bye"},
                    "__type": "dict"
                }
            },
            "__type": "dict",
        },
}

And I think old[OLD_DATA][OLD_DATA] was done to directly get to the second nested __var assuming there will be no more nested __var in serialized data but that is not the case as extra can have dicts too.

Also, in the old test case, we had only tests for the below data -
test case - https://github.com/apache/airflow/blob/main/tests/serialization/test_serde.py#L259-L273

{
    "__type": "airflow.datasets.Dataset",
    "__source": None,
    "__var": {
        "__var": {
            "uri": uri,
            "extra": None              # <-- change 
        },
        "__type": "dict",
    },
}

old[OLD_DATA][OLD_DATA] fails with extra = {"__var": {"hi": "bye"}, "__type": "dict"} and will works with extra=None

But if we explicitly handle the case for dict and do not assume the nested levels of __var below code will work in all cases.

def _convert(old: dict) -> dict:
    """Converts an old style serialization to the new style.""
    if OLD_TYPE in old and OLD_DATA in old:
        #Added to handle for handling xcom data from airflow < 2.6
        if old[OLD_TYPE] == "dict":
            return old[OLD_DATA]
        else:
            return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA]}
    return old

@bolkedebruin
Copy link
Contributor

Looking

@bolkedebruin
Copy link
Contributor

_convert was created to handle old (pre 2.6) XCom seralized data, but the issue is that the new deserializer cannot handle:

{'extra': {'__type': 'dict', '__var': {'hi': 'bye'}}, 'uri': 's3://does_not_exist'}

Which is the nested form of the old style serialization.

I think the solution is correct, but that the commit message and the inline comments could be improved. As mentioned this code was already handling pre 2.6 data, but it wasn't handling the nested version of that correctly. Also I think the test could use some improvement / addition. The above dict should be able to be deseralized directly. So I would expect an additional test.

@bolkedebruin bolkedebruin changed the title Decode old Xcom value from airflow 2.5.2 Decode old-style nested Xcom value Jun 29, 2023
@bolkedebruin bolkedebruin merged commit bd32467 into apache:main Jun 29, 2023
42 checks passed
ephraimbuddy pushed a commit that referenced this pull request Jul 6, 2023
The deserializer was not properly dealing with nested and wrapped old-style xcom values.
---------

Co-authored-by: bolkedebruin <bolkedebruin@users.noreply.github.com>
(cherry picked from commit bd32467)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Deserialization of old xcom data fails after upgrade to 2.6.1 from 2.5.2 when calling /xcom/list/ [GET]
9 participants