A **flow** is the basis of all Prefect workflows. A flow is a Python function decorated with a @flow decorator.

A **task** is a Python function decorated with a @task decorator. Tasks represent distinct pieces of work executed within a flow.

All Prefect workflows are defined within the context of a flow. Every Prefect workflow must contain at least one flow function that serves as the entrypoint for execution of the flow.

Flows can include calls to tasks as well as to child flows, which we call "subflows" in this context. At a high level, this is just like writing any other Python application: you organize specific, repetitive work into tasks, and call those tasks from flows.

In [1]:
import requests
from prefect import flow, task
import time

In [2]:
@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()

In [3]:
@flow
def api_flow(url):
    fact_json = call_api(url)
    return fact_json

In [4]:
print(api_flow("https://catfact.ninja/fact"))

200


{'fact': "A cat's smell is their strongest sense, and they rely on this leading sense to identify people and objects; a feline's sense of smell is 14x better than a human's.", 'length': 163}


Basic flow configuration includes the ability to provide a name, description, and version and other options for the flow via flow arguments.

You specify flow configuration as arguments on the @flow decorator.

By design, tasks follow a very similar model to flows: you can independently assign tasks their own name and description.

In [5]:
@task(name="My Example Task", 
      description="An example task for a tutorial.",
      retries=5,
      retry_delay_seconds=60)
def my_task():
    print("Task1")

@flow(name="My Example Flow", 
      description="An example flow for a tutorial.")
def my_flow():
    my_task()

**Task runners** are responsible for running Prefect tasks within a flow. Each flow has a task runner associated with it. Depending on the task runner you use, the tasks within your flow can run sequentially, concurrently, or in parallel. You can even configure task runners to use distributed execution infrastructure such as a Dask cluster.

By default, the result of a task is a Python object, and execution of the task blocks the execution of the next task in a flow. To make sure that the tasks within your flow can run concurrently or in parallel, add .submit() to your task run. This method will return a PrefectFuture instead of a Python object. A PrefectFuture is an object that provides access to a computation happening in a task runner. 

The default task runner is the ConcurrentTaskRunner, which will run submitted tasks concurrently. If you don't specify a task runner, Prefect uses the ConcurrentTaskRunner.

In [6]:
@task
def print_values(values):
    for value in values:
        time.sleep(0.5)
        print(value, end="\r")

@flow
def my_flow():
    print_values.submit(["AAAA"] * 15)
    print_values.submit(["BBBB"] * 10)

if __name__ == "__main__":
    my_flow()

AAAA

AAAA

Sometimes you may want to intentionally run **tasks sequentially**. The built-in Prefect SequentialTaskRunner lets you do this.

When using non-default task runner, you must import the task runner into your flow script.

In [7]:
from prefect.task_runners import SequentialTaskRunner

@task(name="Sequential Task")
def print_values(values):
    for value in values:
        time.sleep(0.5)
        print(value, end="\r")

@flow(name="Sequential Flow",
      task_runner=SequentialTaskRunner())
def my_flow():
    print_values.submit(["AAAA"] * 15)
    print_values.submit(["BBBB"] * 10)

if __name__ == "__main__":
    my_flow()

AAAA

BBBB

You can also run tasks using **parallel or distributed** execution by using the Dask or Ray task runners available through **Prefect Collections**.

For example, you can achieve parallel task execution, even on in a local execution environment, but using the DaskTaskRunner.

1. Install the prefect-dask collection with pip install prefect-dask.
2. Switch your task runner to the DaskTaskRunner.
3. Call .submit on the task instead of calling the task directly. This submits the task to the task runner rather than running the task in-process.


In [8]:
!pip install prefect-dask

Collecting prefect-dask
  Downloading prefect_dask-0.2.2-py3-none-any.whl (16 kB)
Collecting distributed>=2022.5.0
  Downloading distributed-2022.12.1-py3-none-any.whl (930 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m930.1/930.1 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
Collecting tblib>=1.6.0
  Downloading tblib-1.7.0-py2.py3-none-any.whl (12 kB)
Collecting toolz>=0.10.0
  Downloading toolz-0.12.0-py3-none-any.whl (55 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.8/55.8 kB[0m [31m754.6 kB/s[0m eta [36m0:00:00[0m31m18.5 MB/s[0m eta [36m0:00:01[0m
[?25hCollecting locket>=1.0.0
  Downloading locket-1.0.0-py2.py3-none-any.whl (4.4 kB)
Collecting zict>=0.1.3
  Downloading zict-2.2.0-py2.py3-none-any.whl (23 kB)
Collecting sortedcontainers!=2.0.0,!=2.0.1
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB)
Collecting msgpack>=0.6.0
  Using cached msgpack-1.

Collecting heapdict
  Downloading HeapDict-1.0.1-py3-none-any.whl (3.9 kB)
Installing collected packages: sortedcontainers, msgpack, heapdict, zict, toolz, tblib, locket, partd, dask, distributed, prefect-dask
Successfully installed dask-2022.12.1 distributed-2022.12.1 heapdict-1.0.1 locket-1.0.0 msgpack-1.0.4 partd-1.3.0 prefect-dask-0.2.2 sortedcontainers-2.4.0 tblib-1.7.0 toolz-0.12.0 zict-2.2.0


In [10]:
from prefect_dask.task_runners import DaskTaskRunner

@task(name="Dask Task")
def print_values(values):
    for value in values:
        time.sleep(0.5)
        print(value, end="\r")

@flow(name="Dask Flow",
      task_runner=DaskTaskRunner())
def my_flow():
    print_values.submit(["AAAA"] * 15)
    print_values.submit(["BBBB"] * 10)

if __name__ == "__main__":
    my_flow()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


22:43:32.048 | [36mINFO[0m    | Task run 'Dask Task-0bb9a2c3-1' - Finished in state [32mCompleted[0m()
22:43:34.646 | [36mINFO[0m    | Task run 'Dask Task-0bb9a2c3-0' - Finished in state [32mCompleted[0m()


22:43:34.984 | [36mINFO[0m    | distributed.worker - Stopping worker at tcp://127.0.0.1:33187. Reason: nanny-close


22:43:34.986 | [36mINFO[0m    | distributed.core - Connection to tcp://127.0.0.1:35471 has been closed.


BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB

22:43:35.008 | [36mINFO[0m    | distributed.worker - Stopping worker at tcp://127.0.0.1:32999. Reason: nanny-close
22:43:35.012 | [36mINFO[0m    | distributed.core - Connection to tcp://127.0.0.1:35471 has been closed.


AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

Because the DaskTaskRunner uses multiprocessing, it must be protected by an if __name__ == "__main__": guard when used in a script.

When using the DaskTaskRunner, Prefect is submitting each task run to a Dask cluster object. The Dask scheduler then determines when and how each individual run should be executed (with the constraint that the order matches the execution graph that Prefect provided).

This means the only way to force Dask to walk the task graph in a particular order is to configure Prefect dependencies between your tasks.