# MAST30034 Project 1
## Statistical Modelling

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.formula.api import ols, glm

In [None]:
from functools import reduce 
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [210]:
taxi = spark.read.parquet('../../Project 1/DataFrames/taxi')
weather = spark.read.parquet('../../Project 1/DataFrames/weather')
taxi = taxi.withColumn("pickup_date",to_date(col("pickup_time")))
taxi = taxi.drop("pickup_time", "dropoff_time")
sdf = taxi.join(weather,taxi.pickup_date ==  weather.date,"inner")
sdf = sdf.drop('date')
sdf.write.parquet("../../Project 1/DataFrames/stats_modelling")

                                                                                

In [188]:
from pyspark.ml.feature import RFormula

#eatures = 'features'
#input_cols = ['fare_amount', 'passenger_count', 'pickup_location', 'trip_distance_km', 'temp', 'dew_point', 'pressure', 'wind_speed', 'wind_direction']

formula=RFormula(formula = "tip_amount ~ fare_amount + passenger_count + pickup_location + trip_distance_km + temp + dew_point + pressure + wind_speed + wind_direction", featuresCol= "features", labelCol= "label")
output = formula.fit(sdf).transform(sdf)
model_sdf = output.select("label","features")

label,features
0.0,"[12.0,1.0,77.0,4...."
8.5,"[35.5,2.0,127.0,1..."
3.06,"[14.0,5.0,65.0,6...."
0.0,"[6.0,1.0,255.0,2...."
0.0,"[23.0,1.0,112.0,1..."
2.0,"[15.5,1.0,112.0,5..."
1.46,"[6.0,1.0,255.0,1...."
1.26,"[5.0,1.0,255.0,1...."
1.82,"[6.0,1.0,112.0,1...."
1.82,"[6.0,1.0,49.0,1.8..."


                                                                                

pickup_location,dropoff_location,passenger_count,fare_amount,tip_amount,total_amount,surcharge_amount,trip_distance_km,trip_time_min,temp,dew_point,pressure,wind_speed,wind_direction
1.0,0.9999999999999998,0.01996316320900586,0.06688233158306747,0.012644113141148489,0.06557253527454392,0.01919616612676156,0.07330988195752552,0.0450067871568211,0.002974544275576...,-4.76438782707553...,0.003145881396222141,9.006246451320172E-4,0.003705401239887254
0.9999999999999998,1.0,0.01996316320900586,0.06688233158306747,0.012644113141148489,0.06557253527454392,0.01919616612676156,0.07330988195752552,0.0450067871568211,0.002974544275576...,-4.76438782707553...,0.003145881396222141,9.006246451320172E-4,0.003705401239887254
0.01996316320900586,0.01996316320900586,1.0,-0.00642606107105965,0.011859687366910154,-0.00351897467210...,-1.49699826266390...,-0.00986543347472...,-0.01088876775295...,0.003712325705170...,0.001336358553391...,8.053478648682649E-4,9.682070047532162E-4,0.002044096604317...
0.06688233158306747,0.06688233158306747,-0.00642606107105965,1.0,0.1591737439110843,0.9793769185902902,0.37565029542079464,0.9600858251314356,0.9038969695089076,-0.00272200755028...,-0.01377729446748...,0.001690981672328...,-3.08241128759099...,0.013645398318871111
0.012644113141148489,0.012644113141148489,0.011859687366910154,0.1591737439110843,1.0,0.3290089977710959,0.20553422094922985,0.14935529812733045,0.10747393483403195,0.020324687870879994,0.01788379922615095,-6.98868272864239...,0.004922673726705788,-4.6298981260174E-4
0.06557253527454392,0.06557253527454392,-0.00351897467210...,0.9793769185902902,0.3290089977710959,1.0,0.48180120351740036,0.9444751730159798,0.8742515002150799,-1.37011356277725...,-0.00924954634552...,3.168813279737766E-4,-3.39654679737998...,0.011129833049161094
0.01919616612676156,0.01919616612676156,-1.49699826266390...,0.37565029542079464,0.20553422094922985,0.48180120351740036,1.0,0.40242109526936376,0.29418737848579846,-0.00681583007667...,0.004641551853424101,-0.00986078778706...,-0.01275305524518949,-0.01218184350465...
0.07330988195752552,0.07330988195752552,-0.00986543347472...,0.9600858251314356,0.14935529812733045,0.9444751730159798,0.40242109526936376,1.0,0.8251673100297426,-0.00500601565292...,-0.01499813113411...,0.001851051815036...,8.59199139286539E-4,0.012921949456517319
0.0450067871568211,0.0450067871568211,-0.01088876775295...,0.9038969695089076,0.10747393483403195,0.8742515002150799,0.29418737848579846,0.8251673100297426,1.0,-0.00263093296542...,-0.01181004952818...,0.003273052202112...,-0.00299800569489...,0.010001954376855719
0.002974544275576...,0.002974544275576...,0.003712325705170...,-0.00272200755028...,0.020324687870879994,-1.37011356277725...,-0.00681583007667...,-0.00500601565292...,-0.00263093296542...,1.0,0.5042687968614006,-0.05073384701749247,-0.10852718895036463,0.09139557565015524


## Logistic regression

In [166]:
from pyspark.ml.classification import LogisticRegression

model_sdf = model_sdf.withColumn("label", when(model_sdf["label"] > 0, 1).otherwise(model_sdf["label"]))

# Split the data into train and test
splits = model_sdf.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

# Fit the model
lrModel = LogisticRegression().fit(train)
lrModel.summary
# Print the coefficients and intercept for logistic regression
# print("Coefficients: " + str(lrModel.coefficients))
# print("Intercept: " + str(lrModel.intercept))

                                                                                

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x7f7d732b5f40>

In [None]:
fit_weather = ols(
    formula="tip_amount ~ temp + dew_point + pressure + wind_speed + wind_direction",
    data=df
).fit()

fit_taxi = ols(
    formula="tip_amount ~ pickup_location + passenger_count + fare_amount + trip_distance_km",
    data=df
).fit()

fit_all = ols(
    formula="tip_amount ~ temp + dew_point + pressure + wind_speed + wind_direction + pickup_location + passenger_count + fare_amount + trip_distance_km",
    data=df
).fit()


## Multilayer Perceptron

In [180]:
#sdf[['tip_amount']] <= 10)
(sdf.select('tip_amount').where(sdf.tip_amount == 0).count() / sdf.select('tip_amount').count())*100


34.220367188411906

As over 99% of people give a tip <= $10, thus for simplicity, the percentron will be trained only on instances meeting this criteria. 

In [177]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import RFormula

formula=RFormula(formula = "tip_amount ~ fare_amount + passenger_count + pickup_location + trip_distance_km + temp + dew_point + pressure + wind_speed + wind_direction", featuresCol= "features", labelCol= "label")
output = formula.fit(sdf).transform(sdf)
model_sdf = output.select("label","features")
#model_sdf = model_sdf.select("*",(round("label")))
#model_sdf = model_sdf.withColumn("label",col("round(label, 0)").cast("int"))
#final_model = model_sdf.drop("round(label, 0)")
#final_model = final_model.filter(final_model.label >= 0)
#data = final_model.filter(final_model.label <= 10)
model_sdf = model_sdf.withColumn("label", when(model_sdf["label"] > 0, 1).otherwise(model_sdf["label"]))
# Load training data
model_sdf = model_sdf.withColumn("label",col('label').cast("int"))

In [179]:

# Split the data into train and test
splits = model_sdf.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [9, 15, 13, 11, 9, 5, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))



Test set accuracy = 0.6586177311586593


                                                                                