Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide_context=True not working with PythonVirtualenvOperator #8177

Closed
maganaluis opened this issue Apr 7, 2020 · 18 comments
Closed

provide_context=True not working with PythonVirtualenvOperator #8177

maganaluis opened this issue Apr 7, 2020 · 18 comments
Assignees
Labels
kind:bug This is a clearly a bug

Comments

@maganaluis
Copy link
Contributor

maganaluis commented Apr 7, 2020

Apache Airflow version: 1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.14.8

Environment: Docker (Ubuntu 18.4 - Python 3.7)

  • Cloud provider or hardware configuration: Azure
  • OS (e.g. from /etc/os-release): Docker (Ubuntu 18.4 - Python 3.7)
  • Kernel (e.g. uname -a): Docker (Ubuntu 18.4 - Python 3.7)
  • Install tools: N/A
  • Others: N/A
    What happened:

When we enable provide_context=True for CustomPythonVirtualenvOperator we get the error below.

[2020-04-07 15:08:51,940] {taskinstance.py:1128} ERROR - can't pickle module objects
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 297, in execute_callable
    self._write_args(input_filename)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 339, in _write_args
    pickle.dump(arg_dict, f)
TypeError: can't pickle module objects

One way to get around this issue is to create your own CustomPythonVirtualenvOperator and overwrite _write_args, but this should not be the case. Feel free to use this if you're encountering the same issue:

class CustomPythonVirtualenvOperator(PythonVirtualenvOperator):
    def _write_args(self, input_filename):
        # serialize args to file
        if self._pass_op_args():
            with open(input_filename, 'wb') as f:
                # we only need dag_run to access conf at run time
                arg_dict = ({'args': self.op_args, 'kwargs': {'dag_run': self.op_kwargs['dag_run']}})
                if self.use_dill:
                    dill.dump(arg_dict, f)
                else:
                    pickle.dump(arg_dict, f)

What you expected to happen:

Ideally we should be able to use the context so we can run these tasks with run-time arguments via the CLI or the REST API.

How to reproduce it:

from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow import DAG
import pickle
import dill

default_args = {
    'owner': 'Luis M',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue'
}
dag = DAG(
    'bug',
    default_args=default_args,
    description='bug',
    schedule_interval=timedelta(days=1))


class CustomPythonVirtualenvOperator(PythonVirtualenvOperator):
    def _write_args(self, input_filename):
        # serialize args to file
        if self._pass_op_args():
            with open(input_filename, 'wb') as f:
                arg_dict = ({'args': self.op_args, 'kwargs': {'dag_run': self.op_kwargs['dag_run']}})
                if self.use_dill:
                    dill.dump(arg_dict, f)
                else:
                    pickle.dump(arg_dict, f)


def passf(**kwargs):
    pass

def failf(**kwargs):
    pass
 
task1 = CustomPythonVirtualenvOperator(
        task_id='task1',
        python_callable=passf,
        python_version='3',
        dag=dag,
        provide_context=True
)

task2 = PythonVirtualenvOperator(
        task_id='task2',
        python_callable=failf,
        python_version='3',
        dag=dag,
        provide_context=True
)

Anything else we need to know:

If you run the DAG provided you should see task1 passing and task2 failing.

@maganaluis maganaluis added the kind:bug This is a clearly a bug label Apr 7, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 7, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@maganaluis maganaluis changed the title provide_context=True not working with CustomPythonVirtualenvOperator provide_context=True not working with PythonVirtualenvOperator Apr 7, 2020
@mik-laj
Copy link
Member

mik-laj commented Apr 7, 2020

This problem seems indeed significant. I wonder if it appears in the master version. Have you tried to check which objects in the context are causing the problem? Maybe we can exclude one or two objects to restore the correct behavior of this option in Airflow 1.10.

@mik-laj
Copy link
Member

mik-laj commented Apr 7, 2020

This is an open source project, so there is no specific person who solves the issue. Would you like to take responsibility for it? I will gladly help and answer the questions if you want to solve this problem.

Contributor guide: https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst
Development environment: https://github.com/apache/airflow/blob/master/BREEZE.rst

The community is waiting for your contribution 🐈

@maganaluis
Copy link
Contributor Author

@mik-laj Thank you for the information, I will look into the root cause and create a PR. Could you please assign me to the issue.

I have no tested it on the master branch, I'll do so before proceeding with the PR.

@maganaluis
Copy link
Contributor Author

@mik-laj Hi Kamil - I created the PR #8256 to fix this issue on the branch v1-10-stable and the CI tests are passing, there seems to be an issue with requirements, but I that's related to this change. Could you let me know the next steps?

