## Prefect 2 Tutorial - Intro

In [1]:
from prefect import flow, task
import requests
import asyncio

In [2]:
!prefect version

Version:             2.0b8
API version:         0.7.0
Python version:      3.9.12
Git commit:          4b8dfc35
Built:               Fri, Jul 8, 2022 8:53 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.37.0


### Run a basic flow

The simplest way to begin with Prefect is to import flow and annotate your a Python function using the `@flow` decorator:

In [3]:
@flow
def my_favorite_function():
    print("This function doesn't do much")
    return 42

Running a Prefect workflow manually is as easy as calling the annotated function. In this case, we run the      `my_favorite_function()` snippet shown above:

In [4]:
state = my_favorite_function()

18:11:47.254 | INFO    | prefect.engine - Created flow run 'independent-hawk' for flow 'my-favorite-function'
18:11:47.255 | INFO    | Flow run 'independent-hawk' - Using task runner 'ConcurrentTaskRunner'
18:11:47.299 | INFO    | Flow run 'independent-hawk' - Finished in state Completed()


This function doesn't do much


The first thing you'll notice is the messages surrounding the expected output, "This function doesn't do much".

By adding the `@flow` decorator to your function, function calls will create a flow run — the Prefect Orion orchestration engine manages task and flow state, including inspecting their progress, regardless of where your flow code runs.

For clarity in future tutorial examples, we may not show these messages in results except where they are relevant to the discussion.

Next, examine `state`, which shows the result returned by the `my_favorite_function()` flow.

In [5]:
state

Completed(message=None, type=COMPLETED, result=42)

*Flows return states*

> You may notice that this call did not return the number 42 but rather a Prefect State object. States are the basic currency of communication between Prefect clients and the Prefect API, and can be used to define the conditions for orchestration rules as well as an interface for client-side logic.

In this case, the state of `my_favorite_function()` is "Completed", with no further message details ("None" in this example).

If you want to see the data returned by the flow, access it via the `.result()` method on the State object.

In [6]:
state.result()

42

### Run flows with parameters

As with any Python function, you can pass arguments. The positional and keyword arguments defined on your flow function are called parameters. To demonstrate:

In [7]:
@flow
def call_api(url):
    return requests.get(url).json()


You can pass any parameters needed by your flow function, and you can pass parameters on the `@flow` decorator for configuration as well. We'll cover that in a future tutorial.

For now, run the `call_api()` flow, passing a valid URL as a parameter. In this case, we're sending a POST request to an API that should return valid JSON in the response.

In [8]:
state = call_api("http://time.jsontest.com/")

18:11:47.606 | INFO    | prefect.engine - Created flow run 'sturdy-sheep' for flow 'call-api'
18:11:47.607 | INFO    | Flow run 'sturdy-sheep' - Using task runner 'ConcurrentTaskRunner'
18:11:48.043 | INFO    | Flow run 'sturdy-sheep' - Finished in state Completed()


Again, Prefect Orion automatically orchestrates the flow run. Again, print the state and note the "Completed" state matches what Prefect Orion prints in your terminal.

In [9]:
state.result()

{'date': '07-10-2022',
 'milliseconds_since_epoch': 1657440707816,
 'time': '08:11:47 AM'}

In [10]:
state

Completed(message=None, type=COMPLETED, result={'date': '07-10-2022', 'milliseconds_since_epoch': 1657440707816, 'time': '08:11:47 AM'})

In [11]:
print(state)

Completed()


### Error handling

What happens if the Python function encounters an error while your flow is running? To see what happens whenever our flow does not complete successfully, let's intentionally run the `call_api()` flow above with a bad value for the URL:

In [12]:
state = call_api("foo")

