## 3.1 Introdution to Workflow Orchestration

The image below displays an example of a MLOps Workflow:

- Some data (PostgreSQL database) is fetched by Python code
- Pandas is used to preprocess the data (cleaning, EDA, etc.)
    - Data is then saved to a parquet file (to be read later, etc.)
- Scikit-Learn is then used for feature engineering or running some models
- Or we can also use XGBoost 
- At the same time, MLflow is tracking everything 
- Finally, we deploy the model using Flask 

![title](images/workflow.png)

### Possible complications

#### a) Failures along the workflow

We may have failure points at lots of different parts in the workflow. The goal of the workflow orchestration is to minimize the errors and fail gracefully.


#### b) Complex interdependency
You may be asked to:
- Could you just set up this pipeline to train this model? (add another extra layer)
- Could you set up some logging? (e.g., outside MLflow)
- Could you do it every day? (set up a schedule)
- Could you make it retry if it fails?
- Could you send me a message when it succeeds? (e.g., email)
- Could you visualize the dependencies?
- Could you add caching? (so we don't need to re-run everytime)
- Could you add some collaborators to run ad-hoc, and who don't code?

#### Solution
This is where **Prefect** comes in. It allows you to orchestrate and observe your Python workflows at scale. 

## 3.2 Introduction to Prefect
We are going to see:
- how Python code can be converted to a Prefect script
- run our own Prefect server locally 
- run a few Prefect flows against the server
- Interact with Prefect UI

### What is Prefect?
Prefect is a flexible, open-source Python framework that can be leveraged to turn python code into robust workflows that are more resilient to downtime and unforeseen failures.

When self-hosting a Prefect Server we can separate three different components:
- **Orchestration API**: it's a REST API that is used by the server to work with workflow metadata (so that you can create, control, and monitor the execution of workflows efficiently)
- this workflow metadata is stored in a **Database** (SQLite by default)
- the workflows can be visualized thanks to the server **UI**

### Terminology
The two basic building blocs of Prefect are:
- **Task:** A discrete unit of work in a Prefect workflow. Tasks in Prefect can encapsulate any Python code or callable, such as functions, methods, or classes by using the decorator ```@task```
- **Flow:** It provides a way to define, organize, and orchestrate the execution of tasks in a specific order. You can turn any function into a Prefect flow by adding the ```@flow``` decorator.
- **Subflow:** Flow called by another flow. There is no command to make this explicit, but we can use the name argument in the decorator:
```@flow(name='Subflow')```

*Important!:* All tasks must be called from within a flow. Tasks may not be called from other tasks.

### Installing Prefect
In a conda environment, install all package dependencies with
```
$ pip install -r requirements.txt
```

### Start the Prefect server locally

Activate your conda environment with the new packages installed. Start the Prefect API server locally with
```
$ prefect server start
```
Once the Prefect server is up and running, we make sure that we apply the API URL to our Prefect configuration so that we're pointing to the correct API URL and the workflow metadata is correctly sent to the server UI. Therefore, in a new CLI window, we run the following command:
```
$ prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
```
and then we can access Prefect UI at ```http://127.0.0.1:4200```. It will look like this:

![title](images/prefect-UI.png)

### Launch a flow run

As an example, we can run separately the Python script ```cohorts/2023/03-orchestration/3.2/cat_facts.py```. This script fetches some random trivia about cats from the web and from time to time it will give ```500 Internal Server Error```. 

Now we have collected a new flow run in the UI. The new flow will be under the name of the function it's decorating (```fetch``` in this case):

![title](images/Prefect-firstrun.png)

We can see in the logs (also available in the CLI when we run the script) that an exception happenend and that the task has been retried sucessfully. Due to the random nature of the script, different logs will be recorded for each time the script is executed. 

![title](images/Prefect-firstrun-logs.png)

### Example of subflows

Now we can run the script ```cohorts/2023/03-orchestration/3.2/cat_dog_facts.py``` to see subflows in action. We have a parent flow (```animal-facts```) that calls two subflows (```fetch-cat-fact``` and ```fetch-dog-fact```). Therefore, we will have 3 flow records: 

![title](images/prefect-subflows.png)

The log print argument is set in the parent:

![title](images/prefect-subflows-logs.png)

## 3.3 Prefect Workflow

In this section, we will learn how to productionize the Python scripts from last week's ```02-experiment-tracking```:
- ```preprocess_data.py```
- ```train.py```
- ```hpo.py```

We will skip the hyperparameter tunning step and focus on the best performing model.  We put everything together in the script ```orchestrate.py``` and add some observability and orchestration. This script is organized as follows:

The main flow function

```python
@flow
def main_flow(
    train_path: str = "../data/green_tripdata_2021-01.parquet",
    val_path: str = "../data/green_tripdata_2021-02.parquet",
) -> None:
    """The main training pipeline"""
    
    ...
```

It collects the following tasks:
   1) sets up MLflow
   2) **reads** the train and validation data
   ```python
@task(retries=3, retry_delay_seconds=2)
def read_data(filename: str) -> pl.DataFrame:
    """Read data into DataFrame"""
    ...
```
   3) **performs some transformations** on the data 
   ```python
@task
def add_features(
    df_train: pl.DataFrame, df_val: pl.DataFrame
) -> tuple[
          scipy.sparse.spmatrix,
          scipy.sparse.spmatrix,
          np.ndarray,
          np.ndarray,
          sklearn.feature_extraction.DictVectorizer,
    ]:
    """Add features to the model"""
    ...
```

   4) and **trains** the best model
   ```python
@task(log_prints=True)
def train_best_model(
          X_train: scipy.sparse.spmatrix,
          X_val: scipy.sparse.spmatrix,
          y_train: np.ndarray,
          y_val: np.ndarray,
          dv: sklearn.feature_extraction.DictVectorizer,
) -> None:
    """train a model with best hyperparams and write everything out"""
    ...
```

We can now run ```orchestrate.py``` and go to the Prefect UI:

![title](images/prefect-orchestration.png)

## 3.4 Deploying your Workflow

In this section we will present how to deploy workflows using Prefect Projects for productionizing. It will allows to do scheduling and collaborations with others (e.g. using Prefect Cloud).


### Create a Prefect Project
A project is a minimally opinionated set of files that describe how to prepare one or more flow deployments. It can be initialized by running the CLI command 

```$ prefect project init```

in the **root directory** ```mlops-zoomcamp```. Now the following files has been created:
- ```deployment.yaml```: a YAML file describing base settings for multiple deployments from this project.
- ```prefect.yaml```: a YAML file that contains default instructions for how to build and push any necessary code artifacts (such as Docker images) from this project, as well as default instructions for pulling a deployment in remote execution environments (e.g., cloning a GitHub repository)
- ```.prefect/```: a hidden directory where Prefect will store workflow metadata
- ```.prefectignore```: to skip upload of some files which are relevant for local development but not intended for use in production.

*Important 1*: when you run ```prefect project init```, the files above are not overwritten, so you need to remove them manually.

*Important 2*: I made sure that the project is initiated in the root directory because when running the deployment it will clone ```mlops-zoomcamp.git``` from a remote location (e.g. GitHub). See ```prefect.yaml```. 

### Create a Work Pool

Work pools, together with workers and agents, bridge the Prefect orchestration environment with your execution environment. 

When a deployment creates a flow run, it is submitted to a specific work pool for scheduling. A worker or agent running in the execution environment polls its respective work pool for new runs to execute.

We can create work pools by going to the UI:

![title](images/work-pools.png)

1. Basic information : e.g. Name: zoompool
2. Infrastructure Type: Local Subprocess
3. Configuration: leave as it is

### Deploy your flow
A deployment is a server-side concept that encapsulates a flow, allowing it to be scheduled and triggered via API. The deployment stores metadata about where your flow's code is stored and how your flow should be run.

At a high level, you can think of a deployment as configuration for managing flows, whether you run them via the CLI, the UI, or the API.

To deploy ```orchestrate.py```, run:
```
$ prefect deploy cohorts/2023/03-orchestration/3.4/orchestrate.py:main_flow -n taxi1 -p zoompool
```
while being at the root directory ```mlops-zoomcamp``` folder. 

- ```main_flow``` refers to the flow within the Python script that we want as the entrypoint function for the deployment.
- ```-n``` gives a name to the deployment, in this case *taxi1*
- ```-p``` assigns a work pool to the deployment, in this case *zoompool*

Now we can find the deployment in the UI:

![title](images/prefect-deployment.png)


### Start a worker

To run deployments, you must start a worker or agent that pulls work from the *zoompool* work pool.

- **agent**: Agent processes are lightweight polling services that get scheduled work from a work pool and deploy the corresponding flow runs
- **worker** : Workers are similar to agents, but offer greater control over infrastructure configuration and the ability to route work to specific types of execution environments.

To start a worker:
```
$ prefect worker start -p zoompool
```

### Run the deployed flow

We can go to the UI, click on the recently deployed flow and select ```quick run```

![title](images/run-deployment.png)

*Important:* When running the deployment, Prefect will create a ```tmp/``` directory and clone the repo from GitHub: 

1. Make sure that the ```orchestrate.py``` is updated in the repo, as it will use this remote file. 
2. The current directory when running the deployment is the root directory ```mlops-zoomcamp```. Thus make sure to update your path to read the data accordingly.
3. Finally, the ```.parquet``` files must be uploaded to GitHub or other remote location 

## 3.5  Working with Deployments

### Setting up multiple deployments

We can set up multiple deployments from the project by modifying the ```deployment.yaml``` file:

```
deployments:
- name: taxi_local_data1
  flow_name: cohorts/2023/03-orchestration/3.4/orchestrate.py:main_flow
  work_pool: zoompool
    name: zoompool
- name: taxi_local_data2
  flow_name: cohorts/2023/03-orchestration/3.5/orchestrate.py:main_flow_35
  work_pool: zoompool
    name: zoompool
```

where we have set up two deployments (one for from the script in 3.4 and other for 3.5). Now we call call these two deployments by using:

``` $ prefect deploy --all ```

![title](images/multiple-deployment.png)

In the next section we will create a markdown artifact and then run one of the two deployments above, ```taxi_local_data2```.

### Creating a Markdown artifact

We will add the following code at the end of function ```train_best_model``` in ```3.5/orchestrate.py```.

```python
markdown__rmse_report = f"""# RMSE Report

        ## Summary

        Duration Prediction 

        ## RMSE XGBoost Model

        | Region    | RMSE |
        |:----------|-------:|
        | {date.today()} | {rmse:.2f} |
        """

create_markdown_artifact(
      key="duration-model-report", 
      markdown=markdown__rmse_report
)
```

and make sure that the new code is commited and pushed to the GitHub repo. Then we execute:

```$ prefect deployment run main-flow-35/taxi_local_data2 ```

to run the first deployment in the ```deployment.yaml``` file (so far we have done this via the UI), and 

```$ prefect worker start -p zoompool```

to have our workers running. We can see the markdown artifact in the UI:

![title](images/artifact.png)

### Schedule deployments

The deployments can be scheduled to run for a given interval via the UI, CLI or the ```deployment.yaml``` file. For example, using the UI:

![title](images/schedule.png)

## 3.5 Prefect Cloud

It allows you to host the server on the cloud, instead of the server being on your computer. In this case the platform is run by the Prefect team. It's a hybrid model:

- You run your code on your infrastructure
- Data doesn't get sent to Prefect Cloud, just some metadata
- Worker opens connection with Cloud and sends metadata

![title](images/prefect-cloud.png)

You can check your Prefect profile:

```$ prefect profile ls```

so that you can save different configurations. The information about profiles is kept in ```.prefect/profiles.toml```

### Authentification

Let's get started with Prefect Cloud. First, you need to create an account in [Prefect Cloud](https://app.prefect.cloud) (Prefect Cloud for personal use is free). Once logged in, you enter Workspaces (not present on the local server), where you can share projects and collaborate with other people.

![title](images/prefect-cloud-workspaces.png)

To get some of the information from the local server (section 3.5) to Prefect Cloud, we should authenticate our local CLI first:

```$ prefect cloud login```

It will give you the option to authenticate by either:
- Web Browser (API key that experies after 30 days)
- Paste an API key (previously created by going to your profile in Prefect Cloud)

You can verify that your profile is compatible with working with Cloud by running:

```$ prefect version```

and verifying that the ```Server type``` is set to ```cloud```

### Deploying on Prefect Cloud

Creating a work pool and assigning a worker:

```$ prefect worker start -p zoompool -t process```

View in the browser:

![title](images/zoompool.png)

Now we can deploy the flows from section 3.5 (described in ```deployment.yaml```:

```$ prefect deploy --all```

View in the browser:

![title](images/deployments.png)

Finally, we can run the second deployment by going clicking on ```Run```

![title](images/run.png)

and we would get the usual logs and markdown artifact that we defined in Section 3.5.

### Automations

You can set up notifications when the Deployments reach some predefined state (e.g. Completed) by using Automations:

![title](images/automations.png)

You can add an email notification by setting up the ```Block``` tab:

![title](images/automations-email.png)