Skip to content
Permalink
Browse files

Dask integration

Summary: Add Dask as an execution target

Test Plan: unit

Reviewers: max, alangenfeld

Reviewed By: max, alangenfeld

Differential Revision: https://dagster.phacility.com/D137
  • Loading branch information...
natekupp committed May 15, 2019
1 parent d538c58 commit cf4145d2091910a89e3d3b6b0a9d2a0bf228eb30
@@ -258,6 +258,7 @@ def examples_tests():
steps += python_modules_tox_tests("dagster")
steps += python_modules_tox_tests("dagit", ["apt-get update", "apt-get install -y xdg-utils"])
steps += python_modules_tox_tests("dagster-graphql")
steps += python_modules_tox_tests("dagster-dask")
steps += python_modules_tox_tests("dagstermill")
steps += python_modules_tox_tests("libraries/dagster-pandas")
steps += python_modules_tox_tests("libraries/dagster-ge")
@@ -114,4 +114,8 @@ python_modules/dagster/docs/_build
dagit_run_logs

python_modules/dagit/node_modules/
python_modules/dagit/yarn.lock
python_modules/dagit/yarn.lock


# Dask
dask-worker-space
@@ -29,6 +29,7 @@ install_dev_python_modules:
pip install -r python_modules/dagit/dev-requirements.txt
pip install -e python_modules/dagstermill
SLUGIFY_USES_TEXT_UNIDECODE=yes pip install -e python_modules/dagster-airflow
pip install -e python_modules/dagster-dask
pip install -e python_modules/libraries/dagster-aws
pip install -r python_modules/libraries/dagster-aws/dev-requirements.txt
pip install -e python_modules/libraries/dagster-gcp
@@ -9,17 +9,16 @@
InputDefinition,
Int,
ModeDefinition,
MultiprocessExecutorConfig,
OutputDefinition,
PipelineDefinition,
Result,
SolidInstance,
RunConfig,
execute_pipeline,
lambda_solid,
solid,
RunStorageMode,
)
from dagster_dask import execute_on_dask


