-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PythonVirtualenvOperator
crashes if any python_callable
function is defined in the same source as DAG
#35529
Comments
python_callable
function is defined in the same source as DAGPythonVirtualenvOperator
crashes if any python_callable
function is defined in the same source as DAG
Marked is as good first issue for someone to pick.
If we can detect such condition cool, that would be nice condition. I also encourage you to contribute it - seems you have alreade enough involvement to be ablet to provide a PR. |
I will be working on this. Excited :) Edit: Unless OP want to contribute it. I'll be working on this anyway as a learning experience. |
We also faced a similar problem. We will be glad to see a fix in the next releases :) |
Update: I was able to replicate the issue locally. |
I was looking into this issue. One thing I notice is that, when scheduler is processing the dags and checking if they are valid, Until now, I've been going through scheduler code that processes dags. I just started looking into the task instance execution. EDIT: I just realized, when the DAG is being loaded, for PythonVirtualEnvOperator, it does check whether the passed function is a normal function or a lambda. Something can be done here. I'm still looking into it |
in And hence, during I am still debugging this. Will post more info on this here. |
@potiuk , I think there are two options for this issue.
I am more inclined towards 2 as it will make the DAG work. Although, I think, coming up with a better approach is still possible as I am still exploring it. But, we can also update the documentation and go with approach 1. |
I was able to identify what objects are causing the import issue with dill. We are using dill to dump |
I don't think this is might be an optimal approach. But, I think I understand the problem and probably can explain it here. If we want to allow python callable to be defined as part of the dag module, we should make the original module available whenever we are serializing and deserializing objects. But, since we are modifying module's name and re-importing it. we should try to re-import with the modified name in python_virtualenv_script.jinja2 . so, condition to detect if python_callable is part of the dag module will be to check using
If the above condition is met, we should copy the original module source code to the temp directory where the |
Maybe there is an easier solution: you already have the source of the callable:
and name
and assuming you pass "unique_module_name" starting with
All that before op_args/kwargs loading from dill. That should do the trick:
There is no need to copy the sources to a separate file - it would also be harmful, as we really need the callable only - not all the DAG or other methods defined in there. |
@potiuk , How do we want to handle the cases where there are multiple helper methods in the same module used by the Edit: I guess, this could be a feature request? But, we do mention that global objects are not imported [here] (https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#id1) . I am going ahead and implementing what you suggested @potiuk . Thanks. |
Yeah. This is how it currently works - and not a big limitarion. |
Apache Airflow version
2.7.3
What happened
Hi folks!
I have to use
PythonVirtualenvOperator
operator and pass it{{ dag_run }}
,{{ task_instance }}
and other airflow context variables. And sometimes it crashes with following error:Afer some testing I have found out that this error occurs if there is any operator in the DAG (maybe other operator than
PythonVirtualenvOperator
) that takes a function aspython_callable
argument -- and whose function is defined in the same Python source as the DAG object.What you think should happen instead
I think that
airflow
should check its DAGs befor running (or before serialization) and give an informative error message in following case: if there is aPythonVirtualenvOperator
in the DAG and if there is apython_callable
function who is declared in the same Python module as the DAG itself.And, for the future, it will be really cool if airflow will migrate to
cloudpickle
and such functions will be deserialized correctly.How to reproduce
Here's a minimal example that will give this error (should be tested with
airflow standalone
, withSequentialExecutor
orKubernetesExecutor
, does not happen onDebugExecutor
):And here's my workaround code:
dags/strange_pickling_error/dag.py
:dags/strange_pickling_error/some_moar_code.py
:Operating System
Ubuntu 22.04
Versions of Apache Airflow Providers
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-google==10.11.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-sqlite==3.5.0
Deployment
Virtualenv installation
Deployment details
Python 3.10
airflow==2.7.3
dill==0.3.5.1
Anything else
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: