# Prefect

> [website](https://www.prefect.io/)

- skip_showdoc: true
- skip_exec: true

## Start prefect server

```sh
prefect server start
```

> http://127.0.0.1:4200



### Change Settings

```sh
prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"
```

### View settings

```sh
prefect config view --show-defaults
```

## Flows

Flows can be thought of as special types of functions. They can take inputs, perform work, and return an output. In fact, you can turn any function into a Prefect flow by adding the @flow decorator. When a function becomes a flow, its behavior changes, giving it the following advantages:

Task called within Flows

In [None]:
from prefect import flow, task

@task
def print_hello(name):
    print(f"Hello {name}!")

@flow(name="Hello Flow")
def hello_world(name="world"):
    print_hello(name)

In [None]:
import datetime
from prefect import flow

@flow(flow_run_name="{name}-on-{date:%A}")
def my_flow(name: str, date: datetime.datetime):
    pass

# creates a flow run called 'marvin-on-Thursday'
my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))

### Flow settings

Flows allow a great deal of configuration by passing arguments to the decorator. Flows accept the following optional settings.

| Argument                                           | Description                                                                                                                                                                                                          |
| -------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `description`                                      | An optional string description for the flow. If not provided, the description will be pulled from the docstring for the decorated function.                                                                          |
| `name`                                             | An optional name for the flow. If not provided, the name will be inferred from the function.                                                                                                                         |
| `retries`                                          | An optional number of times to retry on flow run failure.                                                                                                                                                            |
| <span class="no-wrap">`retry_delay_seconds`</span> | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if `retries` is nonzero.                                                                                       |
| `flow_run_name`                                    | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow's parameters as variables; this name can also be provided as a function that returns a string.       |
| `task_runner`                                      | An optional [task runner](/concepts/task-runners/) to use for task execution within the flow when you `.submit()` tasks. If not provided and you `.submit()` tasks, the `ConcurrentTaskRunner` will be used.         |
| `timeout_seconds`                                  | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it will be marked as failed. Flow execution may continue until the next task is called.                   |
| `validate_parameters`                              | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is `True`.                                                                                                                  |
| `version`                                          | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |


### Flow Example

In [None]:
from prefect import flow, task
import datetime
from prefect.runtime import flow_run
from prefect.task_runners import SequentialTaskRunner

def generate_flow_run_name():
    flow_name = flow_run.flow_name
    parameters = flow_run.parameters
    name = parameters["name"]

    
    date = datetime.datetime.now(datetime.timezone.utc)

    return f"flow run name test: {flow_name}-with-{name}: {date:%A}"

@task(name="task test")
def print_hello(name:str) -> str:
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="flow test",
      description="flow test description",
      task_runner=SequentialTaskRunner(),
      flow_run_name=generate_flow_run_name
      )
def hello_world(name:str ="world") -> None:
    message = print_hello(name)

hello_world("Ben")


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello Ben!


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`'))]

In [None]:
import graphviz

In [None]:
from prefect import flow, task

@task(name="Print Hello")
def print_hello(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@task(name="Print Hello Again")
def print_hello_again(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="Hello Flow")
def hello_world(name="world"):
    message = print_hello(name)
    message2 = print_hello_again(message)

hello_world.visualize()

<coroutine object Flow.visualize>

### Subflows

A subflow run is created when a flow function is called inside the execution of another flow. The primary flow is the "parent" flow. The flow created within the parent is the "child" flow or "subflow."

Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately. When a subflow starts, it will create a new task runner for tasks within the subflow. When the subflow completes, the task runner is shut down.

In [None]:
from prefect import flow, task

@task(name="Print Hello")
def print_hello(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="Subflow")
def my_subflow(msg):
    print(f"Subflow says: {msg}")

@flow(name="Hello Flow")
def hello_world(name="world"):
    for _ in range(3):
        message = print_hello(name)
        my_subflow(message)

hello_world("Marvin")


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello Marvin!



 `@task(name='my_unique_name', ...)`


Subflow says: Hello Marvin!


Hello Marvin!



 `@task(name='my_unique_name', ...)`


Subflow says: Hello Marvin!


Hello Marvin!



 `@task(name='my_unique_name', ...)`


Subflow says: Hello Marvin!


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted r

!!! tip "Subflows or tasks?"
    In Prefect you can call tasks _or_ subflows to do work within your workflow, including passing results from other tasks to your subflow. 
    So a common question is:

    "When should I use a subflow instead of a task?"

    We recommend writing tasks that do a discrete, specific piece of work in your workflow: calling an API, performing a database operation, analyzing or transforming a data point. 
    Prefect tasks are well suited to parallel or distributed execution using distributed computation frameworks such as Dask or Ray. 
    For troubleshooting, the more granular you create your tasks, the easier it is to find and fix issues should a task fail.

    Subflows enable you to group related tasks within your workflow. 
    Here are some scenarios where you might choose to use a subflow rather than calling tasks individually:

    - Observability: Subflows, like any other flow run, have first-class observability within the Prefect UI and Prefect Cloud. You'll see subflow status in the **Flow Runs** dashboard rather than having to dig down into the tasks within a specific flow run. See [Final state determination](#final-state-determination) for some examples of leveraging task state within flows.
    - Conditional flows: If you have a group of tasks that run only under certain conditions, you can group them within a subflow and conditionally run the subflow rather than each task individually.
    - Parameters: Flows have first-class support for parameterization, making it easy to run the same group of tasks in different use cases by simply passing different parameters to the subflow in which they run.
    - Task runners: Subflows enable you to specify the task runner used for tasks within the flow. For example, if you want to optimize parallel execution of certain tasks with Dask, you can group them in a subflow that uses the Dask task runner. You can use a different task runner for each subflow.


In [None]:
from prefect import flow
from datetime import datetime

@flow
def what_day_is_it(date: datetime = None):
    if date is None:
        date = datetime.now(timezone.utc)
    print(f"It was {date.strftime('%A')} on {date.isoformat()}")

what_day_is_it("2021-01-01T02:00:19.180906")
# It was Friday on 2021-01-01T02:00:19.180906

It was Friday on 2021-01-01T02:00:19.180906


## Tasks

## Task arguments

Tasks allow for customization through optional arguments:

| Argument              | Description                                                                                                                                                                                                             |
| --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `name`                | An optional name for the task. If not provided, the name will be inferred from the function name.                                                                                                                       |
| `description`         | An optional string description for the task. If not provided, the description will be pulled from the docstring for the decorated function.                                                                             |
| `tags`                | An optional set of tags to be associated with runs of this task. These tags are combined with any tags defined by a `prefect.tags` context at task runtime.                                                             |
| `cache_key_fn`        | An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result will be restored instead of running the task again. |
| `cache_expiration`    | An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire.                                                                      |
| `retries`             | An optional number of times to retry on task run failure.                                                                                                                                                               |
| `retry_delay_seconds` | An optional number of seconds to wait before retrying the task after failure. This is only applicable if `retries` is nonzero.                                                                                          |
| `log_prints`|An optional boolean indicating whether to log print statements. |
| `persist_result` | An optional boolean indicating whether to persist the result of the task run to storage. |

In [None]:
from prefect import flow, task

@task
def my_first_task(msg):
    print(f"Hello, {msg}")

@task
def my_second_task(msg):
    my_first_task.fn(msg)
    return msg

@flow(flow_run_name = "task test1")
def my_flow(msg: str = "Trillian"):
    my_second_task(msg)
    return msg

my_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Hello, Trillian


'Trillian'

In [None]:
from prefect import flow
from prefect.runtime import flow_run, task_run

def generate_task_name():
    flow_name = flow_run.flow_name
    task_name = task_run.task_name

    parameters = task_run.parameters
    name = parameters["name"]
    limit = parameters["limit"]

    return f"{flow_name}-{task_name}-with-{name}-and-{limit}"

@task(name="my-example-task",
      description="An example task for a tutorial.",
      task_run_name=generate_task_name)
def my_task(name: str, limit: int = 100):
    pass

@flow
def my_flow(name: str):
    # creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
    my_task(name="marvin")


 `@flow(name='my_unique_name', ...)`


### Tags

In [None]:
@task(name="hello-task", tags=["test"])
def my_task():
    print("Hello, I'm a task")

### Retries

In [None]:
import httpx

from prefect import flow, task


@task(retries=2, retry_delay_seconds=5)
def get_data_task(
    url: str = "https://api.brittle-service.com/endpoint"
) -> dict:
    response = httpx.get(url)

    # If the response status code is anything but a 2xx, httpx will raise
    # an exception. This task doesn't handle the exception, so Prefect will
    # catch the exception and will consider the task run failed.
    response.raise_for_status()

    return response.json()


@flow
def get_data_flow():
    get_data_task()

get_data_flow()


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


ConnectError: [Errno -2] Name or service not known

In [None]:
import httpx
from prefect import flow, task

def retry_handler(task, task_run, state) -> bool:
    """This is a custom retry handler to handle when we want to retry a task"""
    try:
        # Attempt to get the result of the task
        state.result()
    except httpx.HTTPStatusError as exc:
        # Retry on any HTTP status code that is not 401 or 404
        do_not_retry_on_these_codes = [401, 404]
        return exc.response.status_code not in do_not_retry_on_these_codes
    except httpx.ConnectError:
        # Do not retry
        return False
    except:
        # For any other exception, retry
        return True

@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@flow
def get_data_flow(url):
    my_api_call_task(url=url)

if __name__ == "__main__":
    get_data_flow(url="https://httpbin.org/status/503")


 `@flow(name='my_unique_name', ...)`


HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503

## Timeouts

In [None]:
from prefect import task, get_run_logger
import time

@task(timeout_seconds=1)
def show_timeouts():
    logger = get_run_logger()
    logger.info("I will execute")
    time.sleep(5)
    logger.info("I will not execute")

## Wait for

In [None]:
@task
def task_1():
    pass

@task
def task_2():
    pass

@flow
def my_flow():
    x = task_1()

    # task 2 will wait for task_1 to complete
    y = task_2(wait_for=[x])


 `@flow(name='my_unique_name', ...)`


## Maps

In [None]:
from prefect import flow, task

@task
def print_nums(nums):
    for n in nums:
        print(n)

@task
def square_num(num):
    return num**2

@flow
def map_flow(nums):
    print_nums(nums)
    squared_nums = square_num.map(nums) 
    print_nums(squared_nums)

map_flow([1,2,3,5,8,13])

1
2
3
5
8
13


1
4
9
25
64
169


[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result

## Async

In [None]:
import asyncio

from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1) # yield
        print(value, end=" ")

@flow
async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]

    # asynchronously gather the tasks
    await asyncio.gather(*coros)

```python
asyncio.run(async_flow())
```

In [None]:
from prefect import get_client

async with get_client() as client:
    # set a concurrency limit of 10 on the 'small_instance' tag
    limit_id = await client.create_concurrency_limit(
        tag="small_instance", 
        concurrency_limit=10
        )

In [None]:
!prefect concurrency-limit inspect small_instance

╭──────────────────────────────────────────────────────────────────────────╮
│ [3m       Concurrency Limit ID: [0m[3;31m879f2e40-8387-47c5-af34-0b164f7ea8bc[0m[3m       [0m │
│ ┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓ │
│ ┃[1m [0m[1mTag           [0m[1m [0m┃[1m [0m[1mConcurrency Limit[0m[1m [0m┃[1m [0m[1mCreated       [0m[1m [0m┃[1m [0m[1mUpdated       [0m[1m [0m┃ │
│ ┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩ │
│ │[32m [0m[32msmall_instance[0m[32m [0m│[34m [0m[34m10               [0m[34m [0m│[35m [0m[32m'1 minute ago'[0m[35m [0m│[35m [0m[32m'1 minute ago'[0m[35m [0m│ │
│ └────────────────┴───────────────────┴────────────────┴────────────────┘ │
│ ┏━━━━━━━━━━━━━━━━━━━━━┓                                                  │
│ ┃[1m [0m[1mActive Task Run IDs[0m[1m [0m┃                                                  │
│ ┡━━━━━━━━━━━━━━━━━━━━━┩                           

## Deployments

name: prefect.yaml


```yaml
# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.

