Skip to content

Commit

Permalink
[instigator] dont throw in start/stop if already in desired state (#7483
Browse files Browse the repository at this point in the history
)

I've observed these errors in cloud and I don't have a good argument for why its better to error so just exit gracefully. 

## Test Plan

upadted existing tests
  • Loading branch information
alangenfeld committed Apr 22, 2022
1 parent 460e482 commit 610745b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,8 @@ def test_start_schedule_with_default_status(graphql_context):
)

assert (
"You have attempted to start schedule running_in_code_schedule, but it is already running"
in start_result.data["startSchedule"]["message"]
start_result.data["startSchedule"]["scheduleState"]["status"]
== InstigatorStatus.RUNNING.value
)

# Stop a single schedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,7 @@ def test_start_sensor_with_default_status(self, graphql_context):
variables={"sensorSelector": sensor_selector},
)

assert (
"You have attempted to start sensor running_in_code_sensor, but it is already running"
in start_result.data["startSensor"]["message"]
)
assert start_result.data["startSensor"]["sensorState"]["status"] == "RUNNING"

stop_result = execute_dagster_graphql(
graphql_context,
Expand Down
52 changes: 34 additions & 18 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,14 @@
from dagster.core.execution.stats import RunStepKeyStatsSnapshot
from dagster.core.host_representation import (
ExternalPipeline,
ExternalSensor,
HistoricalPipeline,
RepositoryLocation,
)
from dagster.core.launcher import RunLauncher
from dagster.core.run_coordinator import RunCoordinator
from dagster.core.scheduler import Scheduler
from dagster.core.scheduler.instigation import InstigatorTick, TickStatus
from dagster.core.scheduler.instigation import InstigatorState, InstigatorTick, TickStatus
from dagster.core.snap import ExecutionPlanSnapshot, PipelineSnapshot
from dagster.core.storage.compute_log_manager import ComputeLogManager
from dagster.core.storage.event_log import EventLogStorage
Expand Down Expand Up @@ -1806,26 +1807,23 @@ def scheduler_debug_info(self):

# Schedule / Sensor Storage

def start_sensor(self, external_sensor):
def start_sensor(self, external_sensor: "ExternalSensor"):
from dagster.core.definitions.run_request import InstigatorType
from dagster.core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
SensorInstigatorData,
)

state = self.get_instigator_state(
stored_state = self.get_instigator_state(
external_sensor.get_external_origin_id(), external_sensor.selector_id
)

if external_sensor.get_current_instigator_state(state).is_running:
raise Exception(
"You have attempted to start sensor {name}, but it is already running".format(
name=external_sensor.name
)
)
computed_state = external_sensor.get_current_instigator_state(stored_state)
if computed_state.is_running:
return computed_state

if not state:
if not stored_state:
return self.add_instigator_state(
InstigatorState(
external_sensor.get_external_origin(),
Expand All @@ -1835,19 +1833,31 @@ def start_sensor(self, external_sensor):
)
)
else:
return self.update_instigator_state(state.with_status(InstigatorStatus.RUNNING))
return self.update_instigator_state(stored_state.with_status(InstigatorStatus.RUNNING))

def stop_sensor(self, instigator_origin_id, selector_id, external_sensor):
def stop_sensor(
self,
instigator_origin_id: str,
selector_id: str,
external_sensor: Optional["ExternalSensor"],
):
from dagster.core.definitions.run_request import InstigatorType
from dagster.core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
SensorInstigatorData,
)

state = self.get_instigator_state(instigator_origin_id, selector_id)
stored_state = self.get_instigator_state(instigator_origin_id, selector_id)
if external_sensor:
computed_state = external_sensor.get_current_instigator_state(stored_state)
else:
computed_state = stored_state

if not state:
if not computed_state.is_running:
return computed_state

if not stored_state:
assert external_sensor
return self.add_instigator_state(
InstigatorState(
Expand All @@ -1858,7 +1868,7 @@ def stop_sensor(self, instigator_origin_id, selector_id, external_sensor):
)
)
else:
return self.update_instigator_state(state.with_status(InstigatorStatus.STOPPED))
return self.update_instigator_state(stored_state.with_status(InstigatorStatus.STOPPED))

@traced
def all_instigator_state(
Expand All @@ -1869,13 +1879,19 @@ def all_instigator_state(
)

@traced
def get_instigator_state(self, origin_id, selector_id):
def get_instigator_state(self, origin_id: str, selector_id: str) -> Optional["InstigatorState"]:
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.get_instigator_state(origin_id, selector_id)

def add_instigator_state(self, state):
def add_instigator_state(self, state: "InstigatorState") -> "InstigatorState":
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.add_instigator_state(state)

def update_instigator_state(self, state):
def update_instigator_state(self, state: "InstigatorState") -> "InstigatorState":
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.update_instigator_state(state)

def delete_instigator_state(self, origin_id, selector_id):
Expand Down
72 changes: 38 additions & 34 deletions python_modules/dagster/dagster/core/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import os
from typing import List, NamedTuple
from typing import List, NamedTuple, Optional

from dagster import check
from dagster.config import Field
Expand Down Expand Up @@ -59,7 +59,9 @@ class Scheduler(abc.ABC):
an external system such as cron to ensure scheduled repeated execution according.
"""

def start_schedule(self, instance, external_schedule):
def start_schedule(
self, instance: DagsterInstance, external_schedule: ExternalSchedule
) -> InstigatorState:
"""
Updates the status of the given schedule to `InstigatorStatus.RUNNING` in schedule storage,
Expand All @@ -74,37 +76,40 @@ def start_schedule(self, instance, external_schedule):
check.inst_param(instance, "instance", DagsterInstance)
check.inst_param(external_schedule, "external_schedule", ExternalSchedule)

schedule_state = instance.get_instigator_state(
stored_state = instance.get_instigator_state(
external_schedule.get_external_origin_id(), external_schedule.selector_id
)
if external_schedule.get_current_instigator_state(schedule_state).is_running:
raise DagsterSchedulerError(
"You have attempted to start schedule {name}, but it is already running".format(
name=external_schedule.name
)
)
computed_state = external_schedule.get_current_instigator_state(stored_state)
if computed_state.is_running:
return computed_state

new_instigator_data = ScheduleInstigatorData(
external_schedule.cron_schedule,
get_current_datetime_in_utc().timestamp(),
)

if not schedule_state:
started_schedule = InstigatorState(
if not stored_state:
started_state = InstigatorState(
external_schedule.get_external_origin(),
InstigatorType.SCHEDULE,
InstigatorStatus.RUNNING,
new_instigator_data,
)
instance.add_instigator_state(started_schedule)
instance.add_instigator_state(started_state)
else:
started_schedule = schedule_state.with_status(InstigatorStatus.RUNNING).with_data(
started_state = stored_state.with_status(InstigatorStatus.RUNNING).with_data(
new_instigator_data
)
instance.update_instigator_state(started_schedule)
return started_schedule

def stop_schedule(self, instance, schedule_origin_id, schedule_selector_id, external_schedule):
instance.update_instigator_state(started_state)
return started_state

def stop_schedule(
self,
instance: DagsterInstance,
schedule_origin_id: str,
schedule_selector_id: str,
external_schedule: Optional[ExternalSchedule],
) -> InstigatorState:
"""
Updates the status of the given schedule to `InstigatorStatus.STOPPED` in schedule storage,
Expand All @@ -117,37 +122,36 @@ def stop_schedule(self, instance, schedule_origin_id, schedule_selector_id, exte
check.str_param(schedule_origin_id, "schedule_origin_id")
check.opt_inst_param(external_schedule, "external_schedule", ExternalSchedule)

schedule_state = instance.get_instigator_state(schedule_origin_id, schedule_selector_id)
if (
external_schedule
and not external_schedule.get_current_instigator_state(schedule_state).is_running
) or (schedule_state and not schedule_state.is_running):
raise DagsterSchedulerError(
"You have attempted to stop schedule {name}, but it is already stopped".format(
name=external_schedule.name
)
)
stored_state = instance.get_instigator_state(schedule_origin_id, schedule_selector_id)

if not external_schedule:
computed_state = stored_state
else:
computed_state = external_schedule.get_current_instigator_state(stored_state)

if computed_state and not computed_state.is_running:
return computed_state

if not schedule_state:
if not stored_state:
assert external_schedule
stopped_schedule = InstigatorState(
stopped_state = InstigatorState(
external_schedule.get_external_origin(),
InstigatorType.SCHEDULE,
InstigatorStatus.STOPPED,
ScheduleInstigatorData(
external_schedule.cron_schedule,
),
)
instance.add_instigator_state(stopped_schedule)
instance.add_instigator_state(stopped_state)
else:
stopped_schedule = schedule_state.with_status(InstigatorStatus.STOPPED).with_data(
stopped_state = stored_state.with_status(InstigatorStatus.STOPPED).with_data(
ScheduleInstigatorData(
cron_schedule=schedule_state.instigator_data.cron_schedule,
cron_schedule=computed_state.instigator_data.cron_schedule,
)
)
instance.update_instigator_state(stopped_schedule)
instance.update_instigator_state(stopped_state)

return stopped_schedule
return stopped_state

@abc.abstractmethod
def debug_info(self):
Expand Down

0 comments on commit 610745b

Please sign in to comment.