There were too many changes on the master branch, I will revisit this bug there once 2.0 is out.

@ei-grad
Copy link

ei-grad commented Aug 17, 2020

Is it right that this issue is not going to be fixed in 1.10.x?

As a workaround one could use dill=True, it is able to serialize modules.

@mik-laj
Copy link
Member

mik-laj commented Aug 17, 2020

@kaxil @potiuk can you help with that?

@potiuk
Copy link
Member

potiuk commented Aug 17, 2020

It was not marked as Milestone 1.10.12 I am afraid. I marked is as such now. There is a good "chance" rc1 will be cancelled because of #10362 and if so - we might be able to add it.

@kaxil
Copy link
Member

kaxil commented Aug 17, 2020

Yeah thanks, will include it, planning to cut 1.10.12rc2 later tonight

@kaxil
Copy link
Member

kaxil commented Aug 17, 2020

PR merged, will be part of 1.10.12rc2

@kaxil kaxil closed this as completed Aug 17, 2020
@Wittline
Copy link

Hi,

Was this bug fixed in Airflow version 2?

@jatejeda
Copy link

jatejeda commented Aug 5, 2021

no, it wasn't fixed ... I have the same bug in 2.0.2

Hi,

Was this bug fixed in Airflow version 2?

@Wittline
Copy link

Wittline commented Aug 5, 2021

Hi @jatejeda Please check my github, I used the PythonVirtualenvOperator in a personal project using the version 2.

https://github.com/Wittline/uber-expenses-tracking

@hafid-d
Copy link

hafid-d commented Aug 31, 2021

Hi, facing the same issue with airflow 2.1.2

@potiuk
Copy link
Member

potiuk commented Aug 31, 2021

Probably you are using some 3rd party functions /modules that are not picklable. There is not much we can do with it. The Python Virtualenv operator works in the way that it will serialize everything that is needed by your function and deserialize it in the virtualenv. If there is any module that refuses to get serialized, you will get this error.

@potiuk
Copy link
Member

potiuk commented Aug 31, 2021

Can you post some details of your methods/functions? Imports /logs ? I'd be curious if we can improve the error message to tell exactly what's wrong.

@hafid-d
Copy link

hafid-d commented Aug 31, 2021

Hi @potiuk ! thanks for your reply. So my method is the following:

def labelling(*args, **kwargs):
    from app.labelling.pipeline import labelling_transform
    dataset_1 = kwargs['ti'].xcom_pull(key='dataset_1', task_ids='extracting')
    category_path = kwargs['ti'].xcom_pull(key='category_path', task_ids='extracting')
    dataset_2 = labelling_transform(kwargs['dataset_1'], MODEL_PATH_TRANS, kwargs['category_path'])
    kwargs['ti'].xcom_push(key='dataset_2', value=dataset_2)

I imported labelling_transform function at the beginning, but should I also import all the packages that labelling_transform uses inside the labelling function ?

I've also seen that it was not possible to use the context variable ti when using PythonvirtualenvOperator so I tried the following :

def labelling(*args, **kwargs):
    from app.labelling.pipeline import labelling_transform
    dataset_2 = labelling_transform(kwargs['dataset_1'], MODEL_PATH_TRANS, kwargs['category_path'])
    kwargs['ti'].xcom_push(key='dataset_2', value=dataset_2)


    opr_labeling = PythonVirtualenvOperator(
        task_id='labeling',
        python_callable=labeling,
        op_kwargs={
            'dataset_1': "{{ ti.xcom_pull(key='dataset_1', task_ids='extracting') }}",
            'category_path': "{{ ti.xcom_pull(key='category_path', task_ids='extracting') }}"
        },
        dag=dag_subdag)

Above, I tried to handle the pull but not sure how to handle the push of dataset_2 from the virtualenv. With the above code I am getting the following error: subprocess.CalledProcessError: Command '['/tmp/venv1gu9mhz9/bin/python','/tmp/venv1gu9mhz9/script.py','/tmp/venv1gu9mhz9/script.in','/tmp/venv1gu9mhz9/script.out', '/tmp/venv1gu9mhz9/string_args.txt']' returned non-zero exit status 11

@hafid-d
Copy link

hafid-d commented Aug 31, 2021

To give you a bit of context I am trying to use this PythonVirtualenvOperator as I am facing the following issue when running on of my task : AssertionError: daemonic processes are not allowed to have children from a package that uses multiprocessing.

EDIT: I managed to make it work setting execute_tasks_new_python_interpreter=True in the airflow config but I wanted to isolate and use a new python interpreter only for a specific task and not for all of my tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

8 participants