# Generic metadata about this project
name: nbs
prefect-version: 2.16.8

# build section allows you to manage and build docker images
build:

# push section allows you to manage if and how this project is uploaded to remote locations
push:

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.git_clone:
    repository: git@github.com:bthek1/MLtools.git
    branch: main

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: slow_flow
  version:
  tags: []
  description: Sleepy flow - sleeps the provided amount of time (in seconds).
  entrypoint: nbs/prefect_deployment_serve.py:slow_flow
  parameters: {}
  work_pool:
    name: test-pool
    work_queue_name:
    job_variables: {}
  schedules:
  - interval: 30.0
    anchor_date: '2024-04-03T13:38:23.549390+00:00'
    timezone: UTC
    active: true
- name: fast_flow
  version:
  tags: []
  description: Fastest flow this side of the Mississippi.
  entrypoint: nbs/prefect_deployment_serve.py:fast_flow
  parameters: {}
  work_pool:
    name: test-pool
    work_queue_name:
    job_variables: {}
  schedules:
  - interval: 60.0
    anchor_date: '2024-04-03T14:13:30.384393+00:00'
    timezone: UTC
    active: true
  - interval: 150.0
    anchor_date: '2024-04-03T14:13:45.806620+00:00'
    timezone: UTC
    active: false


````


