# Data Cleaning, Preprocessing and Analysis

In this file, we will Clean and Analyse our dataset. 

For Cleaning, we use Percentiles and remove outliers. We then preprocess the data and perform certain Analysis such as Granger Causality, Johanson's Cointigration test and ADF Test. These tests are done to prove as to why we are using the Vector Autoregressive Model for our solution

In [0]:
import numpy as np
import pandas as pd
from pyspark.sql import functions as F

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import *
import time
import random

import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from statsmodels.tools.eval_measures import rmse, aic
from sklearn.metrics import mean_absolute_percentage_error
from statsmodels.stats.stattools import durbin_watson
from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from sklearn.metrics import mean_absolute_percentage_error, mean_squared_error, mean_absolute_error
import math
from statsmodels.tsa.stattools import acf
from sklearn.metrics import mean_squared_error

import warnings
warnings.filterwarnings("ignore")

<h3> Import Data

In [0]:
def import_data(filepaths):
  """
  Import Data from list of filepaths and concatanate 
  """
  df = spark.read.csv(filepaths[0],header=True)
  for path in filepaths[1:]:
    df = df.unionByName(spark.read.csv(path,header=True))
  return df


# Import Data
taxi = import_data(["/mnt/pirates-of-the-carribbytes/taxi_data/taxi_2019-01.csv",
                    "/mnt/pirates-of-the-carribbytes/taxi_data/taxi_2019-02.csv",
                    "/mnt/pirates-of-the-carribbytes/taxi_data/taxi_2019-03.csv"])


# Import Zone data and join with our dataset. Remove datapoints from EWR and Staten Island
zones = spark.read.csv("/mnt/pirates-of-the-carribbytes/taxi_data/zone_lookup.csv",header=True).drop("Zone","service_zone")
taxi = taxi.join(zones,taxi.PULocationID == zones.LocationID)\
                 .withColumnRenamed("Borough","Division")\
                 .filter((F.col('Division') != 'EWR') & (F.col('Division') != 'Staten Island'))

# Filter to ensure no outlier years are included (we found a timestamp of 2088) and remove unnecessary columns
taxi = taxi.filter((F.unix_timestamp(taxi.tpep_pickup_datetime)>=1546300800) & (F.unix_timestamp(taxi.tpep_pickup_datetime)<=1554073200))\
           .drop('VendorID', 'RatecodeID', 'store_and_fwd_flag','DOLocationID','fare_amount','extra',"LocationID","PULocationID",
                 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'payment_type')



divs = ["Manhattan", "Queens", "Bronx","Brooklyn", "Unknown"]
taxi.count()

We have removed some attributes such as **'VendorID', 'RatecodeID', 'store_and_fwd_flag','DOLocationID', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge'** \- and work with the remaining data. The Zones data has been joined to our dataset

<h1> Data Preprocessing and Cleaning

In this section, we will clean our data and remove all outliers. We can also create new attributes that may help in the final output.

In [0]:
taxi.show(10)

<h4> Construct New features such as Trip Duration (and Speed?)

- Since we have the Trip start & end time, we can compute the Duration by (Dropoff Time - Pickup Time)/60 --- To get the duration in minutes. 
- Given the Distance travelled, we can also compute the Speed of the journey (Speed = (Distance Travelled/Duration)\*60) ---- Speed in miles/hour


For further computation, we can convert the pickup and drop datetime into UNIX timestamps, and drop the columns that are not required in our case

In [0]:
#Convert to Unix timestamp and drop columns that aren't required
taxi_unix = taxi.select("*",
                        F.unix_timestamp(taxi.tpep_pickup_datetime).alias('Pickup_Unix'),
                        F.unix_timestamp(taxi.tpep_dropoff_datetime).alias('Dropoff_Unix'))
taxi_unix.show(2)

