Skip to content

Commit

Permalink
refetch the backfill before updating, to avoid clobbering cancels (#7094
Browse files Browse the repository at this point in the history
)

* refetch the backfill before updating

* add test
  • Loading branch information
prha committed Mar 17, 2022
1 parent 8c3cfd2 commit ffd8318
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
12 changes: 9 additions & 3 deletions python_modules/dagster/dagster/daemon/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def execute_backfill_iteration(instance, workspace, logger, debug_crash_flags=No
for backfill_job in backfill_jobs:
backfill_id = backfill_job.backfill_id

# refetch, in case the backfill was updated in the meantime
backfill_job = instance.get_backfill(backfill_id)

if not backfill_job.last_submitted_partition_name:
logger.info(f"Starting backfill for {backfill_id}")
else:
Expand Down Expand Up @@ -76,8 +79,6 @@ def execute_backfill_iteration(instance, workspace, logger, debug_crash_flags=No

has_more = True
while has_more:
# refetch the backfill job
backfill_job = instance.get_backfill(backfill_job.backfill_id)
if backfill_job.status != BulkActionStatus.REQUESTED:
break

Expand All @@ -87,15 +88,20 @@ def execute_backfill_iteration(instance, workspace, logger, debug_crash_flags=No
_check_for_debug_crash(debug_crash_flags, "BEFORE_SUBMIT")

if chunk:

for _run_id in submit_backfill_runs(
instance, workspace, repo_location, backfill_job, chunk
):
yield
# before submitting, refetch the backfill job to check for status changes
backfill_job = instance.get_backfill(backfill_job.backfill_id)
if backfill_job.status != BulkActionStatus.REQUESTED:
return

_check_for_debug_crash(debug_crash_flags, "AFTER_SUBMIT")

if has_more:
# refetch, in case the backfill was updated in the meantime
backfill_job = instance.get_backfill(backfill_job.backfill_id)
instance.update_backfill(backfill_job.with_partition_checkpoint(checkpoint))
yield
time.sleep(CHECKPOINT_INTERVAL)
Expand Down
35 changes: 35 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,41 @@ def test_simple_backfill():
assert three.tags[PARTITION_NAME_TAG] == "three"


def test_canceled_backfill():
with instance_for_context(default_repo) as (
instance,
workspace,
external_repo,
):
external_partition_set = external_repo.get_external_partition_set("simple_partition_set")
instance.add_backfill(
PartitionBackfill(
backfill_id="simple",
partition_set_origin=external_partition_set.get_external_origin(),
status=BulkActionStatus.REQUESTED,
partition_names=["one", "two", "three"],
from_failure=False,
reexecution_steps=None,
tags=None,
backfill_timestamp=pendulum.now().timestamp(),
)
)
assert instance.get_runs_count() == 0

iterator = execute_backfill_iteration(
instance, workspace, get_default_daemon_logger("BackfillDaemon")
)
next(iterator)
assert instance.get_runs_count() == 1
backfill = instance.get_backfills()[0]
assert backfill.status == BulkActionStatus.REQUESTED
instance.update_backfill(backfill.with_status(BulkActionStatus.CANCELED))
list(iterator)
backfill = instance.get_backfill(backfill.backfill_id)
assert backfill.status == BulkActionStatus.CANCELED
assert instance.get_runs_count() == 1


def test_failure_backfill():
output_file = _failure_flag_file()
with instance_for_context(default_repo) as (
Expand Down

0 comments on commit ffd8318

Please sign in to comment.