Skip to content

Commit

Permalink
Read instigator state / ticks off of selector_id instead of origin id…
Browse files Browse the repository at this point in the history
… 5/5 (#7268)

* add tick selector index migration

* change reads to hit instigators table

* all instigator state callsites to pass in reponame

* change base

* all get_instigator_state callsites

* purge ticks call site

* get_ticks call sites

* read

* fix

* remove import

* fix read

* update selector id

* fix bad rebase

* rebase, fix tests to reference repository_selector_id
  • Loading branch information
prha committed Apr 1, 2022
1 parent 8b8ce0d commit 2e6eabd
Show file tree
Hide file tree
Showing 28 changed files with 689 additions and 456 deletions.
7 changes: 0 additions & 7 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ def get_instigator_state_or_error(graphene_info, selector):
if repository.has_external_sensor(selector.name):
external_sensor = repository.get_external_sensor(selector.name)
stored_state = graphene_info.context.instance.get_instigator_state(
external_sensor.get_external_origin_id()
external_sensor.get_external_origin_id(),
external_sensor.selector_id,
)
current_state = external_sensor.get_current_instigator_state(stored_state)
elif repository.has_external_schedule(selector.name):
external_schedule = repository.get_external_schedule(selector.name)
stored_state = graphene_info.context.instance.get_instigator_state(
external_schedule.get_external_origin_id()
external_schedule.get_external_origin_id(),
external_schedule.selector_id,
)
current_state = external_schedule.get_current_instigator_state(stored_state)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def get_schedules_or_error(graphene_info, repository_selector):
state.name: state
for state in graphene_info.context.instance.all_instigator_state(
repository_origin_id=repository.get_external_origin_id(),
repository_selector_id=repository_selector.selector_id,
instigator_type=InstigatorType.SCHEDULE,
)
}
Expand Down Expand Up @@ -103,7 +104,8 @@ def get_schedules_for_pipeline(graphene_info, pipeline_selector):
continue

schedule_state = graphene_info.context.instance.get_instigator_state(
external_schedule.get_external_origin_id()
external_schedule.get_external_origin_id(),
external_schedule.selector_id,
)
results.append(GrapheneSchedule(external_schedule, schedule_state))

Expand All @@ -127,7 +129,7 @@ def get_schedule_or_error(graphene_info, schedule_selector):
)

schedule_state = graphene_info.context.instance.get_instigator_state(
external_schedule.get_external_origin_id()
external_schedule.get_external_origin_id(), external_schedule.selector_id
)
return GrapheneSchedule(external_schedule, schedule_state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def get_sensors_or_error(graphene_info, repository_selector):
state.name: state
for state in graphene_info.context.instance.all_instigator_state(
repository_origin_id=repository.get_external_origin_id(),
repository_selector_id=repository_selector.selector_id,
instigator_type=InstigatorType.SENSOR,
)
}
Expand Down Expand Up @@ -51,7 +52,8 @@ def get_sensor_or_error(graphene_info, selector):
raise UserFacingGraphQLError(GrapheneSensorNotFoundError(selector.sensor_name))
external_sensor = repository.get_external_sensor(selector.sensor_name)
sensor_state = graphene_info.context.instance.get_instigator_state(
external_sensor.get_external_origin_id()
external_sensor.get_external_origin_id(),
external_sensor.selector_id,
)

return GrapheneSensor(external_sensor, sensor_state)
Expand All @@ -72,7 +74,8 @@ def start_sensor(graphene_info, sensor_selector):
external_sensor = repository.get_external_sensor(sensor_selector.sensor_name)
graphene_info.context.instance.start_sensor(external_sensor)
sensor_state = graphene_info.context.instance.get_instigator_state(
external_sensor.get_external_origin_id()
external_sensor.get_external_origin_id(),
external_sensor.selector_id,
)
return GrapheneSensor(external_sensor, sensor_state)

Expand All @@ -92,7 +95,10 @@ def stop_sensor(graphene_info, instigator_origin_id):
for sensor in repository.get_external_sensors()
}
instance.stop_sensor(instigator_origin_id, external_sensors.get(instigator_origin_id))
state = graphene_info.context.instance.get_instigator_state(instigator_origin_id)
state = graphene_info.context.instance.get_instigator_state(
instigator_origin_id,
external_sensors.get(instigator_origin_id).selector_id,
)
return GrapheneStopSensorMutationResult(state)


Expand Down Expand Up @@ -146,7 +152,8 @@ def get_sensors_for_pipeline(graphene_info, pipeline_selector):
continue

sensor_state = graphene_info.context.instance.get_instigator_state(
external_sensor.get_external_origin_id()
external_sensor.get_external_origin_id(),
external_sensor.selector_id,
)
results.append(GrapheneSensor(external_sensor, sensor_state))

Expand Down Expand Up @@ -181,7 +188,9 @@ def get_sensor_next_tick(graphene_info, sensor_state):
if not sensor_state.is_running:
return None

ticks = graphene_info.context.instance.get_ticks(sensor_state.instigator_origin_id, limit=1)
ticks = graphene_info.context.instance.get_ticks(
sensor_state.instigator_origin_id, sensor_state.selector_id, limit=1
)
if not ticks:
return None
latest_tick = ticks[0]
Expand All @@ -208,7 +217,10 @@ def set_sensor_cursor(graphene_info, selector, cursor):
raise UserFacingGraphQLError(GrapheneSensorNotFoundError(selector.sensor_name))
instance = graphene_info.context.instance
external_sensor = repository.get_external_sensor(selector.sensor_name)
stored_state = instance.get_instigator_state(external_sensor.get_external_origin_id())
stored_state = instance.get_instigator_state(
external_sensor.get_external_origin_id(),
external_sensor.selector_id,
)
sensor_state = external_sensor.get_current_instigator_state(stored_state)
updated_state = sensor_state.with_data(
SensorInstigatorData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def _fetch(self, data_type, limit):
elif data_type == RepositoryDataType.SCHEDULE_STATES:
schedule_states = self._instance.all_instigator_state(
repository_origin_id=self._repository.get_external_origin_id(),
repository_selector_id=self._repository.selector_id,
instigator_type=InstigatorType.SCHEDULE,
)
for state in schedule_states:
Expand All @@ -137,32 +138,45 @@ def _fetch(self, data_type, limit):
elif data_type == RepositoryDataType.SENSOR_STATES:
sensor_states = self._instance.all_instigator_state(
repository_origin_id=self._repository.get_external_origin_id(),
repository_selector_id=self._repository.selector_id,
instigator_type=InstigatorType.SENSOR,
)
for state in sensor_states:
fetched[state.name].append(state)

elif data_type == RepositoryDataType.SCHEDULE_TICKS:
origin_ids = [
schedule.get_external_origin_id()
for schedule in self._repository.get_external_schedules()
]
if self._instance.supports_batch_tick_queries:
fetched = self._instance.get_batch_ticks(origin_ids, limit=limit)
selector_ids = [
schedule.selector_id for schedule in self._repository.get_external_schedules()
]
ticks_by_selector = self._instance.get_batch_ticks(selector_ids, limit=limit)
for schedule in self._repository.get_external_schedules():
fetched[schedule.get_external_origin_id()] = ticks_by_selector.get(
schedule.selector_id, []
)
else:
for origin_id in origin_ids:
fetched[origin_id] = self._instance.get_ticks(origin_id, limit=limit)
for schedule in self._repository.get_external_schedules():
origin_id = schedule.get_external_origin_id()
fetched[origin_id] = self._instance.get_ticks(
origin_id, schedule.selector_id, limit=limit
)

elif data_type == RepositoryDataType.SENSOR_TICKS:
origin_ids = [
sensor.get_external_origin_id()
for sensor in self._repository.get_external_sensors()
]
if self._instance.supports_batch_tick_queries:
fetched = self._instance.get_batch_ticks(origin_ids, limit=limit)
selector_ids = [
schedule.selector_id for schedule in self._repository.get_external_sensors()
]
ticks_by_selector = self._instance.get_batch_ticks(selector_ids, limit=limit)
for sensor in self._repository.get_external_sensors():
fetched[sensor.get_external_origin_id()] = ticks_by_selector.get(
sensor.selector_id, []
)
else:
for origin_id in origin_ids:
fetched[origin_id] = self._instance.get_ticks(origin_id, limit=limit)
for sensor in self._repository.get_external_sensors():
origin_id = sensor.get_external_origin_id()
fetched[origin_id] = self._instance.get_ticks(
origin_id, sensor.selector_id, limit=limit
)

else:
check.failed(f"Unknown data type for {self.__class__.__name__}: {data_type}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def resolve_runsCount(self, graphene_info):
def resolve_tick(self, graphene_info, timestamp):
matches = graphene_info.context.instance.get_ticks(
self._instigator_state.instigator_origin_id,
self._instigator_state.selector_id,
before=timestamp + 1,
after=timestamp - 1,
limit=1,
Expand Down Expand Up @@ -406,6 +407,7 @@ def resolve_ticks(
GrapheneInstigationTick(graphene_info, tick)
for tick in graphene_info.context.instance.get_ticks(
self._instigator_state.instigator_origin_id,
self._instigator_state.selector_id,
before=before,
after=after,
limit=limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from dagster import check
from dagster.core.host_representation import ExternalSchedule, ScheduleSelector
from dagster.core.host_representation.selector import RepositorySelector
from dagster.core.scheduler.instigation import TickStatsSnapshot
from dagster.core.workspace.permissions import Permissions

from ...implementation.fetch_schedules import start_schedule, stop_schedule
Expand Down Expand Up @@ -48,25 +47,6 @@ class Meta:
name = "SchedulerOrError"


class GrapheneScheduleTickStatsSnapshot(graphene.ObjectType):
ticks_started = graphene.NonNull(graphene.Int)
ticks_succeeded = graphene.NonNull(graphene.Int)
ticks_skipped = graphene.NonNull(graphene.Int)
ticks_failed = graphene.NonNull(graphene.Int)

class Meta:
name = "ScheduleTickStatsSnapshot"

def __init__(self, stats):
super().__init__(
ticks_started=stats.ticks_started,
ticks_succeeded=stats.ticks_succeeded,
ticks_skipped=stats.ticks_skipped,
ticks_failed=stats.ticks_failed,
)
self._stats = check.inst_param(stats, "stats", TickStatsSnapshot)


class GrapheneScheduleStateResult(graphene.ObjectType):
scheduleState = graphene.NonNull(GrapheneInstigationState)

Expand Down Expand Up @@ -133,7 +113,6 @@ def types():
GrapheneScheduleTick,
GrapheneScheduleTickFailureData,
GrapheneScheduleTickSpecificData,
GrapheneScheduleTickStatsSnapshot,
GrapheneScheduleTickSuccessData,
GrapheneStartScheduleMutation,
GrapheneStopRunningScheduleMutation,
Expand Down
15 changes: 11 additions & 4 deletions python_modules/dagster/dagster/cli/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def print_changes(external_repository, instance, print_fn=print, preview=False):
errors = debug_info.errors
external_schedules = external_repository.get_external_schedules()
schedule_states = instance.all_instigator_state(
external_repository.get_external_origin_id(), InstigatorType.SCHEDULE
external_repository.get_external_origin_id(),
external_repository.selector_id,
InstigatorType.SCHEDULE,
)
external_schedules_dict = {s.get_external_origin_id(): s for s in external_schedules}
schedule_states_dict = {s.instigator_origin_id: s for s in schedule_states}
Expand Down Expand Up @@ -187,7 +189,9 @@ def execute_list_command(running_filter, stopped_filter, name_filter, cli_args,
stored_schedules_by_origin_id = {
stored_schedule_state.instigator_origin_id: stored_schedule_state
for stored_schedule_state in instance.all_instigator_state(
external_repo.get_external_origin_id(), instigator_type=InstigatorType.SCHEDULE
external_repo.get_external_origin_id(),
external_repo.selector_id,
instigator_type=InstigatorType.SCHEDULE,
)
}

Expand Down Expand Up @@ -394,7 +398,9 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn)

if all_running_flag:
for schedule_state in instance.all_instigator_state(
external_repo.get_external_origin_id(), InstigatorType.SCHEDULE
external_repo.get_external_origin_id(),
external_repo.selector_id,
InstigatorType.SCHEDULE,
):
if schedule_state.status == InstigatorStatus.RUNNING:
try:
Expand All @@ -417,7 +423,8 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn)
else:
external_schedule = external_repo.get_external_schedule(schedule_name)
schedule_state = instance.get_instigator_state(
external_schedule.get_external_origin_id()
external_schedule.get_external_origin_id(),
external_schedule.selector_id,
)
if schedule_state != None and schedule_state.status != InstigatorStatus.RUNNING:
click.UsageError(
Expand Down
10 changes: 7 additions & 3 deletions python_modules/dagster/dagster/cli/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def sensor_cli():

def print_changes(external_repository, instance, print_fn=print, preview=False):
sensor_states = instance.all_instigator_state(
external_repository.get_origin_id(), InstigatorType.SENSOR
external_repository.get_origin_id(), external_repository.selector_id, InstigatorType.SENSOR
)
external_sensors = external_repository.get_external_sensors()
external_sensors_dict = {s.get_external_origin_id(): s for s in external_sensors}
Expand Down Expand Up @@ -147,7 +147,9 @@ def execute_list_command(running_filter, stopped_filter, name_filter, cli_args,
stored_sensors_by_origin_id = {
stored_sensor_state.instigator_origin_id: stored_sensor_state
for stored_sensor_state in instance.all_instigator_state(
external_repo.get_external_origin_id(), instigator_type=InstigatorType.SENSOR
external_repo.get_external_origin_id(),
external_repo.selector_id,
instigator_type=InstigatorType.SENSOR,
)
}

Expand Down Expand Up @@ -359,7 +361,9 @@ def execute_cursor_command(sensor_name, cli_args, print_fn):
)
check_repo_and_scheduler(external_repo, instance)
external_sensor = external_repo.get_external_sensor(sensor_name)
job_state = instance.get_instigator_state(external_sensor.get_external_origin_id())
job_state = instance.get_instigator_state(
external_sensor.get_external_origin_id(), external_sensor.selector_id
)
if not job_state:
instance.add_instigator_state(
InstigatorState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from .handle import InstigatorHandle, PartitionSetHandle, PipelineHandle, RepositoryHandle
from .pipeline_index import PipelineIndex
from .represented import RepresentedPipeline
from .selector import InstigatorSelector
from .selector import InstigatorSelector, RepositorySelector

if TYPE_CHECKING:
from dagster.core.scheduler.instigation import InstigatorState
Expand Down Expand Up @@ -177,6 +177,12 @@ def get_external_jobs(self) -> List["ExternalPipeline"]:
def handle(self):
return self._handle

@property
def selector_id(self):
return create_snapshot_id(
RepositorySelector(self._handle.location_name, self._handle.repository_name)
)

def get_external_origin(self):
return self.handle.get_external_origin()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
)
from dagster.serdes.serdes import WhitelistMap, unpack_inner_value

from .selector import RepositorySelector

if TYPE_CHECKING:
from dagster.core.host_representation.repository_location import (
GrpcServerRepositoryLocation,
Expand Down Expand Up @@ -354,6 +356,11 @@ def __new__(cls, repository_location_origin: RepositoryLocationOrigin, repositor
def get_id(self) -> str:
return create_snapshot_id(self)

def get_selector_id(self) -> str:
return create_snapshot_id(
RepositorySelector(self.repository_location_origin.location_name, self.repository_name)
)

def get_pipeline_origin(self, pipeline_name: str) -> "ExternalPipelineOrigin":
return ExternalPipelineOrigin(self, pipeline_name)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import List, NamedTuple, Optional

from dagster import check
from dagster.serdes import whitelist_for_serdes
from dagster.serdes import create_snapshot_id, whitelist_for_serdes


class PipelineSelector(
Expand Down Expand Up @@ -71,6 +71,10 @@ def to_graphql_input(self):
"repositoryName": self.repository_name,
}

@property
def selector_id(self):
return create_snapshot_id(self)

@staticmethod
def from_graphql_input(graphql_data):
return RepositorySelector(
Expand Down

0 comments on commit 2e6eabd

Please sign in to comment.