# Part 1 - Machine Learning Pipelines and Orchestration

Goal of this section is to get to know better what is orchestration and why it is needed in mlops using Prefect, an open source python based orchestrator.

## Intro and Use case Reminder

### Use Case

The project is *New York City Taxi trip duration prediction*. \
The goal is to use the available data in order to train a simple machine learning model
to predict the trip duration based on **some input that can be available in production environment**.

An ultimate goal for this use case can be to predict in real time trips durations (google-maps/waze itinerary like)
but for simplicity, in this module, we assume that we need batch prediction. The data for which we need predictions
will be stored in a file for ingestion in the trained model.

The machine learning phase is mainly constituted by the following steps : 
- data processing
- model training
- model evaluation
- prediction

The data to use for this module can be downloaded from the [TLC Trip Record Data page](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page).
To complete this module, you will need 03 samples of data :
- `sample 1 example` : yellow trip 2021-01 data (to train model)
- `sample 2 example` : yellow trip 2021-02 data (to evaluate model)
- `sample 3 example` : yellow trip 2021-03 data (for prediction)

A simple machine learning implementation can be found in [mlops-crashcourse.ipynb](null).


### What is Orchestration and Why Orchestrate ?

Orchestration refers to Automation of tasks and processes related to the development, deployment, and management of software applications or machine learning models.

> There is different kind of orchestration. 
- **Infrastructre orchestration** : Auto provisionning and management of servers, networks, storage etc..
- **Deployment orchestration** : Auto deployment of code into different environments like, dev, stagging, testing, production ...
- **Service orchestration** : Auto scaling and management of services/microservices inside applications 
- **Workflow orchestration** : Auto building, testing and deploying pipelines

> For the NYC taxi trip duration prediction, we will be implementing **Workflow orchestration**

> Why orchestrate : 

- Have a central place to manage, auto run and monitor machine learning pipelines
- Set dependencies between ml steps and ensure correct order execution and error handling
- Configure different behavior for pipelines, for example, what happens on failure
- Reduce manual intervention
- Have possibility to distribute workload
- Have possibility to integrate other tools

> Common Orchestration concepts :

- **DAG** : Direct Acyclic Graph, represents (a visualisation of) dependencies between steps in a workflow. Steps are connected in a way that does not form any cycles.

- **SCHEDULING** : It is the case where the run of a workflow is initiated by a time requirement. It generally uses interval of time defined by the user

- **TRIGGER** : It is the case where the run of a workflow is initiated by an action. E.g : The previous task is complete, A specific task fails, A new data notification is received 

## From notebook to Workflows :

### Why change notebooks to python files for production ?

A major part of the model development phase is made by data scientist using jupyter notebooks. \
Once the pure development phase is complete, we must ask ourselves a certain number of questions if we want to move our work into a production environment.
- How do we manage inputs, outputs, storage en dependancies ?
- How to integrate our work to an existing infrastructure ?
- How to get automatic retraining / predictions ?
- and there is more ...

`Jupyter notebooks` are :
- easy to use and interactive
- ease analysis and visualizations
- allow to run in desired order

But have major drawbacks that make them **not production suitable** :
- Hard to understand when there is lots of cells
- Very hard to integrate in infrastructures
- not designed for scalability
- not designed for collaboration
- Can have compatibility issues in different environments
- Debugging ? ...

To solve the major part of this issues, a good practice is to transfer the work from notebooks to **.py** files. \
Doing this, we can:
- facilitate organization and ease the code understanding
- python scripts are easily integrable and runnable in other infrastructures
- we can use Python script manager to ensure consistency and reproducibility across different systems
- run in parallel and distribute across different machines
- They are compatibles with all orchestration tools.


### Good practices preparing code for production

- Refactor code : 
    - set variables in the notebook as function's arguments
    - use clear functions names and add docstring
    - make entrypoint functions to perform specific operations
    - use typing
- Split code into different files following the use
- Manage dependencies with Conda/pyenv and a `requirement.txt` file
- Use git and Github/GitLab for collaboration

### Implementation

#### Data

#### Imports

In [None]:
import os
import numpy as np
import pickle
import random
import asyncio
import pandas as pd
import urllib.request

from typing import List
from scipy.sparse import csr_matrix
from dataclasses import dataclass

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error


