Skip to content
Permalink
Browse files

Simplify engine interface

Summary: Preliminary to selecting engines through Dagit -- first make sure that we are using a single code path for all of our execution modalities.

Test Plan: Unit

Reviewers: #ft, schrockn, alangenfeld, natekupp

Reviewed By: #ft, natekupp

Subscribers: natekupp

Differential Revision: https://dagster.phacility.com/D752
  • Loading branch information...
mgasner committed Jul 18, 2019
1 parent 80aa589 commit 596d7c1f7bd4840ea53131759d93979ed264614f
Showing with 332 additions and 210 deletions.
  1. +1 −1 .pylintrc
  2. +33 −15 docs/sections/deploying/dask.md
  3. +1 −2 python_modules/dagster-dask/dagster_dask/__init__.py
  4. +2 −1 python_modules/dagster-dask/dagster_dask/config.py
  5. +0 −47 python_modules/dagster-dask/dagster_dask/execute.py
  6. +8 −0 python_modules/dagster-dask/dagster_dask_tests/conftest.py
  7. +7 −8 python_modules/dagster-dask/dagster_dask_tests/test_cluster.py
  8. +28 −14 python_modules/dagster-dask/dagster_dask_tests/test_execute.py
  9. +5 −2 python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
  10. +2 −2 python_modules/dagster/dagster/core/definitions/environment_configs.py
  11. +34 −16 python_modules/dagster/dagster/core/engine/child_process_executor.py
  12. +5 −2 python_modules/dagster/dagster/core/engine/engine_inprocess.py
  13. +11 −18 python_modules/dagster/dagster/core/engine/engine_multiprocess.py
  14. +95 −47 python_modules/dagster/dagster/core/execution/api.py
  15. +2 −2 python_modules/dagster/dagster/core/execution/config.py
  16. +2 −14 python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
  17. +24 −7 python_modules/dagster/dagster/core/system_config/objects.py
  18. +3 −3 python_modules/dagster/dagster/core/types/config_schema.py
  19. +3 −3 python_modules/dagster/dagster/core/types/default_applier.py
  20. +3 −3 python_modules/dagster/dagster/core/types/evaluator/evaluation.py
  21. +12 −1 python_modules/dagster/dagster/utils/__init__.py
  22. +0 −2 python_modules/dagster/dagster_tests/core_tests/system_config_tests/test_system_config.py
  23. +34 −0 python_modules/dagster/dagster_tests/core_tests/test_resource_definition.py
  24. +17 −0 python_modules/dagster/dagster_tests/utils_tests/test_utils.py
@@ -15,7 +15,7 @@
# W0511 disable TODO warning
# W1201, W1202 disable log format warning. False positives (I think)

disable=C,R,duplicate-code,W0511,W1201,W1202
disable=C,R,duplicate-code,W0511,W1201,W1202,no-init

# See: https://github.com/getsentry/responses/issues/74
[TYPECHECK]
@@ -1,29 +1,38 @@
# Dask Deployment Guide

## Introduction
As noted above, Dagster is designed to target a variety of execution substrates, and natively supports Dask for pipeline execution.
As noted above, Dagster is designed to target a variety of execution substrates, and natively
supports Dask for pipeline execution.

Presently, the Dagster / Dask integration provides a single API, `execute_on_dask`, which can execute a Dagster pipeline on either local Dask or on a remote Dask cluster.
The Dagster / Dask integration lets you execute a Dagster pipeline on either local Dask or on a
remote Dask cluster by specifying the `run_config` argument to `execute_pipeline`.

