## 2. Amazon Sagemaker processing jobs

In the previous notebook, we have decided the pre-processing steps for the raw data. Here we trigger Sagemaker processing jobs to pre-process the whole raw data for training. Especially, we take a benefit of Sagemaker by triggering **multiple processing jobs for parallel processing**.

In [2]:
%pip install --upgrade boto3 sagemaker

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Collecting boto3
  Downloading boto3-1.26.74-py3-none-any.whl (132 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m132.7/132.7 kB[0m [31m12.0 MB/s[0m eta [36m0:00:00[0m
Collecting sagemaker
  Downloading sagemaker-2.133.0.tar.gz (671 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m671.5/671.5 kB[0m [31m29.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting botocore<1.30.0,>=1.29.74
  Downloading botocore-1.29.74-py3-none-any.whl (10.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.4/10.4 MB[0m [31m37.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Building wheels for collected packages: sagemaker
  Building wheel for sagemaker (setup.py) ... [?25ldone
[?25h  Created wheel for sagemaker: filename=sagemaker-2.133.0-py2.py3-none-any.whl size=908931 sha256=8eb89ebdc6e1174750f53e46fed8a8

In [1]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

region = sagemaker.Session().boto_region_name
role = get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1", role=role, instance_type="ml.m5.xlarge", instance_count=1
)

### Prepare Processing script

In this script, we 1) read parquet files in a given directory, 2) pre-process e.g. clean data and generate a target feature, called 'count', 3) aggregate the date into one file.

In [19]:
%%writefile preprocessing.py
import pandas as pd
import glob
import argparse

def _parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--foldername', type=str, default='_no_given_name_')
    return parser.parse_known_args()

if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    # Load data
    print("processing " + args.foldername)

    files = glob.glob("/opt/ml/processing/input/*.parquet")
    print(files)
    
    columns_to_read = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", 
                       "PULocationID", "total_amount"]
    procssed = pd.DataFrame()
    
    for file in files:

        print(f"reading {file}")
        df = pd.read_parquet(file, columns=columns_to_read)

        print(df.isnull().sum())
        df = df.dropna()

        df['duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60
            
        df = df[df['trip_distance'] > 0]
        df = df[df['total_amount'] > 0]
        df = df[df['duration'] > 0]
        outlier_columns = ['trip_distance', 'total_amount', 'duration']
        sigma2 = 0.9544
        sigma3 = 0.9973
        sigma4 = 0.9999
        for c in outlier_columns:
            s1 = df[c].quantile(sigma4)
            s0 = df[c].quantile(1-sigma4)
            df = df.drop(df[df[c] > s1].index)
            df = df.drop(df[df[c] < s0].index)
            
        df['pickup_time'] = df['tpep_pickup_datetime'].dt.floor('h')
        df = df.drop(columns=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
        df['count'] = df.groupby(by=['pickup_time', 'PULocationID'])['duration'].transform('size')
        df_agg = df.sort_values(by=['pickup_time', 'PULocationID'], ascending=True)\
                                    .groupby(by=['pickup_time', 'PULocationID'])\
                                    .mean(numeric_only=False)
        df_agg = df_agg.reset_index()
        procssed = pd.concat([procssed, df_agg], ignore_index=True)
    
    procssed.sort_values(by=['pickup_time', 'PULocationID'])\
        .to_csv("/opt/ml/processing/output/" + args.foldername + "_processed.csv")
    print("wrote files successfully")

    print("completed running processing job")

Overwriting preprocessing.py


### Run Processing job

We trigger 4 sklearn processing jobs per each year directory, that has 12 months raw data files. Please, make sure we set wait=False for immediate return a.k.a asynchronos api call. Each job will take a few minutes.

In [20]:
%%capture output

from sagemaker.processing import ProcessingInput, ProcessingOutput

my_bucket_name = "s3://[my bucket name]"
folders = ["2019", "2020", "2021", "2022"]
for folder in folders:
    sklearn_processor.run(
        code="preprocessing.py",
        arguments = ["--foldername", folder],
        inputs=[ProcessingInput(source=f"{my_bucket_name}/nyc-taxi-trip-data/" + folder, 
                                destination="/opt/ml/processing/input/")],
        outputs=[ProcessingOutput(source="/opt/ml/processing/output/", 
                                  destination=f"{my_bucket_name}/nyc-taxi-trip-data-processed/")],
        wait=False
    )    
    

Please, see the job progress in AWS console. Once the jobs have been successfully completed you can go to the training.