Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Docker Taskflow decorator #15330

Merged
merged 22 commits into from
Sep 20, 2021
Merged

Conversation

dimberman
Copy link
Contributor

Add the ability to run @task.docker on a python function and turn it into a DockerOperator that can run that python function remotely.

@task.docker(
    image="quay.io/bitnami/python:3.8.8",
    force_pull=True,
    docker_url="unix://var/run/docker.sock",
    network_mode="bridge",
    api_version='auto',
)
def f():
    import random
    return [random.random() for i in range(10000000)]

One notable aspect of this architecture is that we had to build it to make as few assumptions about user setups as possible. We could not share a volume between the worker and the container as this would break if the user runs the airflow worker on a docker container. We could not assume that users would have any specialized system libraries on their images (this implementation only requires python 3 and bash).

To work with these requirements, we use base64 encoding to store a jinja generated python file and inputs (which are generated using the same functions used by the PythonVirtualEnvOperator). Once the container starts, it uses these environment variables to deserialize the strings, run the function, and store the result in a file located at /tmp/script.out.

Once the function completes, we create a sleep loop until the DockerOperator retrieves the result via docker's get_archive API. This result can then be deserialized using pickle and sent to Airflow's XCom library in the same fashion as a python or python_virtualenv result.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@xinbinhuang
Copy link
Contributor

Love this! I wonder if it would be better to use something like @task.container to abstract away the specific operator being used so that DAG author doesn't need to care about it, and data engineer can control either DockerOperator or KueberntesPodOperator depending on their infrastructure

@dimberman
Copy link
Contributor Author

@xinbinhuang that's a great idea!

re: kubernetes, once this gets merged the next step will be "@task.kubernetes" where a user can give a pod spec and launch it using the KPO :)

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.

@dimberman dimberman marked this pull request as ready for review April 14, 2021 19:00
@dimberman dimberman added the full tests needed We need to run full set of tests for this PR to merge label Apr 14, 2021
@dimberman dimberman removed the full tests needed We need to run full set of tests for this PR to merge label Apr 14, 2021
@dimberman dimberman force-pushed the decorator-with-docker branch 2 times, most recently from 6e34372 to 573085f Compare April 15, 2021 17:02
assert ti.xcom_pull() == {'number': test_number + 1, '43': 43}

def test_call_20(self):
"""Test calling decorated function 21 times in a DAG"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func name and doc string don't agree.

airflow/decorators/docker.py Outdated Show resolved Hide resolved
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some docs showing about it, including a giant warning about the downsides (that it ships sourcecode by inspecting it, length, no closures etc.

I'm not sure this needs a whole example dag (each one we add slows down tests, as all dags get loaded by some dags.) At least we should put it in a separate location so it doesn't get loaded by anything that loads the full dag bags.

airflow/decorators/docker.py Outdated Show resolved Hide resolved
airflow/decorators/docker.py Outdated Show resolved Hide resolved
airflow/decorators/docker.py Outdated Show resolved Hide resolved
airflow/decorators/docker.py Outdated Show resolved Hide resolved
@ashb ashb marked this pull request as draft April 15, 2021 19:58
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@kaxil kaxil merged commit a9772cf into apache:main Sep 20, 2021
@kaxil kaxil deleted the decorator-with-docker branch September 20, 2021 21:41
potiuk added a commit to potiuk/airflow that referenced this pull request Jun 29, 2023
When apache#15330 added docker.task, it also optimized replacement of the
callable with it's result in LazyDictWithCache. LazyDictWithCache
is used by Provider's Manager to optimize access to hooks - basically
hook is only actually imported, when it is accessed. This helps with
speeding up importing of connection information

The optimization added result of running callable to _resolved
set, but it missed the case when None was returned. Previously,
when None was returned, the callable was not replaced and it
was called again. After the change - the _resolved set was
updated with the key and None was returned. But since the key
has not been replaced, next time when the same key was retrieved,
the original "callable" was returned, not the None value. So if
callable returned None, and the same key was retrieved twice, the second
time, instead of None, the dictionary returned Callable.

This PR fixes it by setting the value to dictionary even if it
was None.
potiuk added a commit that referenced this pull request Jun 29, 2023
When #15330 added docker.task, it also optimized replacement of the
callable with it's result in LazyDictWithCache. LazyDictWithCache
is used by Provider's Manager to optimize access to hooks - basically
hook is only actually imported, when it is accessed. This helps with
speeding up importing of connection information

The optimization added result of running callable to _resolved
set, but it missed the case when None was returned. Previously,
when None was returned, the callable was not replaced and it
was called again. After the change - the _resolved set was
updated with the key and None was returned. But since the key
has not been replaced, next time when the same key was retrieved,
the original "callable" was returned, not the None value. So if
callable returned None, and the same key was retrieved twice, the second
time, instead of None, the dictionary returned Callable.

This PR fixes it by setting the value to dictionary even if it
was None.
ephraimbuddy pushed a commit that referenced this pull request Jul 6, 2023
When #15330 added docker.task, it also optimized replacement of the
callable with it's result in LazyDictWithCache. LazyDictWithCache
is used by Provider's Manager to optimize access to hooks - basically
hook is only actually imported, when it is accessed. This helps with
speeding up importing of connection information

The optimization added result of running callable to _resolved
set, but it missed the case when None was returned. Previously,
when None was returned, the callable was not replaced and it
was called again. After the change - the _resolved set was
updated with the key and None was returned. But since the key
has not been replaced, next time when the same key was retrieved,
the original "callable" was returned, not the None value. So if
callable returned None, and the same key was retrieved twice, the second
time, instead of None, the dictionary returned Callable.

This PR fixes it by setting the value to dictionary even if it
was None.

(cherry picked from commit 462df0d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants