Skip to content
This repository was archived by the owner on Apr 14, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ COPY Pipfile* /
RUN pipenv install --system --ignore-pipfile --deploy
COPY entrypoint.sh /
RUN mkdir -p /airflow/dags
RUN mkdir -p /airflow/plugins
COPY workflows/* /airflow/dags/
COPY mit /airflow/plugins/mit
RUN chown -R airflow:airflow /airflow

USER airflow
ENV AIRFLOW_HOME /airflow
ENTRYPOINT ["/entrypoint.sh"]
CMD ["--help"]
4 changes: 2 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ coveralls = "*"

[packages]
apache-airflow = {extras = ["celery", "crypto", "password", "postgres"],version = "~=1.10.0"}
psycopg2 = "*"
click = "*"
boto3 = "*"
redis = "*"
jmespath = "*"
colorama = "*"
psycopg2-binary = "*"
boto3 = "*"

[requires]
python_version = "3.7"
Expand Down
420 changes: 228 additions & 192 deletions Pipfile.lock

Large diffs are not rendered by default.

23 changes: 22 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ Workflow
This is the MIT Libraries' `Airflow <https://airflow.apache.org/>`_ implementation. **Please read these instructions carefully, because this is very much a shared codebase.**

.. contents:: Table of Contents
.. section-numbering::

Intro
-----
Expand Down Expand Up @@ -40,6 +39,28 @@ Workflow Tips and Things to Remember
- It's better to have several small tasks than a single large task for your workflow. The Airflow worker nodes will need to be restarted from time to time for different reasons. When a node is restarted, it's given about 2 minutes for any tasks currently running to finish at which point it is forceably killed.
- Airflow schedules tasks to be run, but there are no hard guarantees about when a scheduled task will actually be run as it depends on there being workers available to run it.

Running Containerized Workflows
-------------------------------

We are experimenting with a somewhat different execution model for workflows. With this model, instead of your task being done in the context of Airflow it's done in its own container. There are several really good reasons to consider doing things this way:

1. Your task does not share the same dependencies as all the other tasks or Airflow itself. It is running in an isolated container that has nothing to do with Airflow.
2. Your task can be written in whatever language you want. You bring your own container and can put whatever you want on it.
3. If your long-running task is busy doing work when the cluster goes down for a redeploy it will keep going and your workflow will pick up where it left off when the cluster restarts.

If you are interested in using this, read on. There are two custom operators you will be using to build your workflow. The workflow (DAG) will still be the same Airflow workflow that you are used to, it's just that you will only use these two operators.

The first is the ``airflow.operators.mit.ECSOperator``. Note that this is different from the one included in Airflow. This is the operator that will be used to start your containerized task. It will start the task and then immediately exit. The next step in your workflow will be an ``airflow.sensors.mit.ECSTaskSensor``. As soon as the previous ``ECSOperator`` has exited this sensor will periodically monitor the container. If the container failed to start for some reason, or exited with an error, the sensor step will fail. Otherwise, the sensor step will succeed and your workflow can continue. You can use as many of these as your want in a workflow. You'll just chain them together so that it will look something like this::

ECSOperator --> ECSTaskSensor --> ECSOperator --> ECSTaskSensor ...

The ``ECSOperator`` needs a few arguments that have to be provided at runtime from the environment. The ``cluster`` can be retrieved from the ``ECS_CLUSTER_NAME`` envvar. The ``network_configuration`` can be retrieved from the ``ECS_NETWORK_CONFIG``, but note that this is a base64 encoded JSON string (sorry, this is the only way to get this from Terraform into the environment). You can use::

json.loads(base64.b64decode(os.getenv('ECS_NETWORK_CONFIG')))

to retrieve the network configuration. The ``task_definition`` will need to be manually configured after the task has been created in Terraform. There is an example workflow (``example_ecs.py``) in the root of this repo.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any associated work in terraform. Did this not get pushed? It isn't blocking on this work, but I was just trying to better understand the bigger picture.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, I haven't pushed it yet. I'll do that shortly.



Developing Locally
------------------

Expand Down
30 changes: 30 additions & 0 deletions example_ecs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import base64
from datetime import datetime
import json
import os

from airflow import DAG
from airflow.operators.mit import ECSOperator
from airflow.sensors.mit import ECSTaskSensor


network_config = json.loads(base64.b64decode(os.getenv('ECS_NETWORK_CONFIG')))
cluster = os.getenv('ECS_CLUSTER')

dag = DAG('example',
description='An example for running containerized workflows.',
start_date=datetime.now())

task1 = ECSOperator(task_id='ex_step_1',
dag=dag,
cluster=cluster,
task_definition='example_task_definition',
overrides={},
network_configuration=network_config)

task2 = ECSTaskSensor(task_id='ex_step_2',
dag=dag,
cluster=cluster,
ecs_task_id='ex_step_1')

task1 >> task2
10 changes: 10 additions & 0 deletions mit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from airflow.plugins_manager import AirflowPlugin

from mit.operators import ECSOperator
from mit.sensors import ECSTaskSensor


class MitPlugin(AirflowPlugin):
name = 'mit'
operators = [ECSOperator]
sensors = [ECSTaskSensor]
34 changes: 34 additions & 0 deletions mit/operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import boto3


class ECSOperator(BaseOperator):
@apply_defaults
def __init__(self,
cluster,
task_definition,
overrides,
network_configuration,
launch_type='FARGATE',
**kwargs):
super().__init__(**kwargs)
self.cluster = cluster
self.task_definition = task_definition
self.overrides = overrides
self.launch_type = launch_type
self.network_configuration = network_configuration

def execute(self, context):
ecs = boto3.client('ecs')
res = ecs.run_task(
cluster=self.cluster,
taskDefinition=self.task_definition,
overrides=self.overrides,
count=1,
launchType=self.launch_type,
startedBy='Airflow',
networkConfiguration=self.network_configuration)
if len(res['failures']) > 0:
raise Exception(res)
return res['tasks'][0]['taskArn']
32 changes: 32 additions & 0 deletions mit/sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import boto3


class ECSTaskSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
cluster,
ecs_task_id,
mode='reschedule',
*args,
**kwargs):
super().__init__(*args, mode=mode, **kwargs)
self.cluster = cluster
self.ecs_task_id = ecs_task_id

def poke(self, context):
ecs = boto3.client('ecs')
arn = context['task_instance'].xcom_pull(task_ids=self.ecs_task_id)
res = ecs.describe_tasks(cluster=self.cluster, tasks=[arn])
if len(res['failures']) > 0:
raise Exception(res)

task = res['tasks'][0]
if task['lastStatus'] != 'STOPPED':
return False
cntr = task['containers'][0]
if cntr['exitCode'] > 0:
raise Exception(
f'Container exited with exit code {cntr["exitCode"]}')
return True
27 changes: 27 additions & 0 deletions tests/test_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import re

import boto3
import pytest

from mit.operators import ECSOperator


arn_search = re.compile(r'arn:aws:ecs:us-east-1:012345678910:task/[0-9a-z-]+')


@pytest.fixture
def task(cluster):
task_name = 'workflow-task'
ecs = boto3.client('ecs')
ecs.register_task_definition(family=task_name, containerDefinitions=[])
return task_name


def test_ecs_operator_runs_task(cluster, task):
op = ECSOperator(cluster=cluster.name,
task_definition=task,
overrides={},
network_configuration={},
task_id='test')
arn = op.execute(context={})
assert arn_search.search(arn)
6 changes: 3 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ deps =
flake8: flake8
setenv =
PIPENV_VERBOSITY=-1
coverage: PYTEST_COV=--cov manager
coverage: PYTEST_COV=--cov manager --cov mit
commands =
pipenv install --dev --ignore-pipfile --deploy
pipenv run pytest tests {env:PYTEST_COV:} {posargs:--tb=short}

[testenv:flake8]
commands = pipenv run flake8 manager tests
commands = pipenv run flake8 manager tests mit

[testenv:safety]
commands = pipenv check
Expand All @@ -27,7 +27,7 @@ passenv = TRAVIS TRAVIS_JOB_ID TRAVIS_BRANCH COVERALLS_REPO_TOKEN

commands =
pipenv install --dev
pytest tests --cov=manager
pytest tests --cov=manager --cov=mit
coveralls

[flake8]
Expand Down
Loading