Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow agent #1725

Merged
merged 78 commits into from Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 77 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
d255119
Reanme external plugin
pingsutw May 30, 2023
22202af
nit
pingsutw May 30, 2023
3eb25a1
nit
pingsutw May 30, 2023
6462625
nit
pingsutw May 30, 2023
ee3def0
nit
pingsutw May 30, 2023
499f2a2
fix tests
pingsutw May 30, 2023
2262489
fix tests
pingsutw May 30, 2023
3ac9f05
nit
pingsutw May 30, 2023
7cf586a
bump id
pingsutw May 30, 2023
a4a1e8a
Update setup.py
pingsutw May 30, 2023
3e51f43
pin idl to 1.5.9
pingsutw May 31, 2023
4bfae81
update idl
pingsutw May 31, 2023
0be33f2
bump idl
pingsutw May 31, 2023
d1b0776
nit
pingsutw May 31, 2023
1c98782
fix tests
pingsutw May 31, 2023
0ed21ab
update idl
pingsutw Jun 1, 2023
9355af3
nit
pingsutw Jun 1, 2023
7138d67
update idl
pingsutw Jun 2, 2023
8e9f339
local agent
pingsutw Jun 7, 2023
a48f478
wip
pingsutw Jun 7, 2023
5f3e77e
nit
pingsutw Jun 8, 2023
bbc9bf0
rename
pingsutw Jun 8, 2023
8165eb7
nit
pingsutw Jun 9, 2023
5f8e7d1
wip
pingsutw Jun 12, 2023
1c4ca0a
wip
pingsutw Jun 12, 2023
0a44ae0
test
pingsutw Jun 12, 2023
cb1ab77
wip
pingsutw Jun 12, 2023
c7da3e7
test
pingsutw Jun 12, 2023
3204c2d
wip
pingsutw Jun 12, 2023
7c5a252
merged master
pingsutw Jul 7, 2023
0eefd5e
wip
pingsutw Jul 7, 2023
003be57
nit
pingsutw Jul 7, 2023
3ee6ff2
nit
pingsutw Jul 7, 2023
6110820
nit
pingsutw Jul 7, 2023
3d1234f
lint
pingsutw Jul 7, 2023
028eb08
Add fs hook
pingsutw Jul 7, 2023
cb3e0cf
Add TimeSensor
pingsutw Jul 7, 2023
91f8638
add support operator
pingsutw Jul 14, 2023
dfb244c
nit
pingsutw Jul 19, 2023
487438a
nit
pingsutw Jul 19, 2023
7532272
Add tests
pingsutw Jul 19, 2023
e4bdfea
Add google-cloud-orchestration-airflow
pingsutw Jul 24, 2023
aa26ead
update dockerfile
pingsutw Jul 27, 2023
9d47f50
wip
pingsutw Jul 28, 2023
0b8b0aa
Add support dataproc operator
pingsutw Jul 29, 2023
ff69f49
Fix dataprocDeleteOperator
pingsutw Jul 29, 2023
978ad96
wip
pingsutw Jul 31, 2023
13ee158
wip
pingsutw Jul 31, 2023
abb3f0c
use jsonpickle
pingsutw Jul 31, 2023
9e1589d
clean up
pingsutw Jul 31, 2023
053ce98
test
pingsutw Aug 6, 2023
81f60a3
test
pingsutw Aug 6, 2023
3fbdf47
test
pingsutw Aug 6, 2023
6fb3d7f
wip
pingsutw Aug 7, 2023
873c60e
lont
pingsutw Aug 7, 2023
d884213
Merge branch 'master' of github.com:flyteorg/flytekit into airflow-ag…
pingsutw Aug 11, 2023
38d4b2b
fix-bug
pingsutw Aug 14, 2023
58328ac
update dockerifle
pingsutw Aug 24, 2023
7701e08
Merge branch 'master' of github.com:flyteorg/flytekit into airflow-ag…
pingsutw Sep 7, 2023
1283571
nit
pingsutw Sep 7, 2023
69516e7
use task id as name
pingsutw Sep 14, 2023
a99df83
merged master
pingsutw Sep 14, 2023
3946c9d
Merge branch 'master' of github.com:flyteorg/flytekit into airflow-ag…
pingsutw Sep 14, 2023
ad4b19a
Remove progress bar
pingsutw Sep 14, 2023
62bd03b
catch not found error
pingsutw Sep 14, 2023
7d10abf
clean up
pingsutw Sep 21, 2023
23439e8
clean up
pingsutw Sep 22, 2023
cc50120
clean up
pingsutw Sep 22, 2023
6b95469
merged master
pingsutw Oct 4, 2023
1b25279
nit
pingsutw Oct 4, 2023
3e9dfbd
update dockerfile
pingsutw Oct 4, 2023
023cd66
nit
pingsutw Oct 4, 2023
01ec6d9
update dep
pingsutw Oct 4, 2023
8784c75
nit
pingsutw Oct 4, 2023
86e2780
Merge branch 'master' of github.com:flyteorg/flytekit into airflow-ag…
pingsutw Oct 6, 2023
015db31
update dep
pingsutw Oct 9, 2023
7c68d75
lint
pingsutw Oct 9, 2023
d345b75
test 3.11
pingsutw Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pythonbuild.yml
Expand Up @@ -94,6 +94,7 @@ jobs:
python-version: ["3.8", "3.11"]
plugin-names:
# Please maintain an alphabetical order in the following list
- flytekit-airflow
- flytekit-aws-athena
- flytekit-aws-batch
- flytekit-aws-sagemaker
Expand Down Expand Up @@ -163,6 +164,8 @@ jobs:
# See: https://github.com/flyteorg/flytekit/actions/runs/4493746408/jobs/7905368664
- python-version: 3.11
plugin-names: "flytekit-whylogs"
- python-version: 3.11
plugin-names: "flytekit-airflow"
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Expand Up @@ -20,6 +20,7 @@ RUN pip install -U flytekit==$VERSION \