This is accomplished by taking the compiled execution plan, and converting each execution step into a [Dask Future](https://docs.dask.org/en/latest/futures.html) configured with the appropriate task dependencies to ensure tasks are properly sequenced. Data is passed between step executions via intermediate storage, and so a persistent shared storage must be used in a distributed execution context. When the pipeline is executed, these futures are generated and then awaited by the parent Dagster process.
The integration works by taking the compiled execution plan, and converting each execution step
into a [Dask Future](https://docs.dask.org/en/latest/futures.html) configured with the appropriate
task dependencies to ensure tasks are properly sequenced. When the pipeline is executed, these
futures are generated and then awaited by the parent Dagster process.

Data is passed between step executions via intermediate storage. As a consequence, a persistent
shared storage must be used in a distributed execution context.


### Requirements
To use `dagster-dask`, you'll need to install [Dask / Dask.Distributed](https://distributed.readthedocs.io/en/latest/install.html).
To use `dagster-dask`, you'll need to install
[Dask / Dask.Distributed](https://distributed.readthedocs.io/en/latest/install.html).

## Local Execution
It is relatively straightforward to set up and run a Dagster pipeline on Dask, using the `execute_on_dask()` API. First,
It is relatively straightforward to set up and run a Dagster pipeline on Dask.

`pip install dagster dagster-dask`
First, run `pip install dagster dagster-dask`.

Then:

```
# dask_hello_world.py
python
from dagster import pipeline, solid, ExecutionTargetHandle
from dagster_dask import execute_on_dask, DaskConfig
from dagster import execute_pipeline, ExecutionTargetHandle, pipeline, solid
from dagster_dask import DaskConfig
@solid
@@ -36,34 +45,43 @@ def dask_pipeline():
return hello_world() # pylint: disable=no-value-for-parameter
execute_on_dask(
execute_pipeline(
ExecutionTargetHandle.for_pipeline_python_file(__file__, 'dask_pipeline'),
env_config={'storage': {'filesystem': {}}},
run_config=RunConfig(executor_config=DaskConfig())
)
```

Running `python dask_hello_world.py` will spin up local Dask execution, run the hello, world Dagster pipeline, and exit.
Running `python dask_hello_world.py` will spin up local Dask execution, run the Dagster pipeline,
and exit.


## Distributed Cluster Execution
If you want to use a Dask cluster for distributed execution, you will first need to [set up a Dask cluster](https://distributed.readthedocs.io/en/latest/quickstart.html#setup-dask-distributed-the-hard-way). Note that the machine running the Dagster parent process must have access to the host/port on which the Dask scheduler is running.
If you want to use a Dask cluster for distributed execution, you will first need to
[set up a Dask cluster](https://distributed.readthedocs.io/en/latest/quickstart.html#setup-dask-distributed-the-hard-way).
Note that the machine running the Dagster parent process must have access to the host/port on which
the Dask scheduler is running.

For distributing task execution on a Dask cluster, you must provide a `DaskConfig` object with the address/port of the Dask scheduler:
For distributing task execution on a Dask cluster, you must provide a `DaskConfig` object with
the address/port of the Dask scheduler:

```
execute_on_dask(
execute_pipeline(
ExecutionTargetHandle.for_pipeline_module('your.python.module', 'your_pipeline_name'),
env_config={'storage': {'s3': {'config': {'s3_bucket': 'YOUR_BUCKET_HERE'}}}},
dask_config=DaskConfig(address='dask_scheduler.dns-name:8787')
)
```

Since Dask will invoke your pipeline code on the cluster workers, you must ensure that the latest version of your Python code is available to all of the Dask workers—ideally packaged as a Python module `your.python.module` that is importable on `PYTHONPATH`.
Since Dask will invoke your pipeline code on the cluster workers, you must ensure that the latest
version of your Python code is available to all of the Dask workers—ideally packaged as a Python
module `your.python.module` that is importable on `PYTHONPATH`.


## Limitations
* Presently, `dagster-dask` does not support launching Dask workloads from Dagit.
* For distributed execution, you must use S3 for intermediates and run storage, as shown above.
* Dagster logs are not yet retrieved from Dask workers; this will be addressed in follow-up work.

While this library is still nascent, we're working to improve it, and we are happy to accept contributions!
While this library is still nascent, we're working to improve it, and we are happy to accept
contributions!
@@ -1,6 +1,5 @@
from .version import __version__

from .config import DaskConfig
from .execute import execute_on_dask

__all__ = ['execute_on_dask', 'DaskConfig']
__all__ = ['DaskConfig']
@@ -44,7 +44,8 @@ def is_remote_execution(self):
def requires_persistent_storage(self):
return True

def get_engine(self):
@staticmethod
def get_engine():
from .engine import DaskEngine

return DaskEngine

This file was deleted.

@@ -0,0 +1,8 @@
import pytest

import os


@pytest.fixture(scope='session')
def dask_address():
return os.getenv('DASK_ADDRESS', 'localhost')
@@ -1,16 +1,15 @@
import os
from dagster import execute_pipeline, ExecutionTargetHandle, RunConfig

from dagster import ExecutionTargetHandle
from dagster_dask import execute_on_dask, DaskConfig
from dagster_dask import DaskConfig


def test_dask_cluster():
result = execute_on_dask(
def test_dask_cluster(dask_address):
result = execute_pipeline(
ExecutionTargetHandle.for_pipeline_module(
'dagster_examples.toys.hammer', 'hammer_pipeline'
),
env_config={'storage': {'s3': {'config': {'s3_bucket': 'dagster-airflow-scratch'}}}},
dask_config=DaskConfig(address='%s:8786' % os.getenv('DASK_ADDRESS')),
).build_pipeline_definition(),
environment_dict={'storage': {'s3': {'config': {'s3_bucket': 'dagster-airflow-scratch'}}}},
run_config=RunConfig(executor_config=DaskConfig(address='%s:8786' % dask_address)),
)
assert result.success
assert result.result_for_solid('total').output_value() == 4
@@ -1,6 +1,14 @@
from dagster import file_relative_path, pipeline, solid, ExecutionTargetHandle, InputDefinition
from dagster import (
execute_pipeline,
file_relative_path,
pipeline,
solid,
ExecutionTargetHandle,
InputDefinition,
RunConfig,
)
from dagster.core.test_utils import nesting_composite_pipeline
from dagster_dask import execute_on_dask, DaskConfig
from dagster_dask import DaskConfig

import dagster_pandas as dagster_pd

@@ -16,10 +24,12 @@ def dask_engine_pipeline():


def test_execute_on_dask():
result = execute_on_dask(
ExecutionTargetHandle.for_pipeline_python_file(__file__, 'dask_engine_pipeline'),
env_config={'storage': {'filesystem': {}}},
dask_config=DaskConfig(timeout=30),
result = execute_pipeline(
ExecutionTargetHandle.for_pipeline_python_file(
__file__, 'dask_engine_pipeline'
).build_pipeline_definition(),
environment_dict={'storage': {'filesystem': {}}},
run_config=RunConfig(executor_config=DaskConfig(timeout=30)),
)
assert result.result_for_solid('simple').output_value() == 1

@@ -29,10 +39,12 @@ def dask_composite_pipeline():


def test_composite_execute():
result = execute_on_dask(
ExecutionTargetHandle.for_pipeline_python_file(__file__, 'dask_composite_pipeline'),
env_config={'storage': {'filesystem': {}}},
dask_config=DaskConfig(timeout=30),
result = execute_pipeline(
ExecutionTargetHandle.for_pipeline_python_file(
__file__, 'dask_composite_pipeline'
).build_pipeline_definition(),
environment_dict={'storage': {'filesystem': {}}},
run_config=RunConfig(executor_config=DaskConfig(timeout=30)),
)
assert result.success

@@ -56,10 +68,12 @@ def test_pandas_dask():
}
}

result = execute_on_dask(
ExecutionTargetHandle.for_pipeline_python_file(__file__, pandas_pipeline.name),
env_config={'storage': {'filesystem': {}}, **environment_dict},
dask_config=DaskConfig(timeout=30),
result = execute_pipeline(
ExecutionTargetHandle.for_pipeline_python_file(
__file__, pandas_pipeline.name
).build_pipeline_definition(),
environment_dict={'storage': {'filesystem': {}}, **environment_dict},
run_config=RunConfig(executor_config=DaskConfig(timeout=30)),
)

assert result.success
@@ -1,5 +1,7 @@
from __future__ import absolute_import
from collections import namedtuple
import abc

import copy
import logging
import os
@@ -30,9 +32,10 @@
from dagster_graphql.implementation.pipeline_run_storage import PipelineRun


class PipelineExecutionManager(object):
class PipelineExecutionManager(six.with_metaclass(abc.ABCMeta)):
@abc.abstractmethod
def execute_pipeline(self, handle, pipeline, pipeline_run, raise_on_error):
raise NotImplementedError()
'''Subclasses must implement this method.'''


def build_synthetic_pipeline_error_record(run_id, error_info, pipeline_name):
@@ -8,7 +8,7 @@
from dagster.core.types.field_utils import FieldImpl, check_opt_field_param, _ConfigComposite
from dagster.core.types.iterate_types import iterate_config_types
from dagster.core.types.runtime import construct_runtime_type_dictionary
from dagster.utils import camelcase, single_item
from dagster.utils import camelcase, ensure_single_item

from .dependency import DependencyStructure, Solid, SolidHandle, SolidInputHandle
from .logger import LoggerDefinition
@@ -82,7 +82,7 @@ def _is_selector_field_optional(config_type):
if len(config_type.fields) > 1:
return False
else:
_name, field = single_item(config_type.fields)
_name, field = ensure_single_item(config_type.fields)
return field.is_optional


0 comments on commit 596d7c1

Please sign in to comment.
You can’t perform that action at this time.