Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual flow retries #7152

Merged
merged 97 commits into from Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
5ac9b3e
Sketch out flow restart overview
anticorrelator Oct 12, 2022
44c5ef7
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 12, 2022
f72a03b
Spike out flow restart logic
anticorrelator Oct 12, 2022
ca5edda
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 12, 2022
4dc7be0
Clean up flow restart logic
anticorrelator Oct 12, 2022
e7a8a79
Add rules constraining flow restarts
anticorrelator Oct 12, 2022
407649f
Only update task runs if no restart indicator has been set
anticorrelator Oct 12, 2022
a23990a
Restarted flows must have deployments
anticorrelator Oct 12, 2022
1f74305
Fix bugs in restart policy
anticorrelator Oct 12, 2022
62c0a70
Start adding restart tests
anticorrelator Oct 13, 2022
8834274
Improve flow restart test
anticorrelator Oct 13, 2022
966c342
Spike out SoftRetry example
anticorrelator Oct 13, 2022
c4f99cb
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 13, 2022
86ce10d
Move "soft" retry policy to FlowRetryPolicy
anticorrelator Oct 13, 2022
beba97a
Use correct rule name
anticorrelator Oct 13, 2022
a540968
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 14, 2022
78c2ab6
Use retry logic for restarts
anticorrelator Oct 14, 2022
6cab09a
Use `AwaitingRetry` state for consistency
anticorrelator Oct 14, 2022
96c8841
anticorrelator Oct 14, 2022
2bb6d3a
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 17, 2022
dde4c90
Start consolidating Restart logic
anticorrelator Oct 17, 2022
ce7d832
Remove separate FlowRestartPolicy
anticorrelator Oct 18, 2022
91a549f
Use unified retry/restart rules
anticorrelator Oct 18, 2022
9339993
Move update task run model method
anticorrelator Oct 18, 2022
3bb2fac
Finish sketching out retry/restart logic
anticorrelator Oct 19, 2022
e01bc10
anticorrelator Oct 19, 2022
94ab790
Attempt to update subflow retry state machine
anticorrelator Oct 19, 2022
a335c7d
anticorrelator Oct 19, 2022
6c57b90
Remove special handling for subflow tasks
anticorrelator Oct 19, 2022
b852fde
Only rerun subflows that have not completed
anticorrelator Oct 19, 2022
1409ac5
Permit restart transitions
anticorrelator Oct 19, 2022
660ca4f
Update restart test
anticorrelator Oct 19, 2022
0deb7af
Remove needless imports
anticorrelator Oct 19, 2022
07d9f72
Update flow retry tests
anticorrelator Oct 19, 2022
ba45c0b
Spike out message passing between rules
anticorrelator Oct 19, 2022
7a18dd1
anticorrelator Oct 19, 2022
0d997c0
Remove needless import
anticorrelator Oct 19, 2022
da08672
Pass more explicit messages between rules
anticorrelator Oct 19, 2022
834bfb7
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 19, 2022
100b79e
Revert unintentional change to a rule
anticorrelator Oct 19, 2022
ebf1136
Add migrations
anticorrelator Oct 19, 2022
ffb50f7
Use updated columns
anticorrelator Oct 19, 2022
794b36d
Remove unused model methods and schema actions
anticorrelator Oct 19, 2022
f3db390
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 19, 2022
3b5c7df
Add test for passing context parameters between rules
anticorrelator Oct 19, 2022
38e9bf5
Update restart test
anticorrelator Oct 19, 2022
b28208f
Add tests for FlowRestart rule
anticorrelator Oct 20, 2022
8bbc98f
Start adding tests for rerunning tasks on restart/retry
anticorrelator Oct 20, 2022
85d81b5
Refactor task rerunning rule for clarity
anticorrelator Oct 20, 2022
d1854b2
Fix assertion
anticorrelator Oct 20, 2022
2a785c4
Fix test
anticorrelator Oct 20, 2022
a8dc688
Add FizzlingRule fixture and cleanup test for rerunning rule
anticorrelator Oct 20, 2022
15c6e17
anticorrelator Oct 20, 2022
c4e84b4
Permit rerunning tasks during the last flow retry
anticorrelator Oct 20, 2022
bad94df
Remove needless imports
anticorrelator Oct 20, 2022
bb54b55
Test restarting flows via API
anticorrelator Oct 20, 2022
c71fe93
Change restart tests to use `set_statee instead of dedicated route
anticorrelator Oct 20, 2022
43bef0c
Remove dedicated restart flow run route
anticorrelator Oct 20, 2022
a289c93
anticorrelator Oct 20, 2022
ca6ccf8
Bump minor API version (?)
anticorrelator Oct 20, 2022
af8dffc
Create flow runs using model directly instead of modifying create schema
anticorrelator Oct 20, 2022
a6b69ed
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 20, 2022
c37af3e
Remove `AwaitingRestart` convenience constructor
anticorrelator Oct 20, 2022
407507a
Add docstrings
anticorrelator Oct 20, 2022
9b347d3
Bump minimum API version
anticorrelator Oct 20, 2022
59ca469
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 20, 2022
0e6f1f4
Fix issues with conflicting migrations
anticorrelator Oct 20, 2022
0277ca1
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 20, 2022
d2869f2
Use models instead of create actions
anticorrelator Oct 20, 2022
669698a
Revert version changes
anticorrelator Oct 20, 2022
3267e82
Add integration test for flow retries with subflows
anticorrelator Oct 20, 2022
4b279ec
Use correct name in integration test
anticorrelator Oct 20, 2022
cdb4970
Use better names
anticorrelator Oct 20, 2022
156a9b1
Add flow retry compatibility code to rules
anticorrelator Oct 20, 2022
6f45831
anticorrelator Oct 20, 2022
9deb857
Add error handling to orchestration fixture
anticorrelator Oct 20, 2022
5128133
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 20, 2022
e91aebd
Fix compatibility code
anticorrelator Oct 20, 2022
bedc6d0
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 20, 2022
c58f6b4
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 20, 2022
a0cbac2
Resolve another migration conflict
anticorrelator Oct 20, 2022
80ec6fc
Fix restart bookkeeping logic
anticorrelator Oct 21, 2022
ba32275
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 21, 2022
3c1c322
Sketch out simplifying restarts as retries
anticorrelator Oct 24, 2022
66fa7f2
Unify "retry" transition rules with "terminal state" rules
anticorrelator Oct 24, 2022
61c0cb9
Fix name errors
anticorrelator Oct 24, 2022
12196be
Rework core policy tests to use updated terminal state protection rules
anticorrelator Oct 24, 2022
0251db0
Update restart tests to check manual retry behavior
anticorrelator Oct 24, 2022
bdfb180
Update tests that permit rerunning retrying tasks
anticorrelator Oct 24, 2022
cafb170
Update API tests for manual retries
anticorrelator Oct 24, 2022
d3c43ca
Remove extra columns
anticorrelator Oct 24, 2022
65714ce
Rename `retry_attempt` to `flow_run_run_count`
anticorrelator Oct 24, 2022
4d6bdfb
Separate flow run tracking into its own rule for clarity
anticorrelator Oct 24, 2022
ddaf599
Merge pull request #7308 from PrefectHQ/simplify-restarts-into-retries
anticorrelator Oct 24, 2022
0d60365
Merge branch 'main' of github.com:PrefectHQ/prefect into flow-restarts
anticorrelator Oct 24, 2022
d2c5f97
Rename rules for clarity
anticorrelator Oct 24, 2022
83d98cc
isort
anticorrelator Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 37 additions & 0 deletions flows/flow_retries_with_subflows.py
@@ -0,0 +1,37 @@
from prefect import flow

child_flow_run_count = 0
flow_run_count = 0


@flow
def child_flow():
global child_flow_run_count
child_flow_run_count += 1

# Fail on the first flow run but not the retry
if flow_run_count < 2:
raise ValueError()

return "hello"


@flow(retries=10)
def parent_flow():
global flow_run_count
flow_run_count += 1

result = child_flow()

# It is important that the flow run fails after the child flow run is created
if flow_run_count < 3:
raise ValueError()

return result


if __name__ == "__main__":
result = parent_flow()
assert result == "hello", f"Got {result}"
assert flow_run_count == 3, f"Got {flow_run_count}"
assert child_flow_run_count == 2, f"Got {child_flow_run_count}"
10 changes: 6 additions & 4 deletions src/prefect/engine.py
Expand Up @@ -400,6 +400,8 @@ async def create_and_begin_subflow_run(
parent_logger.debug(f"Resolving inputs to {flow.name!r}")
task_inputs = {k: await collect_task_run_inputs(v) for k, v in parameters.items()}

rerunning = parent_flow_run_context.flow_run.run_count > 1

# Generate a task in the parent flow run to represent the result of the subflow run
dummy_task = Task(name=flow.name, fn=flow.fn, version=flow.version)
parent_task_run = await client.create_task_run(
Expand All @@ -413,8 +415,9 @@ async def create_and_begin_subflow_run(
# Resolve any task futures in the input
parameters = await resolve_inputs(parameters)

if parent_task_run.state.is_final():

if parent_task_run.state.is_final() and not (
rerunning and not parent_task_run.state.is_completed()
):
# Retrieve the most recent flow run from the database
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
Expand All @@ -433,7 +436,7 @@ async def create_and_begin_subflow_run(
flow,
parameters=flow.serialize_parameters(parameters),
parent_task_run_id=parent_task_run.id,
state=parent_task_run.state,
state=parent_task_run.state if not rerunning else Pending(),
tags=TagsContext.get().current_tags,
)

Expand Down Expand Up @@ -469,7 +472,6 @@ async def create_and_begin_subflow_run(
report_flow_run_crashes(flow_run=flow_run, client=client)
)
task_runner = await stack.enter_async_context(flow.task_runner.start())

terminal_state = await orchestrate_flow_run(
flow,
flow_run=flow_run,
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/orion/api/flow_runs.py
Expand Up @@ -237,6 +237,7 @@ async def set_flow_run_state(
flow_policy: BaseOrchestrationPolicy = Depends(
orchestration_dependencies.provide_flow_policy
),
api_version=Depends(dependencies.provide_request_api_version),
) -> OrchestrationResult:
"""Set a flow run state, invoking any orchestration rules."""

Expand All @@ -249,6 +250,7 @@ async def set_flow_run_state(
state=schemas.states.State.parse_obj(state),
force=force,
flow_policy=flow_policy,
api_version=api_version,
)

# set the 201 because a new state was created
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/orion/api/server.py
Expand Up @@ -38,7 +38,7 @@
API_TITLE = "Prefect Orion API"
UI_TITLE = "Prefect Orion UI"
API_VERSION = prefect.__version__
ORION_API_VERSION = "0.8.2"
ORION_API_VERSION = "0.8.3"

logger = get_logger("orion")

Expand Down
@@ -0,0 +1,32 @@
"""Add retry and restart metadata

Revision ID: 8ea825da948d
Revises: ad4b1b4d1e9d
Create Date: 2022-10-19 16:51:10.239643

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "8ea825da948d"
down_revision = "3ced59d8806b"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"task_run",
sa.Column(
"flow_run_run_count", sa.Integer(), server_default="0", nullable=False
),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("task_run", "flow_run_run_count")
# ### end Alembic commands ###
@@ -0,0 +1,33 @@
"""Add retry and restart metadata

Revision ID: af52717cf201
Revises: ad4b1b4d1e9d
Create Date: 2022-10-19 15:58:10.016251

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "af52717cf201"
down_revision = "3ced59d8806b"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("task_run", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"flow_run_run_count", sa.Integer(), server_default="0", nullable=False
)
)

# ### end Alembic commands ###


def downgrade():
with op.batch_alter_table("task_run", schema=None) as batch_op:
batch_op.drop_column("flow_run_run_count")

# ### end Alembic commands ###
3 changes: 3 additions & 0 deletions src/prefect/orion/database/orm_models.py
Expand Up @@ -524,6 +524,9 @@ def flow_run_id(cls):
cache_key = sa.Column(sa.String)
cache_expiration = sa.Column(Timestamp())
task_version = sa.Column(sa.String)
flow_run_run_count = sa.Column(
sa.Integer, server_default="0", default=0, nullable=False
)
empirical_policy = sa.Column(
Pydantic(schemas.core.TaskRunPolicy),
server_default="{}",
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/orion/models/flow_runs.py
Expand Up @@ -11,6 +11,7 @@

import pendulum
import sqlalchemy as sa
from packaging.version import Version
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import load_only
Expand Down Expand Up @@ -377,6 +378,7 @@ async def set_flow_run_state(
state: schemas.states.State,
force: bool = False,
flow_policy: BaseOrchestrationPolicy = None,
api_version: Version = None,
) -> OrchestrationResult:
"""
Creates a new orchestrated flow run state.
Expand Down Expand Up @@ -426,6 +428,9 @@ async def set_flow_run_state(
proposed_state=state,
)

# pass the request version to the orchestration engine to support compatibility code
context.parameters["api-version"] = api_version

# apply orchestration rules and create the new flow run state
async with contextlib.AsyncExitStack() as stack:
for rule in orchestration_rules:
Expand Down