RUN useradd -u 1000 flytekit
RUN chown flytekit: /root
RUN chown flytekit: /home
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
USER flytekit

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
5 changes: 4 additions & 1 deletion Dockerfile.agent
Expand Up @@ -5,6 +5,9 @@ LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

ARG VERSION
RUN pip install prometheus-client
RUN pip install -U flytekit==$VERSION flytekitplugins-bigquery==$VERSION

# Airflow plugin's dependencies
RUN pip install apache-airflow
RUN pip install -U flytekit==$VERSION flytekitplugins-bigquery==$VERSION flytekitplugins-airflow==$VERSION

CMD pyflyte serve --port 8000
30 changes: 13 additions & 17 deletions flytekit/extend/backend/base_agent.py
Expand Up @@ -20,14 +20,14 @@
State,
)
from flyteidl.core.tasks_pb2 import TaskTemplate
from rich.progress import Progress

import flytekit
from flytekit import FlyteContext, logger
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.type_engine import TypeEngine
from flytekit.exceptions.system import FlyteAgentNotFound
from flytekit.exceptions.user import FlyteUserException
from flytekit.models.literals import LiteralMap


Expand Down Expand Up @@ -176,7 +176,7 @@
res = asyncio.run(self._get(resource_meta=res.resource_meta))

if res.resource.state != SUCCEEDED:
raise Exception(f"Failed to run the task {self._entity.name}")
raise FlyteUserException(f"Failed to run the task {self._entity.name}")

Check warning on line 179 in flytekit/extend/backend/base_agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/base_agent.py#L179

Added line #L179 was not covered by tests

return LiteralMap.from_flyte_idl(res.resource.outputs)

Expand Down Expand Up @@ -205,21 +205,17 @@
state = RUNNING
grpc_ctx = _get_grpc_context()

progress = Progress(transient=True)
task = progress.add_task(f"[cyan]Running Task {self._entity.name}...", total=None)
with progress:
while not is_terminal_state(state):
progress.start_task(task)
time.sleep(1)
if self._agent.asynchronous:
res = await self._agent.async_get(grpc_ctx, resource_meta)
if self._is_canceled:
await self._is_canceled
sys.exit(1)
else:
res = self._agent.get(grpc_ctx, resource_meta)
state = res.resource.state
logger.info(f"Task state: {state}")
while not is_terminal_state(state):
time.sleep(1)

Check warning on line 209 in flytekit/extend/backend/base_agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/base_agent.py#L209

Added line #L209 was not covered by tests
if self._agent.asynchronous:
res = await self._agent.async_get(grpc_ctx, resource_meta)

Check warning on line 211 in flytekit/extend/backend/base_agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/base_agent.py#L211

Added line #L211 was not covered by tests
if self._is_canceled:
await self._is_canceled
sys.exit(1)

Check warning on line 214 in flytekit/extend/backend/base_agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/base_agent.py#L213-L214

Added lines #L213 - L214 were not covered by tests
else:
res = self._agent.get(grpc_ctx, resource_meta)
state = res.resource.state
logger.info(f"Task state: {state}")

Check warning on line 218 in flytekit/extend/backend/base_agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extend/backend/base_agent.py#L216-L218

