Skip to content

Commit

Permalink
batch run fetches in sensor daemon (#6950)
Browse files Browse the repository at this point in the history
* change daemon sensor to batch fetch runs

* test both batched and unbatched

* fix lint
  • Loading branch information
prha committed Mar 18, 2022
1 parent e1209fa commit b0a885f
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 15 deletions.
65 changes: 51 additions & 14 deletions python_modules/dagster/dagster/daemon/sensor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import sys
import time
from typing import List, NamedTuple, Optional, cast
from typing import Dict, NamedTuple, Optional

import pendulum

Expand All @@ -18,7 +18,7 @@
TickData,
TickStatus,
)
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, RunsFilter
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, RunsFilter, TagBucket
from dagster.core.storage.tags import RUN_KEY_TAG, check_tags
from dagster.core.telemetry import SENSOR_RUN_CREATED, hash_name, log_action
from dagster.core.workspace import IWorkspace
Expand Down Expand Up @@ -415,8 +415,11 @@ def _evaluate_sensor(
return

skipped_runs = []
for run_request in sensor_runtime_data.run_requests:
existing_runs_by_key = _fetch_existing_runs(
instance, external_sensor, sensor_runtime_data.run_requests
)

for run_request in sensor_runtime_data.run_requests:
target_data = external_sensor.get_target_data(run_request.job_name)

pipeline_selector = PipelineSelector(
Expand All @@ -434,6 +437,7 @@ def _evaluate_sensor(
external_pipeline,
run_request,
target_data,
existing_runs_by_key,
)

if isinstance(run, SkippedSensorRun):
Expand Down Expand Up @@ -497,6 +501,47 @@ def _is_under_min_interval(state, external_sensor, now):
return elapsed < external_sensor.min_interval_seconds


def _fetch_existing_runs(instance, external_sensor, run_requests):
run_keys = [run_request.run_key for run_request in run_requests if run_request.run_key]

if not run_keys:
return {}

existing_runs = {}

if instance.supports_bucket_queries:
runs = instance.get_runs(
filters=RunsFilter(
tags=PipelineRun.tags_for_sensor(external_sensor),
),
bucket_by=TagBucket(
tag_key=RUN_KEY_TAG,
bucket_limit=1,
tag_values=run_keys,
),
)
for run in runs:
tags = run.tags or {}
run_key = tags.get(RUN_KEY_TAG)
existing_runs[run_key] = run
return existing_runs

else:
for run_key in run_keys:
runs = instance.get_runs(
filters=RunsFilter(
tags=merge_dicts(
PipelineRun.tags_for_sensor(external_sensor),
{RUN_KEY_TAG: run_key},
)
),
limit=1,
)
if runs:
existing_runs[run_key] = runs[0]
return existing_runs


def _get_or_create_sensor_run(
context,
instance: DagsterInstance,
Expand All @@ -505,25 +550,17 @@ def _get_or_create_sensor_run(
external_pipeline,
run_request,
target_data,
existing_runs_by_key: Dict[str, PipelineRun],
):

if not run_request.run_key:
return _create_sensor_run(
instance, repo_location, external_sensor, external_pipeline, run_request, target_data
)

existing_runs = instance.get_runs(
RunsFilter(
tags=merge_dicts(
PipelineRun.tags_for_sensor(external_sensor),
{RUN_KEY_TAG: run_request.run_key},
)
)
)
run = existing_runs_by_key.get(run_request.run_key)

existing_runs = cast(List[PipelineRun], existing_runs)
if len(existing_runs):
run = existing_runs[0]
if run:
if run.status != PipelineRunStatus.NOT_STARTED:
# A run already exists and was launched for this time period,
# but the daemon must have crashed before the tick could be put
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ def create_non_bucket_sqlite_run_storage():


class NonBucketQuerySqliteRunStorage(SqliteRunStorage):
def supports_bucket_query(self):
@property
def supports_bucket_queries(self):
return False

@staticmethod
def from_config_value(inst_data, config_value):
return NonBucketQuerySqliteRunStorage.from_local(inst_data=inst_data, **config_value)


@contextmanager
def create_in_memory_storage():
Expand Down
110 changes: 110 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_sensor_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,116 @@ def test_launch_once(capfd):
)


@contextmanager
def instance_with_sensors_no_run_bucketing():
with tempfile.TemporaryDirectory() as temp_dir:
with instance_with_sensors(
overrides={
"run_storage": {
"module": "dagster_tests.core_tests.storage_tests.test_run_storage",
"class": "NonBucketQuerySqliteRunStorage",
"config": {"base_dir": temp_dir},
},
}
) as (
instance,
workspace,
external_repo,
):
yield instance, workspace, external_repo


def test_launch_once_unbatched(capfd):
freeze_datetime = to_timezone(
create_pendulum_time(
year=2019,
month=2,
day=27,
hour=23,
minute=59,
second=59,
tz="UTC",
),
"US/Central",
)
with instance_with_sensors_no_run_bucketing() as (
instance,
workspace,
external_repo,
):
with pendulum.test(freeze_datetime):

external_sensor = external_repo.get_external_sensor("run_key_sensor")
instance.add_instigator_state(
InstigatorState(
external_sensor.get_external_origin(),
InstigatorType.SENSOR,
InstigatorStatus.RUNNING,
)
)
assert instance.get_runs_count() == 0
ticks = instance.get_ticks(external_sensor.get_external_origin_id())
assert len(ticks) == 0

evaluate_sensors(instance, workspace)
wait_for_all_runs_to_start(instance)

assert instance.get_runs_count() == 1
run = instance.get_runs()[0]
ticks = instance.get_ticks(external_sensor.get_external_origin_id())
assert len(ticks) == 1
validate_tick(
ticks[0],
external_sensor,
freeze_datetime,
TickStatus.SUCCESS,
expected_run_ids=[run.run_id],
)

# run again (after 30 seconds), to ensure that the run key maintains idempotence
freeze_datetime = freeze_datetime.add(seconds=30)
with pendulum.test(freeze_datetime):
evaluate_sensors(instance, workspace)
assert instance.get_runs_count() == 1
ticks = instance.get_ticks(external_sensor.get_external_origin_id())
assert len(ticks) == 2
validate_tick(
ticks[0],
external_sensor,
freeze_datetime,
TickStatus.SKIPPED,
)
captured = capfd.readouterr()
assert (
'Skipping 1 run for sensor run_key_sensor already completed with run keys: ["only_once"]'
in captured.out
)

launched_run = instance.get_runs()[0]

# Manually create a new run with the same tags
execute_pipeline(
the_pipeline,
run_config=launched_run.run_config,
tags=launched_run.tags,
instance=instance,
)

# Sensor loop still executes
freeze_datetime = freeze_datetime.add(seconds=30)
with pendulum.test(freeze_datetime):
evaluate_sensors(instance, workspace)
ticks = instance.get_ticks(external_sensor.get_external_origin_id())

assert len(ticks) == 3
validate_tick(
ticks[0],
external_sensor,
freeze_datetime,
TickStatus.SKIPPED,
)


def test_custom_interval_sensor():
freeze_datetime = to_timezone(
create_pendulum_time(year=2019, month=2, day=28, tz="UTC"), "US/Central"
Expand Down

0 comments on commit b0a885f

Please sign in to comment.