@solid(
@@ -56,7 +55,7 @@ def hammer(context, chase_duration):


@solid(
config_field=Field(Int, is_optional=True, default_value=5),
config_field=Field(Int, is_optional=True, default_value=1),
outputs=[
OutputDefinition(Int, 'out_1'),
OutputDefinition(Int, 'out_2'),
@@ -116,15 +115,9 @@ def define_hammer_pipeline():


if __name__ == '__main__':
pipeline = define_hammer_pipeline()
result = execute_pipeline(
pipeline,
run_config=RunConfig(
executor_config=MultiprocessExecutorConfig(
handle=ExecutionTargetHandle.for_pipeline_fn(define_hammer_pipeline)
),
storage_mode=RunStorageMode.FILESYSTEM,
),
result = execute_on_dask(
ExecutionTargetHandle.for_pipeline_fn(define_hammer_pipeline),
env_config={'storage': {'filesystem': {}}},
run_config=RunConfig(storage_mode=RunStorageMode.FILESYSTEM),
)

print('Total Hammer Time: ', result.result_for_solid('total').transformed_value())
@@ -0,0 +1,33 @@
# dagster-dask

##Introduction
This library provides an integration with Dask / Dask.Distributed, to support distributed execution of Dagster workloads. It is still early, and has some limitations which are discussed below.

Presently, it provides a single API, `execute_on_dask`, which can execute a Dagster pipeline on either local Dask or a remote Dask cluster.

## Requirements
To use `dagster-dask`, you'll need to [install Dask / Dask.Distributed](https://distributed.readthedocs.io/en/latest/install.html).

If you want to use a cluster for distributed execution, you'll need to [set up a Dask cluster](https://distributed.readthedocs.io/en/latest/quickstart.html#setup-dask-distributed-the-hard-way). Note that you'll need to ensure that Dagster can access the host/port on which the Dask scheduler is running.

## Getting Started
There is a simple example of how to use this library in [the tests folder](https://github.com/dagster-io/dagster/tree/master/python_modules/dagster-dask/dagster_dask_tests/test_execute.py). This example showcases how to set up a "hello, world" with local Dask execution.

For distributed execution on a Dask cluster, you'll just need to provide a `DaskConfig` object with the address/port of the Dask scheduler:

```
execute_on_dask(
ExecutionTargetHandle.for_pipeline_fn(define_pipeline),
env_config={'storage': {'s3': {}}},
run_config=RunConfig(storage_mode=RunStorageMode.S3),
dask_config=DaskConfig(address='dask_scheduler.dns-name:8787')
)
```


## Limitations
* Presently, `dagster-dask` does not support launching Dask workloads from Dagit.
* For distributed execution, you must use S3 for intermediates and run storage, as shown above.
* Dagster logs are not yet retrieved from Dask workers; this will be addressed in follow-up work.

While this library is still nascent, we're working to improve it, and we are happy to accept contributions!
@@ -0,0 +1,6 @@
from .version import __version__

from .config import DaskConfig
from .execute import execute_on_dask

__all__ = ['execute_on_dask', 'DaskConfig']
@@ -0,0 +1,61 @@
import re

from collections import namedtuple

from dagster import check


class DaskConfig(
namedtuple('DaskConfig', 'address timeout scheduler_file direct_to_workers heartbeat_interval')
):
'''DaskConfig - configuration for the Dask execution engine
'''

def __new__(
cls,
address=None,
timeout=None,
scheduler_file=None,
direct_to_workers=False,
heartbeat_interval=None,
):
return super(DaskConfig, cls).__new__(
cls,
# The address of a ``Scheduler`` server like a string '127.0.0.1:8786'
address=check.opt_str_param(address, 'address'),
# Timeout duration for initial connection to the scheduler
timeout=check.opt_int_param(timeout, 'timeout'),
# Path to a file with scheduler information if available
scheduler_file=check.opt_str_param(scheduler_file, 'scheduler_file'),
# Whether or not to connect directly to the workers, or to ask the scheduler to serve as
# intermediary.
direct_to_workers=check.opt_bool_param(direct_to_workers, 'direct_to_workers'),
# Time in milliseconds between heartbeats to scheduler
heartbeat_interval=check.opt_int_param(heartbeat_interval, 'heartbeat_interval'),
)

@property
def is_remote_execution(self):
return self.address and not re.match(r'127\.0\.0\.1|0\.0\.0\.0|localhost', self.address)

def build_dict(self, pipeline_name):
'''Returns a dict we can use for kwargs passed to dask client instantiation.
Intended to be used like:
with dask.distributed.Client(**cfg.build_dict()) as client:
<< use client here >>
'''
dask_cfg = {'name': pipeline_name}
for cfg_param in [
'address',
'timeout',
'scheduler_file',
'direct_to_workers',
'heartbeat_interval',
]:
cfg_value = getattr(self, cfg_param, None)
if cfg_value:
dask_cfg[cfg_param] = cfg_value
return dask_cfg
@@ -0,0 +1,206 @@
import itertools

import dask
import dask.distributed

from dagster import check, ExecutionTargetHandle, RunConfig, RunStorageMode

from dagster.core.execution.api import create_execution_plan, scoped_pipeline_context
from dagster.core.execution.results import PipelineExecutionResult

from dagster_graphql.cli import execute_query
from dagster_graphql.util import (
dagster_event_from_dict,
get_log_message_event_fragment,
get_step_event_fragment,
)

from .config import DaskConfig
from .query import QUERY_TEMPLATE


def query_on_dask_worker(handle, query, variables, dependencies): # pylint: disable=unused-argument
'''Note that we need to pass "dependencies" to ensure Dask sequences futures during task
scheduling, even though we do not use this argument within the function.
'''
res = execute_query(handle, query, variables)
handle_errors(res)
return handle_result(res)


def handle_errors(res):
if res.get('errors'):
raise Exception('Internal error in GraphQL request. Response: {}'.format(res))

if not res.get('data', {}).get('startPipelineExecution', {}).get('__typename'):
raise Exception('Unexpected response type. Response: {}'.format(res))


def handle_result(res):
res_data = res['data']['startPipelineExecution']

res_type = res_data['__typename']

if res_type == 'InvalidStepError':
raise Exception('invalid step {step_key}'.format(step_key=res_data['invalidStepKey']))

if res_type == 'InvalidOutputError':
raise Exception(
'invalid output {output} for step {step_key}'.format(
output=res_data['invalidOutputName'], step_key=res_data['stepKey']
)
)

if res_type == 'PipelineConfigValidationInvalid':
errors = [err['message'] for err in res_data['errors']]
raise Exception(
'Pipeline configuration invalid:\n{errors}'.format(errors='\n'.join(errors))
)

if res_type == 'PipelineNotFoundError':
raise Exception(
'Pipeline "{pipeline_name}" not found: {message}:'.format(
pipeline_name=res_data['pipelineName'], message=res_data['message']
)
)

if res_type == 'PythonError':
raise Exception(
'Subplan execution failed: {message}\n{stack}'.format(
message=res_data['message'], stack=res_data['stack']
)
)

if res_type == 'StartPipelineExecutionSuccess':
pipeline_name = res_data['run']['pipeline']['name']

skip_events = {
'LogMessageEvent',
'PipelineStartEvent',
'PipelineSuccessEvent',
'PipelineInitFailureEvent',
'PipelineFailureEvent',
}

return [
dagster_event_from_dict(e, pipeline_name)
for e in res_data['run']['logs']['nodes']
if e['__typename'] not in skip_events
]

raise Exception('unexpected result type')


def build_graphql_query():
log_message_event_fragment = get_log_message_event_fragment()
step_event_fragment = get_step_event_fragment()

return '\n'.join(
(
QUERY_TEMPLATE.format(
step_event_fragment=step_event_fragment.include_key,
log_message_event_fragment=log_message_event_fragment.include_key,
),
step_event_fragment.fragment,
log_message_event_fragment.fragment,
)
)


def execute_on_dask(
handle, env_config=None, run_config=None, mode=None, dask_config=None
): # pylint: disable=too-many-locals
check.inst_param(handle, 'handle', ExecutionTargetHandle)

env_config = check.opt_dict_param(env_config, 'env_config', key_type=str)
dask_config = check.opt_inst_param(dask_config, 'dask_config', DaskConfig, DaskConfig())
run_config = check.opt_inst_param(
run_config, 'run_config', RunConfig, RunConfig(storage_mode=RunStorageMode.FILESYSTEM)
)
pipeline = handle.build_pipeline_definition()
mode = check.opt_str_param(mode, 'mode', pipeline.get_default_mode_name())

# Checks to ensure storage is compatible with Dask configuration
storage = env_config.get('storage')
check.invariant(storage.keys(), 'Must specify storage to use Dask execution')

if dask_config.is_remote_execution:
check.invariant(
storage.get('s3'),
'Must use S3 storage with non-local Dask address {dask_address}'.format(
dask_address=dask_config.address
),
)
else:
check.invariant(
not storage.get('in_memory'),
'Cannot use in-memory storage with Dask, use filesystem or S3',
)

execution_plan = create_execution_plan(pipeline, env_config, mode=mode)

step_levels = execution_plan.topological_step_levels()

query = build_graphql_query()

with scoped_pipeline_context(pipeline, env_config, run_config) as pipeline_context:
with dask.distributed.Client(**dask_config.build_dict(pipeline.name)) as client:
execution_futures = []
execution_futures_dict = {}

for step_level in step_levels:
for step in step_level:
step_context = pipeline_context.for_step(step)

check.invariant(
not step_context.run_config.loggers,
'Cannot inject loggers via RunConfig with the Dask executor',
)

check.invariant(
not step_context.event_callback,
'Cannot use event_callback with Dask executor',
)

# We ensure correctness in sequencing by letting Dask schedule futures and
# awaiting dependencies within each step.
dependencies = [
execution_futures_dict[ni.prev_output_handle.step_key]
for ni in step.step_inputs
]

variables = {
'executionParams': {
'selector': {'name': pipeline.name},
'environmentConfigData': env_config,
'mode': mode,
'executionMetadata': {'runId': run_config.run_id},
'stepKeys': [step.key],
}
}

future = client.submit(
query_on_dask_worker, handle, query, variables, dependencies
)

execution_futures.append(future)
execution_futures_dict[step.key] = future

# This tells Dask to awaits the step executions and retrieve their results to the master
execution_step_events = client.gather(execution_futures)

# execution_step_events is now a list of lists, the inner lists contain the dagster
# events emitted by each step
event_list = list(itertools.chain.from_iterable(execution_step_events))

return PipelineExecutionResult(
pipeline,
run_config.run_id,
event_list,
lambda: scoped_pipeline_context(
pipeline,
env_config,
run_config,
intermediates_manager=pipeline_context.intermediates_manager,
),
)

0 comments on commit cf4145d

Please sign in to comment.
You can’t perform that action at this time.