In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Assignment 1")
    .config("spark.driver.memory", "15g")
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/30 08:36:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
data = spark.read.parquet('../data/curated/')

In [3]:
import pandas as pd

from pyspark.sql.functions import *
from pyspark.sql import SparkSession, DataFrame

from pyspark.ml import Pipeline
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.classification import MultilayerPerceptronClassifier    
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

First, we will build an ANOVA model to check the relevance of all listed features

In [4]:
sample = data.sample(0.005, seed = 5).toPandas().copy()

                                                                                

In [5]:
sample["PULocationID"] = pd.Categorical(sample["PULocationID"])
sample["pickup_hour"] = pd.Categorical(sample["pickup_hour"])
sample["pickup_dayofweek"] = pd.Categorical(sample["pickup_dayofweek"])
sample.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 340803 entries, 0 to 340802
Data columns (total 23 columns):
 #   Column                     Non-Null Count   Dtype   
---  ------                     --------------   -----   
 0   PULocationID               340803 non-null  category
 1   passenger_count            340803 non-null  float64 
 2   year                       340803 non-null  int32   
 3   month                      340803 non-null  int32   
 4   pickup_hour                340803 non-null  category
 5   pickup_dayofweek           340803 non-null  category
 6   borough_income_order       336752 non-null  float64 
 7   tip_class                  340803 non-null  float64 
 8   transformed_trip_distance  340803 non-null  float64 
 9   transformed_total_amount   340803 non-null  float64 
 10  SNOW                       340799 non-null  float64 
 11  SNWD                       340799 non-null  float64 
 12  TAVG                       340799 non-null  float64 
 13  TMAX          

In [6]:
import statsmodels.api as sm
from statsmodels.formula.api import ols

In [7]:
sm_model = ols("tip_class ~ PULocationID + pickup_hour + pickup_dayofweek + passenger_count + borough_income_order + tip_class + transformed_trip_distance+  transformed_total_amount +SNOW+ SNWD +TAVG+ TMAX + TMIN+ fog+ freezing_fog+ thunder+ pellets +hail+ glaze+ smoke+ drifting_snow", data=sample).fit()

In [8]:
table = sm.stats.anova_lm(sm_model, typ=2)
print(table)

                                 sum_sq        df             F         PR(>F)
PULocationID               8.247425e-22     235.0  4.417537e+03   0.000000e+00
pickup_hour                6.850595e-25      23.0  3.749127e+01  3.809417e-167
pickup_dayofweek           6.222471e-26       6.0  1.305393e+01   7.959697e-15
passenger_count            7.561839e-26       1.0  9.518251e+01   1.748145e-22
borough_income_order       1.931723e-24       1.0  2.431501e+03   0.000000e+00
tip_class                  2.551554e+05       1.0  3.211696e+32   0.000000e+00
transformed_trip_distance  4.068071e-26       1.0  5.120569e+01   8.334578e-13
transformed_total_amount   4.549565e-25       1.0  5.726637e+02  1.887859e-126
SNOW                       1.197683e-26       1.0  1.507550e+01   1.033152e-04
SNWD                       3.205948e-25       1.0  4.035396e+02   1.054804e-89
TAVG                       1.033779e-26       1.0  1.301240e+01   3.094799e-04
TMAX                       1.996007e-25       1.0  2



From the above output we can see that every output is significant. Therefore, we consider all these significant parameters when doing performing machine learning.

## Vectorisation and Scaling of features

## For training data

We start by pipelining,

In [9]:
categories = ["PULocationID", "pickup_dayofweek", "pickup_hour"]
indexers = [StringIndexer(inputCol=cat, outputCol=cat+"_index") for cat in categories]
encoders = [OneHotEncoder(inputCol=cat+"_index", outputCol=cat+"_encoded") for cat in categories]
transformed_data = Pipeline(stages=indexers+encoders).fit(data).transform(data)
    
for cat in categories:
    transformed_data = transformed_data.drop(cat, cat+"_index") 

                                                                                

