Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix @task.kubernetes to receive input and send output #28942

Merged
merged 4 commits into from
Feb 18, 2023

Conversation

vchiapaikeo
Copy link
Contributor

@vchiapaikeo vchiapaikeo commented Jan 14, 2023

closes: #28933

@task.kubernetes currently does not handle receiving input nor does it properly return output. This PR addresses these issues by passing input to the K8's pod (via an env var) and output where the K8's xcom sidecar expects it (as /airflow/xcom/result.json).

It also seemed like the initial implementation did not properly base64 encode / decode the values. Since there might be unusual characters / line breaks in both the function and inputs, I added base64 encoding to the env var setting (similar to how it's handled in docker).

Testing

To test, I started Airflow w/ breeze (breeze --python 3.9 --backend postgres start-airflow), added a google_cloud_default connection_id to my sandbox GCP project, and copied my kube_config file to /files/.kube/config. Because I am using GCP, I needed to install gcloud. I do so with the following commands in the breeze shell:

curl -sSL https://sdk.cloud.google.com > /tmp/gcloud_installer && bash /tmp/gcloud_installer --install-dir=$AIRFLOW_HOME --disable-prompts
source /root/airflow/google-cloud-sdk/completion.bash.inc
source /root/airflow/google-cloud-sdk/path.bash.inc

I then ran this test DAG:

import os

from airflow import DAG
from airflow.decorators import task

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "start_date": "2022-12-16",
    "retries": 0,
}

@task.kubernetes(
    image="python:3.8-slim-buster",
    namespace=os.getenv("AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE", "airflow-default"),
    in_cluster=False,
    config_file="/files/.kube/config",
)
def k8s_basic() -> str:
    import time
    time.sleep(1)


@task.kubernetes(
    image="python:3.8-slim-buster",
    namespace=os.getenv("AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE", "airflow-default"),
    in_cluster=False,
    config_file="/files/.kube/config",
    # multiple_outputs=True,
)
def k8s_func(val: str = "a") -> str:
    import time
    time.sleep(1)

    print(f"Got val: {val}")
    return {"a": val, "d": [1, 2, 3], "f": {1: val}}


with DAG(
    schedule_interval="@daily",
    max_active_runs=1,
    max_active_tasks=5,
    catchup=False,
    dag_id="test_k8s_decorator",
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    basic = k8s_basic()

    no_input_or_output = k8s_func()

    with_input = k8s_func.override(task_id="with_input")("b")

    with_input_and_output = k8s_func.override(task_id="with_input_and_output", do_xcom_push=True)("b")

    no_input_and_output = k8s_func.override(task_id="no_input_and_output", do_xcom_push=True)()

Results

image

With Input:

image

With Input and Output:

image

No input and Output:

image

With multiple_outputs = True:

image


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes provider related issues area:providers labels Jan 14, 2023
@vchiapaikeo vchiapaikeo force-pushed the vchiapaikeo/k8s-dec-v1 branch 3 times, most recently from 60e2d56 to 0a9a738 Compare January 15, 2023 02:04
@vchiapaikeo vchiapaikeo marked this pull request as ready for review January 15, 2023 11:52
@vchiapaikeo
Copy link
Contributor Author

Hi @eladkal, do you know who might be a good person to review this?

@vchiapaikeo
Copy link
Contributor Author

Modified with b64 encoded contents sent straight to the Py commands and input serialized with pickle instead of json. Retested and results are as expected:

image

image

@vchiapaikeo vchiapaikeo force-pushed the vchiapaikeo/k8s-dec-v1 branch 2 times, most recently from a55a51a to 7251917 Compare January 17, 2023 15:59
@vchiapaikeo
Copy link
Contributor Author

Oops - sorry for pinging everyone. Forgot that I had done a rebase on the UI and didn't pull first. Feel free to unassign here!

@josh-fell josh-fell removed their request for review January 18, 2023 14:29
Comment on lines +92 to +95
f"{_generate_decoded_command(quote(_PYTHON_SCRIPT_ENV), quote(script_filename))}"
)
write_local_input_file_cmd = (
f"{_generate_decoded_command(quote(_PYTHON_INPUT_ENV), quote(input_filename))}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shlex.quote here isn't really necessary anymore since they don't capture user input but I left it in anyways. Thought that made it a bit more readable but I can remove if you prefer.

@vchiapaikeo vchiapaikeo force-pushed the vchiapaikeo/k8s-dec-v1 branch 2 times, most recently from b136a38 to ed11a95 Compare January 26, 2023 19:14
@vchiapaikeo
Copy link
Contributor Author

Hey @uranusjr , anything else I can do here?

Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@vchiapaikeo
Copy link
Contributor Author

Can someone help merge this?

@vchiapaikeo
Copy link
Contributor Author

@uranusjr , can you help merge this please?

@fletchjeff
Copy link
Contributor

fletchjeff commented Mar 15, 2023

Hey @vchiapaikeo, I was just looking in to this very problem and saw this fix. Nice work! Thank you :)

@okulbida
Copy link

okulbida commented Apr 7, 2023

@fletchjeff Do you know when this gonna be released?

@vchiapaikeo
Copy link
Contributor Author

@okulbida , this went out with cncf providers release 5.2.1

https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/index.html#id4

@vchiapaikeo vchiapaikeo deleted the vchiapaikeo/k8s-dec-v1 branch April 7, 2023 16:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

@task.kubernetes TaskFlow decorator fails with IndexError and is unable to receive input
5 participants