# KubeDirector ML Pipeline Demo (NYC Taxi Dataset)

Prerequisite: Please ensure `lookup-ipyheader.csv` and `small_yellow_tripdata_2009-01.csv` are loaded in your Project Repo under the data folder.


In [1]:
%attachments

Training Cluster        ML Engine
----------------------  -----------
trainingengineinstance  python


In [2]:
%%trainingengineinstance

# --------------------------------------
# If smallDemoDataset is False, the full dataset will be downloaded from s3://bd-kartik/testingalt_yellow_tripdata_2009-01.csv. Only January 2009 data is available here, more data can be found by uncommenting the lines that contain this url: https://s3.amazonaws.com/nyc-tlc/trip+data/
# If smallDemoDataset is True, a sample dataset will be loaded from the project repo at data/small_yellow_tripdata_2009-01.csv
smallDemoDataset = True

numEpochs = 1
modelDirectory = 'testmodel'
# --------------------------------------



print("Importing libraries")

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Activation,Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model
import os
import urllib
import sys
import pickle


from scipy import stats
import math
import datetime


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn import metrics
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_squared_log_error
from math import sqrt

print("Done importing libraries")

# Specify months to train below. The available data ranges from 1/2009 to 12/2019.

listMonthsToTrain = [[2009, 1]]

# listMonthsToTrain = [[x,y] for x in list(range(2009, 2020)) for y in list(range(1, 13))]


# --------------------------------------------------------------------------------------------

trainSetSize = 0



def strAppendZero(month):
    if (month < 10):
        return "0" + str(month)
    else:
        return str(month)

def taxiFilePath(year, month, extension):
    year = str(year)
    month = strAppendZero(month)
    print(ProjectRepo("data/originals/" + year + "/" + month + "/" + "yellow_tripdata_" + year + "-" + month + "." + extension))
    return ProjectRepo("data/originals/" + year + "/" + month + "/" + "yellow_tripdata_" + year + "-" + month + "." + extension)

def fileName(year, month, extension):
    year = str(year)
    month = strAppendZero(month)
    return "yellow_tripdata_" + year + "-" + month + "." + extension

def taxiFileParentPath(year, month):
    year = str(year)
    month = strAppendZero(month)
    return ProjectRepo("data/originals/" + year + "/" + month)

# Project repo path function
def ProjectRepo(path):
    ProjectRepo = "/bd-fs-mnt/project_repo"
    return str(ProjectRepo + '/' + path)





# Get full name of the dataframe column by appending the database name to the beginning (a vestige from working with Hive) 
def fullName(colName):
    return dbName + '.' + colName

# Downloads data into the Project Repo if not present, then returns a dataframe containing that data.
def downloadDataDf(year, month):
    if (not os.path.exists(taxiFilePath(year, month, "csv"))):
        try:
            if not os.path.isdir(taxiFileParentPath(year, month)):
                os.makedirs(taxiFileParentPath(year, month))   
                
            url = "s3://bd-kartik/testingalt_yellow_tripdata_2009-01.csv"
                