In [10]:
train_data = transformed_data.filter(col("year") == 2018).drop("year", "month")
test_data = transformed_data.filter(col("year") == 2019).drop("year", "month")

In [11]:
# For numerical variables
def numeric(transformed_data):
    numerical = [cat for cat in transformed_data.columns if not ("_encoded" in cat) and (cat != "tip_class")]
    va = VectorAssembler(inputCols=numerical, outputCol="to_scale")
    sc = StandardScaler(inputCol="to_scale", outputCol="scaled")

    data = va.transform(transformed_data)
    data = sc.fit(data).transform(data)

    for cat in numerical:
        data = data.drop(cat)
    data = data.drop("to_scale")
    return data

In [12]:
train_data =numeric(train_data)
test_data = numeric(transformed_data)

22/08/30 08:37:15 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 11:>                                                        (0 + 8) / 12]

22/08/30 08:37:17 ERROR Executor: Exception in task 5.0 in stage 11.0 (TID 57)
org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$4147/102444414: (struct<passenger_count:double,borough_income_order_double_VectorAssembler_aaea028d416e:double,transformed_trip_distance:double,transformed_total_amount:double,SNOW:double,SNWD:double,TAVG_double_VectorAssembler_aaea028d416e:double,TMAX_double_VectorAssembler_aaea028d416e:double,TMIN_double_VectorAssembler_aaea028d416e:double,fog:double,freezing_fog:double,thunder:double,pellets:double,hail:double,glaze:double,smoke:double,drifting_snow:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:177)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expres

[Stage 11:>                                                        (0 + 1) / 12]

22/08/30 08:37:18 WARN TaskSetManager: Lost task 10.0 in stage 11.0 (TID 62) (chaitanyas-air executor driver): TaskKilled (Stage cancelled)


Py4JJavaError: An error occurred while calling o385.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 11.0 failed 1 times, most recent failure: Lost task 4.0 in stage 11.0 (TID 56) (chaitanyas-air executor driver): org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$4147/102444414: (struct<passenger_count:double,borough_income_order_double_VectorAssembler_aaea028d416e:double,transformed_trip_distance:double,transformed_total_amount:double,SNOW:double,SNWD:double,TAVG_double_VectorAssembler_aaea028d416e:double,TMAX_double_VectorAssembler_aaea028d416e:double,TMIN_double_VectorAssembler_aaea028d416e:double,fog:double,freezing_fog:double,thunder:double,pellets:double,hail:double,glaze:double,smoke:double,drifting_snow:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:177)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:161)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:83)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 25 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$4147/102444414: (struct<passenger_count:double,borough_income_order_double_VectorAssembler_aaea028d416e:double,transformed_trip_distance:double,transformed_total_amount:double,SNOW:double,SNWD:double,TAVG_double_VectorAssembler_aaea028d416e:double,TMAX_double_VectorAssembler_aaea028d416e:double,TMIN_double_VectorAssembler_aaea028d416e:double,fog:double,freezing_fog:double,thunder:double,pellets:double,hail:double,glaze:double,smoke:double,drifting_snow:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:177)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:161)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:83)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 25 more


In [None]:
# vectorisation of all features
train_data = test_data.withColumnRenamed("tip_class", "label")
test_data = test_data.withColumnRenamed("tip_class", "label")
train_data = VectorAssembler(inputCols=train_data.drop("label").columns, outputCol="features").transform(train_data)
test_data = VectorAssembler(inputCols=test_data.drop("label").columns, outputCol="features").transform(test_data)

In [None]:
 # input count can be seen from the output above
layers = [279, 50, 3] 

model_nn = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", maxIter=5, layers=layers, blockSize=128, seed=10)
nn_fit = model_nn.fit(train_data.select('label','features'))

In [None]:
predictions= nn_fit.transform(train_data)
prediction = nn_fit.transform(test_data)

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
evaluator.evaluate(predictions.select("prediction", "label"))

In [None]:
 # vectorisation of all features
    data = data.withColumnRenamed("tip_class", "label")
    transformed_data = VectorAssembler(inputCols=data.drop("label").columns, outputCol="features").transform(data)