In [0]:
# Create the Duration and Speed Column
taxi_ds = taxi_unix.withColumn("Duration",
                               F.round((taxi_unix.Dropoff_Unix-taxi_unix.Pickup_Unix)/60,2))\
                   .withColumn("Speed",
                               F.round(60*(taxi_unix.trip_distance/((taxi_unix.Dropoff_Unix-taxi_unix.Pickup_Unix)/60)),2))\
                   .drop("Dropoff_Unix","tpep_pickup_datetime","tpep_dropoff_datetime")
taxi_ds.show(5)

In [0]:
taxi_ds.show(10)

In [0]:
def calculate_percentile(df):
  """
  Function to calculate percentiles by division
  https://stackoverflow.com/questions/57424211/how-compute-the-percentile-in-pyspark-dataframe-for-each-key
  """
  for feature in df.columns:
    if feature != 'Division' and feature != 'Pickup_Unix':
      print("Percentiles for {}".format(feature))
      df.groupby('Division').agg(F.expr('percentile('+feature+', array(0.25))')[0].alias('25%'),
                                 F.expr('percentile('+feature+', array(0.50))')[0].alias('50%'),
                                 F.expr('percentile('+feature+', array(0.75))')[0].alias('75%'),
                                 F.expr('percentile('+feature+', array(0.90))')[0].alias('90%'),
                                 F.expr('percentile('+feature+', array(0.95))')[0].alias('95%'),
                                 F.expr('percentile('+feature+', array(0.96))')[0].alias('96%'),
                                 F.expr('percentile('+feature+', array(0.98))')[0].alias('98%'),
                                 F.expr('percentile('+feature+', array(0.99))')[0].alias('99%'),
                                 F.expr('percentile('+feature+', array(0.995))')[0].alias('99.5%'),
                                 F.expr('percentile('+feature+', array(0.999))')[0].alias('99.9%'),
                                 F.expr('percentile('+feature+', array(1.00))')[0].alias('100%'))\
         .show()
      
calculate_percentile(taxi_ds)

<h4> Trip Duration

**Observation:**
- Values seem fine until 90th percentile.
- 100th Percentile value is very large for a trip duration. 
- Values until 99th percentile seem okay.
- The outlier is just between 99th and 100th percentile.

