# Workflow management

As we explained before, the concept of `task` and `workflow` are essential for workflow automation.

## 1. Task in prefect

A `task` is an atomic unit of computation (e.g. function, script, or command). In `Prefect`, we use the `@task` decorator to define a task. The `@task` decorator contains a list of parameter which allows us to configure the task.

Below code shows a general form on how to define a task in Prefect

```python
from prefect import task

@task(
    name="task1",
    description="task 1 of my workflow",
    tags=["test", "task"],
    retries=3,
    retry_delay_seconds=30,
    timeout_seconds=120,
    persist_result=False,
    cache_key_fn=None,
    cache_expiration=None,
    log_prints=True,
    on_failure=lambda exc: print("Failed:", exc),
)
def task1(input: str)->str:
    ...

```

|      Parameter       |        Type         |                       Description                       |                	Best Practice                 |
|:--------------------:|:-------------------:|:-------------------------------------------------------:|:---------------------------------------------:|
|        name	         |         str         |                 	Name shown in UI/logs.                 | 	Match task purpose (e.g., transform_sales).  |
|     description      |         str         |                	Human-readable summary.                 |         	Include inputs/outputs info.         |
|         tags         |      list[str]      |       	Arbitrary labels for grouping/monitoring.        |       	Use for filtering or scheduling.       |
|       retries        |         int         |               	Number of retry attempts.                |          	Retry transient I/O tasks.          |
| retry_delay_seconds  |         int         |               	Wait time between retries.               |         	Combine with retries above.          |
|   timeout_seconds    |         int         |              	Task execution hard timeout.              |        	Set based on task complexity.         |
|    persist_result    |        bool         |            	Store task output persistently.             |      	True for cached or reused results.      |
|     cache_key_fn     |      Callable       | 	Function to compute custom cache key based on inputs.  |     	Use for deterministic results reuse.     |
|   cache_expiration   | datetime.timedelta  |             	Expiry time of cached result.              |    	e.g., refresh every 24h for ETL jobs.     |
|      log_prints      |        bool         |        	Capture print() output in Prefect logs.         |         	Always True in development.          |
|      on_failure      |      Callable       |         	Callback triggered upon task failure.          |        	Send alerts or rollback logic.        |

### 1.1 Parallel(Asynchronous) vs Sequential(Synchronous)
Prefect allows tasks to run in two mode:
- Sequential(Synchronous)
- Parallel(Asynchronous)

```python
from prefect import task, flow
import time

@task
def slow_task(x):
    time.sleep(2)
    print(f"Task {x} done")
    return x

@flow
def compare():
    # Sequential execution
    for i in range(3):
        # runs task one by one (≈ 6 s total)
        slow_task(i)

    # Parallel execution
    # the submit() method tells Prefect to run task and returns a `StateFuture` object. It did not wait for the current to finish before starting another task
    futures = [slow_task.submit(i) for i in range(3)]
    results = [f.result() for f in futures]  # runs concurrently (≈ 2 s total)
    print("Results:", results)

if __name__ == "__main__":
    compare()

```

> As in our project, the parallelism is implemented by `Spark` jobs, there is no need to use the `parallelism` of prefect, which has many problems to get actual result from the spark cluster.

## 2. Flow in prefect

The concept of workflow is implemented by using `@flow` in Prefect. It coordinates tasks dependencies, manages retries, and records metadata.

Below code is an example of general form on how to define a flow

```python
from prefect import flow

@flow(
    name="name_of_workflow",
    description="description_of_workflow",
    version="1.0.0",
    retries=2,
    retry_delay_seconds=60,
    persist_result=True,
    log_prints=True,
    validate_parameters=True,
    timeout_seconds=600,
    task_runner=None,  # e.g., ConcurrentTaskRunner
)
def my_flow(param1: str, param2: int = 0):
    ...


```

| Parameter	          | Type	        | Description	                                                                     | Best Practice                                                                   |
|---------------------|--------------|----------------------------------------------------------------------------------|---------------------------------------------------------------------------------|
| name	               | str	         | Human-readable name of the flow.	                                                | Use semantic, short identifiers.                                                |
| description         | 	str	        | Explains purpose; appears in UI and logs.                                        | 	Include system context or dependencies.                                        |
| version	            | str	         | Optional version tag for reproducibility.	                                       | Update manually with each major change.                                         |
| retries	            | int	         | Number of times to retry the entire flow if it fails.                            | 	For critical workflows, set ≥1.                                                |
| retry_delay_seconds | 	int	        | Seconds to wait between retries.	                                                | Use exponential delay for heavy jobs.                                           |
| persist_result	     | bool         | 	If True, stores flow return value in Prefect result store (e.g., local or S3).	 | Enable when downstream needs historical data.                                   |
| log_prints          | 	bool	       | Captures print() output as Prefect logs.	                                        | Always True in debugging or dev mode.                                           |
| timeout_seconds     | 	int         | 	Maximum allowed execution time before timeout.	                                 | Use in prod to prevent resource lock.                                           |
| validate_parameters | 	bool        | 	Validates input types against function signature.	                              | Useful when triggered from UI or API.                                           |
| task_runner         | 	TaskRunner	 | Controls how tasks inside the flow execute (e.g., concurrently).                 | 	Default = SequentialTaskRunner; use ConcurrentTaskRunner() for parallel tasks. |


## 3. Task Flow interaction

In `Prefect`, a flow can contain one or many tasks, a task can contain one or many subtasks. The Below figure shows a simple example of the architecture of a flow

```text
Flow
 ├── Task 1
 ├── Task 2
 │    ├── Subtask 2a
 │    └── Subtask 2b
 └── Task 3 (depends on 1, 2)
```

Let's start to write a simple flow which implements the above architecture

```python
from prefect import flow, task

# ----- LEVEL 1: Base Tasks -----
@task(name="Task_1", log_prints=True)
def task1():
    print("Running Task 1")
    return "output_from_task1"


@task(name="Subtask_2a", log_prints=True)
def subtask2a():
    print("Running Subtask 2a")
    return "result_2a"


@task(name="Subtask_2b", log_prints=True)
def subtask2b():
    print("Running Subtask 2b")
    return "result_2b"


@task(name="Task_3", log_prints=True)
def task3(result1, result2):
    print(f"Running Task 3 after Task 1 and Task 2")
    print(f" → Got from Task 1: {result1}")
    print(f" → Got from Task 2: {result2}")
    return "final_result"


# ----- LEVEL 2: Task 2 as a Subflow -----
@flow(name="subflow_for_task_2", log_prints=True)
def task2():
    print("Starting Task 2 Flow")
    res2a = subtask2a()
    res2b = subtask2b()
    print("Completed Task 2 Flow")
    return f"combined({res2a}, {res2b})"


# ----- LEVEL 3: Main Orchestration Flow -----
@flow(name="Main_Flow", log_prints=True)
def main_flow():
    print("=== Main Flow Started ===")

    # Run the first task
    result1 = task1()
    # Run the second `task`
    # you can notice task2 is actual a flow
    # When you call a flow inside another flow, it becomes subflow
    result2 = task2()

    # Task 3 depends on both Task 1 and Task 2
    task3(wait_for=[result1, result2], result1=result1, result2=result2)

    print("=== Main Flow Completed ===")


if __name__ == "__main__":
    main_flow()
```

After running the above flow, you should see the output in the below figure.
![subflow_run.png](../assets/subflow_run.png)

> In the above code, you can notice, we did not have a decorator such as subtask, or subflow. We define task2 as flow which contains 2 tasks. Then we call it in the main flow.

> You can notice another interesting parameter `log_prints=True`, the default value of this parameter is False. It means your print will never be shown in the task logs. By changing to True, now we can see the print message.

## 4. Passing parameters to flow

If your workflow requires a user input to start, how to do it? Imaging we want to build an NLP model, the first step is to build a tokenized dictionary. So we need to convert the book into a word dictionary first. We will use the above flow architecture to implement such functions.

