Skip to content
Permalink
Browse files

[RFC] DagsterInstance

Summary:
`DagsterInstance` represents the set of Dagster systems that are interacted with when executing a pipeline but may exist beyond the duration of an execution. The initial motivating systems that we have today are `run_storage` and `event_storage`.

If you want to execute a pipeline and persist no history of it such as for test purposes - `DagsterInstance.ephemeral()` is used. This persists nothing about runs to disk and uses a temporary directory for intermediates/file_manager storage.

If you want to execute a pipeline that uses the files system to persist information across runs such as for re-execution, but once that test is over you want everything cleaned up, `DagsterInstance.local_temp()` is used.

Tools like `dagit` and `dagster-graphql` on the other hand will use `DagsterInstance.get()` which in the common case uses `$DAGSTER_HOME` and `$DAGSTER_HOME/dagster.cfg` to determine the configuration for the `DagsterInstance` to use. This gives you a consistent local experience across uses of the tools.

For easier onboarding - when a someone uses dagit for the first time we don't force them to set $DAGSTER_HOME and instead hold open a temp directory in the "watchdog" `dagit.py` parent process that can stay open across restarts allowing someone to maintain a temporary history of runs while debugging an issue which is all cleaned up when the dagit parent watchdog process exits.

In this diff `DagsterInstance` starts alleviating a lot of confusing data flow and cross object communication by giving us a place to start flowing data in one direction about runs and events occurring in a particular `DagsterInstance`. This is key to accelerating our construction of operational tools that allow users to monitor these events. This set up gives us a pretty clear place to slot in "remote" instance types which will be a common setup for "production" deployments of dagster.

Test Plan: unit tests

Reviewers: #ft, schrockn

Reviewed By: #ft, schrockn

Subscribers: max, schrockn

Differential Revision: https://dagster.phacility.com/D936
  • Loading branch information...
