# **Machine Learning Workflow Orchestration**

Orchestration refers to the coordination and management of various tasks, resources, and processes involved in the end-to-end machine learning lifecycle. This includes:

1. Data Preparation and Management
2. Model Training
3. Experimentation and Evaluaiton
4. Model Deployment
5. Monitor and Management
6. Automation of repetitive tasks

### **Introducing Prefect**  
Prefect is an open-source orchestration and observability platform that empowers developers to build and scale resilient code quickly, turning their Python scripts into resilient, recurring workflows.

Prefect streamlines the orchestration of machine learning workflows by providing a flexible, scalable, and reliable framework for building, deploying, and managing complex data pipelines with ease. It empowers data scientists and engineers to focus on building machine learning models and solving business problems while abstracting away the complexities of workflow management and execution.

Prefect versions:
- Prefect 1.x AKA Prefect Core
- Prefect 2.x AKA Prefect Orion

### **Why Prefect?**
- Python based open source tool  
- Manage ML Pipelines  
- Schedule and Monitor the flow  
- Gives observability into failures  
- Native dask integration for scaling (Dask is used for parallel computing)


### **Creating and activating a Virtual Environment**
In order to install prefect, create a virtual environment:
> `$ python -m venv .mlops_env`  

Enter the Virtual Environment using below mentioned command:
> `$ .mlops_env\Scripts\activate`

***

### **Installing Prefect 2.x**
Now install Prefect:
> `$ pip install prefect`  

OR  if you have Prefect 1, upgrade to Prefect 2 using this command:  
> `$ pip install -U prefect`  

OR to install a specific version:  
> `$ pip install prefect==2.4`  

***

### **Check Prefect Version**
Check the prefect version:
> `$ prefect version`

***

### **Running Prefect Dashboard (UI)**

> `$ prefect server start`  

```
 ___ ___ ___ ___ ___ ___ _____
| _ \ _ \ __| __| __/ __|_   _|
|  _/   / _|| _|| _| (__  | |
|_| |_|_\___|_| |___\___| |_|

Configure Prefect to communicate with the server with:

    prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

View the API reference documentation at http://127.0.0.1:4200/docs

Check out the dashboard at http://127.0.0.1:4200
```
***

**Note - In one of the earliest update of Prefect Orion, in Windows OS, if your path contains spaces, it will generate error (as mentioned below) when you try to run prefect orion. Sharing this so that you know what it is if you see it.**

```
___ ___ ___ ___ ___ ___ _____    ___  ___ ___ ___  _  _
| _ \ _ \ __| __| __/ __|_   _|  / _ \| _ \_ _/ _ \| \| |
|  _/   / _|| _|| _| (__  | |   | (_) |   /| | (_) | .` |
|_| |_|_\___|_| |___\___| |_|    \___/|_|_\___\___/|_|\_|
Configure Prefect to communicate with the server with:
    prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
View the API reference documentation at http://127.0.0.1:4200/docs
Check out the dashboard at http://127.0.0.1:4200/
Usage: uvicorn [OPTIONS] APP

Try 'uvicorn --help' for help.

