Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialization error after successful DAG run #20875

Closed
1 of 2 tasks
Michal-Biernacki opened this issue Jan 14, 2022 · 10 comments
Closed
1 of 2 tasks

Serialization error after successful DAG run #20875

Michal-Biernacki opened this issue Jan 14, 2022 · 10 comments
Labels
affected_version:2.2 Issues Reported for 2.2 area:core kind:bug This is a clearly a bug

Comments

@Michal-Biernacki
Copy link

Michal-Biernacki commented Jan 14, 2022

Apache Airflow version

2.2.3 (latest released)

What happened

In case within the DAG definition we use both params and on_success_callback then after triggering the DAG from the UI (option "Trigger DAG") and the finish of the successful DAG run I got the DAG Import Errors error displayed in the UI.

Issue is related with the _serialize_params_dict method of BaseSerialization class.
Part of the error message:

if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
AttributeError: 'str' object has no attribute '__module__'

In consequence there is also no update in the serialized_dag table of backend db.

Additionally:

So error occurs only after the successful DAG run from UI (I did not check the output of the run from CLI or just scheduled run)

In case we remove the params or on_success_callback attribute then there is no serialization error.

To confirm this issue I set up completely new Airflow instance in separate virtualenv and still had the same issue.

What you expected to happen

Successful serialization after the successful DAG run

How to reproduce

To reproduce this error I prepared very simple DAG presented below.

Additional information about the setup:

  • executor = LocalExecutor
  • sql_alchemy_conn = Postgres db
from airflow import DAG
from airflow.utils import timezone
from airflow.operators.dummy import DummyOperator
from airflow.models.param import Param

def func_on_success(context):
    pass

with DAG(
    dag_id="test",
    start_date=timezone.datetime(2022, 1, 1),
    schedule_interval='@once',
    params={"param_test": Param("param_value_test")},
    on_success_callback=func_on_success
) as dag:
    DummyOperator(task_id='task_1')

After successful execution of this DAG through "Trigger DAG" option in the UI the error should be displayed (of course after refresh of the webpage).

Operating System

MacOS 12.1

Versions of Apache Airflow Providers

n/a

Deployment

Virtualenv installation

Deployment details

  • executor = LocalExecutor
  • sql_alchemy_conn = Postgres db

Anything else

There is one scenario in which there will be no error:

  • make some changes to the DAG code
  • in consequence DAG is serialized automatically
  • trigger DAG and finish it successfully within the min_serialized_dag_update_interval second

then there will be no error because serialization process will not start after the DAG run (at least this is my explanation).

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Michal-Biernacki Michal-Biernacki added area:core kind:bug This is a clearly a bug labels Jan 14, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Jan 14, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@uranusjr
Copy link
Member

Already fixed in main.

@parthpalkhiwala-vonage
Copy link

parthpalkhiwala-vonage commented May 23, 2022

Hello All,
We are still facing a similar issuing while upgrading our airflow version from 2.1.3 to 2.2.5
Below is the error we are getting - this usually occurs after task execution and not during DAG parsing by the scheduler.

Apache Airlfow Version : 2.2.5
Python Version : 3.6

File "/usr/local/lib/python3.6/site-packages/airflow/serialization/serialized_objects.py", line 578, in serialize_operator
    serialize_op['params'] = cls._serialize_params_dict(op.params)
  File "/usr/local/lib/python3.6/site-packages/airflow/serialization/serialized_objects.py", line 451, in _serialize_params_dict
    if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
AttributeError: 'str' object has no attribute '__module__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/serialization/serialized_objects.py", line 939, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/usr/local/lib/python3.6/site-packages/airflow/serialization/serialized_objects.py", line 851, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG : 'str' object has no attribute '__module__'

We are passing the params at task Level like
params={'param_name': 'param_val'}

We have also tried using
params={'param_name': Param('param_val')}

Can anyone please guide us how to fix this error?

@uranusjr uranusjr added the affected_version:2.2 Issues Reported for 2.2 label May 23, 2022
@potiuk
Copy link
Member

potiuk commented May 27, 2022

Migration to 2.3.* and using airflow dags reserialize should help.

@parthpalkhiwala-vonage
Copy link

