# Homework 4: batch processing of the Taxi Dataset

The objective of this assignment is to utilize a pre-trained model to develop a batch processing pipeline, which will be deployed via a Docker container. The user will execute the Docker container with specific parameters to run the processing pipeline.

Additionally, the pipeline will be modified to export its output to an S3 bucket. The entire pipeline will then be recreated using Mage and its execution will be triggered.

**Additional Comments:**

- The model was originally trained using `scikit-learn==1.5.0`, and it is essential to maintain the same version to ensure compatibility. The `model.bin` pickle file contains both the ML model and the dictionary vectorizer.

- The environment for this homework was created using pipenv. The `Pipfile` and `Pipfile.lock` were also attached to the homework folder.

In [1]:
!pip freeze | grep scikit-learn

scikit-learn==1.5.0


The following code chunks mirror the processing from previous homeworks. We load the model, download the data, filter out outliers based on trip duration, and then make predictions using this model. Note that it is possible to parametrize the data download process by setting the `year` and `month`, and passing these parameters directly to the URL.

In [2]:
import pickle
import numpy as np
import pandas as pd

with open('model.bin', 'rb') as f_in:
    dv, model = pickle.load(f_in)

In [3]:
categorical = ['PULocationID', 'DOLocationID']

def read_data(filename):
    df = pd.read_parquet(filename)
    
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    
    return df

