Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow agent #1725

Merged
merged 78 commits into from Oct 11, 2023
Merged

Airflow agent #1725

merged 78 commits into from Oct 11, 2023

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Jul 7, 2023

TL;DR

Airflow agent allows you to seamlessly run Airflow tasks in the Flyte workflow without changing code. All the airflow tasks will be run on an airflow agent (long-running server) instead of launching a new pod to run it, which dramatically reduces overhead.

  • Compile Airflow tasks to Flyte tasks
  • Use Airflow sensors/operators in Flyte workflows
  • Add support running Airflow tasks locally without running a cluster

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

DataprocOperator

from datetime import timedelta

from airflow.utils import trigger_rule

from flytekit import task, workflow, ImageSpec
from airflow.sensors import time_sensor
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitSparkJobOperator

airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow"
image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw")


@workflow
def wf():
    create_cluster = DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        image_version="2.0.27-debian10",
        storage_bucket="opta-gcp-dogfood-gcp",
        master_machine_type="n1-highmem-32",
        master_disk_size=1024,
        num_workers=2,
        worker_machine_type="n1-highmem-64",
        worker_disk_size=1024,
        region="us-west1",
        cluster_name="flyte-dataproc",
        project_id="dogfood-gcp-dataplane",
    )

    run_spark = DataprocSubmitSparkJobOperator(
        job_name="spark_pi",
        task_id="run_spark",
        dataproc_jars=["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        main_class="org.apache.spark.examples.JavaWordCount",
        arguments=["gs://opta-gcp-dogfood-gcp/spark/file.txt"],
        cluster_name="flyte-dataproc",
        region="us-west1",
        project_id="dogfood-gcp-dataplane",
    )

    delete_cluster = DataprocDeleteClusterOperator(
        task_id="create_dataproc_cluster",
        project_id="dogfood-gcp-dataplane",
        cluster_name="flyte-dataproc",
        region="us-west1",
        retries=3,
        retry_delay=timedelta(minutes=5),
        email_on_failure=True,
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE
    )

    create_cluster >> run_spark >> delete_cluster


if __name__ == '__main__':
    wf()

bashSensor

from airflow.sensors.bash import BashSensor
from flytekit import task, workflow, ImageSpec


airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow"
image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw")


@task(container_image=image_spec)
def t1():
    print("flyte")


@workflow
def wf():
    sensor = BashSensor(task_id="Sensor_succeeds", bash_command="echo hello")
    sensor >> t1()


if __name__ == '__main__':
    wf()

HiveSensor

from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor

from flytekit import task, workflow, ImageSpec


airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow"
image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw")


@task(container_image=image_spec)
def t1():
    print("flyte")


@workflow
def wf():
    sensor = HivePartitionSensor(table="flyte", schema="person")
    sensor >> t1()


if __name__ == '__main__':
    wf()

FileSensor

from airflow.sensors.filesystem import FileSensor
from flytekit import task, workflow, ImageSpec


airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow"
image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw")


@task(container_image=image_spec)
def t1():
    print("flyte")


@workflow
def wf():
    sensor = FileSensor(task_id="id", filepath="/tmp/1234")
    sensor >> t1()


if __name__ == '__main__':
    wf()

TimeSensor

from datetime import datetime, timedelta
from pytz import UTC

from airflow.sensors.time_sensor import TimeSensor
from flytekit import task, workflow, ImageSpec

airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow"
image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw")


@task(container_image=image_spec)
def t1():
    print("flyte")


@workflow
def wf():
    sensor = TimeSensor(task_id="fire_immediately", target_time=(datetime.now(tz=UTC)+timedelta(seconds=5)).time())
    sensor >> t1()


if __name__ == '__main__':
    wf()

PythonSensor

from airflow.sensors.python import PythonSensor
from flytekit import task, workflow, ImageSpec

airflow_plugin = "git+https://github.com/flyteorg/flytekit.git@487438ab59147879eded897674593a1eaee1c78b#subdirectory=plugins/flytekit-airflow"
image_spec = ImageSpec(base_image="pingsutw/flytekit:v1", packages=["apache-airflow", airflow_plugin], apt_packages=["git"], registry="pingsutw")


@task(container_image=image_spec)
def t1():
    print("flyte")


def py_func():
    print("airflow python sensor")
    return True


@workflow
def wf():
    sensor = PythonSensor(task_id="fire_immediately", python_callable=py_func)
    sensor >> t1()


if __name__ == '__main__':
    wf()

Tracking Issue

NA

Follow-up issue

NA

pingsutw and others added 30 commits May 30, 2023 15:57
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@davidmirror-ops
Copy link

@pingsutw What's the status of this PR?

Signed-off-by: Kevin Su <pingsutw@apache.org>
@codecov
Copy link

codecov bot commented Sep 7, 2023

Codecov Report

Attention: 46 lines in your changes are missing coverage. Please review.

Comparison is base (c413570) 55.04% compared to head (d345b75) 55.24%.
Report is 8 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1725      +/-   ##
==========================================
+ Coverage   55.04%   55.24%   +0.19%     
==========================================
  Files         296      300       +4     
  Lines       22242    22382     +140     
  Branches     3357     3359       +2     
==========================================
+ Hits        12244    12365     +121     
- Misses       9835     9855      +20     
+ Partials      163      162       -1     
Files Coverage Δ
...ytekit-airflow/flytekitplugins/airflow/__init__.py 100.00% <100.00%> (ø)
plugins/flytekit-airflow/tests/test_agent.py 100.00% <100.00%> (ø)
...s/flytekit-airflow/flytekitplugins/airflow/task.py 97.29% <97.29%> (ø)
flytekit/extend/backend/base_agent.py 45.83% <8.33%> (+1.47%) ⬆️
.../flytekit-airflow/flytekitplugins/airflow/agent.py 79.10% <79.10%> (ø)
flytekit/types/pickle/pickle.py 49.33% <20.00%> (-0.67%) ⬇️

... and 4 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
wild-endeavor
wild-endeavor previously approved these changes Oct 3, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link
Collaborator

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few comments. Looks really close.

.github/workflows/pythonbuild.yml Outdated Show resolved Hide resolved
Dockerfile Show resolved Hide resolved
plugins/flytekit-airflow/setup.py Show resolved Hide resolved
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw merged commit 54e68e0 into master Oct 11, 2023
71 checks passed
@Future-Outlier Future-Outlier mentioned this pull request Jan 11, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants