Skip to content
Permalink
Browse files

Dask and docker-compose

Test Plan: docker-compose

Reviewers: schrockn, max, alangenfeld

Reviewed By: max

Differential Revision: https://dagster.phacility.com/D232
  • Loading branch information...
natekupp committed May 29, 2019
1 parent 596cf21 commit 072eafd81fc4571c263e2861a16b011e6351c24b
@@ -7,6 +7,9 @@ RUN apt-get update; \
curl -sL https://deb.nodesource.com/setup_11.x | bash -; \
apt-get install -y nodejs

# install rsync
RUN apt-get install -y rsync

# install yarn
RUN curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add -; \
echo "deb https://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list; \
@@ -27,11 +27,11 @@ class SupportedPython:
SupportedPython.V2_7: "python:2.7.16-stretch",
}

INTEGRATION_IMAGE_MAP = {
SupportedPython.V3_7: "dagster/buildkite-integration:py3.7.3",
SupportedPython.V3_6: "dagster/buildkite-integration:py3.6.8",
SupportedPython.V3_5: "dagster/buildkite-integration:py3.5.7",
SupportedPython.V2_7: "dagster/buildkite-integration:py2.7.16",
IMAGE_VERSION_MAP = {
SupportedPython.V3_7: "3.7.3",
SupportedPython.V3_6: "3.6.8",
SupportedPython.V3_5: "3.5.7",
SupportedPython.V2_7: "2.7.16",
}

TOX_MAP = {
@@ -73,7 +73,8 @@ def on_python_image(self, ver, env=None):

def on_integration_image(self, ver, env=None):
settings = self.base_docker_settings()
settings["image"] = INTEGRATION_IMAGE_MAP[ver]
# version like dagster/buildkite-integration:py3.7.3
settings["image"] = "dagster/buildkite-integration:py" + IMAGE_VERSION_MAP[ver]
# map the docker socket to enable docker to be run from inside docker
settings["volumes"] = ["/var/run/docker.sock:/var/run/docker.sock"]

@@ -83,6 +84,14 @@ def on_integration_image(self, ver, env=None):
self._step["plugins"] = [{DOCKER_PLUGIN: settings}]
return self

def with_timeout(self, num_minutes):
self._step["timeout_in_minutes"] = num_minutes
return self

def with_retry(self, num_retries):
self._step["retry"] = {'automatic': {'limit': num_retries}}
return self

def build(self):
return self._step

@@ -237,6 +246,45 @@ def gcp_tests():
)


def dask_tests():
tests = []
for version in SupportedPythons:
coverage = ".coverage.dagster-dask.{version}.$BUILDKITE_BUILD_ID".format(version=version)
tests.append(
StepBuilder("dagster-dask tests ({ver})".format(ver=TOX_MAP[version]))
.run(
"pushd python_modules/dagster-dask/dagster_dask_tests/dask-docker",
"./build.sh " + IMAGE_VERSION_MAP[version],
# Run the docker-compose dask cluster
"export PYTHON_VERSION=\"{ver}\"".format(ver=IMAGE_VERSION_MAP[version]),
"docker-compose up -d --remove-orphans",
# hold onto your hats, this is docker networking at its best. First, we figure out
# the name of the currently running container...
"export CONTAINER_ID=`cut -c9- < /proc/1/cpuset`",
r'export CONTAINER_NAME=`docker ps --filter "id=\${CONTAINER_ID}" --format "{{.Names}}"`',
# then, we dynamically bind this container into the dask user-defined bridge
# network to make the dask containers visible...
r"docker network connect dask \${CONTAINER_NAME}",
# Now, we grab the IP address of the dask-scheduler container from within the dask
# bridge network and export it; this will let the tox tests talk to the scheduler.
"export DASK_ADDRESS=`docker inspect --format '{{ .NetworkSettings.Networks.dask.IPAddress }}' dask-scheduler`",
"popd",
"pushd python_modules/dagster-dask/",
"pip install tox",
"tox -e {ver}".format(ver=TOX_MAP[version]),
"mv .coverage {file}".format(file=coverage),
"buildkite-agent artifact upload {file}".format(file=coverage),
)
.on_integration_image(
version, ['AWS_SECRET_ACCESS_KEY', 'AWS_ACCESS_KEY_ID', 'AWS_DEFAULT_REGION']
)
.with_timeout(5)
.with_retry(3)
.build()
)
return tests


if __name__ == "__main__":
steps = [
StepBuilder("pylint")
@@ -284,10 +332,11 @@ def gcp_tests():
steps += airline_demo_tests()
steps += events_demo_tests()
steps += airflow_tests()
steps += dask_tests()

steps += python_modules_tox_tests("dagster")
steps += python_modules_tox_tests("dagit", ["apt-get update", "apt-get install -y xdg-utils"])
steps += python_modules_tox_tests("dagster-graphql")
steps += python_modules_tox_tests("dagster-dask")
steps += python_modules_tox_tests("dagstermill")

for library in LIBRARY_MODULES:
@@ -41,7 +41,7 @@ ENV JAVA_HOME /docker-java-home

# Updated from base image
ENV JAVA_VERSION 8u212
ENV JAVA_DEBIAN_VERSION 8u212-b01-1~deb9u1
ENV JAVA_DEBIAN_VERSION 8u212-b03-2~deb9u1

RUN set -ex; \
# deal with slim variants not having man page directories (which causes "update-alternatives" to fail)
@@ -22,8 +22,12 @@
def query_on_dask_worker(handle, query, variables, dependencies): # pylint: disable=unused-argument
'''Note that we need to pass "dependencies" to ensure Dask sequences futures during task
scheduling, even though we do not use this argument within the function.
We also pass in 'raise_on_error' here, because otherwise (currently) very little information
is propagated to the dask master from the workers about the state of execution; we should at
least inform the user of exceptions.
'''
res = execute_query(handle, query, variables)
res = execute_query(handle, query, variables, raise_on_error=True)
handle_errors(res)
return handle_result(res)

@@ -0,0 +1,15 @@
ARG PYTHON_VERSION

FROM python:$PYTHON_VERSION

ADD dagster dagster
ADD dagster-graphql dagster-graphql
ADD dagster-dask dagster-dask
ADD dagster-aws dagster-aws
ADD examples examples

RUN pip install -e dagster/
RUN pip install -e dagster-graphql/
RUN pip install -e dagster-dask/
RUN pip install -e dagster-aws/
RUN pip install -e examples/
@@ -0,0 +1,47 @@
#! /bin/bash
# For the avoidance of doubt, this script is meant to be run with the
# test_project directory as pwd.
# The filesystem manipulation below is to support installing local development
# versions of dagster-graphql, dagster-aws, dagster-dask, and dagster.

set -eux

ROOT=$(git rev-parse --show-toplevel)
pushd $ROOT/python_modules/dagster-dask/dagster_dask_tests/dask-docker/

function cleanup {
rm -rf dagster
rm -rf dagster-graphql
rm -rf dagster-dask
rm -rf dagster-aws
rm -rf examples
}
# ensure cleanup happens on error or normal exit
trap cleanup EXIT

cp -R ../../../dagster .
cp -R ../../../dagster-graphql .
rsync -av --progress ../../../dagster-dask . --exclude dagster_dask_tests
cp -R ../../../libraries/dagster-aws .
cp -R ../../../../examples .


rm -rf \
dagster/*.egg-info \
dagster/.tox \
dagster/build \
dagster/dist \
dagster-graphql/*.egg-info \
dagster-graphql/.tox \
dagster-graphql/build \
dagster-graphql/dist \
dagster-dask/*.egg-info \
dagster-dask/.tox \
dagster-dask/build \
dagster-dask/dist \
dagster-aws/*.egg-info \
dagster-aws/.tox \
dagster-aws/build \
dagster-aws/dist || true

docker build . --build-arg PYTHON_VERSION=$1 -t dagster-dask-test:py$1
@@ -0,0 +1,35 @@
version: '3.7'

services:
dask-scheduler:
build:
context: ./
dockerfile: Dockerfile
image: dagster-dask-test:py${PYTHON_VERSION}
hostname: dask-scheduler
container_name: dask-scheduler
ports:
- "8786:8786"
- "8787:8787"
environment:
AWS_ACCESS_KEY_ID:
AWS_SECRET_ACCESS_KEY:
command: ["dask-scheduler"]
networks:
- dask

dask-worker:
image: dagster-dask-test:py${PYTHON_VERSION}
hostname: dask-worker
container_name: dask-worker
environment:
AWS_ACCESS_KEY_ID:
AWS_SECRET_ACCESS_KEY:
command: ["dask-worker", "tcp://dask-scheduler:8786"]
networks:
- dask

networks:
dask:
driver: bridge
name: dask
@@ -0,0 +1,17 @@
import os

from dagster import ExecutionTargetHandle, RunConfig, RunStorageMode
from dagster_dask import execute_on_dask, DaskConfig


def test_dask_cluster():
result = execute_on_dask(
ExecutionTargetHandle.for_pipeline_module(
'dagster_examples.toys.hammer', 'define_hammer_pipeline'
),
env_config={'storage': {'s3': {'s3_bucket': 'dagster-airflow-scratch'}}},
run_config=RunConfig(storage_mode=RunStorageMode.S3),
dask_config=DaskConfig(address='%s:8786' % os.getenv('DASK_ADDRESS')),
)
assert result.success
assert result.result_for_solid('total').transformed_value() == 4
@@ -1,3 +1,4 @@
bokeh
dagster
dagster_graphql
dask==1.2.2
@@ -43,7 +43,13 @@ def _do_setup(name='dagster-dask'):
'Operating System :: OS Independent',
],
packages=find_packages(exclude=['test']),
install_requires=['dagster', 'dagster_graphql', 'dask==1.2.2', 'distributed==1.28.1'],
install_requires=[
'bokeh',
'dagster',
'dagster_graphql',
'dask==1.2.2',
'distributed==1.28.1',
],
zip_safe=False,
)

@@ -2,10 +2,12 @@
envlist = py37,py36,py35,py27

[testenv]
passenv = CIRCLECI CIRCLE_* CI_PULL_REQUEST COVERALLS_REPO_TOKEN
passenv = CIRCLECI CIRCLE_* CI_PULL_REQUEST COVERALLS_REPO_TOKEN DASK_ADDRESS AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID
deps =
-e ../dagster
-e ../dagster-graphql
-e ../../examples
-e ../libraries/dagster-aws
-r ../dagster/dev-requirements.txt
-e .
commands =
@@ -29,7 +29,7 @@ def create_dagster_graphql_cli():
return ui


def execute_query(handle, query, variables=None, pipeline_run_storage=None):
def execute_query(handle, query, variables=None, pipeline_run_storage=None, raise_on_error=False):
check.inst_param(handle, 'handle', ExecutionTargetHandle)
check.str_param(query, 'query')
check.opt_dict_param(variables, 'variables')
@@ -46,6 +46,7 @@ def execute_query(handle, query, variables=None, pipeline_run_storage=None):
handle=handle,
pipeline_runs=pipeline_run_storage,
execution_manager=execution_manager,
raise_on_error=raise_on_error,
version=__version__,
)

0 comments on commit 072eafd

Please sign in to comment.
You can’t perform that action at this time.