# Introduction

Prefect 2 is a powerful **workflow orchestration** library for managing and automating data pipelines. It allows you to define, run, and monitor data workflows with ease, while handling common challenges like retries, scheduling, and error handling.

In [None]:
pip install prefect

Prefect uses the concept of "flows" and "tasks."

- **Task**: An individual operation or step in a pipeline.
- **Flow**: A collection of tasks that define the overall pipeline.

# Defining Tasks

Tasks are defined using the **@task** decorator. A task can be a function that performs a data transformation or other operations.

In [None]:
from prefect import task

@task(log_prints=True)
def fetch_data(api_url: str):
    """
    Fetch data from the specified API URL.
    """
    import requests
    print(f"Fetching data from {api_url}...")
    response = requests.get(api_url)
    response.raise_for_status()
    return response.json()

# Defining a Flow

The **Flow class** holds the tasks. You can define dependencies between tasks by chaining them together.

### Creating a Flow

In [None]:
from prefect import flow

@flow
def etl_pipeline(api_url: str):
    """
    ETL pipeline flow: orchestrates extract, transform, and load tasks.
    """
    raw_data = fetch_data(api_url)
    processed_data = transform_data(raw_data)
    load_data(processed_data)

etl_pipeline(api_url="https://api.example.com/sales")

# Storing and Loading Flows

Prefect supports the storage of flows so that you can easily load and run them later. You can store flows on various platforms like **GitHub, S3, and Prefect Cloud**m.

### Storing Flows Locally
To store a flow locally, use the **Local storage class**:

In [None]:
from prefect.storage import Local

# Define local storage and assign it to the flow
flow.storage = Local(directory="flows/")  # Save the flow to the "flows/" directory

# Save the flow
flow.save("my_local_flow.prefect")

### Loading Flows
Once your flow is stored, you can load it from storage. If you're using Prefect Cloud, it’s automatically registered.

- To load from a local file:

In [None]:
from prefect import Flow

# Load the flow
loaded_flow = Flow.load("flows/my_local_flow.prefect")

# Run the loaded flow
state = loaded_flow.run()

# Scheduling and Monitoring Flows

Prefect provides easy **scheduling options** to **automate flow runs**. You can define schedules using the **CronSchedule** or **IntervalSchedule** classes.

### Scheduling with Cron
To run a flow periodically with a cron schedule:

In [None]:
from prefect.schedules import CronSchedule

@flow
def daily_flow():
    print("Running daily at 9 AM!")

schedule = CronSchedule(cron="0 9 * * *")
daily_flow.with_options(schedule=schedule)

#### Explanation:
- **CronSchedule("0 0 \* \* \*")** schedules the flow to run at midnight every day.
- The **schedule** parameter is passed to the flow to automate the execution.

### Scheduling with Interval
For intervals, use the IntervalSchedule:

In [None]:
from prefect.schedules import IntervalSchedule
from datetime import timedelta

@flow
def scheduled_flow():
    print("Running every 5 minutes!")

schedule = IntervalSchedule(interval=timedelta(minutes=5))
scheduled_flow.with_options(schedule=schedule)

#### Explanation:
- **@task(max_retries=3, retry_delay=timedelta(seconds=10))** specifies that the task will be retried up to 3 times with a 10-second delay between retries if it fails.

# Handling Failures and Retries

Prefect allows **retry logic for tasks**. You can define retries with a maximum number of attempts and a delay between retries.

In [None]:
@task(retries=3, retry_delay_seconds=10)
def unreliable_task():
    import random
    if random.random() < 0.7:
        raise ValueError("Simulated failure!")
    print("Task succeeded!")

@flow
def retry_example():
    unreliable_task()

retry_example()

- **retries**: Number of retry attempts.
- **retry_delay_seconds**: Time between retries.

# Prefect Executors

Prefect supports multiple execution environments through executors. The most common ones are the **LocalExecutor** and **DaskExecutor** for parallel execution.

### LocalExecutor (Default)
By default, Prefect runs tasks sequentially using the LocalExecutor.

### DaskExecutor for Parallelism
To use Dask for parallel execution, install the required dependencies:

In [None]:
pip install prefect[extras] dask

Then, use the **DaskExecuter**

In [None]:
from prefect.executors import DaskExecutor
from prefect import Flow

with Flow("Parallel ETL", executor=DaskExecutor()) as flow:
    data1 = extract_data()
    data2 = extract_data()
    transformed_data1 = transform_data(data1)
    transformed_data2 = transform_data(data2)
    load_data(transformed_data1)
    load_data(transformed_data2)

flow.run()

# Deployment

For production, Prefect supports deployment options to cloud services like Kubernetes, Docker, and AWS Batch.

In [None]:
from prefect.storage import Docker

flow.storage = Docker(registry_url="your-docker-repo", image_name="prefect-pipeline")