The below code shows an example of the implementation

```python
from prefect import flow, task


# common function
def get_words(input_str: str) -> list[str]:
    # Strip leading/trailing spaces, then split by space
    words = input_str.strip().split(" ")
    # Filter out empty entries (for consecutive spaces)
    return [w for w in words if w]


# ----- LEVEL 1: Base Tasks -----
@task(name="Task_1", description="task 1 will count total words", log_prints=True)
def task1(input_str: str) -> int:
    print("Running Task 1")
    words = get_words(input_str)
    print("Completed Task 1")
    return len(words)


@task(name="Subtask_2a", description="task 2a will convert string to a list of words", log_prints=True)
def subtask2a(input_str: str) -> list[str]:
    print("Running Subtask 2a")
    return get_words(input_str)


@task(name="Subtask_2b", description="task 2b will remove duplicates in the list", log_prints=True)
def subtask2b(words: list[str]) -> list[str]:
    print("Running Subtask 2b")
    seen = set()
    unique_words = []
    for word in words:
        if word not in seen:
            unique_words.append(word)
            seen.add(word)
    return unique_words


@task(name="Task_3", description="show a report with total words and words list", log_prints=True)
def task3(result1, result2):
    print(f"Running Task 3 after Task 1 and Task 2")
    print(f"Got from Task 1: Total words count is {result1}")
    print(f"Got from Task 2: unique words list is {result2}")
    print("Completed Task 3")


# ----- LEVEL 2: Task 2 as a Subflow -----
@flow(name="subflow_for_task_2", log_prints=True)
def task2(input_str: str) -> list[str]:
    print("Starting Task 2 Flow")
    res2a = subtask2a(input_str)
    res2b = subtask2b(res2a)
    print("Completed Task 2 Flow")
    return res2b


# ----- LEVEL 3: Main Orchestration Flow -----
@flow(name="Main_Flow", description="This workflow read a book, then show total words count and unique words",
      version="1.0.0", log_prints=True)
def main_flow(book_str: str) -> None:
    print("=== Main Flow Started ===")

    # Run the first task
    result1 = task1(book_str)
    # Run the second `task`
    # you can notice task2 is actual a flow
    # When you call a flow inside another flow, it becomes subflow
    result2 = task2(book_str)

    # Task 3 depends on both Task 1 and Task 2
    task3(wait_for=[result1, result2], result1=result1, result2=result2)

    print("=== Main Flow Completed ===")


if __name__ == "__main__":
    test_str = "people needs to eat more fruits. people needs to do more sports"
    main_flow(test_str)


```

> You can notice, each task and flow takes a parameter and returns a value. We also added custom metadata to each task and flow to make the workflow more maintainable.
> Check the description and version parameter in @task and @flow

## 5. Persist workflow outputs

Prefect introduces a new concept called `Artifacts`, which persists workflow outputs designed for human consumption.

