## 3.6 Homework

Previous homeworks:

**Week 1:**
* https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/01-intro/homework.md

**Week 2:**
* https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/02-experiment-tracking/homework.md

The goal of this homework is to familiarize users with workflow orchestration. We start from the solution of homework 1. The notebook can be found below:

https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/01-intro/homework.ipynb

This has already been converted to a script called `homework.py` in the `03-orchestration` folder of this repo.

## Motivation

We already have a model training script. Maybe a data scientist in your team handed it to you and your job is schedule the running of training script using a workflow orchestration - Prefect in this case. Below are the requirements. Do not implement them yet, we will do so in this exercise. Just understand the goal.

1. The training flow will be run every month.
2. The flow will take in a parameter called `date` which will be a datetime.
    a. `date` should default to None
    b. If `date` is None, set `date` as the current day. Use the data from 2 months back as the training data and the data from the previous month as validation data.
    c. If `date` is passed, get 2 months before the `date` as the training data, and the previous month as validation data.
    d. As a concrete example, if the date passed is "2021-03-15", the training data should be "fhv_tripdata_2021-01.parquet" and the validation file will be "fhv_trip_data_2021-02.parquet"
3. Save the model as "model-{date}.pkl" where date is in `YYYY-MM-DD`. Note that `date` here is the value of the flow `parameter`. In practice, this setup makes it very easy to get the latest model to run predictions because you just need to get the most recent one.
4. In this example we use a DictVectorizer. That is needed to run future data through our model. Save that as "dv-{date}.pkl". Similar to above, if the date is `2021-03-15`, the files output should be `model-2021-03-15.bin` and `dv-2021-03-15.b`.

This convention is not strict in industry, and in practice, you will come up with your own system to manage these training pipeline runs. For example, if we wanted to train on the whole history instead of just one month, we'd need to allow for added parameterization and logic in our flow. If the data came in weekly instead of monthly, we might need a different naming convention. But these requirements are already a simple approximation of something you could use in production.

On the deployment side, it's very easy to just pull in the latest data and predict it using the latest model and vectorizer files. Tools the MLFlow in the last chapter can simplify that process as well. This homework will focus more on the batch training.

In order, this homework assignment will be about:

1. Converting the script to a Flow
2. Changing the parameters to take in a `date`. Making this parameter dynamic.
3. Scheduling a batch training job that outputs the latest model somewhere

## Setup

You can use either local Prefect Orion or a VM hosted Prefect Orion instance for this. It shouldn't matter. Just note that if you use a VM hosted one, you will need to configure your local API to hit the VM.

Video 3.4 of the course will give more detailed instructions if you been run it on a VM.

## Get the data

In [1]:
# use brace expansion to download all January - March files
!wget -P data 'https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-0'{1..3}'.parquet'

--2022-06-04 23:19:14--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-01.parquet
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 54.231.192.161
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|54.231.192.161|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 11886281 (11M) [binary/octet-stream]
Saving to: ‘data/fhv_tripdata_2021-01.parquet’


2022-06-04 23:19:20 (2.04 MB/s) - ‘data/fhv_tripdata_2021-01.parquet’ saved [11886281/11886281]

--2022-06-04 23:19:20--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-02.parquet
Reusing existing connection to nyc-tlc.s3.amazonaws.com:443.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘data/fhv_tripdata_2021-02.parquet’


2022-06-04 23:19:22 (5.85 MB/s) - ‘data/fhv_tripdata_2021-02.parquet’ saved [10645466/10645466]

--2022-06-04 23:19:22--  https://nyc-tlc.s3.amazonaws.com/trip+data/fh


## Q1. Converting the script to a Prefect flow

If you want to follow the videos exactly, do:

```
pip install prefect==2.0b5
```

If you need Windows support, check `homework-windows.md` for installation instructions.

The current script `homework.py` is a fully functional script as long as you already have `fhv_trip_data_2021-01.parquet` and `fhv_trip_data_2021-02.parquet` inside a `data` folder. You should be able to already run it using:

```
python homework.py
```

We want to bring this to workflow orchestration to add observability around it. The `main` function will be converted to a `flow` and the other functions will be `tasks`. After adding all of the decorators, there is actually one task that you will need to call `.result()` for inside the `flow` to get it to work. Which task is this?

* read_data
* prepare_features
* train_model
* run_model

Important: change all `print` statements to use the Prefect logger. Using the `print` statement will not appear in the Prefect UI. You have to call `get_run_logger` at the start of the task to use it.


In [2]:
!python homework.py

20:51:28.293 | INFO    | prefect.engine - Created flow run 'chirpy-mackerel' for flow 'main'
20:51:28.293 | INFO    | Flow run 'chirpy-mackerel' - Using task runner 'SequentialTaskRunner'
20:51:28.339 | INFO    | Flow run 'chirpy-mackerel' - Created task run 'read_data-4c7f9de4-0' for task 'read_data'
20:51:30.890 | INFO    | Task run 'read_data-4c7f9de4-0' - Finished in state Completed()
20:51:30.912 | INFO    | Flow run 'chirpy-mackerel' - Created task run 'prepare_features-4ee39d9f-0' for task 'prepare_features'
20:51:30.999 | INFO    | Task run 'prepare_features-4ee39d9f-0' - The mean duration of training is 16.2472533682457
20:51:34.467 | INFO    | Task run 'prepare_features-4ee39d9f-0' - Finished in state Completed()
20:51:34.513 | INFO    | Flow run 'chirpy-mackerel' - Created task run 'read_data-4c7f9de4-1' for task 'read_data'
20:51:36.781 | INFO    | Task run 'read_data-4c7f9de4-1' - Finished in state Completed()
20:51:36.801 | INFO    | Flow run 'chirpy-mackerel' - 

After adding all of the decorators, there is actually one task that you will need to call `.result()` for inside the `flow` to get it to work. Which task is this?

--> train_model
since the call to this task is unpacking into lr, dv = ...

## Q2. Parameterizing the flow

Right now there are two parameters for `main()` called `train_path` and `val_path`. We want to change the flow function to accept `date` instead. `date` should then be passed to a task that gives both the `train_path` and `val_path` to use.

It should look like this:

```python
@flow
def main(date=None):
    train_path, val_path = get_paths(date).result()
    # rest of flow below
```

Where `get_paths` is a task that you have to implement. The specs for this are outlined in the motivation section. Listing them out again here:

The flow will take in a parameter called `date` which will be a datetime.
    a. `date` should default to None
    b. If `date` is None, use the current day. Use the data from 2 months back as the training data and the data from the previous month as validation data.
    c. If a `date` value is supplied, get 2 months before the `date` as the training data, and the previous month as validation data.
    d. As a concrete example, if the date passed is "2021-03-15", the training data should be "fhv_tripdata_2021-01.parquet" and the validation file will be "fhv_trip_data_2021-02.parquet"

Because we have two files:
* fhv_tripdata_2021-01.parquet
* fhv_tripdata_2021-02.parquet

Change the `main()` flow call to the following:

```
main(date="2021-03-15")
```

and it should use those files. This is a simplification for testing our homework.

Recall the page from where we downloaded the For-Hire trip data.

https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Download the relevant files needed to run the `main` flow if `date` is 2021-08-15.

For example:
```
main(date="2021-08-15")
```

By setting up the logger from the previous step, we should see some logs about our training job. What is the validation MSE when running the flow with this date?

Note you need to download the relevant files to run. Part of this question is understanding which files the flow should be looking for.

The valition MSE is:

* 11.637
* 11.837
* 12.037
* 12.237

## Testing to find a solution for get_paths implementation

In [24]:
from datetime import date, timedelta, datetime
import dateutil.relativedelta


date_string = "2021-08-15"
d = date.fromisoformat(date_string)

training_months_dt = [d - dateutil.relativedelta.relativedelta(months=i) for i in range(1, 2)]
validation_months_dt = [d - dateutil.relativedelta.relativedelta(months=i) for i in range(2, 3)]

training_months_paths = [f'./data/fhv_tripdata_{dt.year}-{dt.month}.parquet' for dt in training_months_dt]
validation_months_dt = [f'./data/fhv_tripdata_{dt.year}-{dt.month}.parquet' for dt in validation_months_dt]

print(training_months_paths)
print(validation_months_dt)


['./data/fhv_tripdata_2021-7.parquet']
['./data/fhv_tripdata_2021-6.parquet']


In [36]:
def get_paths(date_string=None, months_back_for_training_range=(1, 2), months_back_for_valid_range=(2, 3)):
    if date_string is None:
        date_string = date.today()

    datem = date.fromisoformat(date_string)

    training_months_dt = [datem - dateutil.relativedelta.relativedelta(months=i) for i in months_back_for_training_range]
    validation_months_dt = [datem - dateutil.relativedelta.relativedelta(months=i) for i in months_back_for_valid_range]

    training_months_paths = [f'./data/fhv_tripdata_{dt.year}-{dt.month:02}.parquet' for dt in training_months_dt]
    validation_months_paths = [f'./data/fhv_tripdata_{dt.year}-{dt.month:02}.parquet' for dt in validation_months_dt]

    return training_months_paths[0], validation_months_paths[0] # return single files for training, since main()  expects it in this format


assert(get_paths("2021-03-15") == ('./data/fhv_tripdata_2021-02.parquet', './data/fhv_tripdata_2021-01.parquet'))

get_paths("2021-03-15")

('./data/fhv_tripdata_2021-02.parquet', './data/fhv_tripdata_2021-01.parquet')

In [29]:
#Download the relevant files needed to run the `main` flow if `date` is 2021-08-15.

!wget -P data 'https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-0'{6..7}'.parquet'

--2022-06-06 13:17:29--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-06.parquet
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.160.3
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.160.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13208079 (13M) [binary/octet-stream]
Saving to: ‘data/fhv_tripdata_2021-06.parquet’


2022-06-06 13:17:43 (1018 KB/s) - ‘data/fhv_tripdata_2021-06.parquet’ saved [13208079/13208079]

--2022-06-06 13:17:43--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhv_tripdata_2021-07.parquet
Reusing existing connection to nyc-tlc.s3.amazonaws.com:443.
HTTP request sent, awaiting response... 200 OK
Length: 12650862 (12M) [binary/octet-stream]
Saving to: ‘data/fhv_tripdata_2021-07.parquet’


2022-06-06 13:17:46 (5.40 MB/s) - ‘data/fhv_tripdata_2021-07.parquet’ saved [12650862/12650862]

FINISHED --2022-06-06 13:17:46--
Total wall clock time: 16s
Downloaded: 

In [38]:
# Run the homework for hw3.Q2
# main(date="2021-08-15")
!python homework.py

13:38:08.129 | INFO    | prefect.engine - Created flow run 'neat-puffin' for flow 'main'
13:38:08.129 | INFO    | Flow run 'neat-puffin' - Using task runner 'SequentialTaskRunner'
13:38:08.171 | INFO    | Flow run 'neat-puffin' - Created task run 'get_paths-6e696e34-0' for task 'get_paths'
13:38:08.209 | INFO    | Task run 'get_paths-6e696e34-0' - Finished in state Completed()
13:38:08.210 | INFO    | Flow run 'neat-puffin' - Path for training: ./data/fhv_tripdata_2021-06.parquet, Path for validation: ./data/fhv_tripdata_2021-07.parquet
13:38:08.230 | INFO    | Flow run 'neat-puffin' - Created task run 'read_data-4c7f9de4-0' for task 'read_data'
13:38:11.192 | INFO    | Task run 'read_data-4c7f9de4-0' - Finished in state Completed()
13:38:11.225 | INFO    | Flow run 'neat-puffin' - Created task run 'prepare_features-4ee39d9f-0' for task 'prepare_features'
13:38:11.327 | INFO    | Task run 'prepare_features-4ee39d9f-0' - The mean duration of training is 18.230538791569113
13:38

The valition MSE is:

* 11.637
* 11.837
* 12.037
* 12.237

13:38:32.315 | INFO    | Task run 'run_model-6559300c-0' - The MSE of validation is: 11.637023826050765
--> A)