from prefect import task, flow
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import (
    CronSchedule,
    IntervalSchedule,
)

In [None]:
base_dir = "/app/"

In [None]:
train_path = base_dir + "data/yellow_tripdata_2021-01.parquet"
test_path = base_dir + "data/yellow_tripdata_2021-02.parquet"
inference_path = base_dir + "data/yellow_tripdata_2021-03.parquet"

In [None]:
urllib.request.urlretrieve(
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet",
    train_path,
)
urllib.request.urlretrieve(
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet",
    test_path,
)
urllib.request.urlretrieve(
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-03.parquet",
    inference_path,
);

In [None]:
@dataclass
class Config:
    TRAIN_DATA = train_path
    TEST_DATA = test_path
    INFERENCE_DATA = inference_path
    LOCAL_STORAGE = base_dir + "results"
    CATEGORICAL_VARS = ["PULocationID", "DOLocationID", "passenger_count"]

#### Processing functions

In [None]:
def load_data(path: str) -> pd.DataFrame:
    return pd.read_parquet(path)


def compute_target(
    df: pd.DataFrame,
    pickup_column: str = "tpep_pickup_datetime",
    dropoff_column: str = "tpep_dropoff_datetime",
) -> pd.DataFrame:
    """
    Compute the trip duration in minutes based
    on pickup and dropoff time
    """
    df["duration"] = df[dropoff_column] - df[pickup_column]
    df["duration"] = df["duration"].dt.total_seconds() / 60
    return df


def filter_outliers(
    df: pd.DataFrame, min_duration: int = 1, max_duration: int = 60
) -> pd.DataFrame:
    """
    Remove rows corresponding to negative/zero
    and too high target' values from the dataset
    """
    return df[df["duration"].between(min_duration, max_duration)]


def encode_categorical_cols(
    df: pd.DataFrame, categorical_cols: List[str] = None
) -> pd.DataFrame:
    """
    Takes a Pandas dataframe and a list of categorical
    column names, and returns dataframe with
    the specified columns converted to categorical data type
    """
    if categorical_cols is None:
        categorical_cols = Config.CATEGORICAL_VARS
    df[categorical_cols] = df[categorical_cols].fillna(-1).astype("int")
    df[categorical_cols] = df[categorical_cols].astype("str")
    return df


def extract_x_y(
    df: pd.DataFrame,
    categorical_cols: List[str] = None,
    dv: DictVectorizer = None,
    with_target: bool = True,
) -> dict:
    """
    Turns lists of mappings (dicts of feature names to feature values)
    into sparse matrices for use with scikit-learn estimators
    using Dictvectorizer object.
    :return The sparce matrix, the target' values if needed and the
    dictvectorizer object.
    """
    if categorical_cols is None:
        categorical_cols = Config.CATEGORICAL_VARS
    dicts = df[categorical_cols].to_dict(orient="records")

    y = None
    if with_target:
        if dv is None:
            dv = DictVectorizer()
            dv.fit(dicts)
        y = df["duration"].values

    x = dv.transform(dicts)
    return {"x": x, "y": y, "dv": dv}

#### Main processing 

In [None]:
def process_data(path: str, dv=None, with_target: bool = True) -> dict:
    """
    Load data from a parquet file
    Compute target (duration column) and apply threshold filters (optional)
    Turn features to sparce matrix
    :return The sparce matrix, the target' values and the
    dictvectorizer object if needed.
    """
    df = load_data(path)
    if with_target:
        df1 = compute_target(df)
        df2 = filter_outliers(df1)
        df3 = encode_categorical_cols(df2)
        return extract_x_y(df3, dv=dv)
    else:
        df1 = encode_categorical_cols(df)
        return extract_x_y(df1, dv=dv, with_target=with_target)

#### Model training

In [None]:
def train_model(x_train: csr_matrix, y_train: np.ndarray) -> LinearRegression:
    """Train and return a linear regression model"""
    lr = LinearRegression()
    lr.fit(x_train, y_train)
    return lr


def predict_duration(input_data: csr_matrix, model: LinearRegression) -> np.ndarray:
    """
    Use trained linear regression model
    to predict target from input data
    :return array of predictions
    """
    return model.predict(input_data)