18:11:48.256 | INFO    | prefect.engine - Created flow run 'hysterical-chachalaca' for flow 'call-api'
18:11:48.257 | INFO    | Flow run 'hysterical-chachalaca' - Using task runner 'ConcurrentTaskRunner'
18:11:48.287 | ERROR   | Flow run 'hysterical-chachalaca' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/mjboothaus/try-prefect2/.venv/lib/python3.9/site-packages/prefect/engine.py", line 520, in orchestrate_flow_run
    result = await run_sync_in_interruptible_worker_thread(
  File "/Users/mjboothaus/try-prefect2/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/mjboothaus/try-prefect2/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/Users/mjboothaus/try-prefect2/.venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().r

In this situation, the call to `requests.get()` encounters an exception, but the flow run still returns! The exception is captured by Orion, which continues to shut down the flow run normally.

However, in contrast to the 'COMPLETED' state, we now encounter a 'FAILED' state signaling that something unexpected happened during execution.

In [13]:
print(state)

Failed('Flow run encountered an exception.')


This behavior is consistent across flow runs and task runs and allows you to respond to failures in a first-class way — whether by configuring orchestration rules in the Orion backend (retry logic) or by directly responding to failed states in client code.

### Run a basic flow with tasks

Let's now add some tasks to a flow so that we can orchestrate and monitor at a more granular level.

A task is a function that represents a distinct piece of work executed within a flow. You don't have to use tasks — you can include all of the logic of your workflow within the flow itself. However, encapsulating your business logic into smaller task units gives you more granular observability, control over how specific tasks are run (potentially taking advantage of parallel execution), and reusing tasks across flows and sub-flows.

Creating and adding tasks follows the exact same pattern as for flows. Import task and use the `@task` decorator to annotate functions as tasks.

Let's take the previous `call_api()` example and move the actual HTTP request to its own task.

In [14]:
@task
def call_api(url):
    response = requests.get(url)
    print(f"Response: {response.status_code}")
    return response.json()


@flow
def api_flow(url):
    fact_json = call_api(url)
    return 

As you can see, we still call these tasks as normal functions and can pass their return values to other tasks. We can then call our flow function — now called `api_flow()` — just as before and see the printed output. Prefect manages all the relevant intermediate states.

In [15]:
state = api_flow("https://catfact.ninja/fact")

18:11:48.502 | INFO    | prefect.engine - Created flow run 'crouching-turaco' for flow 'api-flow'
18:11:48.504 | INFO    | Flow run 'crouching-turaco' - Using task runner 'ConcurrentTaskRunner'
18:11:48.554 | INFO    | Flow run 'crouching-turaco' - Created task run 'call_api-ded10bed-0' for task 'call_api'
18:11:49.990 | INFO    | Task run 'call_api-ded10bed-0' - Finished in state Completed()
18:11:50.001 | INFO    | Flow run 'crouching-turaco' - Finished in state Completed('All states completed.')


Response: 200


And of course we can create tasks that take input from and pass input to other tasks.



In [16]:
@task(name="t-example3-1")
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()

@task(name="t-example3-2")
def parse_fact(response):
    print(response["fact"])
    return 

@flow(name="f-example3")
def api_flow(url):
    fact_json = call_api(url)
    parse_fact(fact_json)
    return

In [17]:
state = api_flow("https://catfact.ninja/fact")

18:11:50.143 | INFO    | prefect.engine - Created flow run 'voracious-kiwi' for flow 'f-example3'
18:11:50.144 | INFO    | Flow run 'voracious-kiwi' - Using task runner 'ConcurrentTaskRunner'
18:11:50.183 | INFO    | Flow run 'voracious-kiwi' - Created task run 't-example3-1-ded10bed-0' for task 't-example3-1'
18:11:50.201 | INFO    | Flow run 'voracious-kiwi' - Created task run 't-example3-2-6803447a-0' for task 't-example3-2'
18:11:51.015 | INFO    | Task run 't-example3-1-ded10bed-0' - Finished in state Completed()
18:11:51.040 | INFO    | Task run 't-example3-2-6803447a-0' - Finished in state Completed()
18:11:51.052 | INFO    | Flow run 'voracious-kiwi' - Finished in state Completed('All states completed.')


200
In 1987 cats overtook dogs as the number one pet in America.


In [18]:
print(state)

Completed('All states completed.')


Combining tasks with arbitrary Python code

> Notice in the above example that all of our Python logic is encapsulated within task functions. While there are many benefits to using Prefect in this way, it is not a strict requirement. Interacting with the results of your Prefect tasks requires an understanding of Prefect futures.

### Run a flow within a flow

Not only can you call tasks functions within a flow, but you can also call other flow functions! Flows that run within other flows are called sub-flows and allow you to efficiently manage, track, and version common multi-task logic.

Consider the following simple example:

In [19]:
@flow
def common_flow(config: dict):
    print("I am a subgraph that shows up in lots of places!")
    intermediate_result = 42
    return intermediate_result

@flow
def main_flow():
    # do some things
    # then call another flow function
    data = common_flow(config={})
    # do more things

Whenever we run `main_flow` as above, a new run will be generated for `common_flow` as well. Not only is this run tracked as a sub-flow run of `main_flow`, but you can also inspect it independently in the UI!

In [20]:
flow_state = main_flow()

18:11:51.234 | INFO    | prefect.engine - Created flow run 'hungry-junglefowl' for flow 'main-flow'
18:11:51.234 | INFO    | Flow run 'hungry-junglefowl' - Using task runner 'ConcurrentTaskRunner'
18:11:51.302 | INFO    | Flow run 'hungry-junglefowl' - Created subflow run 'ancient-armadillo' for flow 'common-flow'
18:11:51.348 | INFO    | Flow run 'ancient-armadillo' - Finished in state Completed()
18:11:51.362 | INFO    | Flow run 'hungry-junglefowl' - Finished in state Completed('All states completed.')


I am a subgraph that shows up in lots of places!


You can confirm this for yourself by spinning up the UI using the `prefect orion start` CLI command from your terminal:


In [21]:
!prefect orion start

Starting...

 ___ ___ ___ ___ ___ ___ _____    ___  ___ ___ ___  _  _
| _ \ _ \ __| __| __/ __|_   _|  / _ \| _ \_ _/ _ \| \| |
|  _/   / _|| _|| _| (__  | |   | (_) |   /| | (_) | .` |
|_| |_|_\___|_| |___\___| |_|    \___/|_|_\___\___/|_|\_|

Configure Prefect to communicate with the server with:

    prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

View the API reference documentation at http://127.0.0.1:4200/docs

Check out the dashboard at http://127.0.0.1:4200



[32mINFO[0m:     Started server process [[36m61519[0m]
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Uvicorn running on [1mhttp://127.0.0.1:4200[0m (Press CTRL+C to quit)
^C
[32mINFO[0m:     Shutting down
[32mINFO[0m:     Waiting for application shutdown.


### Asynchronous functions

Even asynchronous functions work with Prefect! Here's a variation of the previous examples that makes the API request as an async operation:

In [24]:
@task
async def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()

@flow
async def async_flow(url):
    fact_json = await call_api(url)
    return 

If we run this in the REPL, the output looks just like previous runs.

In [26]:
# Following doesn't seem to run in Jupyter

# asyncio.run(async_flow("https://catfact.ninja/fact"))