Skip to content

Commit

Permalink
[0.7.13] Improve scheduler debug and reconciliation
Browse files Browse the repository at this point in the history
Test Plan: unit

Reviewers: alangenfeld

Reviewed By: alangenfeld

Differential Revision: https://dagster.phacility.com/D2946
  • Loading branch information
helloworld committed May 14, 2020
1 parent 97d07cf commit 2136cf4
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 62 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

- Fixes a bug where using solid subsets when launching pipeline runs would fail config validation.
- (dagster-gcp) allow multiple "bq_solid_for_queries" solids to co-exist in a pipeline
- Improve scheduler state reconciliation with dagster-cron scheduler. `dagster schedule` debug comamnd will display
issues related to missing crob jobs, extraneous cron jobs, and duplicate cron jobs. Running
`dagster schedule up` will fix any issues.

**New**

Expand Down
2 changes: 1 addition & 1 deletion docs/next/src/pages/docs/deploying/celery.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ them.
Although this system is deliberately designed to make the full range of
Celery config available as needed, keep in mind that Celery exposes many
knobs, many combinations of which are not compatible with each other.
However, workloads also differ widely. If you are runnning many
However, workloads also differ widely. If you are running many
pipelines with very long-running or very short-running tasks, for
instance, and you are already comfortable tuning Celery, you might find
that changing some of the settings works better for your case.
Expand Down
29 changes: 28 additions & 1 deletion python_modules/dagster/dagster/cli/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,34 @@ def schedule_debug_command():

def execute_debug_command(print_fn):
instance = DagsterInstance.get()
print_fn(instance.scheduler_debug_info())

debug_info = instance.scheduler_debug_info()

output = ""

errors = debug_info.errors
if len(errors):
title = "Errors"
output += "\n{title}\n{sep}\n{info}\n\n".format(
title=title, sep="=" * len(title), info="\n".join(debug_info.errors),
)

title = "Scheduler Configuration"
output += "{title}\n{sep}\n{info}\n".format(
title=title, sep="=" * len(title), info=debug_info.scheduler_config_info,
)

title = "Scheduler Info"
output += "{title}\n{sep}\n{info}\n".format(
title=title, sep="=" * len(title), info=debug_info.scheduler_info
)

title = "Scheduler Storage Info"
output += "\n{title}\n{sep}\n{info}\n".format(
title=title, sep="=" * len(title), info="\n".join(debug_info.schedule_storage),
)

print_fn(output)


schedule_cli = create_schedule_cli_group()
56 changes: 37 additions & 19 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,42 +799,60 @@ def end_schedule(self, repository, schedule_name):
return self._scheduler.end_schedule(self, repository, schedule_name)

def scheduler_debug_info(self):
output = ""
from dagster.core.scheduler import SchedulerDebugInfo, ScheduleStatus

title = "Scheduler Configuration"
output += "{title}\n{sep}\n{info}\n".format(
title=title,
sep="=" * len(title),
info=self.info_str_for_component('Scheduler', self.scheduler),
)

title = "Scheduler Info"
output += "{title}\n{sep}\n{info}\n".format(
title=title, sep="=" * len(title), info=self.scheduler.debug_info(),
)
errors = []

title = "Scheduler Storage Info"
schedule_info = self.all_schedules_info()
schedules = []
for repository, schedule in schedule_info:
for repository_name, schedule in schedule_info:
if (
schedule.status == ScheduleStatus.RUNNING
and not self._scheduler.is_scheduler_job_running(repository_name, schedule.name)
):
errors.append(
"Schedule {schedule_name} is set to be running, but the scheduler is not "
"running the schedule. Run `dagster schedule up` to resolve".format(
schedule_name=schedule.name
)
)
elif (
schedule.status == ScheduleStatus.STOPPED
and self._scheduler.is_scheduler_job_running(repository_name, schedule.name)
):
errors.append(
"Schedule {schedule_name} is set to be stopped, but the scheduler is still running "
"the schedule. Run `dagster schedule up` to resolve".format(
schedule_name=schedule.name
)
)

if self._scheduler.is_scheduler_job_running(repository_name, schedule.name) > 1:
errors.append(
"Duplicate jobs found: More than one job for schedule {schedule_name} are "
"running on the scheduler. "
"Run `dagster schedule up` to resolve".format(schedule_name=schedule.name)
)

schedule_info = {
schedule.name: {
"status": schedule.status.value,
"cron_schedule": schedule.cron_schedule,
"python_path": schedule.python_path,
"repository_name": repository,
"repository_name": repository_name,
"repository_path": schedule.repository_path,
}
}

schedules.append(yaml.safe_dump(schedule_info, default_flow_style=False))

output += "{title}\n{sep}\n{info}\n".format(
title=title, sep="=" * len(title), info="\n".join(schedules),
return SchedulerDebugInfo(
scheduler_config_info=self.info_str_for_component('Scheduler', self.scheduler),
scheduler_info=self.scheduler.debug_info(),
schedule_storage=schedules,
errors=errors,
)

return output

# Schedule Storage

def create_schedule_tick(self, repository, schedule_tick_data):
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/core/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
ScheduleTick,
ScheduleTickStatus,
Scheduler,
SchedulerDebugInfo,
SchedulerHandle,
get_schedule_change_set,
reconcile_scheduler_state,
Expand Down
25 changes: 25 additions & 0 deletions python_modules/dagster/dagster/core/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ def get_schedule_change_set(old_schedules, new_schedule_defs):
return changeset


class SchedulerDebugInfo(
namedtuple('SchedulerDebugInfo', 'errors scheduler_config_info scheduler_info schedule_storage')
):
def __new__(cls, errors, scheduler_config_info, scheduler_info, schedule_storage):
return super(SchedulerDebugInfo, cls).__new__(
cls,
errors=check.list_param(errors, 'errors', of_type=str),
scheduler_config_info=check.str_param(scheduler_config_info, 'scheduler_config_info'),
scheduler_info=check.str_param(scheduler_info, 'scheduler_info'),
schedule_storage=check.list_param(schedule_storage, 'schedule_storage', of_type=str),
)


class SchedulerHandle(object):
def __init__(
self, schedule_defs,
Expand Down Expand Up @@ -119,6 +132,9 @@ def reconcile_scheduler_state(python_path, repository_path, repository, instance
instance.stop_schedule(repository, schedule.name)
instance.start_schedule(repository, schedule.name)

if schedule.status == ScheduleStatus.STOPPED:
instance.stop_schedule(repository, schedule.name)

for schedule_name in schedule_names_to_delete:
instance.end_schedule(repository, schedule_name)

Expand Down Expand Up @@ -157,6 +173,15 @@ def end_schedule(self, instance, repository, schedule_name):
schedule_name (string): The schedule to end and delete
'''

@abc.abstractmethod
def is_scheduler_job_running(self, repository_name, schedule_name):
'''Resume a pipeline schedule.
Args:
repository_name (string): The repository the schedule belongs to
schedule_name (string): The name of the schedule to check
'''

@abc.abstractmethod
def get_log_path(self, instance, repository, schedule_name):
'''Get path to store logs for schedule
Expand Down
11 changes: 4 additions & 7 deletions python_modules/dagster/dagster/utils/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,14 @@ def stop_schedule(self, instance, repository, schedule_name):
'Use `schedule up` to initialize schedules'.format(name=schedule_name)
)

if schedule.status == ScheduleStatus.STOPPED:
raise DagsterInvariantViolationError(
'You have attempted to stop schedule {name}, but it is already stopped'.format(
name=schedule_name
)
)

stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED)
instance.update_schedule(repository, stopped_schedule)
return stopped_schedule

def is_scheduler_job_running(self, repository_name, schedule_name):
# Not currently tested in dagster core
return None

def end_schedule(self, instance, repository, schedule_name):
schedule = instance.get_schedule_by_name(repository, schedule_name)
if not schedule:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def start_schedule(self, instance, repository, schedule_name):
self._start_cron_job(instance, repository, started_schedule)

# Check that the schedule made it to the cron tab
if not self._verify_cron_job_exists(repository, schedule):
if not self.is_scheduler_job_running(repository.name, schedule.name):
raise DagsterInvariantViolationError(
"Attempted to write cron job for schedule {schedule_name}, but failed".format(
schedule_name=schedule_name
schedule_name=schedule.name
)
)

Expand All @@ -83,20 +83,13 @@ def stop_schedule(self, instance, repository, schedule_name):
'Use `schedule up` to initialize schedules'.format(name=schedule_name)
)

if schedule.status == ScheduleStatus.STOPPED:
raise DagsterInvariantViolationError(
'You have attempted to stop schedule {name}, but it is already stopped'.format(
name=schedule_name
)
)

stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED)
self._end_cron_job(instance, repository, stopped_schedule)

if self._verify_cron_job_exists(repository, schedule):
if self.is_scheduler_job_running(repository.name, schedule.name):
raise DagsterInvariantViolationError(
"Attempted to remove cron job for schedule {schedule_name}, but failed. The cron "
"job for the schedule is still running".format(schedule_name=schedule_name)
"job for the schedule is still running".format(schedule_name=schedule.name)
)

instance.update_schedule(repository, stopped_schedule)
Expand Down Expand Up @@ -147,9 +140,9 @@ def _get_bash_script_file_path(self, instance, repository, schedule):
script_file_name = "{}.{}.sh".format(repository.name, schedule.name)
return os.path.join(script_directory, script_file_name)

def _cron_tag_for_schedule(self, repository, schedule):
def _cron_tag_for_schedule(self, repository_name, schedule_name):
return 'dagster-schedule: {repository_name}.{schedule_name}'.format(
repository_name=repository.name, schedule_name=schedule.name
repository_name=repository_name, schedule_name=schedule_name
)

def _start_cron_job(self, instance, repository, schedule):
Expand All @@ -165,18 +158,21 @@ def _start_cron_job(self, instance, repository, schedule):
self._cron_tab.write()

def _end_cron_job(self, instance, repository, schedule):
self._cron_tab.remove_all(comment=self._cron_tag_for_schedule(repository, schedule))
self._cron_tab.remove_all(
comment=self._cron_tag_for_schedule(repository.name, schedule.name)
)
self._cron_tab.write()

script_file = self._get_bash_script_file_path(instance, repository, schedule)
if os.path.isfile(script_file):
os.remove(script_file)

def _verify_cron_job_exists(self, repository, schedule):
def is_scheduler_job_running(self, repository_name, schedule_name):
matching_jobs = self._cron_tab.find_comment(
self._cron_tag_for_schedule(repository, schedule)
self._cron_tag_for_schedule(repository_name, schedule_name)
)
return len(list(matching_jobs)) == 1

return len(list(matching_jobs))

def get_log_path(self, instance, repository, schedule_name):
check.inst_param(instance, 'instance', DagsterInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,38 @@

snapshots = Snapshot()

snapshots['test_start_schedule_manual_delete_debug 1'] = '''Scheduler Configuration
=======================
Scheduler:
snapshots['test_start_schedule_manual_delete_debug 1'] = (
[
'Schedule no_config_pipeline_every_min_schedule is set to be running, but the scheduler is not running the schedule. Run `dagster schedule up` to resolve'
],
'''Scheduler:
SystemCronScheduler
''',
'''Running Cron Jobs:
Scheduler Info
==============
Running Cron Jobs:
Scheduler Storage Info
======================
default_config_pipeline_every_min_schedule:
''',
[
'''default_config_pipeline_every_min_schedule:
cron_schedule: '* * * * *'
python_path: fake path
repository_name: test_repository
repository_path: ''
status: STOPPED
no_config_pipeline_daily_schedule:
''',
'''no_config_pipeline_daily_schedule:
cron_schedule: 0 0 * * *
python_path: fake path
repository_name: test_repository
repository_path: ''
status: STOPPED
no_config_pipeline_every_min_schedule:
''',
'''no_config_pipeline_every_min_schedule:
cron_schedule: '* * * * *'
python_path: fake path
repository_name: test_repository
repository_path: ''
status: RUNNING
'''
]
)
Loading

0 comments on commit 2136cf4

Please sign in to comment.