Added lines #L216 - L218 were not covered by tests
return res

def signal_handler(self, resource_meta: bytes, signum: int, frame: FrameType) -> typing.Any:
Expand Down
50 changes: 30 additions & 20 deletions flytekit/types/pickle/pickle.py
Expand Up @@ -4,7 +4,7 @@

import cloudpickle

from flytekit.core.context_manager import FlyteContext
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeEngine, TypeTransformer
from flytekit.models.core import types as _core_types
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
Expand Down Expand Up @@ -50,6 +50,33 @@

return _SpecificFormatClass

@classmethod
def to_pickle(cls, python_val: typing.Any) -> str:
ctx = FlyteContextManager.current_context()
local_dir = ctx.file_access.get_random_local_directory()
os.makedirs(local_dir, exist_ok=True)
local_path = ctx.file_access.get_random_local_path()
uri = os.path.join(local_dir, local_path)

Check warning on line 59 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L55-L59

Added lines #L55 - L59 were not covered by tests
with open(uri, "w+b") as outfile:
cloudpickle.dump(python_val, outfile)

Check warning on line 61 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L61

Added line #L61 was not covered by tests

remote_path = ctx.file_access.get_random_remote_path(uri)
ctx.file_access.put_data(uri, remote_path, is_multipart=False)
return remote_path

Check warning on line 65 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L63-L65

Added lines #L63 - L65 were not covered by tests

@classmethod
def from_pickle(cls, uri: str) -> typing.Any:
ctx = FlyteContextManager.current_context()

Check warning on line 69 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L69

Added line #L69 was not covered by tests
# Deserialize the pickle, and return data in the pickle,
# and download pickle file to local first if file is not in the local file systems.
if ctx.file_access.is_remote(uri):
local_path = ctx.file_access.get_random_local_path()
ctx.file_access.get_data(uri, local_path, False)
uri = local_path

Check warning on line 75 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L73-L75

Added lines #L73 - L75 were not covered by tests
with open(uri, "rb") as infile:
data = cloudpickle.load(infile)
return data

Check warning on line 78 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L77-L78

Added lines #L77 - L78 were not covered by tests


class FlytePickleTransformer(TypeTransformer[FlytePickle]):
PYTHON_PICKLE_FORMAT = "PythonPickle"
Expand All @@ -63,15 +90,7 @@

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
uri = lv.scalar.blob.uri
# Deserialize the pickle, and return data in the pickle,
# and download pickle file to local first if file is not in the local file systems.
if ctx.file_access.is_remote(uri):
local_path = ctx.file_access.get_random_local_path()
ctx.file_access.get_data(uri, local_path, False)
uri = local_path
with open(uri, "rb") as infile:
data = cloudpickle.load(infile)
return data
return FlytePickle.from_pickle(uri)

Check warning on line 93 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L93

Added line #L93 was not covered by tests

def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
if python_val is None:
Expand All @@ -81,16 +100,7 @@
format=self.PYTHON_PICKLE_FORMAT, dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE
)
)
# Dump the task output into pickle
local_dir = ctx.file_access.get_random_local_directory()
os.makedirs(local_dir, exist_ok=True)
local_path = ctx.file_access.get_random_local_path()
uri = os.path.join(local_dir, local_path)
with open(uri, "w+b") as outfile:
cloudpickle.dump(python_val, outfile)

remote_path = ctx.file_access.get_random_remote_path(uri)
ctx.file_access.put_data(uri, remote_path, is_multipart=False)
remote_path = FlytePickle.to_pickle(python_val)

Check warning on line 103 in flytekit/types/pickle/pickle.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/pickle/pickle.py#L103

Added line #L103 was not covered by tests
return Literal(scalar=Scalar(blob=Blob(metadata=meta, uri=remote_path)))

def guess_python_type(self, literal_type: LiteralType) -> typing.Type[FlytePickle[typing.Any]]:
Expand Down
33 changes: 33 additions & 0 deletions plugins/flytekit-airflow/README.md
@@ -0,0 +1,33 @@
# Flytekit Airflow Plugin
Airflow plugin allows you to seamlessly run Airflow tasks in the Flyte workflow without changing any code.

- Compile Airflow tasks to Flyte tasks
- Use Airflow sensors/operators in Flyte workflows
- Add support running Airflow tasks locally without running a cluster

## Example
```python
from airflow.sensors.filesystem import FileSensor
from flytekit import task, workflow

@task()
def t1():
print("flyte")


@workflow
def wf():
sensor = FileSensor(task_id="id", filepath="/tmp/1234")
sensor >> t1()


if __name__ == '__main__':
wf()
```


