Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support failure node #840

Merged
merged 32 commits into from
Dec 12, 2023
Merged

Add support failure node #840

merged 32 commits into from
Dec 12, 2023

Conversation

kumare3
Copy link
Contributor

@kumare3 kumare3 commented Jan 31, 2022

Describe your changes

This PR adds a failure node in flytekit, which allows you to run another task/workflow (delete cluster or clean up resource) if the the workflow fails.

  • Add on_failure to @workflow. people can pass a task/workflow to it.
  • Compile and serialize the failure node
  • Add Error type and transformer
  • Also support local execution. If workflow fails, flytekit will run failure node first, then raise an exception.

Example

Three workflows

  1. wf1: both workflow and subworkflow have a failure node. propeller will create two tasks when failing
  2. wf2: Only one workflow has a failure node. propeller will create one task when failing
  3. wf3: failure node is a workflowNode. propeller will create another workflow when failing
flytectl demo start --dev
make compile  # compile the single binary
flyte start --config flyte-single-binary-local.yaml
pip install git+https://github.com/flyteorg/flytekit.git@2582a8bc16f80ab3a7101af8360e9d7212236e43
pyflyte run --remote workflow.py wf
import typing
from click.testing import CliRunner

from flytekit import task, workflow, ImageSpec, WorkflowFailurePolicy
from flytekit.clis.sdk_in_container import pyflyte
from flytekit.types.error.error import FlyteError

new_flytekit = "git+https://github.com/flyteorg/flytekit.git@5a415107b0aff272a16eb147860a65d47a10c4d8"
image_spec = ImageSpec(packages=[new_flytekit], apt_packages=["git"], registry="pingsutw")


@task(container_image=image_spec)
def create_cluster(name: str):
    print(f"Creating cluster: {name}")


@task(container_image=image_spec)
def t1(a: int, b: str):
    print(f"{a} {b}")
    raise ValueError("Fail!")


@task(container_image=image_spec)
def delete_cluster(name: str, err: typing.Optional[FlyteError] = None):
    print(f"Deleting cluster {name}")
    print(err)


@task(container_image=image_spec)
def clean_up(name: str, err: typing.Optional[FlyteError] = None):
    print(f"Deleting cluster {name} due to {err}")
    print(err)


@workflow(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def subwf(name: str = "kevin"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


@workflow(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf1(name: str = "kevin"):
    c = create_cluster(name=name)
    subwf(name="pingsutw")
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


@workflow(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf2(name: str = "kevin"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


@workflow
def clean_up_wf(name: str = "kevin"):
    return create_cluster(name=name)


@workflow(on_failure=clean_up_wf, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf3(name: str = "Kevin"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d


if __name__ == '__main__':
    runner = CliRunner()
    result = runner.invoke(pyflyte.main, ["run", "--remote", "failure_node.py", "wf2"])
    print(result.output)

Tracking issue

flyteorg/flyte#1506

Screenshots

image

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
@kumare3 kumare3 changed the title [wip] Error handler [wip] Error handler proposal Nov 8, 2022
Signed-off-by: Kevin Su <pingsutw@apache.org>
@codecov
Copy link

codecov bot commented Oct 24, 2023

Codecov Report

Attention: 12 lines in your changes are missing coverage. Please review.

Comparison is base (5c6802c) 86.22% compared to head (f6fa9b3) 85.84%.
Report is 11 commits behind head on master.

Files Patch % Lines
flytekit/core/workflow.py 85.71% 1 Missing and 3 partials ⚠️
flytekit/tools/translator.py 42.85% 3 Missing and 1 partial ⚠️
flytekit/types/error/error.py 87.09% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #840      +/-   ##
==========================================
- Coverage   86.22%   85.84%   -0.38%     
==========================================
  Files         320      308      -12     
  Lines       23531    22965     -566     
  Branches     3464     3485      +21     
==========================================
- Hits        20289    19714     -575     
- Misses       2650     2651       +1     
- Partials      592      600       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

kumare3 and others added 6 commits October 24, 2023 15:52
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw changed the title [wip] Error handler proposal Add support failure node Nov 17, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
) as inner_comp_ctx:
# Now lets compile the failure-node if it exists
if self.on_failure:
# TODO: validate inputs match the workflow interface, with an extra param `err`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete all the todos



@dataclass
class FlyteError(DataClassJSONMixin):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do not think we need this right, though lets keep it

Copy link
Contributor Author

@kumare3 kumare3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but i cannot approve

Signed-off-by: Kevin Su <pingsutw@apache.org>
@pingsutw pingsutw merged commit 613a655 into master Dec 12, 2023
72 of 74 checks passed
RRap0so pushed a commit to RRap0so/flytekit that referenced this pull request Dec 15, 2023
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Rafael Raposo <rafaelraposo@spotify.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants