In [1]:
import pyspark.sql.functions as psf
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import VectorAssembler
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
from pyspark.ml import Pipeline
import pandas as pd
import numpy as np
test = spark.read.csv('FileStore/tables/test.csv', header = True)
train = spark.read.csv('FileStore/tables/train.csv', header = True)

In [2]:
#1) data cleaning
#function for finding number of NAs in each column, to see if interpolation, data dropping, needed | add: min, max, mean, presence of outliers etc. to see if if/where cleaning necessary
def clean_info(df):
  n_col = len(df.columns)
  n_row = df.count()
  #initializing results dataframe
  data = {'Name':df.columns, 'Nas':np.repeat(0, n_col), 'Percent':np.repeat(0,n_col)} 
  result = pd.DataFrame(data) 
  #loop through each column and get number of NAs
  for i in range(0, n_col):
    #name
    result.iloc[i,0] = df.columns[i]
    #NAs
    result.iloc[i,1] = n_row - df.select(df.columns[i]).drop().count()
    #perc
    result.iloc[i,2] = result.iloc[i,1] / n_row
  return result

#this data is already cleaned, so no NAs, erroneous data, outliers, etc.
clean_info(train)

In [3]:
#2) feature engineering
#function to create features, in order to be able to apply to test set later on
def create_features(df):
  #converting strings to doubles
  columns = ['fare_amount', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count']
  for i in columns:
    df = df.withColumn(i, df[i].cast('double'))
  
  #change pickup datetime to datetime from string, day of week
  df.createOrReplaceTempView('data')
  to_datetime = "SELECT *, TO_TIMESTAMP(SUBSTRING(pickup_datetime, 1, 19)) AS pickup_datetime_new, DATE_FORMAT(TO_TIMESTAMP(SUBSTRING(pickup_datetime, 1, 19)), 'EEEE') AS dow FROM data"
  df = spark.sql(to_datetime)
  
  #create columns for year, month, day, hour, minute
  df = df.withColumn('year', psf.year(df.pickup_datetime_new))
  df = df.withColumn('month', psf.month(df.pickup_datetime_new))
  df = df.withColumn('day', psf.dayofmonth(df.pickup_datetime_new))
  df = df.withColumn('hour', psf.hour(df.pickup_datetime_new))
  df = df.withColumn('minute', psf.minute(df.pickup_datetime_new))
  
  #converting day of week to one hot encoding
  df = df.withColumn('dow_array', psf.split(psf.col('dow'),' '))
  dowVectorizer = CountVectorizer(inputCol='dow_array', outputCol='dow_one_hot', vocabSize=7, minDF=1.0)
  dowVectorizer_model = dowVectorizer.fit(df)
  df = dowVectorizer_model.transform(df)
  
  #holidays, static ones, could encode year specific or use library
  holidays = [(1,1),(7,4),(12,25)]
  df = df.withColumn('holiday', psf.lit(0))
  for i in range(0, len(holidays)):
    df = df.withColumn('holiday', ((df.day == holidays[i][1]) & (df.month == holidays[i][0])) | (df.holiday == True))
  
  #night time surcharge beween 20 and 6
  df = df.withColumn('night', (df.hour >= 20) | (df.hour <= 6))
  
  #dropping unnecessary columns
  drops = ['key', 'pickup_datetime', 'pickup_datetime_new', 'dow', 'dow_array']
  for i in drops:
    df = df.drop(i)
    
  #drop nas
  df = df.dropna()
  
  #drop fares less than equal to 0
  df = df.filter(df.fare_amount > 0)
  
  #creating label column, equal to fare_amount
  df = df.withColumn('label', df.fare_amount)
  
  return df

#creating the engineered training data
training_data = create_features(train)

In [4]:
#3) fitting models
#function for easily testing new algorithms
def test_algorithm(data, model, grid, evaluator_string, folds):
  #data: df with data, model: a pyspark.ml model object, grid: a tuning grid, evaluator_string: a string for the evaluator type to be used, folds: integer for k number of folds for cross validation
  
  #vector assembler
  vec_assembler = VectorAssembler(inputCols = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count', 'year', 'month', 'day', 'hour', 'minute', 'dow_one_hot', 'holiday', 'night'], outputCol = 'features')
  
  #pipeline
  taxi_pipe = Pipeline(stages=[vec_assembler])
  piped_data = taxi_pipe.fit(data).transform(data)
  training, test = piped_data.select(['features', 'label']).randomSplit([.8, .2])
  
  #evaluator
  evaluator = evals.RegressionEvaluator(metricName=evaluator_string)
  
  #cross validator
  cv = tune.CrossValidator(estimator=model,
               estimatorParamMaps=grid,
               evaluator=evaluator,
               numFolds=folds
               )
  #fit the cross-validated model
  models = cv.fit(training)
  best_model = models.bestModel
  
  #evaluate on the test set
  predictions = best_model.transform(test)
  evaluation = evaluator.evaluate(predictions)
  
  #best parameters
  best_params = best_model.extractParamMap()
  
  #returning model and metrics
  return best_model, evaluation, best_params

In [5]:
#decision tree
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'label')

#grid search
dt_grid = tune.ParamGridBuilder()
dt_grid = dt_grid.addGrid(dt.maxDepth, np.arange(5, 6))
dt_grid = dt_grid.build()

#evaluate the algorithm
dt_model, dt_eval, dt_best_params = test_algorithm(training_data, dt, grid, 'mse')

In [6]:
#linear regression
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='label')

#grid search
lr_grid = tune.ParamGridBuilder()
lr_grid = lr_grid.addGrid(lr.maxIter, np.arange(5, 11, 5))
lr_grid = lr_grid.addGrid(lr.elasticNetParam, [0,1])
lr_grid = lr_grid.build()

#evaluate the algorithm
lr_model, lr_eval, dt_best_params = test_algorithm(training_data, lr, lr_grid, 'mse')