We can look further into this if needed. 
But from the NYC Taxi & Limousine Commission regulations, drivers must limit themselves to 12 working hours/day (https://abc7ny.com/cab-drivers-taxi-hours-new-york-city/1353642/). 

So we can consider the thresholds to be 
- **minimum : 1 minute** 
- **maximum : 720 minutes** (12hrs\*60min)

**Observation:**

In [0]:
# Filtering based on 1 =< Duration =< 720
filtered_duration_df = taxi_ds.where((taxi_ds.Duration>=1) & (taxi_ds.Duration<=720))

<h4> Speed

**Observation:**
- From analyzing the percentile values, we can see that 99.9% of the values are below 47.6

So we can consider the thresholds to be 
- **minimum : 1 mile/hr** 
- **maximum : 72miles/hr**

In [0]:
# Filtering based on 1 <= Speed <= 720
filtered_speed_df = filtered_duration_df.where((filtered_duration_df.Speed>0) & (filtered_duration_df.Speed<=72))

<h4> Trip Distance

A good cutoff based on  distance seems to be 42 miles. We can remove the remaining data

In [0]:
# Filtering based on 0 < Trip Distance <= 10
filtered_distance_df = filtered_speed_df.where((filtered_speed_df['trip_distance']>0) & (filtered_speed_df['trip_distance']<=42))

<h4> Total Fare Amount

**Observation:**
- There are some negative fare values which does not make any logical sense.

The minimum fare amount according to the New York TLC is $2.50 (https://www1.nyc.gov/site/tlc/passengers/taxi-fare.page), so we can remove all datapoints with values lower than that.

In [0]:
# Filtering based on 0 < Fair Amount <= 10
filtered_fare_df = filtered_distance_df.where((filtered_distance_df['total_amount']>=2.5))

<h4> Passenger Count

From general analysis, we saw that there are some NULL values in this column. We will filter out the rows with these NULL passenger_count values and with passenger count of 0

In [0]:
#Check number of NULL values
print("Number of NULL values in passenger_count attribute: ",filtered_fare_df.filter((F.isnull(filtered_fare_df['passenger_count'])==True) | (filtered_fare_df['passenger_count']==0)).count())

#Filter out rows with NULL passenger count
filtered_pass_df = filtered_fare_df.filter((F.isnull(filtered_fare_df['passenger_count'])==False) & (filtered_fare_df['passenger_count']>0))

###Binning

In [0]:
t = filtered_pass_df.withColumn("BIN",((filtered_pass_df.Pickup_Unix - 1546300800)/1800).cast("int")).withColumn("COUNT",F.lit(1))



taxi_binned = t.groupBy(["BIN","Division"])\
                .agg(F.mean("Duration"),F.mean("Speed"),F.mean("trip_distance"),F.mean("total_amount"),F.sum("COUNT"))\
                .sort("BIN")\
                .drop("BIN")
taxi_binned.show(10)

<h3> Remove Outliers

In [0]:
def remove_outliers(df):
  """
  This function preprocesses our data,
  removes unnecessary outliers and
  creates 10 min time bins
  """
  
  # Create Pickup and Dropoff UNIX time from datetime feature
  df = df.select("*", F.unix_timestamp(df.tpep_pickup_datetime).alias('Pickup_Unix'),
                      F.unix_timestamp(df.tpep_dropoff_datetime).alias('Dropoff_Unix'))
  
  #Changing the datatype in numerical columns to double
  tcols = ['passenger_count', 'trip_distance', 'total_amount']
  for c in tcols:
    # add condition for the cols to be type cast
    df = df.withColumn(c, df[c].cast('double'))
  
  # Create Duration and Speed columns, remove Dropoff_Unix, datetime of pickup and dropoff
  df = df.withColumn("Duration", F.round((df['Dropoff_Unix'] - df['Pickup_Unix'])/60,2))\
                        .withColumn("Speed", F.round(60*(df['trip_distance'] / ((df['Dropoff_Unix']-df['Pickup_Unix'])/60)),2))\
                        .drop("Dropoff_Unix","tpep_dropoff_datetime","tpep_pickup_datetime")
  
  
  #Filter Outliers, Add Time Bin
  df = df.where((df['Duration']>=1) & (df['Duration']<=720))\
         .where((df['Speed']>0) & (df['Speed']<=72))\
         .where((df['trip_distance']>0) & (df['trip_distance']<=42))\
         .where((df['total_amount']>=2.5) & (df['total_amount']<=150))\
         .filter((F.isnull(df['passenger_count'])==False) & (df['passenger_count']>0))\
         .withColumn("BIN",((df.Pickup_Unix - 1546300800)/1800).cast("int")).withColumn("COUNT",F.lit(1))

  # Group By Time Bins and Divisions and get final dataset
  df = df.groupBy(["BIN","Division"])\
         .agg(F.mean("Duration"),F.mean("Speed"),F.mean("trip_distance"),F.mean("total_amount"),F.sum("COUNT"))\
         .sort("BIN")\
         .drop("BIN")

  return df


taxi_clean = remove_outliers(taxi).cache()
taxi_clean.show(10)

In [0]:
divs = ["Manhattan", "Queens", "Brooklyn", "Bronx", "Unknown"]

for x in divs:
  temp = t.withColumn("Division", F.when(F.col("Division")==x, 1).otherwise(0))
  locals()["df_" + x] = temp.filter("Division > 0")

divs = ["Manhattan", "Queens", "Brooklyn", "Bronx", "Unknown"]
dfs = [df_Manhattan, df_Queens, df_Brooklyn, df_Bronx, df_Unknown]

for i in range(0,5):
  #groupBy and .filter("Division == 'Bronx'")\
  locals()["s" + divs[i]] = dfs[i].groupBy("BIN")\
                .agg(F.mean("Duration"),F.mean("Speed"),F.mean("trip_distance"),F.mean("total_amount"),F.sum("COUNT"))\
        .sort("BIN")\
        .drop("BIN")

##Check Granger Causality

In [0]:
from statsmodels.tsa.stattools import grangercausalitytests

maxlag=12
test = 'ssr_chi2test'
# [10 min, hourly ,12 hours, 24 hours, Weekly]
# [1,6,72,144,1008]
# sdf.select(sdf.columns[i]).collect()

def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False):    

    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data.select(r,c).collect(), maxlag=maxlag, verbose=False)
            p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

grangers_causation_matrix(sManhattan, variables = sManhattan.columns) 

Unnamed: 0,avg(Duration)_x,avg(Speed)_x,avg(trip_distance)_x,avg(total_amount)_x,sum(COUNT)_x
avg(Duration)_y,1.0,0.0,0.0,0.0,0.0
avg(Speed)_y,0.0,1.0,0.0,0.0,0.0
avg(trip_distance)_y,0.0,0.0,1.0,0.0,0.0
avg(total_amount)_y,0.0,0.0,0.0,1.0,0.0
sum(COUNT)_y,0.0,0.0,0.0,0.0,1.0


##Perform Johanson's Cointegration Test and Report Summary

In [0]:
from statsmodels.tsa.vector_ar.vecm import coint_johansen

def adjust(val, length= 6): return str(val).ljust(length)

def cointegration_test(df, alpha=0.05): 
    """Perform Johanson's Cointegration Test and Report Summary"""
    df = df.toPandas()
    out = coint_johansen(df,-1,5)
    d = {'0.90':0, '0.95':1, '0.99':2}
    traces = out.lr1
    cvts = out.cvt[:, d[str(1-alpha)]]
    

    # Summary
    print('Name   ::  Test Stat > C(95%)    =>   Signif  \n', '--'*20)
    for col, trace, cvt in zip(df.columns, traces, cvts):
        print(adjust(col), ':: ', adjust(round(trace,2), 9), ">", adjust(cvt, 8), ' =>  ' , trace > cvt)
        
        
cointegration_test(sManhattan)

<h3> Train & Test Split

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType

nobs = round(0.2*(sManhattan.count()))
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
schema = StructType([
        StructField("avg(Duration)", DoubleType(), True),
        StructField("avg(Speed)", DoubleType(), True),
        StructField("avg(trip_distance)", DoubleType(), True),
        StructField("avg(total_amount)", DoubleType(), True),
        StructField("sum(COUNT)", DoubleType(), True)
    ])

@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def trainSplit(df):
  nobs = round(0.05*len(df))
  df_train = df[0:-nobs]
  return df_train

@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def testSplit(df):
  nobs = round(0.05*len(df))
  df_test = df[-nobs:]
  return df_test

sdf_train = sManhattan.groupby().apply(trainSplit)
sdf_test = sManhattan.groupby().apply(testSplit)

<h3> Augmented Dickey-Fuller Test (ADF Test) for Stationarity

In [0]:
def adfuller_test(series, signif=0.05, name='', verbose=False):
    r = adfuller(series, autolag='AIC')
    output = {'test_statistic':round(r[0], 4), 'pvalue':round(r[1], 4), 'n_lags':round(r[2], 4), 'n_obs':r[3]}
    p_value = output['pvalue'] 

    if p_value <= signif:
        return [name," is stationary"]
    else:
        return [name," is non-stationary"]
      
      
      
      
schemaArr = StructType([
        StructField("Stationarity", StringType(), True),
        StructField("Column", StringType(), True)
    ])
@pandas_udf(schemaArr,PandasUDFType.GROUPED_MAP)
def adFullerStationarity_test(df_train):
  df_af = pd.DataFrame()
  for name, column in df_train.iteritems():
      x= adfuller_test(column, name=column.name)
      df_af = df_af.append({"Column":x[0],"Stationarity":x[1]},ignore_index=True )
  return df_af

sdf_train.groupby().apply(adFullerStationarity_test).show()

**As per the above analysis and tests, we conclude that we can use the Vector Autoregressor Model for our time series prediction**