Skip to content

Commit

Permalink
[storage] live event logs
Browse files Browse the repository at this point in the history
Summary: When we load logs from json - process the event stream to correctly set the state. If the run is not in a complete state or logs are not ready yet set a watcher to consume the logs until they complete.

Test Plan:
`dagit --log` in one tab
`dagster-graphql --log -p startPipelineExecution -v '{"executionParams": {"selector":{"name": "log_spew"}, "mode": "default"}}';` in another
see pipeline run state stay up to date

Reviewers: #ft, schrockn

Reviewed By: #ft, schrockn

Subscribers: schrockn

Differential Revision: https://dagster.phacility.com/D885
  • Loading branch information
alangenfeld committed Aug 27, 2019
1 parent 49ec982 commit e8f0908
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 48 deletions.
91 changes: 62 additions & 29 deletions python_modules/dagster-graphql/dagster_graphql_tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
import os
import time

from click.testing import CliRunner
from dagster_graphql.cli import ui
Expand All @@ -14,9 +14,16 @@
lambda_solid,
seven,
)
from dagster.seven import mock
from dagster.core.storage.pipeline_run import PipelineRunStatus
from dagster.core.storage.runs import FilesystemRunStorage
from dagster.utils import script_relative_path

try:
# Python 2 tempfile doesn't have tempfile.TemporaryDirectory
import backports.tempfile as tempfile
except ImportError:
import tempfile


@lambda_solid(input_defs=[InputDefinition('num', Int)], output_def=OutputDefinition(Int))
def add_one(num):
Expand Down Expand Up @@ -79,7 +86,7 @@ def test_basic_variables():
assert result.exit_code == 0

result_data = json.loads(result.output)
assert result_data['data']
assert result_data['data']['pipeline']['name'] == 'math'


START_PIPELINE_EXECUTION_QUERY = '''
Expand Down Expand Up @@ -129,7 +136,7 @@ def test_start_execution_text():
}
)

repo_path = script_relative_path('./repository.yaml')
repo_path = script_relative_path('./cli_test_repository.yaml')

runner = CliRunner()
result = runner.invoke(
Expand All @@ -140,7 +147,10 @@ def test_start_execution_text():

try:
result_data = json.loads(result.output.strip('\n').split('\n')[-1])
assert result_data['data']
assert (
result_data['data']['startPipelineExecution']['__typename']
== 'StartPipelineExecutionSuccess'
)
except Exception as e:
raise Exception('Failed with {} Exception: {}'.format(result.output, e))

Expand All @@ -158,19 +168,18 @@ def test_start_execution_file():
}
)

repo_path = script_relative_path('./repository.yaml')
repo_path = script_relative_path('./cli_test_repository.yaml')
runner = CliRunner()
result = runner.invoke(
ui, ['-y', repo_path, '-v', variables, '--file', script_relative_path('./execute.graphql')]
)

assert result.exit_code == 0

try:
result_data = json.loads(result.output.strip('\n').split('\n')[-1])
assert result_data['data']
except Exception as e:
raise Exception('Failed with {} Exception: {}'.format(result.output, e))
result_data = json.loads(result.output.strip('\n').split('\n')[-1])
assert (
result_data['data']['startPipelineExecution']['__typename']
== 'StartPipelineExecutionSuccess'
)


def test_start_execution_predefined():
Expand All @@ -186,22 +195,20 @@ def test_start_execution_predefined():
}
)

repo_path = script_relative_path('./repository.yaml')
repo_path = script_relative_path('./cli_test_repository.yaml')

runner = CliRunner()
result = runner.invoke(ui, ['-y', repo_path, '-v', variables, '-p', 'startPipelineExecution'])

assert result.exit_code == 0

try:
result_data = json.loads(result.output.strip('\n').split('\n')[-1])
assert result_data['data']
except Exception as e:
raise Exception('Failed with {} Exception: {}'.format(result.output, e))
result_data = json.loads(result.output.strip('\n').split('\n')[-1])
assert (
result_data['data']['startPipelineExecution']['__typename']
== 'StartPipelineExecutionSuccess'
)


@mock.patch.dict(os.environ, {"DAGSTER_HOME": "~/dagster"})
def test_start_execution_predefined_with_logs():

variables = seven.json.dumps(
{
'executionParams': {
Expand All @@ -214,11 +221,37 @@ def test_start_execution_predefined_with_logs():
}
)

repo_path = script_relative_path('./repository.yaml')

runner = CliRunner()
result = runner.invoke(
ui, ['-y', repo_path, '-v', variables, '-p', 'startPipelineExecution', '--log']
)

assert result.exit_code == 0
repo_path = script_relative_path('./cli_test_repository.yaml')
with tempfile.TemporaryDirectory() as temp_dir:

run_storage = FilesystemRunStorage(base_dir=temp_dir, watch=True)

runner = CliRunner()
result = runner.invoke(
ui,
[
'-y',
repo_path,
'-v',
variables,
'-p',
'startPipelineExecution',
'--log',
'--log-dir',
temp_dir,
],
)
assert result.exit_code == 0
result_data = json.loads(result.output.strip('\n').split('\n')[-1])
assert (
result_data['data']['startPipelineExecution']['__typename']
== 'StartPipelineExecutionSuccess'
)

# allow FS events to flush
time.sleep(0.500)

# assert that the watching run storage captured the run correctly from the other process
run_id = result_data['data']['startPipelineExecution']['run']['runId']
run = run_storage.get_run_by_id(run_id)
assert run.status == PipelineRunStatus.SUCCESS
49 changes: 46 additions & 3 deletions python_modules/dagster/dagster/core/storage/event_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import gevent.lock
import pyrsistent
import six
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer

from dagster import check
from dagster.core.events.log import EventRecord
from dagster.utils import mkdir_p

from .config import base_runs_directory
from .pipeline_run import PipelineRun
from .pipeline_run import PipelineRun, PipelineRunStatus


class EventLogSequence(pyrsistent.CheckedPVector):
Expand Down Expand Up @@ -99,7 +101,9 @@ def __init__(self, base_dir=None):
self.file_cursors = defaultdict(lambda: (0, 0))
# Swap these out to use lockfiles
self.file_lock = defaultdict(gevent.lock.Semaphore)
self._metadata_file_lock = defaultdict(gevent.lock.Semaphore)
self._watchers = {}
self._obs = Observer()
self._obs.start()

def filepath_for_run_id(self, run_id):
return os.path.join(self._base_dir, '{run_id}.log'.format(run_id=run_id))
Expand All @@ -124,6 +128,9 @@ def wipe(self):
def is_persistent(self):
return True

def logs_ready(self, run_id):
return os.path.exists(self.filepath_for_run_id(run_id))

def get_logs_for_run(self, run_id, cursor=0):
events = []
with self.file_lock[run_id]:
Expand All @@ -137,10 +144,46 @@ def get_logs_for_run(self, run_id, cursor=0):
i = 0
while i < cursor:
pickle.load(fd)
i = fd.tell()
i += 1
self.file_cursors[run_id] = (cursor, fd.tell())
try:
while True:
events.append(pickle.load(fd))
except EOFError:
pass
return events

def watch(self, run_id, start_cursor, callback):
watchdog = EventLogStorageWatchdog(self, run_id, callback, start_cursor)
self._watchers[run_id] = self._obs.schedule(watchdog, self._base_dir, True)

def end_watch(self, run_id, handler):
self._obs.remove_handler_for_watch(handler, self._watchers[run_id])
del self._watchers[run_id]


class EventLogStorageWatchdog(PatternMatchingEventHandler):
def __init__(self, event_log_storage, run_id, callback, start_cursor, **kwargs):
self._event_log_storage = event_log_storage
self._run_id = run_id
self._cb = callback
self._log_path = event_log_storage.filepath_for_run_id(run_id)
self._cursor = start_cursor
super(EventLogStorageWatchdog, self).__init__(patterns=[self._log_path], **kwargs)

def _process_log(self):
events = self._event_log_storage.get_logs_for_run(self._run_id, self._cursor)
self._cursor += len(events)
for event in events:
status = self._cb(event)

if status == PipelineRunStatus.SUCCESS or status == PipelineRunStatus.FAILURE:
self._event_log_storage.end_watch(self._run_id, self)

def on_created(self, event):
check.invariant(event.src_path == self._log_path)
self._process_log()

def on_modified(self, event):
check.invariant(event.src_path == self._log_path)
self._process_log()
23 changes: 22 additions & 1 deletion python_modules/dagster/dagster/core/storage/pipeline_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ def logs_after(self, cursor):
def all_logs(self):
return self._run_storage.event_log_storage.get_logs_for_run(self.run_id)

def logs_ready(self):
return self._run_storage.event_log_storage.logs_ready(self.run_id)

def watch_event_logs(self, cursor, cb):
return self._run_storage.event_log_storage.watch(self.run_id, cursor, cb)

@property
def selector(self):
return self._selector
Expand Down Expand Up @@ -132,6 +138,8 @@ def handle_new_event(self, new_event):
for subscriber in self.__subscribers:
subscriber.handle_new_event(new_event)

return self._status

def subscribe(self, subscriber):
self.__subscribers.append(subscriber)

Expand All @@ -150,11 +158,24 @@ def from_json(json_data, run_storage):
selector = ExecutionSelector(
name=json_data['pipeline_name'], solid_subset=json_data.get('pipeline_solid_subset')
)
return PipelineRun(
run = PipelineRun(
run_storage=run_storage,
pipeline_name=json_data['pipeline_name'],
run_id=json_data['run_id'],
selector=selector,
env_config=json_data['config'],
mode=json_data['mode'],
)

if not run.logs_ready():
run.watch_event_logs(0, run.handle_new_event)
return run

init_logs = run.all_logs()
for log in init_logs:
run.handle_new_event(log)

if run.status != PipelineRunStatus.SUCCESS:
run.watch_event_logs(len(init_logs), run.handle_new_event)

return run
20 changes: 5 additions & 15 deletions python_modules/dagster/dagster/core/storage/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dagster.utils import mkdir_p

from .config import base_runs_directory
from .event_log import EventLogStorage, FilesystemEventLogStorage, InMemoryEventLogStorage
from .event_log import FilesystemEventLogStorage, InMemoryEventLogStorage
from .pipeline_run import PipelineRun


Expand Down Expand Up @@ -79,13 +79,8 @@ def is_persistent(self):


class InMemoryRunStorage(RunStorage):
def __init__(self, event_log_storage=None):
self.event_log_storage = check.opt_inst_param(
event_log_storage,
'event_log_storage',
EventLogStorage,
default=InMemoryEventLogStorage(),
)
def __init__(self):
self.event_log_storage = InMemoryEventLogStorage()
self._runs = OrderedDict()

def add_run(self, pipeline_run):
Expand Down Expand Up @@ -124,20 +119,15 @@ def create_run(self, **kwargs):


class FilesystemRunStorage(RunStorage):
def __init__(self, event_log_storage=None, base_dir=None, watch=False):
def __init__(self, base_dir=None, watch=False):
self._base_dir = check.opt_str_param(base_dir, 'base_dir', base_runs_directory())
mkdir_p(self._base_dir)

self._runs = OrderedDict()

self._file_lock = defaultdict(gevent.lock.Semaphore)

self.event_log_storage = check.opt_inst_param(
event_log_storage,
'event_log_storage',
EventLogStorage,
default=FilesystemEventLogStorage(base_dir=self._base_dir),
)
self.event_log_storage = FilesystemEventLogStorage(base_dir=self._base_dir)

self._load_runs()

Expand Down

0 comments on commit e8f0908

Please sign in to comment.