To install the plugin, run the following command:

```bash
pip install flytekitplugins-airflow
```
16 changes: 16 additions & 0 deletions plugins/flytekit-airflow/flytekitplugins/airflow/__init__.py
@@ -0,0 +1,16 @@
"""
.. currentmodule:: flytekitplugins.airflow

This package contains things that are useful when extending Flytekit.

.. autosummary::
:template: custom.rst
:toctree: generated/

AirflowConfig
AirflowTask
AirflowAgent
"""

from .agent import AirflowAgent
from .task import AirflowConfig, AirflowTask
109 changes: 109 additions & 0 deletions plugins/flytekit-airflow/flytekitplugins/airflow/agent.py
@@ -0,0 +1,109 @@
import importlib
from dataclasses import dataclass
from typing import Optional

import cloudpickle
import grpc
import jsonpickle
from airflow.providers.google.cloud.operators.dataproc import (
DataprocDeleteClusterOperator,
DataprocJobBaseOperator,
JobStatus,
)
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
from flyteidl.admin.agent_pb2 import (
PERMANENT_FAILURE,
RUNNING,
SUCCEEDED,
CreateTaskResponse,
DeleteTaskResponse,
GetTaskResponse,
Resource,
)
from flytekitplugins.airflow.task import AirflowConfig
from google.cloud.exceptions import NotFound

from flytekit import FlyteContext, FlyteContextManager, logger
from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate


@dataclass
class ResourceMetadata:
job_id: str
airflow_config: AirflowConfig


def _get_airflow_task(ctx: FlyteContext, airflow_config: AirflowConfig):
task_module = importlib.import_module(name=airflow_config.task_module)
task_def = getattr(task_module, airflow_config.task_name)
task_config = airflow_config.task_config

# Set the GET_ORIGINAL_TASK attribute to True so that task_def will return the original
# airflow task instead of the Flyte task.
ctx.user_space_params.builder().add_attr("GET_ORIGINAL_TASK", True).build()
if issubclass(task_def, DataprocJobBaseOperator):
return task_def(**task_config, asynchronous=True)

Check warning on line 48 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L48

Added line #L48 was not covered by tests
return task_def(**task_config)


class AirflowAgent(AgentBase):
def __init__(self):
super().__init__(task_type="airflow", asynchronous=False)

def create(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
) -> CreateTaskResponse:
airflow_config = jsonpickle.decode(task_template.custom.get("task_config_pkl"))
resource_meta = ResourceMetadata(job_id="", airflow_config=airflow_config)

ctx = FlyteContextManager.current_context()
airflow_task = _get_airflow_task(ctx, airflow_config)
if isinstance(airflow_task, DataprocJobBaseOperator):
airflow_task.execute(context=Context())
resource_meta.job_id = ctx.user_space_params.xcom_data["value"]["resource"]

Check warning on line 70 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L69-L70

Added lines #L69 - L70 were not covered by tests

return CreateTaskResponse(resource_meta=cloudpickle.dumps(resource_meta))

def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse:
meta = cloudpickle.loads(resource_meta)
airflow_config = meta.airflow_config
job_id = meta.job_id
task = _get_airflow_task(FlyteContextManager.current_context(), meta.airflow_config)
cur_state = RUNNING

if issubclass(type(task), BaseSensorOperator):
if task.poke(context=Context()):
cur_state = SUCCEEDED
elif issubclass(type(task), DataprocJobBaseOperator):
job = task.hook.get_job(

Check warning on line 85 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L85

Added line #L85 was not covered by tests
job_id=job_id,
region=airflow_config.task_config["region"],
project_id=airflow_config.task_config["project_id"],
)
if job.status.state == JobStatus.State.DONE:
cur_state = SUCCEEDED
elif job.status.state in (JobStatus.State.ERROR, JobStatus.State.CANCELLED):
cur_state = PERMANENT_FAILURE

Check warning on line 93 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L90-L93

Added lines #L90 - L93 were not covered by tests
elif isinstance(task, DataprocDeleteClusterOperator):
try:
task.execute(context=Context())
except NotFound:
logger.info("Cluster already deleted.")
cur_state = SUCCEEDED

Check warning on line 99 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L95-L99

Added lines #L95 - L99 were not covered by tests
else:
task.execute(context=Context())
cur_state = SUCCEEDED
return GetTaskResponse(resource=Resource(state=cur_state, outputs=None))

def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse:
return DeleteTaskResponse()

Check warning on line 106 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L106

Added line #L106 was not covered by tests


AgentRegistry.register(AirflowAgent())