There are five artifact types:
- links
- Markdown
- progress
- images
- tables
You can find more details in this [page](https://docs.prefect.io/v3/concepts/artifacts).

Imagine we want to persist the result of task3. Let's retake the above workflow, and improve it with a `Markdown Artifact`.

```python
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact


# common function
def get_words(input_str: str) -> list[str]:
    # Strip leading/trailing spaces, then split by space
    words = input_str.strip().split(" ")
    # Filter out empty entries (for consecutive spaces)
    return [w for w in words if w]


# ----- LEVEL 1: Base Tasks -----
@task(name="Task_1", description="task 1 will count total words", log_prints=True)
def task1(input_str: str) -> int:
    print("Running Task 1")
    words = get_words(input_str)
    print("Completed Task 1")
    return len(words)


@task(name="Subtask_2a", description="task 2a will convert string to a list of words", log_prints=True)
def subtask2a(input_str: str) -> list[str]:
    print("Running Subtask 2a")
    return get_words(input_str)


@task(name="Subtask_2b", description="task 2b will remove duplicates in the list", log_prints=True)
def subtask2b(words: list[str]) -> list[str]:
    print("Running Subtask 2b")
    seen = set()
    unique_words = []
    for word in words:
        if word not in seen:
            unique_words.append(word)
            seen.add(word)
    return unique_words


@task(name="Task_3", description="show a report with total words and words list", log_prints=True)
def task3(result1, result2):
    print(f"Running Task 3 after Task 1 and Task 2")
    markdown_report = f"""# Book processing report

## Summary

In this report, we count total words and show unique words list.

| word count        | unique word list |
|:--------------|-------:|
| {result1} | {result2} |
"""
    create_markdown_artifact(
        key="word-report",
        markdown=markdown_report,
        description="book processing report",
    )
    print("Completed Task 3")


# ----- LEVEL 2: Task 2 as a Subflow -----
@flow(name="subflow_for_task_2", log_prints=True)
def task2(input_str: str) -> list[str]:
    print("Starting Task 2 Flow")
    res2a = subtask2a(input_str)
    res2b = subtask2b(res2a)
    print("Completed Task 2 Flow")
    return res2b


# ----- LEVEL 3: Main Orchestration Flow -----
@flow(name="Flow_with_artifact", description="This workflow read a book, then show total words count and unique words",
      version="1.0.0", log_prints=True)
def main_flow(book_str: str) -> None:
    print("=== Main Flow Started ===")

    # Run the first task
    result1 = task1(book_str)
    # Run the second `task`
    # you can notice task2 is actual a flow
    # When you call a flow inside another flow, it becomes subflow
    result2 = task2(book_str)

    # Task 3 depends on both Task 1 and Task 2
    task3(wait_for=[result1, result2], result1=result1, result2=result2)

    print("=== Main Flow Completed ===")


if __name__ == "__main__":
    test_str = "people needs to eat more fruits. people needs to do more sports"
    main_flow(test_str)

```

After running the above flow, you should a new artifact created in the prefect server. Below figure shows an example
![prefect_artifact_example.png](../assets/prefect_artifact_example.png)

## 6. Error Handling in prefect

In Prefect, error handling is flexible. As prefect use python function, you can manage failures by using the basic `try/except` of python inside a task, in the flow. Prefect also provides `configurations (e.g. retries, states, or failovers)` to make error handling more user-friendly.


### 6.1 Handle error with try/except in task

Check the below example, we have two tasks which will divide 10 by the given number x. The `task1` handles the `ZeroDivisionError` with `try/except`. The `task2 `
```python
from prefect import flow, task


@task(name="Task_1", description="task 1 will divide 10 by the given number x with error handling", log_prints=True)
def task1(x: int) -> int:
    print("Running Task 1")
    try:
        result = int(10 / x)
        print(f"The normal output is {result}")
        return result
    except ZeroDivisionError:
        print("The error handling output is the default value 0")
        return 0  # Fallback value or recovery action


@task(name="Task_2", description="task 2 will divide 10 by the given number x without error handling", log_prints=True)
def task2(x: int) -> int:
    print("Running Task 2")
    return int(10 / x)


@task(name="Task_3", description="task 3 will multiple the result of task1 and task2 x by 10", log_prints=True)
def task3(y1: int, y2: int) -> None:
    print("Running Task 3")
    print(f"Receive value from task_1: {y1}, corresponding result is: {10 * y1}")
    print(f"Receive value from task_2: {y2}, corresponding result is: {10 * y2}")


@flow(name="Flow_with_error", description="This workflow contains a task which can raise ZeroDivisionErro ",
      version="1.0.0", log_prints=True)
def handle_error_flow(x: int):
    t1_resu = task1(x)
    print("Completed Task 1")
    t2_resu = task2(x)
    print("Completed Task 2")
    task3(t1_resu, t2_resu)
    print("Completed Task 3")


if __name__ == "__main__":
    handle_error_flow(0)

```



### 6.2 Handle error with try/except in flow

You can notice in the above example, if the `task2` failed, the whole workflow stopped and failed too. By default, in Prefect if `a flow fails if any upstream task fails`.

To avoid this, we can handle error with try/except in flow. Let's re-take the above example and improve it.

```python
from prefect import flow, task


@task(name="Task_1", description="task 1 will divide 10 by the given number x with error handling", log_prints=True)
def task1(x: int) -> int:
    print("Task 1: Start")
    try:
        result = int(10 / x)
        print(f"The normal output is {result}")
        return result
    except ZeroDivisionError:
        print("The error handling output is the default value 0")
        return 0  # Fallback value or recovery action


@task(name="Task_2", description="task 2 will divide 10 by the given number x without error handling", log_prints=True)
def task2(x: int) -> int:
    print("Task 2: Start")
    return int(10 / x)


@task(name="Task_3", description="task 3 will multiple the result of task1 and task2 x by 10", log_prints=True)
def task3(y1: int, y2: int) -> None:
    print("Task 3: Start")
    print(f"Receive value from task_1: {y1}, corresponding result is: {10 * y1}")
    print(f"Receive value from task_2: {y2}, corresponding result is: {10 * y2}")


@flow(name="Flow_with_error_improved", description="This workflow contains a task which can raise ZeroDivisionError",
      version="1.0.0", log_prints=True)
def handle_error_flow(x: int):
    # run task 1
    t1_resu = task1(x)
    print("Completed Task 1")
    # tun task 2, need to handle exception
    try:
        t2_resu = task2(x)
    except ZeroDivisionError:
        print("Main flow: The task 2 has failed, use the default value 0")
        t2_resu = 0
    print("Completed Task 2")
    # run task3
    # try to replace the below line with task3(wait_for=[t1_resu, t2_resu]), see what happens
    task3(wait_for=[t1_resu, t2_resu],y1=t1_resu,y2=t2_resu)
    print("Completed Task 3")


if __name__ == "__main__":
    handle_error_flow(0)


```



### 6.3 Handle error with Prefect task state

In prefect, each task run has an associate states such as.
- Completed
- Failed
- Cancelled
- Crashed
- Paused
- (and optionally AwaitingRetry)

The below figure shows the possible state and lifecycle of a task run

![prefect_task_run_state.png](../assets/prefect_task_run_state.png)

In the below flow, we on longer have try/except in the flow. We now check the states of the task to decide whether we continue or stop.

```python
from prefect import flow, task
from prefect.states import Completed, Failed


@task(name="Task_1", description="task 1 will divide 10 by the given number x with error handling", log_prints=True)
def task1(x: int) -> int:
    print("Task 1: Start")
    try:
        result = int(10 / x)
        print(f"The normal output is {result}")
        return result
    except ZeroDivisionError:
        print("The error handling output is the default value 0")
        return 0  # Fallback value or recovery action


@task(name="Task_2", description="task 2 will divide 10 by the given number x without error handling", log_prints=True)
def task2(x: int) -> int:
    print("Task 2: Start")
    return int(10 / x)


@task(name="Task_3", description="task 3 will multiple the result of task1 and task2 x by 10", log_prints=True)
def task3(y1: int, y2: int) -> None:
    print("Task 3: Start")
    print(f"Receive value from task_1: {y1}, corresponding result is: {10 * y1}")
    print(f"Receive value from task_2: {y2}, corresponding result is: {10 * y2}")


@flow(name="Flow_with_error_state_control", description="This workflow contains a task which can raise ZeroDivisionError, we handle the error with task state",
      version="1.0.0", log_prints=True)
def handle_error_flow(x: int):
    # run task 1
    t1_state_resu = task1(x, return_state=True)
    t1_resu = t1_state_resu.result()
    print("Completed Task 1")
    # tun task 2, need to handle exception
    t2_state_resu = task2(x, return_state=True)
    print(f"t2 result with state: {t2_state_resu}")
    if t2_state_resu.is_completed():
        t2_resu = t2_state_resu.result()
    elif t2_state_resu.is_failed():
        print("Main flow: The task 2 has failed, use the default value 0")
        t2_resu = 0
    else:
        print(f"Main flow: unkown state: {t2_state_resu.name}")
        t2_resu = 0
    print("Completed Task 2")
    # run task3
    # try to replace the below line with task3(wait_for=[t1_resu, t2_resu]), see what happens
    task3(wait_for=[t1_resu, t2_resu],y1=t1_resu,y2=t2_resu)
    print("Completed Task 3")


if __name__ == "__main__":
    handle_error_flow(0)

```

### 6.4 Automatic retries
Prefect provides an `automatic retry mechanisme`. When a task raises an exception/error, Prefect automatically retries it with the specified rules without restarting the entire flow.

Below is an example, imagine the workflow requires many resources to run, before it starts the real job, it checks if there is enough `resources`. To avoid failure and manual rerun, we can add an `automatic retry`.

```python
from prefect import task, flow
import random


@task(name="task_1", description="we test retry mechanism in this task ", retries=3, retry_delay_seconds=10)
def task1():
    print("Task 1: start")
    available_src = random.random()
    if available_src < 0.7:
        raise RuntimeError("No enough resource. Please hold!")
    return "Allocated required resource. Start workflow"


@task(name="Subtask_2a", description="task 2a will convert string to a list of words", log_prints=True)
def task2(input_str: str) -> None:
    print("Task 2: start")
    print(f"{input_str}")


@flow(name="Flow_with_retry", log_prints=True)
def flow_with_retry():
    # run task1
    result = task1()
    print("Completed Task 1")

    # run task2
    task2(wait_for=[task1],input_str=result)
    print("Completed Task 2")


if __name__ == "__main__":
    flow_with_retry()
```



> You can notice, for `recoverable errors` automatic retry is helpful. But for some situations(e.g., division by zero, bad file path, retry is just a waste of time and resource.


#### When to use automatic retry

Use automatic retry when facing:
- Network resource access failures: calling a REST API or S3 bucket intermittently.
- External system latency: waiting for external resources to be available(e.g. file locks, cluster workers).
- Race conditions in distributed systems: Retry after short delay can stabilize timing-sensitive processes.

#### When to use automatic retry

- Deterministic logic bugs: division by zero, bad config.
- Non-idempotent operations: retries may cause duplicates.




## 7. The Prefect logging system

Until now, to get information of tasks and flows, we have used the `print()`. It's enough for dev and test purpose, but `not recommended at all for production use`. We recommend you to use the `Prefect logging system`.

The `Prefect logging system` have the below advantages:
- automatically stored in the workflow run metadata, and they are scoped per task/flow
- has timestamp and log levels(e.g. Debug, Info, Warning, Error, Critical to filter log message)
- works on all Prefect workers(e.g. local, cloud, and threads)

You can notice, we reuse the code of retry, we just use logger to replace all the `print()`. You can notice, we can use different log levels to control how much information we show in the task run output metadata.
```python
import logging
from prefect import task, flow
import random
from prefect.logging import get_run_logger


@task(name="task_1", description="we test retry mechanism in this task ", retries=3, retry_delay_seconds=10)
def task1():
    logger = get_run_logger()
    logger.setLevel(logging.DEBUG)
    logger.debug("Task 1: start")
    available_src = random.random()
    if available_src < 0.7:
        error_msg = "No enough resource. Please hold!"
        logger.error(f"task1: {error_msg}")
        raise RuntimeError(error_msg)
    success_msg = "Allocated required resource. Start workflow"
    logger.info(success_msg)
    return success_msg


@task(name="task_2", description="task 2 will log message received from task1")
def task2(input_str: str) -> None:
    logger = get_run_logger()
    logger.setLevel(logging.DEBUG)
    logger.debug("Task 2: start")
    logger.info(f"{input_str}")


@flow(name="Flow_with_log")
def flow_with_log():
    # config logger
    logger = get_run_logger()
    logger.setLevel(logging.DEBUG)
    # run task1
    result = task1()
    logger.debug("Completed Task 1")

    # run task2
    task2(wait_for=[task1],input_str=result)
    logger.debug("Completed Task 2")


if __name__ == "__main__":
    flow_with_log()
```

> try to set log level to INFO, and error