Error: Got unexpected extra argument (prefect.orion.api.server:create_app)
Orion stopped!
```

<img src="images/prefect_dashboard.JPG">

## **Refactoring the ML Workflow**

If you've written the entire workflow code cell by cell in a Jupyter Notebook without explicitly defining functions, you may encounter difficulties when trying to visualize and monitor your flows in the Prefect dashboard.

Prefect works best when workflows are organized into modular functions, with each function representing a task in your workflow. This allows Prefect to track task dependencies, visualize the workflow graph, and provide detailed execution logs and status updates in the dashboard.

However, if you've written your workflow code directly in a Jupyter Notebook without defining functions, you can still use Prefect to run your workflows, but you may miss out on some of the dashboard's features and benefits.

To address this, you can refactor your workflow code to extract each step into separate functions, and then import these functions into your notebook. This way, you can maintain the convenience of writing and experimenting with code in a notebook while also leveraging Prefect's capabilities for workflow orchestration and monitoring.

Once you've refactored your code to use functions, you can run your flows as usual using Prefect's CLI or Python API, and you'll be able to visualize and monitor them in the Prefect dashboard.

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split

from sklearn.preprocessing import MinMaxScaler
from sklearn.neighbors import KNeighborsClassifier

from sklearn import metrics

In [2]:
def load_data(file_path):
    """
    Load data from a CSV file.
    """
    return pd.read_csv(file_path)

In [3]:
def split_inputs_output(data, inputs, output):
    """
    Split features and target variables.
    """
    X = data[inputs]
    y = data[output]
    return X, y

In [4]:
def split_train_test(X, y, test_size=0.25, random_state=0):
    """
    Split data into train and test sets.
    """
    return train_test_split(X, y, test_size=test_size, random_state=random_state)

In [5]:
def preprocess_data(X_train, X_test, y_train, y_test):
    """
    Rescale the data.
    """
    scaler = MinMaxScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    return X_train_scaled, X_test_scaled, y_train, y_test

In [6]:
def train_model(X_train_scaled, y_train, hyperparameters):
    """
    Training the machine learning model.
    """
    clf = KNeighborsClassifier(**hyperparameters)
    clf.fit(X_train_scaled, y_train)
    return clf

In [7]:
def evaluate_model(model, X_train_scaled, y_train, X_test_scaled, y_test):
    """
    Evaluating the model.
    """
    y_train_pred = model.predict(X_train_scaled)
    y_test_pred = model.predict(X_test_scaled)

    train_score = metrics.accuracy_score(y_train, y_train_pred)
    test_score = metrics.accuracy_score(y_test, y_test_pred)
    
    return train_score, test_score

In [8]:
def workflow(data_path):
    DATA_PATH = data_path
    INPUTS = ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']
    OUTPUT = 'Species'
    HYPERPARAMETERS = {'n_neighbors': 3, 'p': 2}
    
    # Load data
    iris = load_data(DATA_PATH)

    # Identify Inputs and Output
    X, y = split_inputs_output(iris, INPUTS, OUTPUT)

    # Split data into train and test sets
    X_train, X_test, y_train, y_test = split_train_test(X, y)

    # Preprocess the data
    X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X_train, X_test, y_train, y_test)

    # Build a model
    model = train_model(X_train_scaled, y_train, HYPERPARAMETERS)
    
    # Evaluation
    train_score, test_score = evaluate_model(model, X_train_scaled, y_train, X_test_scaled, y_test)
    
    print("Train Score:", train_score)
    print("Test Score:", test_score)

In [9]:
if __name__ == "__main__":
    workflow(data_path="data/iris.csv")

Train Score: 0.9732142857142857
Test Score: 0.9736842105263158


## **Building a Prefect Workflow**

Step 1 - Import Prefect modules

Step 2 - Define Prefect Tasks

Step 3 - Define Prefect Flow

Step 5 - Run Prefect Flow

<img src="images/prefect_flow_run.JPG">

In [10]:
from prefect import task, flow

In [11]:
@task
def load_data(file_path):
    """
    Load data from a CSV file.
    """
    return pd.read_csv(file_path)


@task
def split_inputs_output(data, inputs, output):
    """
    Split features and target variables.
    """
    X = data[inputs]
    y = data[output]
    return X, y
	

@task
def split_train_test(X, y, test_size=0.25, random_state=0):
    """
    Split data into train and test sets.
    """
    return train_test_split(X, y, test_size=test_size, random_state=random_state)
	
	
@task
def preprocess_data(X_train, X_test, y_train, y_test):
    """
    Rescale the data.
    """
    scaler = MinMaxScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    return X_train_scaled, X_test_scaled, y_train, y_test
	

@task
def train_model(X_train_scaled, y_train, hyperparameters):
    """
    Training the machine learning model.
    """
    clf = KNeighborsClassifier(**hyperparameters)
    clf.fit(X_train_scaled, y_train)
    return clf
	

@task
def evaluate_model(model, X_train_scaled, y_train, X_test_scaled, y_test):
    """
    Evaluating the model.
    """
    y_train_pred = model.predict(X_train_scaled)
    y_test_pred = model.predict(X_test_scaled)

    train_score = metrics.accuracy_score(y_train, y_train_pred)
    test_score = metrics.accuracy_score(y_test, y_test_pred)
    
    return train_score, test_score

In [12]:
# Workflow

@flow(name="KNN Training Flow")
def workflow():
    DATA_PATH = "data/iris.csv"
    INPUTS = ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']
    OUTPUT = 'Species'
    HYPERPARAMETERS = {'n_neighbors': 3, 'p': 2}
    
    # Load data
    iris = load_data(DATA_PATH)

    # Identify Inputs and Output
    X, y = split_inputs_output(iris, INPUTS, OUTPUT)

    # Split data into train and test sets
    X_train, X_test, y_train, y_test = split_train_test(X, y)

    # Preprocess the data
    X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X_train, X_test, y_train, y_test)

    # Build a model
    model = train_model(X_train_scaled, y_train, HYPERPARAMETERS)
    
    # Evaluation
    train_score, test_score = evaluate_model(model, X_train_scaled, y_train, X_test_scaled, y_test)
    
    print("Train Score:", train_score)
    print("Test Score:", test_score)

In [13]:
if __name__ == "__main__":
    workflow()

Train Score: 0.9732142857142857
Test Score: 0.9736842105263158


## **Make Your Workflow Schedulable**

There are two way I will discuss using which you can auto schedule your workflow.

**Note:** For both the implementations, make sure you have moved your prefect `tasks` and `flows` to `.py` files.

1. Using Serve Function

```python
if __name__ == "__main__":
    workflow.serve(
        name="my-first-deployment",
        cron="* * * * *"
    )
```
2. Using Deploy Function
```python
# For this implementation, you should ensure that you create the 'work_pool_name'
# This can be done in the Prefect Dashboard by navigating to 'Work Pool'
if __name__ == "__main__":
    workflow.deploy(
        name="my-first-deployment",
        cron="* * * * *",
        work_pool_name="local-work-pool"
    )
```

### **What is `cron`?**
'cron' refers to a scheduling expression that defines the frequency at which a flow should be executed. It follows a specific syntax that allows you to specify recurring time intervals for running your flow.

A cron expression consists of five fields separated by spaces:

1. Minute (0 - 59)
2. Hour (0 - 23)
3. Day of the month (1 - 31)
4. Month (1 - 12)
5. Day of the week (0 - 7) (Sunday is 0 or 7)

For example, a cron expression of "0 1 * * *" means the flow will run every day at 1:00 AM.

**[Click here](https://crontab.guru/) to experiment.**  

Here are a few more examples of cron expressions:

- "0 * * * *" (run every hour at the beginning of the hour)
- "0 0 * * 0" (run every Sunday at midnight)
- "0 8-18 * * *" (run every hour between 8:00 AM and 6:00 PM)
