Skip to content

Commit

Permalink
Add deployment docs for Airflow and Dask
Browse files Browse the repository at this point in the history
Test Plan: docs only

Reviewers: #ft, alangenfeld, max

Reviewed By: #ft, alangenfeld, max

Subscribers: max, alangenfeld

Differential Revision: https://dagster.phacility.com/D663
  • Loading branch information
Nate Kupp committed Jul 25, 2019
1 parent 6b5082f commit 37ea663
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 234 deletions.
4 changes: 1 addition & 3 deletions docs/index.rst
Expand Up @@ -8,7 +8,5 @@
Learn <sections/learn/learn>
API Docs <sections/api/api>
Reference <sections/reference/reference>

Deploying <sections/deploying/deploying>
Community <sections/community/community>

.. TODO: Write this section Deploying <sections/deploying/deploying>
230 changes: 230 additions & 0 deletions docs/sections/deploying/airflow.md
@@ -0,0 +1,230 @@
# Airflow Deployment Guide

## Introduction
Dagster is designed for incremental adoption, and to work with all of your existing Airflow infrastructure.

You can use all of Dagster's features and abstractions—the programming model, type systems, etc.
while scheduling, executing, and monitoring your Dagster pipelines with Airflow, right alongside all
of your existing Airflow DAGs.

This integration is fairly simple. As with vanilla Dagster pipeline execution, Dagster compiles your pipeline and
configuration together into an execution plan. In the case of Airflow, these execution plans are then mapped to a
DAG, with a bijection between solids and Airflow operators.

### Requirements
- The `dagster-airflow` library requires an Airflow install.

### Overview
We support two modes of execution:

1. **Uncontainerized**, (default) where tasks are invoked directly on the Airflow executors.
2. **Containerized**, where tasks are executed in Docker containers.

## Running Uncontainerized
We use a `DagsterPythonOperator` to wrap Dagster solids and define an Airflow DAG that corresponds
to a Dagster pipeline and can run in your Airflow environment uncontainerized.

You will need to make sure that all of the Python and system requirements that your Dagster pipeline
requires are available in your Airflow environment; if you're running Airflow on multiple nodes with
the Celery executor, this will be true for the Airflow master and all workers.

To define an Airflow DAG corresponding to a Dagster pipeline, you'll put a new Python file defining your DAG
in the directory in which Airflow looks for DAGs -- this is typically `$AIRFLOW_HOME/dags`.

You can automatically scaffold this file from your Python code with the `dagster-airflow` CLI tool. For example, if you've checked out Dagster to `$DAGSTER_ROOT`:

pip install -e $DAGSTER_ROOT/examples/
dagster-airflow scaffold \
--dag-name toys_sleepy \
--module-name dagster_examples.toys.sleepy \
--pipeline-name sleepy_pipeline

This will create a file in your local `$AIRFLOW_HOME/dags` folder like:

```python
'''
The airflow DAG scaffold for dagster_examples.toys.sleepy.sleepy_pipeline
Note that this docstring must contain the strings "airflow" and "DAG" for
Airflow to properly detect it as a DAG
See: http://bit.ly/307VMum
'''
import datetime
import yaml

from dagster_airflow.factory import make_airflow_dag


ENVIRONMENT = '''
storage:
filesystem:
config:
base_dir: /tmp/dagster-airflow/toys_sleepy
'''


# NOTE: these arguments should be edited for your environment
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime(2019, 5, 7),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}

dag, tasks = make_airflow_dag(
# NOTE: you must ensure that dagster_examples.toys.sleepy is installed or available on sys.path,
# otherwise, this import will fail.
module_name='dagster_examples.toys.sleepy',
pipeline_name='sleepy_pipeline',
environment_dict=yaml.load(ENVIRONMENT),
dag_kwargs={'default_args': DEFAULT_ARGS, 'max_active_runs': 1}
)
```

You can simply edit this file, supplying the appropriate environment configuration and Airflow `DEFAULT_ARGS` for
your particular Airflow instance. When Airflow sweeps this directory looking for DAGs, it will find and execute this code, dynamically creating an Airflow DAG and steps corresponding to your Dagster pipeline.

These are ordinary Airflow objects, and you can do eveything you would expect with them—for example,
adding `ExternalTaskSensor` dependencies between the dynamically generated Airflow operators in this DAG
and operators that you define in your other existing Airflow DAGs.

Note the extra `storage` parameter in the environment dict. You can set this for any Dagster
pipeline (and intermediate values will be automatically materialized in either `filesystem` or `s3`
storage), but you **must** set it when converting a pipeline to an Airflow DAG.

## Running Containerized
We use a `DagsterDockerOperator`, based on the ordinary Airflow `DockerOperator`, to wrap Dagster pipelines.
In order to run containerized Dagster pipelines, you must have Docker running in your Airflow
environment (the same as for the ordinary Airflow `DockerOperator`).

During execution, Dagster caches and transfers intermediate state between execution steps. This feature enables quick re-execution of execution steps from the Dagit UI.

When running uncontainerized on a single machine, this transfer can take place in memory or on the local file system, but running in a containerized context requires a persistent intermediate storage layer available to the Dagster containers.

Presently, we support S3 for persisting this intermediate state. To use it, you'll just need to set up an S3 bucket and expose AWS credentials via the usual Boto credentials chain. We plan on supporting other persistence targets like GCS, HDFS, and NFS in the future—please reach out to us if you require a different intermediate store for your use case!

We use the `DagsterDockerOperator` to define an Airflow DAG that can run in completely isolated
containers corresponding to your Dagster solids. To run containerized, you'll first need to
containerize your repository. Then, you can define your Airflow DAG.

### Containerizing your repository

Make sure you have Docker installed, and write a Dockerfile like the following:

```Dockerfile
# You may use any base container with a supported Python runtime: 2.7, 3.5, 3.6, or 3.7
FROM python:3.7

# Install any OS-level requirements (e.g. using apt, yum, apk, etc.) that the pipelines in your
# repository require to run
# RUN apt-get install some-package some-other-package

# Set environment variables that you'd like to have available in the built image.
# ENV IMPORTANT_OPTION=yes

# If you would like to set secrets at build time (with --build-arg), set args
# ARG super_secret

# Install dagster_graphql
RUN pip install dagster_graphql

# Install any Python requirements that the pipelines in your repository require to run
ADD /path/to/requirements.txt .
RUN pip install -r requirements.txt

# Add your repository.yaml file so that dagster_graphql knows where to look to find your repository,
# the Python file in which your repository is defined, and any local dependencies (e.g., unpackaged
# Python files from which your repository definition imports, or local packages that cannot be
# installed using the requirements.txt).
ADD /path/to/repository.yaml .
ADD /path/to/repository_definition.py .
# ADD /path/to/additional_file.py .

# The dagster-airflow machinery will use dagster_graphql to execute steps in your pipelines, so we
# need to run dagster_graphql when the container starts up
ENTRYPOINT [ "dagster_graphql" ]
```

Of course, you may expand on this Dockerfile in any way that suits your needs.

Once you've written your Dockerfile, you can build your Docker image. You'll need the name of the
Docker image (`-t`) that contains your repository later so that the docker-airflow machinery knows
which image to run. E.g., if you want your image to be called `dagster-airflow-demo-repository`:

```bash
docker build -t dagster-airflow-demo-repository -f /path/to/Dockerfile .
```

If you want your containerized pipeline to be available to Airflow operators running on other
machines (for example, in environments where Airflow workers are running remotely) you'll need to
push your Docker image to a Docker registry so that remote instances of Docker can pull the image
by name.

For most production applications, you'll probably want to use a private Docker registry, rather
than the public DockerHub, to store your containerized pipelines.

### Defining your pipeline as a containerized Airflow DAG

As in the uncontainerized case, you'll put a new Python file defining your DAG in the directory in
which Airflow looks for DAGs.

from dagster_airflow.factory import make_airflow_dag_containerized

from my_package import define_my_pipeline

pipeline = define_my_pipeline()

image = 'dagster-airflow-demo-repository'

dag, steps = make_airflow_dag_containerized(
pipeline,
image,
environment_dict={'storage': {'filesystem': {'config': {'base_dir': '/tmp'}}}},
dag_id=None,
dag_description=None,
dag_kwargs=None,
op_kwargs=None
)

You can pass `op_kwargs` through to the the DagsterDockerOperator to use custom TLS settings, the
private registry of your choice, etc., just as you would configure the ordinary Airflow
DockerOperator.

### Docker bind-mount for filesystem intermediate storage

By default, the DagsterDockerOperator will bind-mount `/tmp` on the host into `/tmp` in the Docker
container. You can control this by setting the `op_kwargs` in make_airflow_dag. For instance,
if you'd prefer to mount `/host_tmp` on the host into `/container_tmp` in the container, and
use this volume for intermediate storage, you can run:

dag, steps = make_airflow_dag(
pipeline,
image,
environment_dict={'storage': {'filesystem': {'config' : {'base_dir': '/container_tmp'}}}},
dag_id=None,
dag_description=None,
dag_kwargs=None,
op_kwargs={'host_tmp_dir': '/host_tmp', 'tmp_dir': '/container_tmp'}
)

### Using S3 with dagster-airflow

You can also use S3 for dagster-airflow intermediate storage, and you must use S3 when running
your DAGs with distributed executors.

You'll need to create an S3 bucket, and provide AWS credentials granting read and write permissions
to this bucket within your Docker containers. We recommend that you use credentials for an IAM user
which has the [least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege)
required to access the S3 bucket for dagster-airflow.

You can configure S3 storage as follows:

{'storage': {'s3': {'s3_bucket': 'my-cool-bucket'}}}


### Compatibility

Note that Airflow versions less than 1.10.3 are incompatible with Python 3.7+.
69 changes: 69 additions & 0 deletions docs/sections/deploying/dask.md
@@ -0,0 +1,69 @@
# Dask Deployment Guide

## Introduction
As noted above, Dagster is designed to target a variety of execution substrates, and natively supports Dask for pipeline execution.

Presently, the Dagster / Dask integration provides a single API, `execute_on_dask`, which can execute a Dagster pipeline on either local Dask or on a remote Dask cluster.

This is accomplished by taking the compiled execution plan, and converting each execution step into a [Dask Future](https://docs.dask.org/en/latest/futures.html) configured with the appropriate task dependencies to ensure tasks are properly sequenced. Data is passed between step executions via intermediate storage, and so a persistent shared storage must be used in a distributed execution context. When the pipeline is executed, these futures are generated and then awaited by the parent Dagster process.


### Requirements
To use `dagster-dask`, you'll need to install [Dask / Dask.Distributed](https://distributed.readthedocs.io/en/latest/install.html).

## Local Execution
It is relatively straightforward to set up and run a Dagster pipeline on Dask, using the `execute_on_dask()` API. First,

`pip install dagster dagster-dask`

Then:

```
# dask_hello_world.py
python
from dagster import pipeline, solid, ExecutionTargetHandle
from dagster_dask import execute_on_dask, DaskConfig
@solid
def hello_world(_):
return "Hello, World!"
@pipeline
def dask_pipeline():
return hello_world() # pylint: disable=no-value-for-parameter
execute_on_dask(
ExecutionTargetHandle.for_pipeline_python_file(__file__, 'dask_pipeline'),
env_config={'storage': {'filesystem': {}}},
)
```

Running `python dask_hello_world.py` will spin up local Dask execution, run the hello, world Dagster pipeline, and exit.


## Distributed Cluster Execution
If you want to use a Dask cluster for distributed execution, you will first need to [set up a Dask cluster](https://distributed.readthedocs.io/en/latest/quickstart.html#setup-dask-distributed-the-hard-way). Note that the machine running the Dagster parent process must have access to the host/port on which the Dask scheduler is running.

For distributing task execution on a Dask cluster, you must provide a `DaskConfig` object with the address/port of the Dask scheduler:

```
execute_on_dask(
ExecutionTargetHandle.for_pipeline_module('your.python.module', 'your_pipeline_name'),
env_config={'storage': {'s3': {'config': {'s3_bucket': 'YOUR_BUCKET_HERE'}}}},
dask_config=DaskConfig(address='dask_scheduler.dns-name:8787')
)
```

Since Dask will invoke your pipeline code on the cluster workers, you must ensure that the latest version of your Python code is available to all of the Dask workers—ideally packaged as a Python module `your.python.module` that is importable on `PYTHONPATH`.


## Limitations
* Presently, `dagster-dask` does not support launching Dask workloads from Dagit.
* For distributed execution, you must use S3 for intermediates and run storage, as shown above.
* Dagster logs are not yet retrieved from Dask workers; this will be addressed in follow-up work.

While this library is still nascent, we're working to improve it, and we are happy to accept contributions!
26 changes: 26 additions & 0 deletions docs/sections/deploying/deploying.rst
@@ -0,0 +1,26 @@
Deploying
=======================

As is the case with all workflow orchestration, when deploying Dagster pipelines, there are two
primary concerns: **execution** and **scheduling**.

In the simplest case where you just want to schedule a simple pipeline or two and solid execution is
relatively lightweight, you can readily schedule and invoke Dagster pipeline execution via cron by
adding a ``dagster pipeline execute...`` line to your crontab.

In real-world systems, you'll likely need more sophisticated support for both scheduling and
executing pipelines.

For this, Dagster provides support for two execution substrates: `Dask <https://github.com/dagster-io/dagster/tree/master/python_modules/dagster-dask>`__,
which distributes execution but still relies on cron or some other external scheduler, and
`Airflow <https://github.com/dagster-io/dagster/tree/master/python_modules/dagster-airflow>`__,
to which we delegate both scheduling and execution.

In the following sections, we detail how to operationalize Dagster execution on both of these
systems.

.. toctree::
:maxdepth: 1

airflow
dask

0 comments on commit 37ea663

Please sign in to comment.