Skip to content

Commit

Permalink
Remove duplicate count_resume_run_attempts method (#7915)
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed May 23, 2022
1 parent f625b6d commit abac74a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
6 changes: 2 additions & 4 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1821,11 +1821,9 @@ def resume_run(self, run_id: str, workspace: "IWorkspace", attempt_number: int):
return run

def count_resume_run_attempts(self, run_id: str):
from dagster.core.events import DagsterEventType
from dagster.daemon.monitoring import RESUME_RUN_LOG_MESSAGE
from dagster.daemon.monitoring import count_resume_run_attempts

events = self.all_logs(run_id, of_type=DagsterEventType.ENGINE_EVENT)
return len([event for event in events if event.message == RESUME_RUN_LOG_MESSAGE])
return count_resume_run_attempts(self, run_id)

def run_will_resume(self, run_id: str):
if not self.run_monitoring_enabled:
Expand Down
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/daemon/monitoring/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
from .monitoring_daemon import RESUME_RUN_LOG_MESSAGE, execute_monitoring_iteration
from .monitoring_daemon import (
RESUME_RUN_LOG_MESSAGE,
count_resume_run_attempts,
execute_monitoring_iteration,
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ def monitor_starting_run(instance: DagsterInstance, run, logger):
# TODO: consider attempting to resume the run, if the run worker is in a bad status


def count_resume_run_attempts(instance: DagsterInstance, run):
events = instance.all_logs(run.run_id, of_type=DagsterEventType.ENGINE_EVENT)
def count_resume_run_attempts(instance: DagsterInstance, run_id: str):
events = instance.all_logs(run_id, of_type=DagsterEventType.ENGINE_EVENT)
return len([event for event in events if event.message == RESUME_RUN_LOG_MESSAGE])


def monitor_started_run(instance: DagsterInstance, workspace, run, logger):
check.invariant(run.status == PipelineRunStatus.STARTED)
check_health_result = instance.run_launcher.check_run_worker_health(run)
if check_health_result.status != WorkerStatus.RUNNING:
num_prev_attempts = count_resume_run_attempts(instance, run)
num_prev_attempts = count_resume_run_attempts(instance, run.run_id)
if num_prev_attempts < instance.run_monitoring_max_resume_run_attempts:
msg = (
f"Detected run worker status {check_health_result}. Resuming run {run.run_id} with "
Expand Down

0 comments on commit abac74a

Please sign in to comment.