def evaluate_model(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Calculate mean squared error for two arrays"""
    return mean_squared_error(y_true, y_pred, squared=False)

#### Serialization

In [None]:
def load_pickle(path: str):
    with open(path, "rb") as f:
        loaded_obj = pickle.load(f)
    return loaded_obj


def save_pickle(path: str, obj: dict):
    with open(path, "wb") as f:
        pickle.dump(obj, f)

#### Main training

In [None]:
def train_and_predict(x_train, y_train, x_test, y_test) -> dict:
    """Train model, predict values and calculate error"""
    model = train_model(x_train, y_train)
    prediction = predict_duration(x_test, model)
    mse = evaluate_model(y_test, prediction)
    return {"model": model, "mse": mse}


def complete_ml(
    train_path: str,
    test_path: str,
    save_model: bool = True,
    save_dv: bool = True,
    local_storage: str = Config.LOCAL_STORAGE,
) -> None:
    """
    Load data and prepare sparse matrix (using dictvectorizer) for model training
    Train model, make predictions and calculate error
    Save model and dictvectorizer to a folder in pickle format
    :return none
    """
    if not os.path.exists(local_storage):
        os.makedirs(local_storage)

    train_data = process_data(train_path)
    test_data = process_data(test_path, dv=train_data["dv"])
    model_obj = train_and_predict(
        train_data["x"], train_data["y"], test_data["x"], test_data["y"]
    )
    if save_model:
        save_pickle(f"{local_storage}/model.pickle", model_obj)
    if save_dv:
        save_pickle(f"{local_storage}/dv.pickle", train_data["dv"])


def batch_inference(
    input_path, dv=None, model=None, local_storage=Config.LOCAL_STORAGE
):
    """
    Load model and dictvectorizer from folder
    Transforms input data with dictvectorizer
    Predict values using loaded model
    :return array of predictions
    """
    if not dv:
        dv = load_pickle(f"{local_storage}/dv.pickle")
    data = process_data(input_path, dv, with_target=False)
    if not model:
        model = load_pickle(f"{local_storage}/model.pickle")["model"]
    return predict_duration(data["x"], model)

## Orchestrators

Here are a few popular workflow orchestrators :

- Apache Airflow : Open-source platform, used for scheduling and managing workflows, has a large and active community.

- Prefect : Open-source platform, designed to be highly flexible, easily deployable and scalable, offers a Python API.

- Flyte : Open-source platform, unified platform for workflow management across cloud and on-premises environments. Also provides a Python API 

- AWS Step Functions : Serverless workflow service offered by Amazon Web Services, supports building and executing workflows with multiple steps.

- Zapier : Web-based platform that provides a visual interface for connecting different web applications, does not require coding knowledge.

> [Little Orchestrators Benchmark](https://miro.medium.com/max/1400/1*b6CAci-A4TfuYwM9coY6nw.webp)

## Workflow orchestration with prefect

> **Version** : 
> This module has been created using Prefect 2.7.9

### Main Prefect concepts

Prefect uses python to build Jobs using functions decorators. 
As long as your main workflow function is decorated, any run of such flow becomes observable from the Prefect UI.

Basic concepts : 
- **Tasks in prefect** : They are units of work written in python. A task is a function decorated with the @task Prefect decorator. They can only be called in flows.
- **Flows** : They are dags that represents a group of interdependant tasks. 
- **Engine** : This is what define where to run flows. This is where we manage workload distribution. In this course, we only use local machine.
- **State** : They are prefect objects returned by flows. Contain informations about flows and data.

Deployment concepts : 
- **Deployment object** : These are prefect entities that the api can understand for scheduling, auto-runs etc.
- **Work queue** : These are created after a deployment have been applied. It lists all the upcomming runs and their state. (scheduled, running ...)
- **Agent** : These are responsable of pulling the flows from work queues for run a the right time. 

### Set up Prefect UI

Steps :

- Set an API URL for your local server to make sure that your workflow will be tracked by this specific instance : 
    - `prefect config set PREFECT_API_URL=http://0.0.0.0:4200/api`
- Start a local prefect server : `prefect orion start --host 0.0.0.0`

Prefect database is stored at `~/.prefect/orion.db`. If you want to reset the database, run `prefect orion database reset`


In [None]:
! prefect config set PREFECT_API_URL=http://0.0.0.0:4200/api

In [None]:
! prefect orion start --host 0.0.0.0

### Implementation

#### Imports

#### First flow : Processing flow

It is possible to configure tasks and flows behavior using arguments in the decorators : 
- name, tags
- version
- retries on failure
- etc ...

In [None]:
@task(name="failure_task", tags=["fails"], retries=3, retry_delay_seconds=60)
def failure():
    print("running")
    if random.randint(1, 10) % 2 == 0:
        raise ValueError("bad code")


@flow(name="failure_flow", version="1.0")
def test_failure():
    failure()

In [None]:
test_failure()

Let's create processing flows using the functions above and prefect : 

In [None]:
@task(name="load_data", tags=["preprocessing"], retries=2, retry_delay_seconds=60)
def load_data(path: str) -> pd.DataFrame:
    return pd.read_parquet(path)


@task(name="compute_duration", tags=["preprocessing"])
def compute_target(
    df: pd.DataFrame,
    pickup_column: str = "tpep_pickup_datetime",
    dropoff_column: str = "tpep_dropoff_datetime",
) -> pd.DataFrame:
    """
    Compute the trip duration in minutes based
    on pickup and dropoff time
    """
    df["duration"] = df[dropoff_column] - df[pickup_column]
    df["duration"] = df["duration"].dt.total_seconds() / 60
    return df


@task(name="filter_outliers", tags=["preprocessing"])
def filter_outliers(
    df: pd.DataFrame, min_duration: int = 1, max_duration: int = 60
) -> pd.DataFrame:
    """
    Remove rows corresponding to negative/zero
    and too high target' values from the dataset
    """
    return df[df["duration"].between(min_duration, max_duration)]


@task(name="encode_cat_cols", tags=["preprocessing"])
def encode_categorical_cols(
    df: pd.DataFrame, categorical_cols: List[str] = None
) -> pd.DataFrame:
    """
    Takes a Pandas dataframe and a list of categorical
    column names, and returns dataframe with
    the specified columns converted to categorical data type
    """
    if categorical_cols is None:
        categorical_cols = Config.CATEGORICAL_VARS
    df[categorical_cols] = df[categorical_cols].fillna(-1).astype("int")
    df[categorical_cols] = df[categorical_cols].astype("str")
    return df


@task(name="extract_x_y", tags=["preprocessing"])
def extract_x_y(
    df: pd.DataFrame,
    categorical_cols: List[str] = None,
    dv: DictVectorizer = None,
    with_target: bool = True,
) -> dict:
    """
    Turns lists of mappings (dicts of feature names to feature values)
    into sparse matrices for use with scikit-learn estimators
    using Dictvectorizer object.
    :return The sparce matrix, the target' values if needed and the
    dictvectorizer object.
    """
    if categorical_cols is None:
        categorical_cols = Config.CATEGORICAL_VARS
    dicts = df[categorical_cols].to_dict(orient="records")

    y = None
    if with_target:
        if dv is None:
            dv = DictVectorizer()
            dv.fit(dicts)
        y = df["duration"].values

    x = dv.transform(dicts)
    return {"x": x, "y": y, "dv": dv}

In [None]:
@flow(name="Data processing", retries=1, retry_delay_seconds=30)
def process_data(path: str, dv=None, with_target: bool = True) -> dict:
    """
    Load data from a parquet file
    Compute target(duration column) and apply threshold filters (optional)
    Turn features to sparce matrix
    :return The sparce matrix, the target' values and the
    dictvectorizer object if needed.
    """
    df = load_data(path)
    if with_target:
        df1 = compute_target(df)
        df2 = filter_outliers(df1)
        df3 = encode_categorical_cols(df2)
        return extract_x_y(df3, dv=dv)
    else:
        df1 = encode_categorical_cols(df)
        return extract_x_y(df1, dv=dv, with_target=with_target)

In [None]:
res = process_data(Config.TRAIN_DATA)

res_without_y = process_data(
    path=Config.INFERENCE_DATA, dv=res["dv"], with_target=False
)

#### Training flow

In [None]:
@task(name="Train model", tags=["Model"])
def train_model(x_train: csr_matrix, y_train: np.ndarray) -> LinearRegression:
    """Train and return a linear regression model"""
    lr = LinearRegression()
    lr.fit(x_train, y_train)
    return lr


@task(name="Make prediction", tags=["Model"])
def predict_duration(input_data: csr_matrix, model: LinearRegression) -> np.ndarray:
    """
    Use trained linear regression model
    to predict target from input data
    :return array of predictions
    """
    return model.predict(input_data)


@task(name="Evaluation", tags=["Model"])
def evaluate_model(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Calculate mean squared error for two arrays"""
    return mean_squared_error(y_true, y_pred, squared=False)


@task(name="Load", tags=["Serialize"])
def load_pickle(path: str):
    with open(path, "rb") as f:
        loaded_obj = pickle.load(f)
    return loaded_obj


@task(name="Save", tags=["Serialize"])
def save_pickle(path: str, obj: dict):
    with open(path, "wb") as f:
        pickle.dump(obj, f)

In [None]:
@flow(name="Model initialisation")
def train_and_predict(x_train, y_train, x_test, y_test) -> dict:
    """Train model, predict values and calculate error"""
    model = train_model(x_train, y_train)
    prediction = predict_duration(x_test, model)
    mse = evaluate_model(y_test, prediction)
    return {"model": model, "mse": mse}


@flow(name="Example Machine learning workflow", retries=1, retry_delay_seconds=30)
def complete_ml(
    train_path: str,
    test_path: str,
    save_model: bool = True,
    save_dv: bool = True,
    local_storage: str = Config.LOCAL_STORAGE,
) -> None:
    """
    Load data and prepare sparse matrix (using dictvectorizer) for model training
    Train model, make predictions and calculate error
    Save model and dictvectorizer to a folder in pickle format
    :return none
    """
    if not os.path.exists(local_storage):
        os.makedirs(local_storage)

    train_data = process_data(train_path)
    test_data = process_data(test_path, dv=train_data["dv"])
    model_obj = train_and_predict(
        train_data["x"], train_data["y"], test_data["x"], test_data["y"]
    )
    if save_model:
        save_pickle(f"{local_storage}/model.pickle", model_obj)
    if save_dv:
        save_pickle(f"{local_storage}/dv.pickle", train_data["dv"])


@flow(name="Batch inference", retries=1, retry_delay_seconds=30)
def batch_inference(
    input_path, dv=None, model=None, local_storage=Config.LOCAL_STORAGE
):
    """
    Load model and dictvectorizer from folder
    Transforms input data with dictvectorizer
    Predict values using loaded model
    :return array of predictions
    """
    if not dv:
        dv = load_pickle(f"{local_storage}/dv.pickle")
    data = process_data(input_path, dv, with_target=False)
    if not model:
        model = load_pickle(f"{local_storage}/model.pickle")["model"]
    return predict_duration(data["x"], model)

In [None]:
complete_ml(Config.TRAIN_DATA, Config.TEST_DATA)

In [None]:
batch_inference(Config.INFERENCE_DATA)

#### Deployments : 

Prefect deployment objects are instances that are used by the prefect API tu understand scheduling requirements. \
A flow can be used in multiple deployment objects, but a deployment object is associated to a unique flow. \
It creates work queues and agent that manages the runs.

There is two types of scheduling that can be used with prefect : 
- cron scheduling : define runs dates based on a cron expression. e.g. : `"0 0 * * 0"` (every sunday at 00:00)
- interval scheduling : define runs interval in minutes/seconds/...

In [None]:
modeling_deployment_every_sunday = await Deployment.build_from_flow(
    name="Model training Deployment",
    flow=complete_ml,
    version="1.0",
    tags=["model"],
    schedule=CronSchedule(cron="0 0 * * 0"),
    apply=True,
    entrypoint="/app/lib/prefect_workflows.py:complete_ml",
    parameters={"train_path": Config.TRAIN_DATA, "test_path": Config.TEST_DATA},
)


inference_deployment_every_minute = await Deployment.build_from_flow(
    name="Model Inference Deployment",
    flow=batch_inference,
    version="1.0",
    tags=["inference"],
    schedule=IntervalSchedule(interval=60),
    apply=True,
    entrypoint="/app/lib/prefect_workflows.py:batch_inference",
    parameters={"input_path": Config.INFERENCE_DATA},
)

A prefect agent is needed to pull the works and run the flows at the right time/interval.
Start one with : 
```
prefect agent start default
```

In [None]:
! prefect agent start default

#### Open Discussion : 

- Discuss a global vision of how to implement pipelines triggering (by action, not time)
- Discuss a way to implement it to the NYC use case. 