Skip to content
Permalink
Browse files

Add Scheduler to dagit and DagsterGraphQLContext

Summary: Add `Scheduler` to the `DagsterGraphQLContext` and the option to specify the storage directory for schedules in the dagit cli. By default, the storage directory is `$DAGSTER_HOME/schedules`

Test Plan:
run dagit with `SCHEDULER` feature enabled, unit

dagster.cgf:
```
[FEATURES]
scheduler=
```

Reviewers: #ft, alangenfeld

Reviewed By: #ft, alangenfeld

Subscribers: alangenfeld

Differential Revision: https://dagster.phacility.com/D798
  • Loading branch information...
helloworld committed Aug 12, 2019
1 parent 46421ee commit a8adcc7d7e40cc1da32d8aa13bd7b95fb0189624

Large diffs are not rendered by default.

@@ -12,7 +12,8 @@
from nbconvert import HTMLExporter
from six import text_type

from dagster import ExecutionTargetHandle, check, seven
from dagster import check, seven, ExecutionTargetHandle
from dagster.utils import Features
from dagster.utils.log import get_stack_trace_array

from dagster_graphql.implementation.context import DagsterGraphQLContext
@@ -23,6 +24,7 @@
from dagster_graphql.implementation.pipeline_run_storage import RunStorage
from dagster_graphql.schema import create_schema

from dagster_graphql.implementation.scheduler import Scheduler
from .subscription_server import DagsterSubscriptionServer
from .templates.playground import TEMPLATE as PLAYGROUND_TEMPLATE
from .version import __version__
@@ -119,9 +121,12 @@ def notebook_view(request_args):
return '<style>' + resources['inlining']['css'][0] + '</style>' + body, 200


def create_app(handle, pipeline_run_storage, use_synchronous_execution_manager=False):
def create_app(
handle, pipeline_run_storage, scheduler=None, use_synchronous_execution_manager=False
):
check.inst_param(handle, 'handle', ExecutionTargetHandle)
check.inst_param(pipeline_run_storage, 'pipeline_run_storage', RunStorage)
check.opt_inst_param(scheduler, 'scheduler', Scheduler)
check.bool_param(use_synchronous_execution_manager, 'use_synchronous_execution_manager')

app = Flask('dagster-ui')
@@ -138,12 +143,21 @@ def create_app(handle, pipeline_run_storage, use_synchronous_execution_manager=F

print('Loading repository...')

context = DagsterGraphQLContext(
handle=handle,
pipeline_runs=pipeline_run_storage,
execution_manager=execution_manager,
version=__version__,
)
if Features.SCHEDULER.is_enabled:
context = DagsterGraphQLContext(
handle=handle,
execution_manager=execution_manager,
pipeline_runs=pipeline_run_storage,
scheduler=scheduler, # Add schedule argument
version=__version__,
)
else:
context = DagsterGraphQLContext(
handle=handle,
execution_manager=execution_manager,
pipeline_runs=pipeline_run_storage,
version=__version__,
)

app.add_url_rule(
'/graphql',
@@ -11,7 +11,9 @@
from dagster.cli.pipeline import repository_target_argument
from dagster.utils import (
DEFAULT_REPOSITORY_YAML_FILENAME,
Features,
dagster_logs_dir_for_handle,
dagster_schedule_dir_for_handle,
is_dagster_home_set,
)

@@ -20,6 +22,8 @@
InMemoryRunStorage,
)

from dagster_graphql.implementation.scheduler import SystemCronScheduler

from .app import create_app
from .version import __version__

@@ -66,13 +70,14 @@ def create_dagit_cli():
),
)
@click.option('--log-dir', help="Directory to record logs to", default=None)
@click.option('--schedule-dir', help="Directory to record logs to", default=None)
@click.option(
'--no-watch',
is_flag=True,
help='Disable autoreloading when there are changes to the repo/pipeline being served',
)
@click.version_option(version=__version__, prog_name='dagit')
def ui(host, port, sync, log, log_dir, no_watch=False, **kwargs):
def ui(host, port, sync, log, log_dir, schedule_dir, no_watch=False, **kwargs):
handle = handle_for_repo_cli_args(kwargs)

# add the path for the cwd so imports in dynamically loaded code work correctly
@@ -91,20 +96,32 @@ def ui(host, port, sync, log, log_dir, no_watch=False, **kwargs):

log_dir = dagster_logs_dir_for_handle(handle)

if Features.SCHEDULER.is_enabled:
# Don't error if $DAGSTER_HOME is not set
if not schedule_dir and is_dagster_home_set():
schedule_dir = dagster_schedule_dir_for_handle(handle)

check.invariant(
not no_watch,
'Do not set no_watch when calling the Dagit Python CLI directly -- this flag is a no-op '
'at this level and should be set only when invoking dagit/bin/dagit.',
)
host_dagit_ui(log, log_dir, handle, sync, host, port)
host_dagit_ui(log, log_dir, schedule_dir, handle, sync, host, port)


def host_dagit_ui(log, log_dir, handle, use_sync, host, port):
def host_dagit_ui(log, log_dir, schedule_dir, handle, use_sync, host, port):
check.inst_param(handle, 'handle', ExecutionTargetHandle)

pipeline_run_storage = FilesystemRunStorage(log_dir) if log else InMemoryRunStorage()

