# Machine Learning Pipelines and Orchestration with Prefect

> [!Important]
> **Version**: This module has been created using Prefect 2.13.7

## 0 - Useful functions from previous lessons

In [7]:
# lib/config.py
CATEGORICAL_COLS = ["PULocationID", "DOLocationID", "passenger_count"]

DATA_DIRPATH = "../../data"
MODELS_DIRPATH = "../../models"

In [9]:
# lib/preprocessing.py
from typing import List

from loguru import logger
import pandas as pd
from sklearn.feature_extraction import DictVectorizer

# from config import CATEGORICAL_COLS


def compute_target(
    df: pd.DataFrame, pickup_column: str = "tpep_pickup_datetime", dropoff_column: str = "tpep_dropoff_datetime"
) -> pd.DataFrame:
    """Compute the trip duration in minutes based on pickup and dropoff time"""
    df["duration"] = df[dropoff_column] - df[pickup_column]
    df["duration"] = df["duration"].dt.total_seconds() / 60
    return df


def filter_outliers(df: pd.DataFrame, min_duration: int = 1, max_duration: int = 60) -> pd.DataFrame:
    """
    Remove rows corresponding to negative/zero
    and too high target' values from the dataset
    """
    return df[df["duration"].between(min_duration, max_duration)]


def encode_categorical_cols(df: pd.DataFrame, categorical_cols: List[str] = None) -> pd.DataFrame:
    """Encode categorical columns as strings"""
    if categorical_cols is None:
        categorical_cols = CATEGORICAL_COLS
    df[categorical_cols] = df[categorical_cols].fillna(-1).astype("int")
    df[categorical_cols] = df[categorical_cols].astype("str")
    return df


def extract_x_y(
    df: pd.DataFrame,
    categorical_cols: List[str] = None,
    dv: DictVectorizer = None,
    with_target: bool = True,
) -> dict:
    """Extract X and y from the dataframe"""
    if categorical_cols is None:
        categorical_cols = CATEGORICAL_COLS
    dicts = df[categorical_cols].to_dict(orient="records")

    y = None
    if with_target:
        if dv is None:
            dv = DictVectorizer()
            dv.fit(dicts)
        y = df["duration"].values

    x = dv.transform(dicts)
    return x, y, dv


def process_data(filepath: str, dv=None, with_target: bool = True) -> dict:
    """
    Load data from a parquet file
    Compute target (duration column) and apply threshold filters (optional)
    Turn features to sparce matrix
    :return The sparce matrix, the target' values and the
    dictvectorizer object if needed.
    """
    df = pd.read_parquet(filepath)
    if with_target:
        logger.info("Computing target...")
        df1 = compute_target(df)
        logger.info("Filtering outliers...")
        df2 = filter_outliers(df1)
        logger.info("Encoding categorical columns...")
        df3 = encode_categorical_cols(df2)
        logger.info("Extracting X and y...")
        return extract_x_y(df3, dv=dv)
    else:
        logger.info("Encoding categorical columns...")
        df1 = encode_categorical_cols(df)
        logger.info("Extracting X and y...")
        return extract_x_y(df1, dv=dv, with_target=with_target)

## 1 - Create workflow functions

Create five functions to complete the ML process :
- `train_model`
- `predict`
- `evaluate_model`
- A workflow function to perform the whole training process `train_model_workflow`
    - Process data
    - Train model
    - Evaluate model
- A workflow function to perform the whole prediction process `predict_workflow`
    - Process data without target column
    - Predict


Then, test your code with the downloaded data (e.g. January to train and February to predict)

## 2 - Setup and explore Prefect

We are going to use [Prefect](https://docs.prefect.io/2.6/tutorials/first-steps/), an Open Source orchestration tool with a Python SDK.


**WINDOWS USERS**:

You might run into issues with Prefect on Windows. If you do, please follow [Prefects instructions](https://docs.prefect.io/2.13.7/getting-started/installation/#install-prefect) to install Prefect on your machine

### 2-1 Setup Prefect UI

Before starting to implement tasks and flows with prefect, let's set up the UI in order to have a good visualization of our work.

Steps :

- Set an API URL for your local server to make sure that your workflow will be tracked by this specific instance :
```
prefect config set PREFECT_API_URL=http://0.0.0.0:4200/api
```

- Check you have SQLite installed ([Prefect backend database system](https://docs.prefect.io/2.13.7/getting-started/installation/#external-requirements)):
```
sqlite3 --version 
```

- Start a local prefect server :
```
prefect server start --host 0.0.0.0
```

If you want to reset the database, run :
```
prefect server database reset
```


You can visit the UI at http://0.0.0.0:4200/dashboard
![](images/starting_page.png)

### 2-2 Prefect tasks and flows

[Prefect uses tasks and flows to build workflows](https://docs.prefect.io/2.13.7/tutorial/flows/).
- Flows are like functions. They can take inputs, perform work, and return an output. In fact, you can turn any function into a Prefect flow by adding the @flow decorator
- A task is any Python function decorated with a @task decorator called within a flow. You can think of a flow as a recipe for connecting a known sequence of tasks together. Tasks, and the dependencies between them, are displayed in the flow run graph, enabling you to break down a complex flow into something you can observe, understand and control at a more granular level.
    - All tasks must be called from within a flow. Tasks may not call other tasks directly.
    - Not all functions in a flow need be tasks. Use them only when their features are useful.

In [None]:
# Example
import httpx
from prefect import flow, task


@task
def get_url(url: str, params: dict = None):
    response = httpx.get(url, params=params)
    response.raise_for_status()
    return response.json()


@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    url = f"https://api.github.com/repos/{repo_name}"
    repo_stats = get_url(url)
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
    print(f"Forks 🍴 : {repo_stats['forks_count']}")

## 3 - Create Prefect tasks and flows

### 3-1 Create tasks and flows

Use the decorators `@task` and `@flow` to create your first prefect flow : The Processing flow.

Prefect will try to use by default different thread to run each task. If you want sequential steps, introduce this dependencies through the name of each task output.


Then:
- Test your code by calling the flows run with downloaded data  # TODO
- Visualize in the local prefect UI  # TODO

> [!Warning]
> **Typing tasks and flows in prefect** :
> Typing tasks in prefect is done as with any python code.
> For flows, either use `validate_parameters=False` or define pydantic models for prefect to understand your NON DEFAULT typing (see extra section).
> But if all tasks are typed, since flows are just set of tasks, it should be all good if we don't want to add a layer of complexity
> `Default types` : str, int ...

### 3-2 Customize your flows

## 4 - Deploy your flows

## 5 - Extra concepts

### 5-1 Prefect workers

### 5-2 Prefect typing using Pydantic