# Week 3: Homework

## Motivation

The goal of this homework is to familiarize users with workflow orchestration. We start from the solution of homework 1. The notebook can be found below:

https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/01-intro/homework.ipynb

This has already been converted to a script called `homework.py` in this folder.

In [1]:
!cat homework.py

import pandas as pd

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

def read_data(path):
    df = pd.read_parquet(path)
    return df

def prepare_features(df, categorical, train=True):
    df['duration'] = df.dropOff_datetime - df.pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    mean_duration = df.duration.mean()
    if train:
        print(f"The mean duration of training is {mean_duration}")
    else:
        print(f"The mean duration of validation is {mean_duration}")
    
    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    return df

def train_model(df, categorical):

    train_dicts = df[categorical].to_dict(orient='records')
    dv = DictVectorizer()
    X_train = dv.fit_transform(train_dicts) 
    y_train = df.duration.values

    print(f"The shape of X_train

#### Motivation 

We already have a model training script. Maybe a data scientist in your team handed it to you and your job is schedule the running of training script using a workflow orchestration - Prefect in this case. Below are the requirements. Do not implement them yet, we will do so in this exercise. Just understand the goal.

1. The training flow will be run every month.
2. The flow will take in a parameter called `date` which will be a datetime.
    * a. `date` should default to None
    * b. If `date` is None, set `date` as the current day. Use the data from 2 months back as the training data and the data from the previous month as validation data.
    * c. If `date` is passed, get 2 months before the `date` as the training data, and the previous month as validation data.
    * d. As a concrete example, if the date passed is "2021-03-15", the training data should be "fhv_tripdata_2021-01.parquet" and the validation file will be "fhv_trip_data_2021-02.parquet"
3. Save the model as "model-{date}.pkl" where date is in `YYYY-MM-DD`. Note that `date` here is the value of the flow `parameter`. In practice, this setup makes it very easy to get the latest model to run predictions because you just need to get the most recent one.
4. In this example we use a DictVectorizer. That is needed to run future data through our model. Save that as "dv-{date}.pkl". Similar to above, if the date is `2021-03-15`, the files output should be `model-2021-03-15.bin` and `dv-2021-03-15.b`.

This convention is not strict in industry, and in practice, you will come up with your own system to manage these training pipeline runs. For example, if we wanted to train on the whole history instead of just one month, we'd need to allow for added parameterization and logic in our flow. If the data came in weekly instead of monthly, we might need a different naming convention. But these requirements are already a simple approximation of something you could use in production.

On the deployment side, it's very easy to just pull in the latest data and predict it using the latest model and vectorizer files. Tools the MLFlow in the last chapter can simplify that process as well. This homework will focus more on the batch training.

In order, this homework assignment will be about:

1. Converting the script to a Flow
2. Changing the parameters to take in a `date`. Making this parameter dynamic.
3. Scheduling a batch training job that outputs the latest model somewhere


## Q1. Converting the script to a Prefect flow

The current script `homework.py` is a fully functional script as long as you already have `fhv_trip_data_2021-01.parquet` and `fhv_trip_data_2021-02.parquet` inside a `data` folder. You should be able to already run it using:

In [2]:
!python homework.py

The mean duration of training is 16.2472533682457
The mean duration of validation is 16.859265811074096
The shape of X_train is (1109826, 525)
The DictVectorizer has 525 features
The MSE of training is: 10.528519395264997
The MSE of validation is: 11.014287010952778


We want to bring this to workflow orchestration to add observability around it. The `main` function will be converted to a `flow` and the other functions will be `tasks`. After adding all of the decorators, there is actually one task that you will need to call `.result()` for inside the `flow` to get it to work. Which task is this?

* `read_data`
* `prepare_features`
* ***`train_model`***
* `run_model`

***ANSWER***: `train_model`

## Q2. Parameterizing the flow

Right now there are two parameters for `main()` called `train_path` and `val_path`. We want to change the flow function to accept `date` instead. `date` should then be passed to a task that gives both the `train_path` and `val_path` to use.

It should look like this:

```python
@flow
def main(date=None):
    train_path, val_path = get_paths(date).result()
    # rest of flow below
```

1. The training flow will be run every month.
2. The flow will take in a parameter called `date` which will be a datetime.
    * a. `date` should default to None
    * b. If `date` is None, set `date` as the current day. Use the data from 2 months back as the training data and the data from the previous month as validation data.
    * c. If `date` is passed, get 2 months before the `date` as the training data, and the previous month as validation data.
    * d. As a concrete example, if the date passed is "2021-03-15", the training data should be "fhv_tripdata_2021-01.parquet" and the validation file will be "fhv_trip_data_2021-02.parquet"
3. Save the model as "model-{date}.pkl" where date is in `YYYY-MM-DD`. Note that `date` here is the value of the flow `parameter`. In practice, this setup makes it very easy to get the latest model to run predictions because you just need to get the most recent one.
4. In this example we use a DictVectorizer. That is needed to run future data through our model. Save that as "dv-{date}.pkl". Similar to above, if the date is `2021-03-15`, the files output should be `model-2021-03-15.bin` and `dv-2021-03-15.b`.

In [38]:
from datetime import datetime
from dateutil.relativedelta import relativedelta

In [68]:
def get_paths(date: str = None):
    if date == None:
        date = datetime.today()
    else:
        date = datetime.strptime(date, "%Y-%m-%d")

    train_date = date - relativedelta(months=2)
    val_date = date - relativedelta(months=1)

    train_path = f"./data/fhv_tripdata_{train_date.strftime('%Y')}-{train_date.strftime('%m')}.parquet"
    val_path = f"./data/fhv_tripdata_{val_date.strftime('%Y')}-{val_date.strftime('%m')}.parquet"

    return train_path, val_path

get_paths("2021-03-15")

('./data/fhv_tripdata_2021-01.parquet', './data/fhv_tripdata_2021-02.parquet')

Downloading the relevant files needed to run the main flow if date is 2021-08-15:

In [69]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-06.parquet -P ./data
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-07.parquet -P ./data

--2022-06-11 01:19:57--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-06.parquet
Connecting to 192.9.200.39:80... connected.
Proxy request sent, awaiting response... 200 OK
Length: 13208079 (13M) [binary/octet-stream]
Saving to: ‘./data/fhv_tripdata_2021-06.parquet.1’


2022-06-11 01:20:36 (335 KB/s) - ‘./data/fhv_tripdata_2021-06.parquet.1’ saved [13208079/13208079]

--2022-06-11 01:20:37--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-07.parquet
Connecting to 192.9.200.39:80... connected.
Proxy request sent, awaiting response... 200 OK
Length: 12650862 (12M) [binary/octet-stream]
Saving to: ‘./data/fhv_tripdata_2021-07.parquet’


2022-06-11 01:20:52 (893 KB/s) - ‘./data/fhv_tripdata_2021-07.parquet’ saved [12650862/12650862]



In [70]:
get_paths("2021-08-15")

('./data/fhv_tripdata_2021-06.parquet', './data/fhv_tripdata_2021-07.parquet')

``` bash
01:22:51.421 | INFO    | Task run 'run_model-6559300c-0' - The MSE of validation is: 11.637023826050765
```

***ANSWER:*** 11.637

## Q3. Saving the model and artifacts

At the moment, we are not saving the model and vectorizer for future use. You don't need a new task for this, you can just add it inside the `flow`. The requirements for filenames to save it as were mentioned in the Motivation section. They are pasted again here:

* Save the model as "model-{date}.pkl" where date is in `YYYY-MM-DD`. Note that `date` here is the value of the flow `parameter`. In practice, this setup makes it very easy to get the latest model to run predictions because you just need to get the most recent one.
* In this example we use a DictVectorizer. That is needed to run future data through our model. Save that as "dv-{date}.pkl". Similar to above, if the date is `2021-03-15`, the files output should be `model-2021-03-15.bin` and `dv-2021-03-15.b`.

By using this file name, during inference, we can just pull the latest model from our model directory and apply it. Assuming we already had a list of filenames:

```python
['model-2021-03-15.bin', 'model-2021-04-15.bin', 'model-2021-05-15.bin']
```

We could do something like `sorted(model_list, reverse=False)[0]` to get the filename of the latest file. This is the simplest way to consistently use the latest trained model for inference. Tools like MLFlow give us more control logic to use flows.


In [72]:
!tail -10 homework.py

    # save the model
    with open(f"./artifacts/model-{date}.bin", "wb") as f_out:
        pickle.dump(lr, f_out)

    # save the dictvectorizeR
    with open(f"./artifacts/dv-{date}.b", "wb") as f_out:
        pickle.dump(dv, f_out)

    
main(date="2021-08-15")


In [73]:
!ls -la ./artifacts

total 24
drwxrwxrwx 1 bengsoon bengsoon   512 Jun 11 16:39 .
drwxrwxrwx 1 bengsoon bengsoon   512 Jun 11 16:30 ..
-rwxrwxrwx 1 bengsoon bengsoon 13191 Jun 11 16:39 dv-2021-08-15.b
-rwxrwxrwx 1 bengsoon bengsoon  4581 Jun 11 16:39 model-2021-08-15.bin



What is the file size of the `DictVectorizer` that we trained when the `date` is 2021-08-15?

* 13,000 bytes 
* 23,000 bytes 
* 33,000 bytes 
* 43,000 bytes 

***ANSWER:*** 13,000 bytes

## Q4. Creating a deployment with a CronSchedule

We previously showed the `IntervalSchedule` in the video tutorials. In some cases, the interval is too rigid. For example, what if we wanted to run this `flow` on the 15th of every month? An interval of 30 days would not be in sync. In cases like these, the `CronSchedule` is more appropriate. The documentation for that is [here](https://orion-docs.prefect.io/concepts/schedules/#cronschedule)

Cron is an important part of workflow orchestration. It is used to schedule tasks, and was a predecessor for more mature orchestration frameworks. A lot of teams still use Cron in production. Even if you don't use Cron, the Cron expression is very common as a way to write a schedule, and the basics are worth learning for orchestration, even outside Prefect.

For this exercise, use a `CronSchedule` when creating a Prefect deployment.

Create a deployment with `prefect deployment create` after you write your `DeploymentSpec`

Let's check the prefect storage first:


In [74]:
!prefect storage ls

[3m                       Configured Storage                       [0m
┏━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━━━━━━┓
┃[1m [0m[1mID[0m[1m [0m┃[1m [0m[1mStorage Type[0m[1m [0m┃[1m [0m[1mStorage Version[0m[1m [0m┃[1m [0m[1mName[0m[1m [0m┃[1m [0m[1mServer Default [0m[1m [0m┃
┡━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━━━━━━┩
└────┴──────────────┴─────────────────┴──────┴─────────────────┘
[2;3mNo default storage is set. Temporary local storage will be used.[0m
[2;3m     Set a default with `prefect storage set-default <id>`      [0m


We don't have a storage yet, so we will create a Local one (`~/.prefect`) through the terminal. Once it is done, we should see it again with `!prefect storage ls`

In [78]:
!prefect storage ls

[3m                               Configured Storage                               [0m
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━┳━━━━━━┳━━━━━━━━━━━━━━━━━┓
┃[1m                                      [0m┃[1m [0m[1mSto…[0m[1m [0m┃[1m [0m[1mStor…[0m[1m [0m┃[1m      [0m┃[1m                 [0m┃
┃[1m [0m[1m                                  ID[0m[1m [0m┃[1m [0m[1mType[0m[1m [0m┃[1m [0m[1mVers…[0m[1m [0m┃[1m [0m[1mName[0m[1m [0m┃[1m [0m[1mServer Default [0m[1m [0m┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━╇━━━━━━╇━━━━━━━━━━━━━━━━━┩
│[36m [0m[36m9ec8d91f-10b1-419b-ad72-558925f03039[0m[36m [0m│[36m [0m[36mLoc…[0m[36m [0m│[36m [0m[36m1.0  [0m[36m [0m│[32m [0m[32mloc…[0m[32m [0m│ ✅              │
│[36m                                      [0m│[36m [0m[36mSto…[0m[36m [0m│[36m       [0m│[32m      [0m│                 │
└──────────────────────────────────────┴──────┴───────┴──────┴──────────

What is the Cron expression to run a flow at 9 AM every 15th of the month?

* `* * 15 9 0`
* `9 15 * * *`
* `0 9 15 * *`
* `0 15 9 1 *`

***ANSWER:*** `0 9 15 * *`

![](./chron_schedule.png)

## Q5. Viewing the Deployment 

View the deployment in the UI. When first loading, we may not see that many flows because the default filter is 1 day back and 1 day forward. Remove the filter for 1 day forward to see the scheduled runs. 

How many flow runs are scheduled by Prefect in advance? You should not be counting manually. There is a number of upcoming runs on the top right of the dashboard.

* 0
* 3
* 10
* 25

![](./upcoming_runs.png)

***ANSWER:*** 3 (or 4)

## Q6. Creating a work-queue

In order to run this flow, you will need an agent and a work queue. Because we scheduled our flow on every month, it won't really get picked up by an agent. For this exercise, create a work-queue from the UI and view it using the CLI. 

For all CLI commands with Prefect, you can use `--help` to get more information. 

For example,

* `prefect --help`
* `prefect work-queue --help`

What is the command to view the available work-queues?

In [82]:
!prefect work-queue --help

Usage: prefect work-queue [OPTIONS] COMMAND [ARGS]...

  Commands for work queue CRUD.

Options:
  --help  Show this message and exit.

Commands:
  clear-concurrency-limit  Clear any concurrency limits from a work queue.
  create                   Create a work queue.
  delete                   Delete a work queue by ID.
  inspect                  Inspect a work queue by ID.
  ls                       View all work queues.
  pause                    Pause a work queue.
  preview                  Preview a work queue.
  resume                   Resume a paused work queue.
  set-concurrency-limit    Set a concurrency limit on a work queue.


* `prefect work-queue inspect`
    > - To inspect a work queue by ID
* `prefect work-queue ls`
    > - View all work queues
* `prefect work-queue preview`
    > - Preview a work queue (same as what Kevin did in the video)
* `prefect work-queue list`
    > - Invalid command

In [79]:
!prefect work-queue ls

[3m                             Work Queues                             [0m
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃[1m [0m[1m                                  ID[0m[1m [0m┃[1m [0m[1mName  [0m[1m [0m┃[1m [0m[1mConcurrency Limit[0m[1m [0m┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│[36m [0m[36m1e2f6c71-9495-4b81-8715-c4591dbd1a35[0m[36m [0m│[32m [0m[32mglobal[0m[32m [0m│[34m [0m[34mNone[0m[34m             [0m[34m [0m│
└──────────────────────────────────────┴────────┴───────────────────┘
[31m                     (**) denotes a paused queue                     [0m


In [86]:
!prefect work-queue preview 1e2f6c71-9495-4b81-8715-c4591dbd1a35 --hours 10000

┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃[1m [0m[1mScheduled Sta…[0m[1m [0m┃[1m [0m[1mRun ID                   [0m[1m [0m┃[1m [0m[1mNa…[0m[1m [0m┃[1m [0m[1mDeployment ID            [0m[1m [0m┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│[33m [0m[33m2022-09-15 09…[0m[33m [0m│[36m [0m[36m4b9aacca-b766-4bac-9bab-…[0m[36m [0m│[32m [0m[32msp…[0m[32m [0m│[34m [0m[34m41e12a25-cc02-4223-a042-…[0m[34m [0m│
│[33m [0m[33m2022-08-15 09…[0m[33m [0m│[36m [0m[36m875fe784-abd2-4944-b8a9-…[0m[36m [0m│[32m [0m[32mju…[0m[32m [0m│[34m [0m[34m41e12a25-cc02-4223-a042-…[0m[34m [0m│
│[33m [0m[33m2022-07-15 09…[0m[33m [0m│[36m [0m[36mf0bc9751-7d5c-4774-8ec4-…[0m[36m [0m│[32m [0m[32mpo…[0m[32m [0m│[34m [0m[34m41e12a25-cc02-4223-a042-…[0m[34m [0m│
│[33m [0m[33m2022-06-15 09…[0m[33m [0m│[36m [0m[36m1fb4e2d2-88fc-489f-bacc-…[0m[36m

In [83]:
!prefect work-queue  1e2f6c71-9495-4b81-8715-c4591dbd1a35

[1;35mWorkQueue[0m[1m([0m
    [33mid[0m=[32m'1e2f6c71-9495-4b81-8715-c4591dbd1a35'[0m,
    [33mcreated[0m=[32m'1 day ago'[0m,
    [33mupdated[0m=[32m'1 day ago'[0m,
    [33mname[0m=[32m'global'[0m
[1m)[0m


***ANSWER:*** `prefect work-queue ls`