# Uncomment the lines below to download data from the NYC Taxi Commission's S3 bucket                
#             url = "https://s3.amazonaws.com/nyc-tlc/trip+data/" + fileName(year, month, "csv")
#             proxy = urllib.request.ProxyHandler({'https': 'web-proxy.corp.hpecorp.net:8080'})
#             opener = urllib.request.build_opener(proxy)
#             urllib.request.install_opener(opener)      

    
            urllib.request.urlretrieve (url, taxiFilePath(year, month, "csv") + "-downloadInProgress")
            os.rename(taxiFilePath(year, month, "csv") + "-downloadInProgress", taxiFilePath(year, month, "csv"))
        except:
            raise AssertionError("Error downloading dataset from S3")
            
    if (year == 2016 and month >= 7):
        df = pd.read_csv(taxiFilePath(year, month, "csv"), skiprows=1, names=['VendorID','tpep_pickup_datetime','tpep_dropoff_datetime','passenger_count','trip_distance','RatecodeID','store_and_fwd_flag','PULocationID','DOLocationID','payment_type','fare_amount','extra','mta_tax','tip_amount','tolls_amount','improvement_surcharge','total_amount','blank1','blank2'])
    else:
        df = pd.read_csv(taxiFilePath(year, month, "csv"), warn_bad_lines=True, error_bad_lines=False)
    df.columns = [x.lower() for x in df.columns]
    df.columns = df.columns.str.replace(' ', '')
    for str in ['vendor_name', 'passenger_count', 'rate_code', 'store_and_forward', 'payment_type', 'fare_amt', 'surcharge', 'mta_tax', 'tip_amt', 'tolls_amt', 'total_amt']:
        if str in df.columns:
            del df[str]
    for str in ['vendor_id', 'passenger_count', 'store_and_fwd_flag', 'fare_amount', 'surcharge', 'tip_amount', 'tolls_amount', 'total_amount', 'congestion_surcharge', 'improvement_surcharge']:
        if str in df.columns:
            del df[str]
    df = df.add_prefix('pqyellowtaxi.')
    return df



def downloadDemoDf(year, month):
    try:
        df = pd.read_csv(ProjectRepo('data/small_yellow_tripdata_2009-01.csv'), warn_bad_lines=True, error_bad_lines=False)
    except:
        raise AssertionError("Error loading dataset from Project Repo")

    df.columns = [x.lower() for x in df.columns]
    for str in ['vendor_name', 'passenger_count', 'rate_code', 'store_and_forward', 'payment_type', 'fare_amt', 'surcharge', 'mta_tax', 'tip_amt', 'tolls_amt', 'total_amt']:
        if str in df.columns:
            del df[str]
    for str in ['vendor_id', 'passenger_count', 'store_and_fwd_flag', 'fare_amount', 'surcharge', 'tip_amount', 'tolls_amount', 'total_amount', 'congestion_surcharge', 'improvement_surcharge']:
        if str in df.columns:
            del df[str]
    df = df.add_prefix('pqyellowtaxi.')
    return df
    

def mergeData(df, lookup):
    try:
        if fullName('pulocationid') in df.columns:
            df = pd.merge(df, dflook[[lookupDbName + '.location_i', lookupDbName + '.long', lookupDbName + '.lat']], how='left', left_on=dbName + '.pulocationid', right_on=lookupDbName + '.location_i')
            df.rename(columns = {(lookupDbName + '.long'):(dbName + '.startstationlongitude')}, inplace = True)
            df.rename(columns = {(lookupDbName + '.lat'):(dbName + '.startstationlatitude')}, inplace = True)
            df = pd.merge(df, dflook[[lookupDbName + '.location_i', lookupDbName + '.long', lookupDbName + '.lat']], how='left', left_on=dbName + '.dolocationid', right_on=lookupDbName + '.location_i')
            df.rename(columns = {(lookupDbName + '.long'):(dbName + '.endstationlongitude')}, inplace = True)
            df.rename(columns = {(lookupDbName + '.lat'):(dbName + '.endstationlatitude')}, inplace = True)
        else:
            if fullName('pickup_longitude') in df.columns:
                df.rename(columns = {(dbName + '.pickup_longitude'):(dbName + '.startstationlongitude')}, inplace = True)
                df.rename(columns = {(dbName + '.pickup_latitude'):(dbName + '.startstationlatitude')}, inplace = True)
                df.rename(columns = {(dbName + '.dropoff_longitude'):(dbName + '.endstationlongitude')}, inplace = True)
                df.rename(columns = {(dbName + '.dropoff_latitude'):(dbName + '.endstationlatitude')}, inplace = True)
            elif fullName('start_lon') in df.columns:
                df.rename(columns = {(dbName + '.start_lon'):(dbName + '.startstationlongitude')}, inplace = True)
                df.rename(columns = {(dbName + '.start_lat'):(dbName + '.startstationlatitude')}, inplace = True)
                df.rename(columns = {(dbName + '.end_lon'):(dbName + '.endstationlongitude')}, inplace = True)
                df.rename(columns = {(dbName + '.end_lat'):(dbName + '.endstationlatitude')}, inplace = True)
            if fullName('trip_pickup_datetime') in df.columns:
                df.rename(columns = {(dbName + '.trip_pickup_datetime'):(dbName + '.tpep_pickup_datetime')}, inplace = True)
                df.rename(columns = {(dbName + '.trip_dropoff_datetime'):(dbName + '.tpep_dropoff_datetime')}, inplace = True)
            elif fullName('pickup_datetime') in df.columns:
                df.rename(columns = {(dbName + '.pickup_datetime'):(dbName + '.tpep_pickup_datetime')}, inplace = True)
                df.rename(columns = {(dbName + '.dropoff_datetime'):(dbName + '.tpep_dropoff_datetime')}, inplace = True)
        return df
    except:
        raise AssertionError("Error merging data, please verify column names have not been modified.")
        
    

    
