Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PreventRedundantTransitions rule from core task orchestration policy #8802

Merged
merged 6 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def priority():
CacheRetrieval,
HandleTaskTerminalStateTransitions,
PreventRunningTasksFromStoppedFlows,
PreventRedundantTransitions,
SecureTaskConcurrencySlots, # retrieve cached states even if slots are full
CopyScheduledTime,
WaitForScheduledTime,
Expand Down
14 changes: 7 additions & 7 deletions tests/server/api/test_task_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,14 @@ async def test_set_task_run_state(self, task_run, client, session):
assert run.run_count == 1

@pytest.mark.parametrize("proposed_state", ["PENDING", "RUNNING"])
async def test_setting_task_run_state_twice_aborts(
async def test_setting_task_run_state_twice_works(
self, task_run, client, session, proposed_state
):
# A multi-agent environment may attempt to orchestrate a run more than once,
# this test ensures that a 2nd agent cannot re-propose a state that's already
# been set
# Task runner infrastructure may kill and restart tasks without informing
# Prefect. This test checks that a task can re-transition its current state
# to ensure restarts occur without error.

# first ensure the parent flow run is in a running state
# first, ensure the parent flow run is in a running state
await client.post(
f"/flow_runs/{task_run.flow_run_id}/set_state",
json=dict(state=dict(type="RUNNING")),
Expand All @@ -372,10 +372,10 @@ async def test_setting_task_run_state_twice_aborts(
f"/task_runs/{task_run.id}/set_state",
json=dict(state=dict(type=proposed_state, name="Test State")),
)
assert response.status_code == 200
assert response.status_code == 201

api_response = OrchestrationResult.parse_obj(response.json())
assert api_response.status == responses.SetStateStatus.ABORT
assert api_response.status == responses.SetStateStatus.ACCEPT

async def test_set_task_run_ignores_client_provided_timestamp(
self, flow_run, client
Expand Down
50 changes: 50 additions & 0 deletions tests/server/orchestration/test_core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,56 @@ async def test_one_run_wont_consume_multiple_slots(
assert duplicate_ctx.response_status == SetStateStatus.ACCEPT
assert (await self.count_concurrency_slots(session, "a generous limit")) == 1

async def test_task_restart_does_not_consume_multiple_slots(
self,
session,
run_type,
initialize_orchestration,
):
await self.create_concurrency_limit(session, "a generous limit", 10)

concurrency_policy = [
SecureTaskConcurrencySlots,
]
# we should have no consumed slots yet
assert (await self.count_concurrency_slots(session, "a generous limit")) == 0

start_transition = (states.StateType.PENDING, states.StateType.RUNNING)

ctx = await initialize_orchestration(
session, "task", *start_transition, run_tags=["a generous limit"]
)

async with contextlib.AsyncExitStack() as stack:
for rule in concurrency_policy:
ctx = await stack.enter_async_context(rule(ctx, *start_transition))
await ctx.validate_proposed_state()

assert ctx.response_status == SetStateStatus.ACCEPT

# PENDING -> RUNNING should consume a slot
assert (await self.count_concurrency_slots(session, "a generous limit")) == 1

running_transition = (states.StateType.RUNNING, states.StateType.RUNNING)
restart_ctx = await initialize_orchestration(
session,
"task",
*running_transition,
run_override=ctx.run,
run_tags=["a generous limit"],
)

async with contextlib.AsyncExitStack() as stack:
for rule in concurrency_policy:
restart_ctx = await stack.enter_async_context(
rule(restart_ctx, *running_transition)
)
await restart_ctx.validate_proposed_state()

# RUNNING -> RUNNING should not consume another slot
assert restart_ctx.response_status == SetStateStatus.ACCEPT
assert (await self.count_concurrency_slots(session, "a generous limit")) == 1

async def test_concurrency_race_condition_new_tags_arent_double_counted(
self,
session,
Expand Down