In [4]:
year = 2023
month = 3
df = read_data(f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet')

In [5]:
dicts = df[categorical].to_dict(orient='records')
X_val = dv.transform(dicts)
y_pred = model.predict(X_val)

**Question 1:** What's the standard deviation of the predicted duration for this (March 2023) dataset?

In [6]:
np.std(y_pred)

6.247488852238703

Now we creat an artificial `ride_id` column and, along with the predictions, create a `DataFrame` with the results and save it as a parquet file.

In [7]:
df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

In [8]:
df_result = pd.DataFrame({'ride_id': df['ride_id'], 'y_pred': y_pred})

output_file = 'output.parquet'

df_result.to_parquet(
    output_file,
    engine='pyarrow',
    compression=None,
    index=False
)

**Question 2:** What's the size of the output file?

In [9]:
import os

file_size_bytes = os.path.getsize(output_file)
file_size_mb = file_size_bytes / (1024 ** 2)
file_size_mb

65.46183013916016

That's pretty much the whole pipeline. Now we're instructed to turn the nb into a script.

**Question 3:** Which command you need to execute for that?

```bash
jupyter nbconvert --to script notebook_name.ipynb
```

The resulting script was further transformed based on the remaining part of the homework. You can find the complete script at `batch_processing.py`

As mentioned before, pipenv was used to install all the libraries by running

```bash
pip install pipenv
pipenv --python 3.10.13 # initialize the environment
pipenv install pyarrow==16.1.0 scikit-learn==1.5.0 pandas==2.2.2
```

**Question 4:** After installing the libraries, pipenv creates two files: `Pipfile` and `Pipfile.lock`. The `Pipfile.lock` file keeps the hashes of the dependencies we use for the virtual env. What's the first hash for the Scikit-Learn dependency?

In [10]:
import json

with open('Pipfile.lock') as f:
    data = json.load(f)

scikit_learn_hashes = data['default']['scikit-learn']['hashes']
first_scikit_learn_hash = scikit_learn_hashes[0]

print(f"The first hash of scikit-learn is: {first_scikit_learn_hash}")

The first hash of scikit-learn is: sha256:057b991ac64b3e75c9c04b5f9395eaf19a6179244c089afdebaad98264bff37c


The script was parametrized using `Click`. It accepts the following parameters:

+ `--year`: Enter a year
+ `--month`: Enter a month (1-12)
+ `--metrics`: Print metrics about the predictions
+ `--bucket`: Name of the S3 bucket to upload the results

**Question 5:** Run the script for April 2023. What's the mean predicted duration?

In [11]:
! python3 batch_processing.py --year 2023 --month 4 --metrics

2024-06-14 11:20:00,974 - INFO - Starting data processing for 2023-04
2024-06-14 11:20:00,974 - INFO - Loading and processing data...
2024-06-14 11:20:12,420 - INFO - Data loaded successfully. Proceeding with predictions.
2024-06-14 11:20:12,421 - INFO - Transforming categorical variables...
2024-06-14 11:20:19,048 - INFO - Generating predictions using the model...
2024-06-14 11:20:19,057 - INFO - Predictions generated successfully.
2024-06-14 11:20:19,057 - INFO - Calculating prediction metrics...
2024-06-14 11:20:19,065 - INFO - Prediction metrics - Mean: 14.292283, Std: 6.353997, Min: -16.328844, Max: 70.047721
2024-06-14 11:20:19,611 - INFO - Preparing results for export...
2024-06-14 11:20:19,631 - INFO - Exporting results to output.parquet...
2024-06-14 11:20:19,896 - INFO - Data processing and export completed successfully.


Finally, we'll package the script in the docker container. For that, we'll a base image that was prepared by the course tutor: [agrigorev/zoomcamp-model:mlops-2024-3.10.13-slim](https://hub.docker.com/layers/agrigorev/zoomcamp-model/mlops-2024-3.10.13-slim/images/sha256-f54535b73a8c3ef91967d5588de57d4e251b22addcbbfb6e71304a91c1c7027f?context=repo)

This is what the content of this image is:

```DockerFile
FROM python:3.10.13-slim

WORKDIR /app
COPY [ "model2.bin", "model.bin" ]
```

This image already has a pickle file with a dictionary vectorizer and a model, so we use `agrigorev`'s image as base and we don't include our `model.bin` into the container. We build this container by running

```bash
docker build -t homework-4 .
```

**Question 6:** Now run the script with docker. What's the mean predicted duration for May 2023?


In [12]:
! docker run --platform linux/amd64 -it homework-4 --year 2023 --month 5 --metrics

2024-06-14 16:20:26,007 - INFO - Starting data processing for 2023-05
2024-06-14 16:20:26,008 - INFO - Loading and processing data...
2024-06-14 16:20:38,461 - INFO - Data loaded successfully. Proceeding with predictions.
2024-06-14 16:20:38,466 - INFO - Transforming categorical variables...
2024-06-14 16:20:52,329 - INFO - Generating predictions using the model...
2024-06-14 16:20:52,342 - INFO - Predictions generated successfully.
2024-06-14 16:20:52,342 - INFO - Calculating prediction metrics...
2024-06-14 16:20:52,351 - INFO - Prediction metrics - Mean: 0.191744, Std: 1.388140, Min: -5.206588, Max: 5.559183
2024-06-14 16:20:53,256 - INFO - Preparing results for export...
2024-06-14 16:20:53,272 - INFO - Exporting results to output.parquet...
2024-06-14 16:20:53,674 - INFO - Data processing and export completed successfully.


## Bonus: upload the result the cloud

To achieve this part of the homework, we now create, in AWS:

- An S3 bucket called `mlops-zoomcamp-fustincho`.
- An IAM User and create access keys for it.
- A policy that is attached directly to the user, with the following permissions:

```json
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "s3:ListBucket",
			"Resource": "arn:aws:s3:::mlops-zoomcamp-fustincho"
		},
		{
			"Effect": "Allow",
			"Action": "s3:PutObject",
			"Resource": "arn:aws:s3:::mlops-zoomcamp-fustincho/*"
		}
	]
}
```

When `batch_processing.py` receives a bucket name as the `--bucket` parameter, then it attempts to upload, via the boto3 client, the resulting output. For this to work, it is imperative for `boto3` to find the AWS credentials. For this, we create an `env` file and paste the access keys we created earlier.

In [13]:
%%writefile .env
AWS_ACCESS_KEY_ID=your-aws-access-key-id
AWS_SECRET_ACCESS_KEY=your-aws-secret-access-key
AWS_DEFAULT_REGION=your-region

Overwriting .env


In [14]:
!docker run --platform linux/amd64 --env-file .env homework-4 --year 2023 --month 3 --metrics --bucket mlops-zoomcamp-fustincho

2024-06-14 16:29:26,910 - INFO - Starting data processing for 2023-03
2024-06-14 16:29:26,910 - INFO - Loading and processing data...
2024-06-14 16:29:39,709 - INFO - Data loaded successfully. Proceeding with predictions.
2024-06-14 16:29:39,710 - INFO - Transforming categorical variables...
2024-06-14 16:29:51,171 - INFO - Generating predictions using the model...
2024-06-14 16:29:51,183 - INFO - Predictions generated successfully.
2024-06-14 16:29:51,183 - INFO - Calculating prediction metrics...
2024-06-14 16:29:51,192 - INFO - Prediction metrics - Mean: 0.188769, Std: 1.391873, Min: -5.299526, Max: 5.559183
2024-06-14 16:29:52,092 - INFO - Preparing results for export...
2024-06-14 16:29:52,111 - INFO - Exporting results to output.parquet...
2024-06-14 16:29:52,512 - INFO - Data processing and export completed successfully.
2024-06-14 16:29:52,512 - INFO - Uploading results to S3 bucket: mlops-zoomcamp-fustincho
2024-06-14 16:29:52,534 - INFO - Found credentials in environment vari

![](./img/s3_uploaded.png)

## Bonus: use Mage for batch inference

The `../bonus-homework` folder contains the solution for this bonus homework, using the same model we used for the script. To run the mage project we use `docker compose up`. In Mage, we create a pipeline and separate parts of the `batch_processing.py` in blocks:

![](./img/pipeline.png)

The `year` and `month` were also added as global variables within the pipeline.

Then we create a trigger that we will use to execute the pipeline:

![](./img/trigger.png)

With this API endpoint active, it is possible to trigger a batch processing job by running

```bash
curl -X POST http://localhost:6789/api/pipeline_schedules/1/pipeline_runs/5a78a51f345442b0b5ebee3cd941cd0f \
  --header 'Content-Type: application/json' \
  --data '
{
  "pipeline_run": {
    "variables": {
      "year": "2023",
      "month": "05"
    }
  }
}'
```

We will now test this using the `requests` module. We will predict the trip duration for May and June 2023:

In [15]:
import requests

url = "http://localhost:6789/api/pipeline_schedules/1/pipeline_runs/5a78a51f345442b0b5ebee3cd941cd0f"
headers = {
    'Content-Type': 'application/json'
}
data = {
    "pipeline_run": {
        "variables": {
            "year": "2023",
            "month": "05"
        }
    }
}

response = requests.post(url, headers=headers, data=json.dumps(data))

print(response.status_code)
print(response.json())

data["pipeline_run"]["variables"]["month"] = "06"

response = requests.post(url, headers=headers, data=json.dumps(data))

print(response.status_code)
print(response.json())


200
{'pipeline_run': {'id': 5, 'created_at': '2024-06-14 16:40:58', 'updated_at': '2024-06-14 16:40:58', 'pipeline_schedule_id': 1, 'pipeline_uuid': 'batch_process_taxi_dataset', 'execution_date': '2024-06-14 16:40:58.983406', 'status': 'initial', 'started_at': None, 'completed_at': None, 'variables': {'year': '2023', 'month': '05', 'execution_partition': '1/20240614T164058_983406'}, 'passed_sla': False, 'event_variables': {}, 'metrics': None, 'backfill_id': None, 'executor_type': 'local_python'}}
200
{'pipeline_run': {'id': 6, 'created_at': '2024-06-14 16:40:59', 'updated_at': '2024-06-14 16:40:59', 'pipeline_schedule_id': 1, 'pipeline_uuid': 'batch_process_taxi_dataset', 'execution_date': '2024-06-14 16:40:59.037369', 'status': 'initial', 'started_at': None, 'completed_at': None, 'variables': {'year': '2023', 'month': '06', 'execution_partition': '1/20240614T164059_037369'}, 'passed_sla': False, 'event_variables': {}, 'metrics': None, 'backfill_id': None, 'executor_type': 'local_pyth

![](./img/trigger_running.png)

Once it is completed, we can find the predictions here `../bonus-homework/mage_project/`:

![](./img/trigger_complete.png)

![](./img/outputs.png)