def generateFeatures(df):
    df[dbName + '.tpep_pickup_datetime'] = pd.to_datetime(df[dbName + '.tpep_pickup_datetime'])
    df[dbName + '.tpep_dropoff_datetime'] = pd.to_datetime(df[dbName + '.tpep_dropoff_datetime'])
    df[fullName('duration')] = (df[fullName("tpep_dropoff_datetime")] - df[fullName("tpep_pickup_datetime")]).dt.total_seconds()

    df[fullName("weekday")] = (df[fullName('tpep_pickup_datetime')].dt.dayofweek < 5).astype(float)
    df[fullName("hour")] = df[fullName('tpep_pickup_datetime')].dt.hour
    df[fullName("work")] = (df[fullName('weekday')] == 1) & (df[fullName("hour")] >= 8) & (df[fullName("hour")] < 18)
    return df
    
def removeOutliers(df):
    df = df[df[fullName('duration')] > 20]
    df = df[df[fullName('duration')] < 10800]
    df = df[df[fullName('trip_distance')] > 0]
    df = df[df[fullName('trip_distance')] < 150]
    return df

dbName = "pqyellowtaxi"
lookupDbName = "pqlookup"
try:
    dflook = pd.read_csv(ProjectRepo('data/lookup-ipyheader.csv'))
except:
    raise AssertionError("Please ensure lookup-ipyheader.csv is in your Project Repo under the data folder.")
    

for step in range(0, len(listMonthsToTrain)):   
    year, month = listMonthsToTrain[step]
    
    
    
    print("Begin step " + str(step) + ": year" + str(year) + " month" + str(month))
    
    print("Reading in data" + " step " + str(step))
    # ==================================================================


    if smallDemoDataset is True:
        df = downloadDemoDf(year, month)
    else:
        df = downloadDataDf(year, month)
        

    # ==================================================================

    print("Done reading in data, start data cleaning" + " step " + str(step))
    print("Dataset size before cleaning" + " step " + str(step) + ": " + str(len(df)))


    print("merge")
    df = mergeData(df, dflook)

    print("generateFeatures")
    df = generateFeatures(df)

    print("removeOutliers")
    df = removeOutliers(df)


    cols = [fullName('work'), fullName('startstationlatitude'), fullName('startstationlongitude'), fullName('endstationlatitude'), fullName('endstationlongitude'), fullName('trip_distance'), fullName('weekday'), fullName('hour'), fullName('duration')]
    dataset = df[cols]
    dataset = dataset.dropna(how='any',axis=0)

    del df

    X = dataset.iloc[:, 0:(len(cols) - 1)].values
    y = dataset.iloc[:, (len(cols) - 1)].values
    X = X.copy()
    y = y.copy()




    print("Dataset size after cleaning" + " step " + str(step) + ": " + str(len(dataset)))
    trainSetSize = trainSetSize + len(dataset)
    print("Cumulative data size read in all steps up including step " + str(step) + ": " + str(trainSetSize))
    print("Done cleaning data" + " step " + str(step))

    del dataset

    print("Training..." + " step " + str(step))



    X_train,X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

    
    if (step == 0):
        sc = StandardScaler()
        X_train = sc.fit_transform(X_train)
        X_test = sc.transform(X_test)
        if (not os.path.exists(ProjectRepo('models/' + modelDirectory))):
            os.mkdir(ProjectRepo('models/' + modelDirectory))
        input_layer = Input(shape=(X.shape[1],))
        dense_layer_1 = Dense(100, activation='relu')(input_layer)
        dense_layer_2 = Dense(50, activation='relu')(dense_layer_1)
        dense_layer_3 = Dense(25, activation='relu')(dense_layer_2)
        output = Dense(1)(dense_layer_3)
        model = Model(inputs=input_layer, outputs=output)
        model.compile(loss="mean_squared_error" , optimizer="adam", metrics=["mean_squared_error"])
        pickle.dump(sc, open(ProjectRepo('models/' + modelDirectory + '/scaler.pkl'),'wb'))
    else:
#         If step is not 0, then model training should continue based on the previous step's model.
        sc = pickle.load(open(ProjectRepo('models/' + modelDirectory + '/scaler.pkl'),'rb'))
        X_train = sc.fit_transform(X_train)
        X_test = sc.transform(X_test)
        prevModelPath = 'models/' + modelDirectory + '/' + str(step - 1) + '_tf'
        model = load_model(ProjectRepo(prevModelPath))


    print("Step " + str(step) + " begin training time: ", datetime.datetime.now())
    history_callback = model.fit(X_train, y_train, batch_size=256, epochs=numEpochs, verbose=1, validation_split=0.2)

    loss_history = history_callback.history["loss"]

    lossHistoryDirPath = 'models/' + modelDirectory + '/' + 'history'
    lossHistoryFilePath = lossHistoryDirPath + '/' + str(step) + '.txt'

    y_pred = model.predict(X_test)
    y_pred = y_pred.clip(min=0)

    print("Step " + str(step) + "metrics")
    print('Mean Absolute Error:', metrics.mean_absolute_error(y_test, y_pred))
    print('Mean Squared Error:', metrics.mean_squared_error(y_test, y_pred))
    print('Root Mean Squared Error:', np.sqrt(metrics.mean_squared_error(y_test, y_pred)))
    print('Root Mean Squared Log Error:', np.sqrt(mean_squared_log_error( y_test, y_pred)))
    print()
    # Finish time
    print("Step " + str(step) + " end training time: ", datetime.datetime.now())


    if (not os.path.exists(ProjectRepo('models/' + modelDirectory))):
        os.mkdir(ProjectRepo('models/' + modelDirectory + '/' + str(step) + '_tf'))
    
    modelPath = 'models/' + modelDirectory + '/' + str(step) + '_tf'
    
    model.save(ProjectRepo(modelPath))
    del model
    del X_train
    del X_test
    del y_train
    del y_test

History URL: http://kdss-2cd6v-0.kdhs-858ph.thetestingtenant.svc.cluster.local:10001/history/2


In [3]:
%logs --url http://kdss-2cd6v-0.kdhs-858ph.thetestingtenant.svc.cluster.local:10001/history/2

Job Status: Finished
Importing libraries
Done importing libraries
Begin step 0: year2009 month1
Reading in data step 0
Done reading in data, start data cleaning step 0
Dataset size before cleaning step 0: 14998
merge
generateFeatures
removeOutliers
Dataset size after cleaning step 0: 14546
Cumulative data size read in all steps up including step 0: 14546
Done cleaning data step 0
Training... step 0
Step 0 begin training time:  2020-08-10 07:41:42.350525
Train on 9308 samples, validate on 2328 samples
Epoch 1/2
256/9308 [..............................] - ETA: 16s - loss: 496655.8125 - mean_squared_error: 496655.8125
Epoch 2/2
256/9308 [..............................] - ETA: 0s - loss: 657290.2500 - mean_squared_error: 657290.2500
Step 0metrics
Mean Absolute Error: 595.000206182