alangenfeld committed Sep 4, 2019
1 parent 235d126 commit 0608e0ad9c19b9bb1ee4b9a43c8e9ca948046a4b
Showing with 1,196 additions and 1,305 deletions.
  1. +0 −1 .gitignore
  2. +6 −9 examples/dagster_examples_tests/airline_demo_tests/test_types.py
  3. +2 −2 examples/dagster_examples_tests/graphql_tests/util.py
  4. +2 −3 examples/dagster_examples_tests/tutorial_tests/test_cli_invocations.py
  5. +6 −21 python_modules/dagit/dagit/app.py
  6. +10 −46 python_modules/dagit/dagit/cli.py
  7. +12 −0 python_modules/dagit/dagit/dagit.py
  8. +7 −14 python_modules/dagit/dagit_tests/test_app.py
  9. +2 −3 python_modules/dagit/dagit_tests/test_smoke.py
  10. +12 −10 python_modules/dagster-dask/dagster_dask_tests/test_execute.py
  11. +8 −49 python_modules/dagster-graphql/dagster_graphql/cli.py
  12. +2 −0 python_modules/dagster-graphql/dagster_graphql/client/mutations.py
  13. +10 −14 python_modules/dagster-graphql/dagster_graphql/implementation/context.py
  14. +31 −31 python_modules/dagster-graphql/dagster_graphql/implementation/execution.py
  15. +4 −6 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py
  16. +24 −25 python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
  17. +6 −4 python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_run_storage.py
  18. +1 −1 python_modules/dagster-graphql/dagster_graphql/schema/pipelines.py
  19. +7 −3 python_modules/dagster-graphql/dagster_graphql/schema/roots.py
  20. +4 −4 python_modules/dagster-graphql/dagster_graphql/schema/runs.py
  21. +4 −9 python_modules/dagster-graphql/dagster_graphql_tests/client_tests/test_util.py
  22. +5 −5 python_modules/dagster-graphql/dagster_graphql_tests/graphql/setup.py
  23. +7 −4 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_execute_pipeline.py
  24. +17 −11 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_execution_plan.py
  25. +2 −2 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_misc.py
  26. +45 −49 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs.py
  27. +4 −2 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py
  28. +14 −32 python_modules/dagster-graphql/dagster_graphql_tests/test_cli.py
  29. +101 −101 python_modules/dagster-graphql/dagster_graphql_tests/test_multiprocessing.py
  30. +2 −2 python_modules/dagster-graphql/dagster_graphql_tests/test_python_error.py
  31. +0 −1 python_modules/dagster-graphql/setup.py
  32. +2 −2 python_modules/dagster/dagster/cli/__init__.py
  33. +5 −5 python_modules/dagster/dagster/cli/run.py
  34. +5 −11 python_modules/dagster/dagster/cli/schedule.py
  35. +1 −3 python_modules/dagster/dagster/core/definitions/system_storage.py
  36. +15 −3 python_modules/dagster/dagster/core/engine/engine_multiprocess.py
  37. +10 −0 python_modules/dagster/dagster/core/events/__init__.py
  38. +39 −17 python_modules/dagster/dagster/core/execution/api.py
  39. +4 −4 python_modules/dagster/dagster/core/execution/config.py
  40. +6 −6 python_modules/dagster/dagster/core/execution/context/system.py
  41. +27 −24 python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
  42. +40 −42 python_modules/dagster/dagster/core/execution/logs.py
  43. +253 −0 python_modules/dagster/dagster/core/instance/__init__.py
  44. +6 −0 python_modules/dagster/dagster/core/instance/features.py
  45. +18 −1 python_modules/dagster/dagster/core/serdes/__init__.py
  46. +0 −16 python_modules/dagster/dagster/core/storage/config.py
  47. +9 −25 python_modules/dagster/dagster/core/storage/event_log.py
  48. +7 −12 python_modules/dagster/dagster/core/storage/file_manager.py
  49. +4 −1 python_modules/dagster/dagster/core/storage/init.py
  50. +11 −25 python_modules/dagster/dagster/core/storage/intermediate_store.py
  51. +13 −1 python_modules/dagster/dagster/core/storage/intermediates_manager.py
  52. +10 −141 python_modules/dagster/dagster/core/storage/pipeline_run.py
  53. +107 −100 python_modules/dagster/dagster/core/storage/runs.py
  54. +21 −13 python_modules/dagster/dagster/core/storage/system_storage.py
  55. +3 −1 python_modules/dagster/dagster/seven/__init__.py
  56. +6 −1 python_modules/dagster/dagster/seven/temp_dir.py
  57. +0 −87 python_modules/dagster/dagster/utils/__init__.py
  58. +13 −6 python_modules/dagster/dagster/utils/test.py
  59. +0 −15 python_modules/dagster/dagster_tests/cli_tests/test_cli_commands.py
  60. +51 −17 ..._modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_execution_plan_reexecution.py
  61. +8 −3 python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_execution_plan_subset.py
  62. +9 −8 python_modules/dagster/dagster_tests/core_tests/storage_tests/test_local_file_manager.py
  63. +13 −7 python_modules/dagster/dagster_tests/core_tests/test_external_execution_plan.py
  64. +46 −111 python_modules/dagster/dagster_tests/core_tests/test_intermediate_store.py
  65. +4 −2 python_modules/dagster/dagster_tests/core_tests/test_pipeline_execution.py
  66. +2 −0 python_modules/dagster/dagster_tests/core_tests/test_resource_definition.py
  67. +17 −69 python_modules/dagster/dagster_tests/core_tests/test_run_storage.py
  68. +4 −3 python_modules/dagster/dagster_tests/core_tests/test_stdout.py
  69. +5 −1 python_modules/dagster/setup.py
  70. +4 −0 python_modules/dagstermill/dagstermill/manager.py
  71. +3 −6 python_modules/libraries/dagster-aws/dagster_aws/cli/cli.py
  72. +0 −2 python_modules/libraries/dagster-aws/dagster_aws/s3/system_storage.py
  73. +35 −39 python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_intermediate_store.py
  74. +1 −2 python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py
  75. +2 −8 python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py
  76. +0 −1 python_modules/libraries/dagster-cron/setup.py
@@ -55,7 +55,6 @@ coverage.xml
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
@@ -16,18 +16,14 @@
execute_pipeline,
file_relative_path,
pipeline,
seven,
solid,
)
from dagster.core.instance import DagsterInstance
from dagster.core.storage.intermediate_store import FileSystemIntermediateStore

from .test_solids import spark_mode

try:
# Python 2 tempfile doesn't have tempfile.TemporaryDirectory
import backports.tempfile as tempfile
except ImportError:
import tempfile


def test_spark_data_frame_serialization_file_system_file_handle():
@solid
@@ -40,13 +36,14 @@ def spark_df_test_pipeline():
ingest_csv_file_handle_to_spark(nonce())