Migration to 2.3.* and using airflow dags reserialize should help.

I believed this issue had been fixed in 2.2.5 - it originally occurred in 2.2.3 until I know - upgrading to 2.3.0 or 2.3.1 will it be solved ?

Also can you please explain the root cause of the issue in case we are unable to upgrade 2.3.* right now because that will be major change for us it would also mean we use Python 3.7 or greater?

Is the issue related to the Db upgrade or something related to the code?

@potiuk
Copy link
Member

potiuk commented May 27, 2022

because that will be major change for us it would also mean we use Python 3.7 or greater?

Python 3.6 reached end of life in October and stopped receiving security fixes, so you are putting your company in a great risk by not migrating in "planned" way - it is possible that a critical bug will be discovered and you will HAVE TO upgrade in a hurry - like it was with Log4Shell - If your reason is Python 3.6, I would strongly recommend you to start migration process now.

Is the issue related to the Db upgrade or something related to the code?

The root cause is that your serialized dags are likely using some older version of libraries and objects that are serialized are likely not restorable with the new versions - depending on what libraries and objects you have and dependencies you had, it might be something that is not at all in control of Airflow.. The "reserialize" will simply delete serialized dags from the database which will force them to be re-serialized agaain. Think of it as a "cached versions" of your DAGs.

I believed this issue had been fixed in 2.2.5 - it originally occurred in 2.2.3 until I know - upgrading to 2.3.0 or 2.3.1 will it be solved ?

If you reserialize - likely yes. But the only way you can get it fixed is to upgrade anyway, so holding off the upgrade makes no sense whatsovever even if you are not sure if it is going to be fixed. There might be reasons why some fixes might not work for you - but no-one will give you guarantee that your problem will be fixed, I am afraid. This is free software. It comes with no such guarantees. but if you need guarantees, then there are companies who provide paid support, and there you can have more expectations I think. Since this is a free and open software - you can also take a look at what the "reserialize" command is doing - you can easily find it in the code and do similar thing yourself, This is always an option, but then you take responsibility for us - the maintainers decided to implement reserialize command to help in such cases but this absolutely required to migrate to 2.3, but you can take the risk to apply similar approach to pre 2.3. version (but it's your decision and responsibility by not folowing the path recommended by maintainers).

So you have options - either you follow the way we recommend (and with good support of the community) or choose to hold on an bear the consequences of the decision :)

@potiuk
Copy link
Member

potiuk commented May 27, 2022

And yes. In most cases the suggestion here (if have not enough data to diagnose our user's environment and specific case) the advise will be to check if the problem is fixed is to migrate. With far less than < 1:1000 ratio of maintainers to users, it's far easier that our users are paying back to the community (and other users) by testing if new version fixes their problem rather than maintainers spending time on analysing individual customisations and modifications impacting those. It's just pure math.

@parthpalkhiwala-vonage
Copy link

Is the issue related to the Db upgrade or something related to the code?

The root cause is that your serialized dags are likely using some older version of libraries and objects that are serialized are likely not restorable with the new versions - depending on what libraries and objects you have and dependencies you had, it might be something that is not at all in control of Airflow.. The "reserialize" will simply delete serialized dags from the database which will force them to be re-serialized agaain. Think of it as a "cached versions" of your DAGs.

Thanks for the quick response.
Unfortunately airflow dags reserialize isn't available in 2.2.5 but based on the github code, we ran the following code to reserialize our dags

from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import Session

session = Session()
session.query(SerializedDagModel).delete(synchronize_session=False)
session.commit()

dagbag = DagBag()
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
dagbag.sync_to_db()

But that too doesn't seem to work as the dags still were giving import errors.
Interestingly enough if I duplicate the same dag code in a new Dag in a new file, it runs without giving any import error.

@potiuk
Copy link
Member

potiuk commented Jun 1, 2022

My quess is that you still have some workers using older airflow version.

@potiuk
Copy link
Member

potiuk commented Jun 1, 2022

Alternatively, somewhere in your system *.pyc or pycache files are "stalled" and old version of python bytecode is used. Those are all the guesses I can come up, but maybe you should look for similar anomalies - maybe somewhere in your deplpyment the code is cached somewhere. Cache invalidation is one of the hardest things.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants