Skip to content

Commit

Permalink
add support for filtering ticks by status (#6919)
Browse files Browse the repository at this point in the history
* add support for filtering ticks by status

* add tick filter test to schedule storage suite

* add warnings for invalid cursor

* isort
  • Loading branch information
prha committed Mar 7, 2022
1 parent 3f5aeb7 commit 9dc310c
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 13 deletions.
9 changes: 8 additions & 1 deletion js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import warnings

import graphene
import pendulum
Expand All @@ -13,6 +14,7 @@
InstigatorType,
ScheduleInstigatorData,
SensorInstigatorData,
TickStatus,
)
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import TagType, get_tag_type
Expand Down Expand Up @@ -57,6 +59,7 @@ class Meta:
class GrapheneSensorData(graphene.ObjectType):
lastTickTimestamp = graphene.Float()
lastRunKey = graphene.String()
lastCursor = graphene.String()

class Meta:
name = "SensorData"
Expand All @@ -66,6 +69,7 @@ def __init__(self, instigator_data):
super().__init__(
lastTickTimestamp=instigator_data.last_tick_timestamp,
lastRunKey=instigator_data.last_run_key,
lastCursor=instigator_data.cursor,
)


Expand Down Expand Up @@ -270,6 +274,8 @@ class GrapheneInstigationState(graphene.ObjectType):
dayRange=graphene.Int(),
dayOffset=graphene.Int(),
limit=graphene.Int(),
cursor=graphene.String(),
statuses=graphene.List(graphene.NonNull(GrapheneInstigationTickStatus)),
)
nextTick = graphene.Field(GrapheneFutureInstigationTick)
runningCount = graphene.NonNull(graphene.Int) # remove with cron scheduler
Expand Down Expand Up @@ -360,17 +366,35 @@ def resolve_tick(self, graphene_info, timestamp):
)
return GrapheneInstigationTick(graphene_info, matches[0]) if matches else None

def resolve_ticks(self, graphene_info, dayRange=None, dayOffset=None, limit=None):
before = pendulum.now("UTC").subtract(days=dayOffset).timestamp() if dayOffset else None
def resolve_ticks(
self, graphene_info, dayRange=None, dayOffset=None, limit=None, cursor=None, statuses=None
):
before = None
if dayOffset:
before = pendulum.now("UTC").subtract(days=dayOffset).timestamp()
elif cursor:
parts = cursor.split(":")
if parts:
try:
before = float(parts[-1])
except (ValueError, IndexError):
warnings.warn(f"Invalid cursor for {self.name} ticks: {cursor}")

after = (
pendulum.now("UTC").subtract(days=dayRange + (dayOffset or 0)).timestamp()
if dayRange
else None
)
if statuses:
statuses = [TickStatus(status) for status in statuses]
return [
GrapheneInstigationTick(graphene_info, tick)
for tick in graphene_info.context.instance.get_ticks(
self._instigator_state.instigator_origin_id, before=before, after=after, limit=limit
self._instigator_state.instigator_origin_id,
before=before,
after=after,
limit=limit,
statuses=statuses,
)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
)

from dagster.core.definitions.run_request import InstigatorType
from dagster.core.scheduler.instigation import InstigatorState, InstigatorStatus
from dagster.core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
TickData,
TickStatus,
)
from dagster.core.test_utils import create_test_daemon_workspace
from dagster.daemon import get_default_daemon_logger
from dagster.daemon.sensor import execute_sensor_iteration
from dagster.utils import Counter, traced_counter
from dagster.utils.error import SerializableErrorInfo

from .graphql_context_test_suite import (
ExecutingGraphQLContextTestMatrix,
Expand Down Expand Up @@ -198,6 +204,29 @@
}
"""

GET_TICKS_QUERY = """
query TicksQuery($sensorSelector: SensorSelector!, $statuses: [InstigationTickStatus!]) {
sensorOrError(sensorSelector: $sensorSelector) {
__typename
... on PythonError {
message
stack
}
... on Sensor {
id
sensorState {
id
ticks(statuses: $statuses) {
id
status
timestamp
}
}
}
}
}
"""


class TestSensors(NonLaunchableGraphQLContextTestMatrix):
def test_get_sensors(self, graphql_context, snapshot):
Expand Down Expand Up @@ -461,3 +490,89 @@ def test_repository_batching(graphql_context):
# 2) `all_instigator_state` is fetched to instantiate GrapheneSensor
assert counts.get("DagsterInstance.get_run_records") == 1
assert counts.get("DagsterInstance.all_instigator_state") == 1


def test_sensor_ticks_filtered(graphql_context):
external_repository = graphql_context.get_repository_location(
main_repo_location_name()
).get_repository(main_repo_name())

sensor_name = "always_no_config_sensor"
external_sensor = external_repository.get_external_sensor(sensor_name)
sensor_selector = infer_sensor_selector(graphql_context, sensor_name)

# turn the sensor on
graphql_context.instance.add_instigator_state(
InstigatorState(
external_sensor.get_external_origin(), InstigatorType.SENSOR, InstigatorStatus.RUNNING
)
)

now = pendulum.now("US/Central")
with pendulum.test(now):
_create_tick(graphql_context) # create a success tick

# create a started tick
graphql_context.instance.create_tick(
TickData(
instigator_origin_id=external_sensor.get_external_origin().get_id(),
instigator_name=sensor_name,
instigator_type=InstigatorType.SENSOR,
status=TickStatus.STARTED,
timestamp=now.timestamp(),
)
)

# create a skipped tick
graphql_context.instance.create_tick(
TickData(
instigator_origin_id=external_sensor.get_external_origin().get_id(),
instigator_name=sensor_name,
instigator_type=InstigatorType.SENSOR,
status=TickStatus.SKIPPED,
timestamp=now.timestamp(),
)
)

# create a failed tick
graphql_context.instance.create_tick(
TickData(
instigator_origin_id=external_sensor.get_external_origin().get_id(),
instigator_name=sensor_name,
instigator_type=InstigatorType.SENSOR,
status=TickStatus.FAILURE,
timestamp=now.timestamp(),
error=SerializableErrorInfo(message="foobar", stack=[], cls_name=None, cause=None),
)
)

result = execute_dagster_graphql(
graphql_context,
GET_TICKS_QUERY,
variables={"sensorSelector": sensor_selector},
)
assert len(result.data["sensorOrError"]["sensorState"]["ticks"]) == 4

result = execute_dagster_graphql(
graphql_context,
GET_TICKS_QUERY,
variables={"sensorSelector": sensor_selector, "statuses": ["STARTED"]},
)
assert len(result.data["sensorOrError"]["sensorState"]["ticks"]) == 1
assert result.data["sensorOrError"]["sensorState"]["ticks"][0]["status"] == "STARTED"

result = execute_dagster_graphql(
graphql_context,
GET_TICKS_QUERY,
variables={"sensorSelector": sensor_selector, "statuses": ["FAILURE"]},
)
assert len(result.data["sensorOrError"]["sensorState"]["ticks"]) == 1
assert result.data["sensorOrError"]["sensorState"]["ticks"][0]["status"] == "FAILURE"

result = execute_dagster_graphql(
graphql_context,
GET_TICKS_QUERY,
variables={"sensorSelector": sensor_selector, "statuses": ["SKIPPED"]},
)
assert len(result.data["sensorOrError"]["sensorState"]["ticks"]) == 1
assert result.data["sensorOrError"]["sensorState"]["ticks"][0]["status"] == "SKIPPED"
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1757,8 +1757,10 @@ def get_tick(self, origin_id, timestamp):
return matches[0] if len(matches) else None

@traced
def get_ticks(self, origin_id, before=None, after=None, limit=None):
return self._schedule_storage.get_ticks(origin_id, before=before, after=after, limit=limit)
def get_ticks(self, origin_id, before=None, after=None, limit=None, statuses=None):
return self._schedule_storage.get_ticks(
origin_id, before=before, after=after, limit=limit, statuses=statuses
)

def create_tick(self, tick_data):
return self._schedule_storage.create_tick(tick_data)
Expand Down
9 changes: 7 additions & 2 deletions python_modules/dagster/dagster/core/storage/schedules/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Iterable
from typing import Iterable, List, Optional

from dagster.core.definitions.run_request import InstigatorType
from dagster.core.instance import MayHaveInstanceWeakref
Expand Down Expand Up @@ -58,7 +58,12 @@ def delete_instigator_state(self, origin_id: str):

@abc.abstractmethod
def get_ticks(
self, origin_id: str, before: float = None, after: float = None, limit: int = None
self,
origin_id: str,
before: float = None,
after: float = None,
limit: int = None,
statuses: Optional[List[TickStatus]] = None,
) -> Iterable[InstigatorTick]:
"""Get the ticks for a given instigator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,28 @@ def delete_instigator_state(self, origin_id):
)
)

def _add_filter_limit(self, query, before=None, after=None, limit=None):
def _add_filter_limit(self, query, before=None, after=None, limit=None, statuses=None):
check.opt_float_param(before, "before")
check.opt_float_param(after, "after")
check.opt_int_param(limit, "limit")
check.opt_list_param(statuses, "statuses", of_type=TickStatus)

if before:
query = query.where(JobTickTable.c.timestamp < utc_datetime_from_timestamp(before))
if after:
query = query.where(JobTickTable.c.timestamp > utc_datetime_from_timestamp(after))
if limit:
query = query.limit(limit)
if statuses:
query = query.where(JobTickTable.c.status.in_([status.value for status in statuses]))
return query

def get_ticks(self, origin_id, before=None, after=None, limit=None):
def get_ticks(self, origin_id, before=None, after=None, limit=None, statuses=None):
check.str_param(origin_id, "origin_id")
check.opt_float_param(before, "before")
check.opt_float_param(after, "after")
check.opt_int_param(limit, "limit")
check.opt_list_param(statuses, "statuses", of_type=TickStatus)

query = (
db.select([JobTickTable.c.id, JobTickTable.c.tick_body])
Expand All @@ -146,7 +150,9 @@ def get_ticks(self, origin_id, before=None, after=None, limit=None):
.order_by(JobTickTable.c.timestamp.desc())
)

query = self._add_filter_limit(query, before=before, after=after, limit=limit)
query = self._add_filter_limit(
query, before=before, after=after, limit=limit, statuses=statuses
)

rows = self.execute(query)
return list(
Expand Down
35 changes: 34 additions & 1 deletion python_modules/dagster/dagster/utils/test/schedule_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def build_sensor_tick(self, current_time, status=TickStatus.STARTED, run_id=None
status,
current_time,
[run_id] if run_id else [],
error,
error=error,
)

def test_create_sensor_tick(self, storage):
Expand Down Expand Up @@ -587,3 +587,36 @@ def test_purge_ticks(self, storage):

ticks = storage.get_ticks("my_sensor")
assert len(ticks) == 2

def test_ticks_filtered(self, storage):
storage.create_tick(self.build_sensor_tick(time.time(), status=TickStatus.STARTED))
storage.create_tick(self.build_sensor_tick(time.time(), status=TickStatus.SUCCESS))
storage.create_tick(self.build_sensor_tick(time.time(), status=TickStatus.SKIPPED))
storage.create_tick(
self.build_sensor_tick(
time.time(),
status=TickStatus.FAILURE,
error=SerializableErrorInfo(message="foobar", stack=[], cls_name=None, cause=None),
)
)

ticks = storage.get_ticks("my_sensor")
assert len(ticks) == 4

started = storage.get_ticks("my_sensor", statuses=[TickStatus.STARTED])
assert len(started) == 1

successes = storage.get_ticks("my_sensor", statuses=[TickStatus.SUCCESS])
assert len(successes) == 1

skips = storage.get_ticks("my_sensor", statuses=[TickStatus.SKIPPED])
assert len(skips) == 1

failures = storage.get_ticks("my_sensor", statuses=[TickStatus.FAILURE])
assert len(failures) == 1

# everything but skips
non_skips = storage.get_ticks(
"my_sensor", statuses=[TickStatus.STARTED, TickStatus.SUCCESS, TickStatus.FAILURE]
)
assert len(non_skips) == 3

0 comments on commit 9dc310c

Please sign in to comment.