## Work Pools : To do

Work pool overview¶

Work pools organize work for execution. Work pools have types corresponding to the infrastructure that will execute the flow code, as well as the delivery method of work to that environment. Pull work pools require workers (or less ideally, agents) to poll the work pool for flow runs to execute. Push work pools can submit runs directly to your serverless infrastructure providers such as Google Cloud Run, Azure Container Instances, and AWS ECS without the need for an agent or worker. 

```sh
prefect work-pool create test-pool
```

In [None]:
!prefect work-pool ls

[3m                                   Work Pools                                   [0m
┏━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┓
┃[1m [0m[1mName     [0m[1m [0m┃[1m [0m[1mType  [0m[1m [0m┃[1m [0m[1m                                  ID[0m[1m [0m┃[1m [0m[1mConcurrency Lim…[0m[1m [0m┃
┡━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━┩
│[32m [0m[32mtest-pool[0m[32m [0m│[35m [0m[35mproce…[0m[35m [0m│[36m [0m[36m5d41c025-ec45-4e64-9ef6-3ca91b9684e0[0m[36m [0m│[34m [0m[34mNone[0m[34m            [0m[34m [0m│
└───────────┴────────┴──────────────────────────────────────┴──────────────────┘
[31m                           (**) denotes a paused pool                           [0m


In [None]:
!prefect work-pool inspect 'test-pool'

[1;35mWorkPool[0m[1m([0m
    [33mid[0m=[32m'5d41c025-ec45-4e64-9ef6-3ca91b9684e0'[0m,
    [33mcreated[0m=[1;35mDateTime[0m[1m([0m[1;36m2024[0m, [1;36m4[0m, [1;36m3[0m, [1;36m10[0m, [1;36m12[0m, [1;36m27[0m, [1;36m298785[0m, [33mtzinfo[0m=[1;35mTimezone[0m[1m([0m[32m'+00:00'[0m[1m)[0m[1m)[0m,
    [33mupdated[0m=[1;35mDateTime[0m[1m([0m[1;36m2024[0m, [1;36m4[0m, [1;36m3[0m, [1;36m10[0m, [1;36m12[0m, [1;36m27[0m, [1;36m306000[0m, [33mtzinfo[0m=[1;35mTimezone[0m[1m([0m[32m'+00:00'[0m[1m)[0m[1m)[0m,
    [33mname[0m=[32m'test-pool'[0m,
    [33mtype[0m=[32m'process'[0m,
    [33mbase_job_template[0m=[1m{[0m
        [32m'job_configuration'[0m: [1m{[0m
            [32m'command'[0m: [32m'[0m[32m{[0m[32m{[0m[32m command [0m[32m}[0m[32m}[0m[32m'[0m,
            [32m'env'[0m: [32m'[0m[32m{[0m[32m{[0m[32m env [0m[32m}[0m[32m}[0m[32m'[0m,
            [32m'labels'[0m: [32m'[0m[32

## Schedules : To do

Prefect supports several types of schedules that cover a wide range of use cases and offer a large degree of customization:

- [`Cron`](#cron) is most appropriate for users who are already familiar with `cron` from previous use.
- [`Interval`](#interval) is best suited for deployments that need to run at some consistent cadence that isn't related to absolute time.
- [`RRule`](#rrule) is best suited for deployments that rely on calendar logic for simple recurring schedules, irregular intervals, exclusions, or day-of-month adjustments.

!!! tip "Schedules can be inactive"
    When you create or edit a schedule, you can set the `active` property to `False` in Python (or `false` in a YAML file) to deactivate the schedule.
    This is useful if you want to keep the schedule configuration but temporarily stop the schedule from creating new flow runs.


## Results

In [None]:
from prefect import flow, task

@task
def my_task():
    return 1

@flow
def my_flow():
    future = my_task.submit()
    return future.result() + 1

result = my_flow()
assert result == 2


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


### Error handling

In [None]:
from prefect import flow, task

@task
def my_task():
    raise ValueError()

@flow
def my_flow():
    state = my_task(return_state=True)

    if state.is_failed():
        print("Oh no! The task failed. Falling back to '1'.")
        result = 1
    else:
        result = state.result()

    return result + 1

result = my_flow()
assert result == 2


 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


Oh no! The task failed. Falling back to '1'.


## Artifacts

In [None]:
from prefect import flow, task
from prefect.artifacts import create_link_artifact

@task
def my_first_task():
    create_link_artifact(
        key="create-link-artifact",
        link="my_first_task",
        description="## my_first_task",
    )

@task
def my_second_task():
    create_link_artifact(
        key="create-link-artifact",
        link="my_second_task",
        description="## my_second_task",
    )

@flow
def my_flow():
    create_link_artifact(
        key="create-link-artifact",
        link="my_flow",
        description="## my_flow",
)
    my_first_task()
    my_second_task()

if __name__ == "__main__":
    my_flow()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


In [None]:
from prefect import flow
from prefect.artifacts import create_link_artifact

@flow
def my_flow():
    create_link_artifact(
        key="my-important-link",
        link="https://www.prefect.io/",
        link_text="Prefect",
    )

if __name__ == "__main__":
    my_flow()


 `@flow(name='my_unique_name', ...)`


In [None]:
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact

@task
def markdown_task():
    na_revenue = 500000
    markdown_report = f"""# Sales Report

## Summary

In the past quarter, our company saw a significant increase in sales, with a total revenue of $1,000,000. 
This represents a 20% increase over the same period last year.

## Sales by Region

| Region        | Revenue |
|:--------------|-------:|
| North America | ${na_revenue:,} |
| Europe        | $250,000 |
| Asia          | $150,000 |
| South America | $75,000 |
| Africa        | $25,000 |

## Top Products

1. Product A - $300,000 in revenue
2. Product B - $200,000 in revenue
3. Product C - $150,000 in revenue

## Conclusion

Overall, these results are very encouraging and demonstrate the success of our sales team in increasing revenue 
across all regions. However, we still have room for improvement and should focus on further increasing sales in 
the coming quarter.
"""
    create_markdown_artifact(
        key="gtm-report",
        markdown=markdown_report,
        description="Quarterly Sales Report",
    )

@flow()
def my_flow():
    markdown_task()


if __name__ == "__main__":
    my_flow()


 `@flow(name='my_unique_name', ...)`


## States

When calling a task or a flow, there are three types of returned values:

- Data: A Python object (such as `int`, `str`, `dict`, `list`, and so on).
- `State`: A Prefect object indicating the state of a flow or task run.
- [`PrefectFuture`](/api-ref/prefect/futures/#prefect.futures.PrefectFuture): A Prefect object that contains both _data_ and _State_.

Returning data  is the default behavior any time you call `your_task()`.

Returning Prefect [`State`](/api-ref/server/schemas/states/) occurs anytime you call your task or flow with the argument `return_state=True`.

Returning [`PrefectFuture`](/api-ref/prefect/futures/#prefect.futures.PrefectFuture) is achieved by calling `your_task.submit()`.

In [None]:
from prefect import flow

def my_success_hook(flow, flow_run, state):
    print("Flow run succeeded!")

@flow(on_completion=[my_success_hook])
def my_flow():
    return 42

my_flow()


 `@flow(name='my_unique_name', ...)`


Flow run succeeded!


42

Create flow run state change hooks¶
```python
def my_flow_hook(flow: Flow, flow_run: FlowRun, state: State):
    """This is the required signature for a flow run state
    change hook. This hook can only be passed into flows.
    """

# pass hook as a list of callables
@flow(on_completion=[my_flow_hook])
```


Create task run state change hooks¶
```python
def my_task_hook(task: Task, task_run: TaskRun, state: State):
    """This is the required signature for a task run state change
    hook. This hook can only be passed into tasks.
    """

# pass hook as a list of callables
@task(on_failure=[my_task_hook])
```

## Serve

```python
from prefect import flow


@flow(log_prints=True)
def hello_world(name: str = "world", goodbye: bool = False):
    print(f"Hello {name} from Prefect! 🤗")

    if goodbye:
        print(f"Goodbye {name}!")


if __name__ == "__main__":
    # creates a deployment and stays running to monitor for work instructions generated on the server

    hello_world.serve(name="my-first-deployment",
                      tags=["onboarding"],
                      parameters={"goodbye": True},
                      interval=60)


```

```python
import time
from prefect import flow, serve


@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")
    serve(slow_deploy, fast_deploy)

```