app = create_app(handle, pipeline_run_storage, use_synchronous_execution_manager=use_sync)
if Features.SCHEDULER.is_enabled:
scheduler = SystemCronScheduler(schedule_dir)
app = create_app(
handle, pipeline_run_storage, scheduler, use_synchronous_execution_manager=use_sync
)
else:
app = create_app(handle, pipeline_run_storage, use_synchronous_execution_manager=use_sync)

server = pywsgi.WSGIServer((host, port), app, handler_class=WebSocketHandler)
print('Serving on http://{host}:{port}'.format(host=host, port=port))
try:
@@ -13,7 +13,6 @@ def test_create_app():
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
pipeline_run_storage = InMemoryRunStorage()
assert create_app(handle, pipeline_run_storage, use_synchronous_execution_manager=True)
assert create_app(handle, pipeline_run_storage, use_synchronous_execution_manager=False)


def test_notebook_view():
@@ -44,7 +43,15 @@ def test_index_view():
def test_successful_host_dagit_ui():
with mock.patch('gevent.pywsgi.WSGIServer'):
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
host_dagit_ui(log=False, log_dir=None, handle=handle, use_sync=True, host=None, port=2343)
host_dagit_ui(
log=False,
log_dir=None,
schedule_dir=None,
handle=handle,
use_sync=True,
host=None,
port=2343,
)


def _define_mock_server(fn):
@@ -69,7 +76,13 @@ def _raise_custom_error():
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
with pytest.raises(AnException):
host_dagit_ui(
log=False, log_dir=None, handle=handle, use_sync=True, host=None, port=2343
log=False,
log_dir=None,
schedule_dir=None,
handle=handle,
use_sync=True,
host=None,
port=2343,
)


@@ -81,7 +94,13 @@ def _raise_os_error():
handle = ExecutionTargetHandle.for_repo_yaml(script_relative_path('./repository.yaml'))
with pytest.raises(Exception) as exc_info:
host_dagit_ui(
log=False, log_dir=None, handle=handle, use_sync=True, host=None, port=2343
log=False,
log_dir=None,
schedule_dir=None,
handle=handle,
use_sync=True,
host=None,
port=2343,
)

assert 'Another process ' in str(exc_info.value)
@@ -1,15 +1,22 @@
from dagster import ExecutionTargetHandle, check

from .pipeline_execution_manager import PipelineExecutionManager
from dagster import check, ExecutionTargetHandle
from .pipeline_run_storage import RunStorage
from .scheduler import Scheduler
from .pipeline_execution_manager import PipelineExecutionManager


class DagsterGraphQLContext(object):
def __init__(
self, handle, pipeline_runs, execution_manager, raise_on_error=False, version=None
self,
handle,
execution_manager,
pipeline_runs,
scheduler=None,
raise_on_error=False,
version=None,
):
self._handle = check.inst_param(handle, 'handle', ExecutionTargetHandle)
self.pipeline_runs = check.inst_param(pipeline_runs, 'pipeline_runs', RunStorage)
self.scheduler = check.opt_inst_param(scheduler, 'scheduler', Scheduler)
self.execution_manager = check.inst_param(
execution_manager, 'pipeline_execution_manager', PipelineExecutionManager
)
@@ -86,11 +86,14 @@ def _load_schedules(self):

self._schedules[schedule.schedule_id] = schedule
except Exception as ex: # pylint: disable=broad-except
raise Exception(
'Could not parse dagit schedule from {file_name} in {dir_name}. {ex}: {msg}'.format(
file_name=file,
dir_name=self._schedule_dir,
ex=type(ex).__name__,
msg=ex,
)
six.raise_from(
Exception(
'Could not parse dagit schedule from {file_name} in {dir_name}. {ex}: {msg}'.format(
file_name=file,
dir_name=self._schedule_dir,
ex=type(ex).__name__,
msg=ex,
)
),
ex,
)
@@ -9,6 +9,7 @@

import yaml
from six.moves import configparser
from enum import Enum

from dagster import check
from dagster.core.errors import DagsterInvariantViolationError
@@ -244,6 +245,18 @@ def dagster_config_path():
return os.path.join(dagster_home_dir(), dagster_config_file)


def dagster_schedules_dir():
return os.path.join(dagster_home_dir(), "schedules", "experimental")


def dagster_schedule_dir_for_handle(handle):
from dagster.core.definitions.handle import ExecutionTargetHandle

check.inst_param(handle, 'handle', ExecutionTargetHandle)
repository_name = handle.build_repository_definition().name
return os.path.join(dagster_schedules_dir(), repository_name)


def dagster_logs_dir():
return os.path.join(dagster_home_dir(), "logs", "experimental")

@@ -275,6 +288,25 @@ def get_enabled_features():
return []


class Features(Enum):
class FeatureFlag(object):
_features = get_enabled_features()

def __init__(self, name):
self.enabled = name in type(self)._features

@property
def is_enabled(self):
return self.enabled

# Add new feature flags here
SCHEDULER = FeatureFlag("scheduler")

@property
def is_enabled(self):
return self.value.is_enabled # pylint: disable=no-member


@contextlib.contextmanager
def safe_tempfile_path():
# This gets a valid temporary file path in the safest possible way, although there is still no

0 comments on commit a8adcc7

Please sign in to comment.
You can’t perform that action at this time.