Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP44 Fix DAG serialization #34042

Merged
merged 1 commit into from Oct 27, 2023
Merged

AIP44 Fix DAG serialization #34042

merged 1 commit into from Oct 27, 2023

Conversation

mhenc
Copy link
Collaborator

@mhenc mhenc commented Sep 2, 2023

DAG serialization using BaseSerialization doesn't encode it with '__type'/'__var' fields, making it not-deserializable (no information about the type).

This object is needed by Internal API to be fully serializable/deserializable.

Also added more tests for BaseSerialization.serialize/deserialize methods to cover all supported object types (and fixed few minor isssues).


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@mhenc mhenc marked this pull request as ready for review September 2, 2023 20:59
@@ -438,7 +438,7 @@ def serialize(
json_pod = PodGenerator.serialize_pod(var)
return cls._encode(json_pod, type_=DAT.POD)
elif isinstance(var, DAG):
return SerializedDAG.serialize_dag(var)
return cls._encode(SerializedDAG.serialize_dag(var), type_=DAT.DAG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to encode this, we expect it to have a top level "dag" unique key as shown in

Similarly, we don't encode tasks or each operator, because we know the structure that it would be within ["dag"]["tasks"]

Check L445 and L447

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When building DAG s10n it was designed to using Airflow internal knowledge to not inflate the final blob and optimize wherever possible. This is the reason we don't store defaults and objects that are None for DAG and operator objects

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, be careful about backwards-incompatible changes for this.

And if for AIP-44 we absolutely need this sort of change, that will probably be more involved than just the DAG object

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but:
if I serialize dag (with BaseSerialization.serialize) then try to deserialize it with BaseSerialization.deserialize it fails - as it doesn't get there
https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L551
because

        var = encoded_var[Encoding.VAR]
        type_ = encoded_var[Encoding.TYPE]

failes (key not exists).

In code I am not able to find any usages of BaseSerialization.serialize on DAG (at least when looking for something like BaseSerialization.serialize.*dag

We are currently working on making sure all other objects types are serializable.

Copy link
Member

@potiuk potiuk Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think @mhenc is right - this part of code seems to be not used before - it's not DAG serialization that gets afffected here, it's just serializing the whole dag as part of bigger structure. I have not seen any place in the code where we'd do that before.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id: int
id: Optional[int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field should not be None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought that, but it is None during creation (before submission to DB) - it is assigned by SQLAlchemy after you run

session.add(obj)
session.commit()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But when do we ever serialize an unsaved DagRun?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point :)
We may need it for TriggerDag Operator - but this is outside of this PR.
So currently it's only for tests (but I can fix them) and also for consitency - some *Pydantic objects will have id as Optional (e.g. Job #34026) so we may keep it Optional there for consistency

But if you belive it's better to leave it as "int" then I can revert this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure Job should have an optional ID either. Maybe make this not optional for now and we can punt this until it’s nocessary for a model. It’s always easier to remove a restriction than add one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually Job must have it as in
#34026
we call "add_job" which is actual the case where Id is empty.

Removing it from DagRunPydantic.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. But wait for @kaxil / @uranusjr comments.

@mhenc
Copy link
Collaborator Author

mhenc commented Sep 11, 2023

@uranusjr @kaxil Do you have any more comments for this PR?

@potiuk
Copy link
Member

potiuk commented Sep 22, 2023

2 tasks failing,

@mhenc mhenc force-pushed the fix_dag_serialization branch 2 times, most recently from e037603 to 2de0d75 Compare October 17, 2023 10:00
@uranusjr
Copy link
Member

Is the .orig file left in accidentally?

@mhenc mhenc force-pushed the fix_dag_serialization branch 5 times, most recently from 957cce5 to 5635bee Compare October 18, 2023 11:29
@mhenc
Copy link
Collaborator Author

mhenc commented Oct 18, 2023

Is the .orig file left in accidentally?

Yes, by accident after conflict resolution. Removed.

@potiuk
Copy link
Member

potiuk commented Oct 18, 2023

@kaxil ?

@mhenc mhenc requested a review from potiuk October 18, 2023 14:34
@potiuk
Copy link
Member

potiuk commented Oct 25, 2023

@kaxil 🙏

@kaxil
Copy link
Member

kaxil commented Oct 26, 2023

@kaxil 🙏

oof, looking now

@potiuk potiuk merged commit 64a64ab into apache:main Oct 27, 2023
44 checks passed
@mhenc mhenc deleted the fix_dag_serialization branch October 27, 2023 12:14
@ephraimbuddy ephraimbuddy added this to the Airflow 2.7.3 milestone Oct 29, 2023
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Oct 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants