diff --git a/CHANGES.md b/CHANGES.md index 989223cee132..b433caecf09b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,9 @@ - Fixes a bug where using solid subsets when launching pipeline runs would fail config validation. - (dagster-gcp) allow multiple "bq_solid_for_queries" solids to co-exist in a pipeline +- Improve scheduler state reconciliation with dagster-cron scheduler. `dagster schedule` debug comamnd will display + issues related to missing crob jobs, extraneous cron jobs, and duplicate cron jobs. Running + `dagster schedule up` will fix any issues. **New** diff --git a/docs/next/src/pages/docs/deploying/celery.mdx b/docs/next/src/pages/docs/deploying/celery.mdx index 6c82d145d73c..1f61b14b5684 100644 --- a/docs/next/src/pages/docs/deploying/celery.mdx +++ b/docs/next/src/pages/docs/deploying/celery.mdx @@ -169,7 +169,7 @@ them. Although this system is deliberately designed to make the full range of Celery config available as needed, keep in mind that Celery exposes many knobs, many combinations of which are not compatible with each other. -However, workloads also differ widely. If you are runnning many +However, workloads also differ widely. If you are running many pipelines with very long-running or very short-running tasks, for instance, and you are already comfortable tuning Celery, you might find that changing some of the settings works better for your case. diff --git a/python_modules/dagster/dagster/cli/schedule.py b/python_modules/dagster/dagster/cli/schedule.py index 0f7339c5af9e..9a0191c58026 100644 --- a/python_modules/dagster/dagster/cli/schedule.py +++ b/python_modules/dagster/dagster/cli/schedule.py @@ -381,7 +381,34 @@ def schedule_debug_command(): def execute_debug_command(print_fn): instance = DagsterInstance.get() - print_fn(instance.scheduler_debug_info()) + + debug_info = instance.scheduler_debug_info() + + output = "" + + errors = debug_info.errors + if len(errors): + title = "Errors" + output += "\n{title}\n{sep}\n{info}\n\n".format( + title=title, sep="=" * len(title), info="\n".join(debug_info.errors), + ) + + title = "Scheduler Configuration" + output += "{title}\n{sep}\n{info}\n".format( + title=title, sep="=" * len(title), info=debug_info.scheduler_config_info, + ) + + title = "Scheduler Info" + output += "{title}\n{sep}\n{info}\n".format( + title=title, sep="=" * len(title), info=debug_info.scheduler_info + ) + + title = "Scheduler Storage Info" + output += "\n{title}\n{sep}\n{info}\n".format( + title=title, sep="=" * len(title), info="\n".join(debug_info.schedule_storage), + ) + + print_fn(output) schedule_cli = create_schedule_cli_group() diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py index 56a238e71603..f09f1b2f2044 100644 --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -799,42 +799,60 @@ def end_schedule(self, repository, schedule_name): return self._scheduler.end_schedule(self, repository, schedule_name) def scheduler_debug_info(self): - output = "" + from dagster.core.scheduler import SchedulerDebugInfo, ScheduleStatus - title = "Scheduler Configuration" - output += "{title}\n{sep}\n{info}\n".format( - title=title, - sep="=" * len(title), - info=self.info_str_for_component('Scheduler', self.scheduler), - ) - - title = "Scheduler Info" - output += "{title}\n{sep}\n{info}\n".format( - title=title, sep="=" * len(title), info=self.scheduler.debug_info(), - ) + errors = [] - title = "Scheduler Storage Info" schedule_info = self.all_schedules_info() schedules = [] - for repository, schedule in schedule_info: + for repository_name, schedule in schedule_info: + if ( + schedule.status == ScheduleStatus.RUNNING + and not self._scheduler.is_scheduler_job_running(repository_name, schedule.name) + ): + errors.append( + "Schedule {schedule_name} is set to be running, but the scheduler is not " + "running the schedule. Run `dagster schedule up` to resolve".format( + schedule_name=schedule.name + ) + ) + elif ( + schedule.status == ScheduleStatus.STOPPED + and self._scheduler.is_scheduler_job_running(repository_name, schedule.name) + ): + errors.append( + "Schedule {schedule_name} is set to be stopped, but the scheduler is still running " + "the schedule. Run `dagster schedule up` to resolve".format( + schedule_name=schedule.name + ) + ) + + if self._scheduler.is_scheduler_job_running(repository_name, schedule.name) > 1: + errors.append( + "Duplicate jobs found: More than one job for schedule {schedule_name} are " + "running on the scheduler. " + "Run `dagster schedule up` to resolve".format(schedule_name=schedule.name) + ) + schedule_info = { schedule.name: { "status": schedule.status.value, "cron_schedule": schedule.cron_schedule, "python_path": schedule.python_path, - "repository_name": repository, + "repository_name": repository_name, "repository_path": schedule.repository_path, } } schedules.append(yaml.safe_dump(schedule_info, default_flow_style=False)) - output += "{title}\n{sep}\n{info}\n".format( - title=title, sep="=" * len(title), info="\n".join(schedules), + return SchedulerDebugInfo( + scheduler_config_info=self.info_str_for_component('Scheduler', self.scheduler), + scheduler_info=self.scheduler.debug_info(), + schedule_storage=schedules, + errors=errors, ) - return output - # Schedule Storage def create_schedule_tick(self, repository, schedule_tick_data): diff --git a/python_modules/dagster/dagster/core/scheduler/__init__.py b/python_modules/dagster/dagster/core/scheduler/__init__.py index a464cf3ef25a..408da1d696a9 100644 --- a/python_modules/dagster/dagster/core/scheduler/__init__.py +++ b/python_modules/dagster/dagster/core/scheduler/__init__.py @@ -5,6 +5,7 @@ ScheduleTick, ScheduleTickStatus, Scheduler, + SchedulerDebugInfo, SchedulerHandle, get_schedule_change_set, reconcile_scheduler_state, diff --git a/python_modules/dagster/dagster/core/scheduler/scheduler.py b/python_modules/dagster/dagster/core/scheduler/scheduler.py index c13c11e486fe..94a6cbea1630 100644 --- a/python_modules/dagster/dagster/core/scheduler/scheduler.py +++ b/python_modules/dagster/dagster/core/scheduler/scheduler.py @@ -56,6 +56,19 @@ def get_schedule_change_set(old_schedules, new_schedule_defs): return changeset +class SchedulerDebugInfo( + namedtuple('SchedulerDebugInfo', 'errors scheduler_config_info scheduler_info schedule_storage') +): + def __new__(cls, errors, scheduler_config_info, scheduler_info, schedule_storage): + return super(SchedulerDebugInfo, cls).__new__( + cls, + errors=check.list_param(errors, 'errors', of_type=str), + scheduler_config_info=check.str_param(scheduler_config_info, 'scheduler_config_info'), + scheduler_info=check.str_param(scheduler_info, 'scheduler_info'), + schedule_storage=check.list_param(schedule_storage, 'schedule_storage', of_type=str), + ) + + class SchedulerHandle(object): def __init__( self, schedule_defs, @@ -119,6 +132,9 @@ def reconcile_scheduler_state(python_path, repository_path, repository, instance instance.stop_schedule(repository, schedule.name) instance.start_schedule(repository, schedule.name) + if schedule.status == ScheduleStatus.STOPPED: + instance.stop_schedule(repository, schedule.name) + for schedule_name in schedule_names_to_delete: instance.end_schedule(repository, schedule_name) @@ -157,6 +173,15 @@ def end_schedule(self, instance, repository, schedule_name): schedule_name (string): The schedule to end and delete ''' + @abc.abstractmethod + def is_scheduler_job_running(self, repository_name, schedule_name): + '''Resume a pipeline schedule. + + Args: + repository_name (string): The repository the schedule belongs to + schedule_name (string): The name of the schedule to check + ''' + @abc.abstractmethod def get_log_path(self, instance, repository, schedule_name): '''Get path to store logs for schedule diff --git a/python_modules/dagster/dagster/utils/test/__init__.py b/python_modules/dagster/dagster/utils/test/__init__.py index cfc8169001c7..0b5026442eba 100644 --- a/python_modules/dagster/dagster/utils/test/__init__.py +++ b/python_modules/dagster/dagster/utils/test/__init__.py @@ -422,17 +422,14 @@ def stop_schedule(self, instance, repository, schedule_name): 'Use `schedule up` to initialize schedules'.format(name=schedule_name) ) - if schedule.status == ScheduleStatus.STOPPED: - raise DagsterInvariantViolationError( - 'You have attempted to stop schedule {name}, but it is already stopped'.format( - name=schedule_name - ) - ) - stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED) instance.update_schedule(repository, stopped_schedule) return stopped_schedule + def is_scheduler_job_running(self, repository_name, schedule_name): + # Not currently tested in dagster core + return None + def end_schedule(self, instance, repository, schedule_name): schedule = instance.get_schedule_by_name(repository, schedule_name) if not schedule: diff --git a/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py b/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py index 114f1fec41a0..88999a025be6 100644 --- a/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py +++ b/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py @@ -64,10 +64,10 @@ def start_schedule(self, instance, repository, schedule_name): self._start_cron_job(instance, repository, started_schedule) # Check that the schedule made it to the cron tab - if not self._verify_cron_job_exists(repository, schedule): + if not self.is_scheduler_job_running(repository.name, schedule.name): raise DagsterInvariantViolationError( "Attempted to write cron job for schedule {schedule_name}, but failed".format( - schedule_name=schedule_name + schedule_name=schedule.name ) ) @@ -83,20 +83,13 @@ def stop_schedule(self, instance, repository, schedule_name): 'Use `schedule up` to initialize schedules'.format(name=schedule_name) ) - if schedule.status == ScheduleStatus.STOPPED: - raise DagsterInvariantViolationError( - 'You have attempted to stop schedule {name}, but it is already stopped'.format( - name=schedule_name - ) - ) - stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED) self._end_cron_job(instance, repository, stopped_schedule) - if self._verify_cron_job_exists(repository, schedule): + if self.is_scheduler_job_running(repository.name, schedule.name): raise DagsterInvariantViolationError( "Attempted to remove cron job for schedule {schedule_name}, but failed. The cron " - "job for the schedule is still running".format(schedule_name=schedule_name) + "job for the schedule is still running".format(schedule_name=schedule.name) ) instance.update_schedule(repository, stopped_schedule) @@ -147,9 +140,9 @@ def _get_bash_script_file_path(self, instance, repository, schedule): script_file_name = "{}.{}.sh".format(repository.name, schedule.name) return os.path.join(script_directory, script_file_name) - def _cron_tag_for_schedule(self, repository, schedule): + def _cron_tag_for_schedule(self, repository_name, schedule_name): return 'dagster-schedule: {repository_name}.{schedule_name}'.format( - repository_name=repository.name, schedule_name=schedule.name + repository_name=repository_name, schedule_name=schedule_name ) def _start_cron_job(self, instance, repository, schedule): @@ -165,18 +158,21 @@ def _start_cron_job(self, instance, repository, schedule): self._cron_tab.write() def _end_cron_job(self, instance, repository, schedule): - self._cron_tab.remove_all(comment=self._cron_tag_for_schedule(repository, schedule)) + self._cron_tab.remove_all( + comment=self._cron_tag_for_schedule(repository.name, schedule.name) + ) self._cron_tab.write() script_file = self._get_bash_script_file_path(instance, repository, schedule) if os.path.isfile(script_file): os.remove(script_file) - def _verify_cron_job_exists(self, repository, schedule): + def is_scheduler_job_running(self, repository_name, schedule_name): matching_jobs = self._cron_tab.find_comment( - self._cron_tag_for_schedule(repository, schedule) + self._cron_tag_for_schedule(repository_name, schedule_name) ) - return len(list(matching_jobs)) == 1 + + return len(list(matching_jobs)) def get_log_path(self, instance, repository, schedule_name): check.inst_param(instance, 'instance', DagsterInstance) diff --git a/python_modules/libraries/dagster-cron/dagster_cron_tests/snapshots/snap_test_cron_scheduler.py b/python_modules/libraries/dagster-cron/dagster_cron_tests/snapshots/snap_test_cron_scheduler.py index dcb0b954cefc..4da1e6ba351b 100644 --- a/python_modules/libraries/dagster-cron/dagster_cron_tests/snapshots/snap_test_cron_scheduler.py +++ b/python_modules/libraries/dagster-cron/dagster_cron_tests/snapshots/snap_test_cron_scheduler.py @@ -6,38 +6,38 @@ snapshots = Snapshot() -snapshots['test_start_schedule_manual_delete_debug 1'] = '''Scheduler Configuration -======================= -Scheduler: +snapshots['test_start_schedule_manual_delete_debug 1'] = ( + [ + 'Schedule no_config_pipeline_every_min_schedule is set to be running, but the scheduler is not running the schedule. Run `dagster schedule up` to resolve' + ], + '''Scheduler: SystemCronScheduler +''', + '''Running Cron Jobs: -Scheduler Info -============== -Running Cron Jobs: - - -Scheduler Storage Info -====================== -default_config_pipeline_every_min_schedule: +''', + [ + '''default_config_pipeline_every_min_schedule: cron_schedule: '* * * * *' python_path: fake path repository_name: test_repository repository_path: '' status: STOPPED - -no_config_pipeline_daily_schedule: +''', + '''no_config_pipeline_daily_schedule: cron_schedule: 0 0 * * * python_path: fake path repository_name: test_repository repository_path: '' status: STOPPED - -no_config_pipeline_every_min_schedule: +''', + '''no_config_pipeline_every_min_schedule: cron_schedule: '* * * * *' python_path: fake path repository_name: test_repository repository_path: '' status: RUNNING - ''' + ] +) diff --git a/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py b/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py index 3a4cc9d471d7..1a853c6684ff 100644 --- a/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py +++ b/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py @@ -251,7 +251,84 @@ def test_start_schedule_manual_delete_debug( ) # Check debug command - snapshot.assert_match(instance.scheduler_debug_info()) + debug_info = instance.scheduler_debug_info() + assert len(debug_info.errors) == 1 + + # Reconcile should fix error + reconcile_scheduler_state( + python_path="fake path", repository_path="", repository=repository, instance=instance, + ) + debug_info = instance.scheduler_debug_info() + assert len(debug_info.errors) == 0 + + +def test_start_schedule_manual_add_debug( + restore_cron_tab, snapshot # pylint:disable=unused-argument,redefined-outer-name +): + with TemporaryDirectory() as tempdir: + repository = RepositoryDefinition(name="test_repository", schedule_defs=define_schedules()) + instance = define_scheduler_instance(tempdir) + + # Initialize scheduler + reconcile_scheduler_state( + python_path="fake path", repository_path="", repository=repository, instance=instance, + ) + + # Manually add the schedule from to the crontab + instance.scheduler._start_cron_job( # pylint: disable=protected-access + instance, + repository, + instance.get_schedule_by_name(repository, "no_config_pipeline_every_min_schedule"), + ) + + # Check debug command + debug_info = instance.scheduler_debug_info() + assert len(debug_info.errors) == 1 + + # Reconcile should fix error + reconcile_scheduler_state( + python_path="fake path", repository_path="", repository=repository, instance=instance, + ) + debug_info = instance.scheduler_debug_info() + assert len(debug_info.errors) == 0 + + +def test_start_schedule_manual_duplicate_schedules_add_debug( + restore_cron_tab, snapshot # pylint:disable=unused-argument,redefined-outer-name +): + with TemporaryDirectory() as tempdir: + repository = RepositoryDefinition(name="test_repository", schedule_defs=define_schedules()) + instance = define_scheduler_instance(tempdir) + + # Initialize scheduler + reconcile_scheduler_state( + python_path="fake path", repository_path="", repository=repository, instance=instance, + ) + + instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + + # Manually add extra cron tabs + instance.scheduler._start_cron_job( # pylint: disable=protected-access + instance, + repository, + instance.get_schedule_by_name(repository, "no_config_pipeline_every_min_schedule"), + ) + instance.scheduler._start_cron_job( # pylint: disable=protected-access + instance, + repository, + instance.get_schedule_by_name(repository, "no_config_pipeline_every_min_schedule"), + ) + + # Check debug command + debug_info = instance.scheduler_debug_info() + assert len(debug_info.errors) == 1 + + # Reconcile should fix error + reconcile_scheduler_state( + python_path="fake path", repository_path="", repository=repository, instance=instance, + ) + debug_info = instance.scheduler_debug_info() + assert len(debug_info.errors) == 0 def test_stop_schedule_fails(