run_id = str(uuid.uuid4())

intermediate_store = FileSystemIntermediateStore(run_id=run_id)
instance = DagsterInstance.ephemeral()
intermediate_store = FileSystemIntermediateStore.for_instance(instance, run_id=run_id)

result = execute_pipeline(
spark_df_test_pipeline,
run_config=RunConfig(run_id=run_id, mode='spark'),
environment_dict={'storage': {'filesystem': {}}},
instance=instance,
)

assert result.success
@@ -128,7 +125,7 @@ def passthrough_df(_context, df):
def passthrough():
passthrough_df(emit()) # pylint: disable=no-value-for-parameter

with tempfile.TemporaryDirectory() as tempdir:
with seven.TemporaryDirectory() as tempdir:
file_name = os.path.join(tempdir, 'output.csv')
result = execute_pipeline(
passthrough,
@@ -2,13 +2,13 @@
from dagster_graphql.implementation.pipeline_execution_manager import SynchronousExecutionManager

from dagster import ExecutionTargetHandle
from dagster.core.storage.runs import InMemoryRunStorage
from dagster.core.instance import DagsterInstance


def define_examples_context(raise_on_error=True):
return DagsterGraphQLContext(
handle=ExecutionTargetHandle.for_repo_module('dagster_examples', 'define_demo_repo'),
pipeline_runs=InMemoryRunStorage(),
instance=DagsterInstance.ephemeral(),
execution_manager=SynchronousExecutionManager(),
raise_on_error=raise_on_error,
)
@@ -8,7 +8,7 @@
from dagster import DagsterInvalidConfigError
from dagster.cli.load_handle import handle_for_repo_cli_args
from dagster.cli.pipeline import pipeline_execute_command
from dagster.core.storage.runs import InMemoryRunStorage
from dagster.core.instance import DagsterInstance
from dagster.utils import DEFAULT_REPOSITORY_YAML_FILENAME, script_relative_path

PIPELINES_OR_ERROR_QUERY = '{ pipelinesOrError { ... on PipelineConnection { nodes { name } } } }'
@@ -20,9 +20,8 @@ def path_to_tutorial_file(path):

def load_dagit_for_repo_cli_args(n_pipelines=1, **kwargs):
handle = handle_for_repo_cli_args(kwargs)
pipeline_run_storage = InMemoryRunStorage()

app = create_app(handle, pipeline_run_storage)
app = create_app(handle, DagsterInstance.ephemeral())

client = app.test_client()

@@ -18,9 +18,7 @@
from six import text_type

from dagster import ExecutionTargetHandle, check, seven
from dagster.core.scheduler import Scheduler
from dagster.core.storage.runs import RunStorage
from dagster.utils import Features
from dagster.core.instance import DagsterInstance
from dagster.utils.log import get_stack_trace_array

from .subscription_server import DagsterSubscriptionServer
@@ -119,10 +117,9 @@ def notebook_view(request_args):
return '<style>' + resources['inlining']['css'][0] + '</style>' + body, 200


def create_app(handle, pipeline_run_storage, scheduler=None):
def create_app(handle, instance):
check.inst_param(handle, 'handle', ExecutionTargetHandle)
check.inst_param(pipeline_run_storage, 'pipeline_run_storage', RunStorage)
check.opt_inst_param(scheduler, 'scheduler', Scheduler)
check.inst_param(instance, 'instance', DagsterInstance)

app = Flask('dagster-ui')
sockets = Sockets(app)
@@ -135,21 +132,9 @@ def create_app(handle, pipeline_run_storage, scheduler=None):

print('Loading repository...')

if Features.SCHEDULER.is_enabled:
context = DagsterGraphQLContext(
handle=handle,
execution_manager=execution_manager,
pipeline_runs=pipeline_run_storage,
scheduler=scheduler, # Add schedule argument
version=__version__,
)
else:
context = DagsterGraphQLContext(
handle=handle,
execution_manager=execution_manager,
pipeline_runs=pipeline_run_storage,
version=__version__,
)
context = DagsterGraphQLContext(
handle=handle, instance=instance, execution_manager=execution_manager, version=__version__
)

app.add_url_rule(
'/graphql',
@@ -9,14 +9,8 @@
from dagster import ExecutionTargetHandle, check
from dagster.cli.load_handle import handle_for_repo_cli_args
from dagster.cli.pipeline import repository_target_argument
from dagster.core.storage.runs import FilesystemRunStorage, InMemoryRunStorage
from dagster.utils import (
DEFAULT_REPOSITORY_YAML_FILENAME,
Features,
dagster_logs_dir_for_handle,
dagster_schedule_dir_for_handle,
is_dagster_home_set,
)
from dagster.core.instance import DagsterInstance
from dagster.utils import DEFAULT_REPOSITORY_YAML_FILENAME

from .app import create_app
from .version import __version__
@@ -55,66 +49,36 @@ def create_dagit_cli():
@click.option('--host', '-h', type=click.STRING, default='127.0.0.1', help="Host to run server on")
@click.option('--port', '-p', type=click.INT, default=3000, help="Port to run server on")
@click.option(
'--log',
is_flag=True,
help=(
'Record logs of pipeline runs. Use --log-dir to specify the directory to record logs to. '
'By default, logs will be stored under $DAGSTER_HOME.'
),
'--storage-fallback',
help="Base directory for dagster storage if $DAGSTER_HOME is not set",
default=None,
)
@click.option('--log-dir', help="Directory to record logs to", default=None)
@click.option('--schedule-dir', help="Directory to record logs to", default=None)
@click.option(
'--no-watch',
is_flag=True,
help='Disable autoreloading when there are changes to the repo/pipeline being served',
)
@click.version_option(version=__version__, prog_name='dagit')
def ui(host, port, log, log_dir, schedule_dir, no_watch=False, **kwargs):
def ui(host, port, storage_fallback, no_watch=False, **kwargs):
handle = handle_for_repo_cli_args(kwargs)

# add the path for the cwd so imports in dynamically loaded code work correctly
sys.path.append(os.getcwd())

if log and not log_dir:
if not is_dagster_home_set():
raise click.UsageError(
'$DAGSTER_HOME is not set and log-dir is not provided. '
'Set the home directory for dagster by exporting DAGSTER_HOME in your '
'.bashrc or .bash_profile, or pass in a default directory using the --log-dir flag '
'\nExamples:'
'\n 1. export DAGSTER_HOME="~/dagster"'
'\n 2. --log --log-dir="/dagster_logs"'
)

log_dir = dagster_logs_dir_for_handle(handle)

if Features.SCHEDULER.is_enabled:
# Don't error if $DAGSTER_HOME is not set
if not schedule_dir and is_dagster_home_set():
schedule_dir = dagster_schedule_dir_for_handle(handle)

check.invariant(
not no_watch,
'Do not set no_watch when calling the Dagit Python CLI directly -- this flag is a no-op '
'at this level and should be set only when invoking dagit/bin/dagit.',
)
host_dagit_ui(log, log_dir, schedule_dir, handle, host, port)
host_dagit_ui(handle, host, port, storage_fallback)


def host_dagit_ui(log, log_dir, schedule_dir, handle, host, port):
def host_dagit_ui(handle, host, port, storage_fallback=None):
check.inst_param(handle, 'handle', ExecutionTargetHandle)

pipeline_run_storage = (
FilesystemRunStorage(base_dir=log_dir, watch=True) if log else InMemoryRunStorage()
)
instance = DagsterInstance.get(storage_fallback)

if Features.SCHEDULER.is_enabled:
repository = handle.build_repository_definition()
scheduler = repository.build_scheduler(schedule_dir=schedule_dir)
app = create_app(handle, pipeline_run_storage, scheduler)
else:
app = create_app(handle, pipeline_run_storage)
app = create_app(handle, instance)

server = pywsgi.WSGIServer((host, port), app, handler_class=WebSocketHandler)
print('Serving on http://{host}:{port}'.format(host=host, port=port))
@@ -7,6 +7,8 @@
from watchdog.observers import Observer
from watchdog.tricks import AutoRestartTrick

from dagster import seven


# Use watchdog's python API to auto-restart the dagit-cli process when
# python files in the current directory change. This is a slightly modified
@@ -38,6 +40,7 @@ def handle_sigterm(_signum, _frame):
def main():
# Build the dagit-cli command, omitting the --no-watch arg if present
watch = True
fallback_set = False
command = ['dagit-cli']
for arg in sys.argv[1:]:
if arg == '--no-watch':
@@ -48,9 +51,18 @@ def main():
elif arg == '--version':
watch = False
command.append(arg)
elif arg == '--storage-fallback':
fallback_set = True
command.append(arg)
else:
command.append(arg)

host_tempdir = None
if not fallback_set:
host_tempdir = seven.TemporaryDirectory()
command.append('--storage-fallback')
command.append(host_tempdir.name)

# If not using watch mode, just call the command
if not watch:
os.execvp(command[0], command)
@@ -3,23 +3,22 @@
from dagit.cli import host_dagit_ui

from dagster import ExecutionTargetHandle
from dagster.core.storage.runs import InMemoryRunStorage
from dagster.core.instance import DagsterInstance
from dagster.seven import mock
from dagster.utils import script_relative_path


def test_create_app():
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
pipeline_run_storage = InMemoryRunStorage()
assert create_app(handle, pipeline_run_storage)
assert create_app(handle, DagsterInstance.ephemeral())


def test_notebook_view():
notebook_path = script_relative_path('render_uuid_notebook.ipynb')

with create_app(
ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml')),
InMemoryRunStorage(),
DagsterInstance.ephemeral(),
).test_client() as client:
res = client.get('/dagit/notebook?path={}'.format(notebook_path))

@@ -31,7 +30,7 @@ def test_notebook_view():
def test_index_view():
with create_app(
ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml')),
InMemoryRunStorage(),
DagsterInstance.ephemeral(),
).test_client() as client:
res = client.get('/')

@@ -42,9 +41,7 @@ def test_index_view():
def test_successful_host_dagit_ui():
with mock.patch('gevent.pywsgi.WSGIServer'):
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
host_dagit_ui(
log=False, log_dir=None, schedule_dir=None, handle=handle, host=None, port=2343
)
host_dagit_ui(handle=handle, host=None, port=2343)


def _define_mock_server(fn):
@@ -68,9 +65,7 @@ def _raise_custom_error():
with mock.patch('gevent.pywsgi.WSGIServer', new=_define_mock_server(_raise_custom_error)):
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
with pytest.raises(AnException):
host_dagit_ui(
log=False, log_dir=None, schedule_dir=None, handle=handle, host=None, port=2343
)
host_dagit_ui(handle=handle, host=None, port=2343)


def test_port_collision():
@@ -80,8 +75,6 @@ def _raise_os_error():
with mock.patch('gevent.pywsgi.WSGIServer', new=_define_mock_server(_raise_os_error)):
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
with pytest.raises(Exception) as exc_info:
host_dagit_ui(
log=False, log_dir=None, schedule_dir=None, handle=handle, host=None, port=2343
)
host_dagit_ui(handle=handle, host=None, port=2343)

assert 'Another process ' in str(exc_info.value)
@@ -5,16 +5,15 @@
from dagit import app

from dagster import ExecutionTargetHandle
from dagster.core.storage.runs import InMemoryRunStorage
from dagster.core.instance import DagsterInstance


def test_smoke_app():
pipeline_run_storage = InMemoryRunStorage()
flask_app = app.create_app(
ExecutionTargetHandle.for_repo_module(
module_name='dagster_examples.intro_tutorial.repos', fn_name='define_repo'
),
pipeline_run_storage,
DagsterInstance.ephemeral(),
)
client = flask_app.test_client()

@@ -8,6 +8,7 @@
execute_pipeline,
file_relative_path,
pipeline,
seven,
solid,
)
from dagster.core.definitions.executor import default_executors
@@ -25,16 +26,17 @@ def dask_engine_pipeline():


def test_execute_on_dask():
result = execute_pipeline(
ExecutionTargetHandle.for_pipeline_python_file(
__file__, 'dask_engine_pipeline'
).build_pipeline_definition(),
environment_dict={
'storage': {'filesystem': {}},
'execution': {'dask': {'config': {'timeout': 30}}},
},
)
assert result.result_for_solid('simple').output_value() == 1
with seven.TemporaryDirectory() as tempdir:
result = execute_pipeline(
ExecutionTargetHandle.for_pipeline_python_file(
__file__, 'dask_engine_pipeline'
).build_pipeline_definition(),
environment_dict={
'storage': {'filesystem': {'config': {'base_dir': tempdir}}},
'execution': {'dask': {'config': {'timeout': 30}}},
},
)
assert result.result_for_solid('simple').output_value() == 1


def dask_composite_pipeline():

0 comments on commit 0608e0a

Please sign in to comment.
You can’t perform that action at this time.