In [None]:
import prefect
from prefect import task, Flow

import requests
from datetime import timedelta
from unittest.mock import MagicMock, patch
from time import sleep


@task
def create_payload():
    "Performs expensive computation to create / return an URL"

    sleep(5) # for whatever reason, getting to this point takes a long time
    return 'http://www.google.com'


@task(max_retries=1, retry_delay=timedelta(minutes=10))
def ping_external_service(url):
    "Performs a simple GET request to the provided URL, and returns the text of the response."

    if prefect.context.get("fail"):
        raise ValueError("Request failed with status code 418.")
    else:
        r = requests.get(url)
        return r.text


In [None]:
with Flow(name="retry example") as f:
    text = ping_external_service(create_payload)


In [None]:
%%time
with prefect.context(fail=True) as ctx:
    flow_state = f.run(return_tasks=f.tasks, context=ctx)

##    CPU times: user 5.65 ms, sys: 1.46 ms, total: 7.12 ms
##    Wall time: 5.01 s


In [None]:
print("Flow state: {}\n".format(flow_state))
print("Flow results: {}".format(flow_state.result))

## Flow state: Pending("Some terminal tasks are still pending.")

## Flow results: {
##      <Task: create_payload>: Success("Task run succeeded."),
##      <Task: ping_external_service>: Retrying("Retrying Task (after attempt 1 of 2)")
##                }


In [None]:
%%time
new_flow_state = f.run(return_tasks=[text],
                       task_states={text: flow_state.result[text]},
                       start_tasks=[text])

##    CPU times: user 15.5 ms, sys: 5.91 ms, total: 21.4 ms
##    Wall time: 167 ms


In [None]:
print("Flow state: {}\n".format(new_flow_state))
print("Flow results: {}".format(new_flow_state.result))

##     Flow state: Success("All reference tasks succeeded.")

##     Flow results: {<Task: ping_external_service>: Success("Task run succeeded.")}
