Skip to content

Commit

Permalink
add tick batching to schedule storage (#6995)
Browse files Browse the repository at this point in the history
* add tick batch

* gate by old sqlite version

* fix lint; mypy; fix asset tests

* isort
  • Loading branch information
prha committed Mar 14, 2022
1 parent 79827ef commit 52b1527
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,48 @@
}
}

snapshots['TestAssetAwareEventLog.test_asset_op[asset_aware_instance_in_process_env] 1'] = {
'assetOrError': {
'definition': {
'op': {
'description': None,
'inputDefinitions': [
{
'name': 'asset_one'
}
],
'name': 'asset_two',
'outputDefinitions': [
{
'name': 'result'
}
]
}
}
}
}

snapshots['TestAssetAwareEventLog.test_asset_op[sqlite_with_default_run_launcher_managed_grpc_env] 1'] = {
'assetOrError': {
'definition': {
'op': {
'description': None,
'inputDefinitions': [
{
'name': 'asset_one'
}
],
'name': 'asset_two',
'outputDefinitions': [
{
'name': 'result'
}
]
}
}
}
}

snapshots['TestAssetAwareEventLog.test_get_asset_key_lineage[asset_aware_instance_in_process_env] 1'] = {
'assetOrError': {
'assetMaterializations': [
Expand Down Expand Up @@ -566,3 +608,39 @@
]
}
}

snapshots['TestAssetAwareEventLog.test_op_assets[asset_aware_instance_in_process_env] 1'] = {
'repositoryOrError': {
'usedSolid': {
'definition': {
'assetNodes': [
{
'assetKey': {
'path': [
'asset_two'
]
}
}
]
}
}
}
}

snapshots['TestAssetAwareEventLog.test_op_assets[sqlite_with_default_run_launcher_managed_grpc_env] 1'] = {
'repositoryOrError': {
'usedSolid': {
'definition': {
'assetNodes': [
{
'assetKey': {
'path': [
'asset_two'
]
}
}
]
}
}
}
}
15 changes: 15 additions & 0 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from dagster.core.launcher import RunLauncher
from dagster.core.run_coordinator import RunCoordinator
from dagster.core.scheduler import Scheduler
from dagster.core.scheduler.instigation import InstigatorTick, TickStatus
from dagster.core.snap import ExecutionPlanSnapshot, PipelineSnapshot
from dagster.core.storage.compute_log_manager import ComputeLogManager
from dagster.core.storage.event_log import EventLogStorage
Expand Down Expand Up @@ -1754,6 +1755,20 @@ def update_instigator_state(self, state):
def delete_instigator_state(self, origin_id):
return self._schedule_storage.delete_instigator_state(origin_id)

@property
def supports_batch_tick_queries(self):
return self._schedule_storage and self._schedule_storage.supports_batch_queries

def get_batch_ticks(
self,
origin_ids: Sequence[str],
limit: Optional[int] = None,
statuses: Optional[Sequence["TickStatus"]] = None,
) -> Mapping[str, Iterable["InstigatorTick"]]:
if not self._schedule_storage:
return {}
return self._schedule_storage.get_batch_ticks(origin_ids, limit, statuses)

@traced
def get_tick(self, origin_id, timestamp):
matches = self._schedule_storage.get_ticks(
Expand Down
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/core/storage/schedules/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Iterable, List, Optional
from typing import Iterable, List, Mapping, Optional, Sequence

from dagster.core.definitions.run_request import InstigatorType
from dagster.core.instance import MayHaveInstanceWeakref
Expand Down Expand Up @@ -58,6 +58,18 @@ def delete_instigator_state(self, origin_id: str):
origin_id (str): The id of the instigator target to delete
"""

@property
def supports_batch_queries(self):
return False

def get_batch_ticks(
self,
origin_ids: Sequence[str],
limit: Optional[int] = None,
statuses: Optional[Sequence[TickStatus]] = None,
) -> Mapping[str, Iterable[InstigatorTick]]:
raise NotImplementedError()

@abc.abstractmethod
def get_ticks(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from abc import abstractmethod
from collections import defaultdict
from typing import Iterable, Mapping, Optional, Sequence, cast

import sqlalchemy as db

Expand Down Expand Up @@ -136,6 +138,61 @@ def _add_filter_limit(self, query, before=None, after=None, limit=None, statuses
query = query.where(JobTickTable.c.status.in_([status.value for status in statuses]))
return query

@property
def supports_batch_queries(self):
return True

def get_batch_ticks(
self,
origin_ids: Sequence[str],
limit: Optional[int] = None,
statuses: Optional[Sequence[TickStatus]] = None,
) -> Mapping[str, Iterable[InstigatorTick]]:
check.list_param(origin_ids, "origin_ids", of_type=str)
check.opt_int_param(limit, "limit")
check.opt_list_param(statuses, "statuses", of_type=TickStatus)

bucket_rank_column = (
db.func.rank()
.over(
order_by=db.desc(JobTickTable.c.timestamp),
partition_by=JobTickTable.c.job_origin_id,
)
.label("rank")
)
subquery = (
db.select(
[
JobTickTable.c.id,
JobTickTable.c.job_origin_id,
JobTickTable.c.tick_body,
bucket_rank_column,
]
)
.select_from(JobTickTable)
.where(JobTickTable.c.job_origin_id.in_(origin_ids))
.alias("subquery")
)
if statuses:
subquery = subquery.where(
JobTickTable.c.status.in_([status.value for status in statuses])
)

query = (
db.select([subquery.c.id, subquery.c.job_origin_id, subquery.c.tick_body])
.order_by(subquery.c.rank.asc())
.where(subquery.c.rank <= limit)
)

rows = self.execute(query)
results = defaultdict(list)
for row in rows:
tick_id = row[0]
origin_id = row[1]
tick_data = cast(TickData, deserialize_json_to_dagster_namedtuple(row[2]))
results[origin_id].append(InstigatorTick(tick_id, tick_data))
return results

def get_ticks(self, origin_id, before=None, after=None, limit=None, statuses=None):
check.str_param(origin_id, "origin_id")
check.opt_float_param(before, "before")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster.core.storage.sqlite import create_db_conn_string
from dagster.core.storage.sqlite import create_db_conn_string, get_sqlite_version
from dagster.serdes import ConfigurableClass, ConfigurableClassData
from dagster.utils import mkdir_p

from ..schema import ScheduleStorageSqlMetadata
from ..sql_schedule_storage import SqlScheduleStorage

MINIMUM_SQLITE_BATCH_VERSION = "3.25.0"


class SqliteScheduleStorage(SqlScheduleStorage, ConfigurableClass):
"""Local SQLite backed schedule storage"""
Expand Down Expand Up @@ -72,6 +74,10 @@ def connect(self):
finally:
conn.close()

@property
def supports_batch_queries(self):
return get_sqlite_version() > MINIMUM_SQLITE_BATCH_VERSION

def upgrade(self):
alembic_config = get_alembic_config(__file__)
with self.connect() as conn:
Expand Down
31 changes: 28 additions & 3 deletions python_modules/dagster/dagster/utils/test/schedule_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,12 @@ def test_add_state_with_same_name(self, storage):
with pytest.raises(Exception):
storage.add_instigator_state(state)

def build_sensor_tick(self, current_time, status=TickStatus.STARTED, run_id=None, error=None):
def build_sensor_tick(
self, current_time, status=TickStatus.STARTED, run_id=None, error=None, name="my_sensor"
):
return TickData(
"my_sensor",
"my_sensor",
name,
name,
InstigatorType.SENSOR,
status,
current_time,
Expand Down Expand Up @@ -620,3 +622,26 @@ def test_ticks_filtered(self, storage):
"my_sensor", statuses=[TickStatus.STARTED, TickStatus.SUCCESS, TickStatus.FAILURE]
)
assert len(non_skips) == 3

def test_ticks_batched(self, storage):
if not storage.supports_batch_queries:
pytest.skip("storage cannot batch")

_a = storage.create_tick(
self.build_sensor_tick(time.time(), status=TickStatus.SUCCESS, name="sensor_one")
)
b = storage.create_tick(
self.build_sensor_tick(time.time(), status=TickStatus.SUCCESS, name="sensor_one")
)
_c = storage.create_tick(
self.build_sensor_tick(time.time(), status=TickStatus.SUCCESS, name="sensor_two")
)
d = storage.create_tick(
self.build_sensor_tick(time.time(), status=TickStatus.SUCCESS, name="sensor_two")
)

ticks_by_origin = storage.get_batch_ticks(["sensor_one", "sensor_two"], limit=1)
assert set(ticks_by_origin.keys()) == {"sensor_one", "sensor_two"}
assert len(ticks_by_origin["sensor_one"]) == 1
assert ticks_by_origin["sensor_one"][0].tick_id == b.tick_id
assert ticks_by_origin["sensor_two"][0].tick_id == d.tick_id

0 comments on commit 52b1527

Please sign in to comment.