In [2]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DataModelling').getOrCreate()

In [3]:
df = spark.read.csv('Trip.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- Make: string (nullable = true)
 |-- ManufactureYear: integer (nullable = true)
 |-- EngineModel: string (nullable = true)
 |-- DriverID: string (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- StopTime: timestamp (nullable = true)
 |-- DurationSec: integer (nullable = true)
 |-- DistanceKm: double (nullable = true)
 |-- FuelUsedL: double (nullable = true)
 |-- StartOdoKm: double (nullable = true)
 |-- IdleTimeSec: integer (nullable = true)
 |-- Loaded: integer (nullable = true)
 |-- Stops: integer (nullable = true)
 |-- MaxRPM: double (nullable = true)



In [4]:
#Remove invalid and extreme DurationSec observations
df2 = df.filter("DurationSec > 0").filter("DurationSec < 35000")
df2.count()

527991

In [5]:
#Remove invalid and extreme IdleTimeSec observations
df3 = df2.filter("IdleTimeSec > 0").filter("IdleTimeSec < 5000")
df3.count()

526058

In [6]:
#Remove invalid and extreme ManufactureYear observations
df4 = df3.filter("ManufactureYear > 2001").filter("ManufactureYear < 2020")
df4.count()

492497

In [6]:
df4.groupby("EngineModel").count().orderBy("count", ascending=False).show()

+-----------+------+
|EngineModel| count|
+-----------+------+
|     Type 6|298362|
|    Type 13|145289|
|     Type 9| 42957|
|     Type 5|  4187|
|    No Type|   921|
|     Type 4|   615|
|     Type 8|    76|
|     Type 7|    43|
|     Type 1|    16|
|     Type 2|    12|
|    Type 11|     6|
|    Type 12|     6|
|     Type 3|     4|
|    Type 10|     3|
+-----------+------+



In [7]:
from pyspark.sql.functions import col
df5 = df4.where(col("EngineModel").isin({"Type 6", "Type 13", "Type 5", "Type 4", "Type 8", "Type 7"}))
df5.groupby("EngineModel").count().orderBy("count", ascending=False).show()

+-----------+------+
|EngineModel| count|
+-----------+------+
|     Type 6|298362|
|    Type 13|145289|
|     Type 5|  4187|
|     Type 4|   615|
|     Type 8|    76|
|     Type 7|    43|
+-----------+------+



In [8]:
df5.groupby("Make").count().orderBy("count", ascending=False).show()

+-------+------+
|   Make| count|
+-------+------+
|No Make|422373|
| Make 1| 23807|
| Make 2|  1836|
| Make 3|   556|
+-------+------+



In [8]:
df6 = df5.filter("DistanceKm < 800").filter("DistanceKm > 0")
df6.count()

448369

In [9]:
df7 = df6.filter("FuelUsedL < 250")
df7.count()

448265

In [10]:
df8 = df7.filter("MaxRPM < 2500").filter("MaxRPM > 1250")
df8.count()

448173

In [11]:
df9 = df8.filter("Stops < 15")
df9.count()

438754

In [12]:
df10 = df9.drop("DriverID")
df11 = df10.drop("Loaded")
df11.printSchema()

root
 |-- Make: string (nullable = true)
 |-- ManufactureYear: integer (nullable = true)
 |-- EngineModel: string (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- StopTime: timestamp (nullable = true)
 |-- DurationSec: integer (nullable = true)
 |-- DistanceKm: double (nullable = true)
 |-- FuelUsedL: double (nullable = true)
 |-- StartOdoKm: double (nullable = true)
 |-- IdleTimeSec: integer (nullable = true)
 |-- Stops: integer (nullable = true)
 |-- MaxRPM: double (nullable = true)



In [13]:
df12 = df11.na.drop()
df12.count()

438754

In [15]:
df12.printSchema()

root
 |-- Make: string (nullable = true)
 |-- ManufactureYear: integer (nullable = true)
 |-- EngineModel: string (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- StopTime: timestamp (nullable = true)
 |-- DurationSec: integer (nullable = true)
 |-- DistanceKm: double (nullable = true)
 |-- FuelUsedL: double (nullable = true)
 |-- StartOdoKm: double (nullable = true)
 |-- IdleTimeSec: integer (nullable = true)
 |-- Stops: integer (nullable = true)
 |-- MaxRPM: double (nullable = true)



In [16]:
df12.groupby("EngineModel").count().orderBy("count", ascending=False).show()

+-----------+------+
|EngineModel| count|
+-----------+------+
|     Type 6|291746|
|    Type 13|142225|
|     Type 5|  4085|
|     Type 4|   611|
|     Type 8|    64|
|     Type 7|    23|
+-----------+------+



In [14]:
from pyspark.sql.functions import lit
type6 = df12.where(col("EngineModel").isin({"Type 6"})).withColumn("EngineModelNum",lit(6))
type6.count()

291746

In [15]:
type13 = df12.where(col("EngineModel").isin({"Type 13"})).withColumn("EngineModelNum",lit(13))
type13.count()

142225

In [16]:
type5 = df12.where(col("EngineModel").isin({"Type 5"})).withColumn("EngineModelNum",lit(5))
type5.count()

4085

In [17]:
type4 = df12.where(col("EngineModel").isin({"Type 4"})).withColumn("EngineModelNum",lit(4))
type4.count()

611

In [18]:
type8 = df12.where(col("EngineModel").isin({"Type 8"})).withColumn("EngineModelNum",lit(8))
type8.count()

64

In [19]:
type7 = df12.where(col("EngineModel").isin({"Type 7"})).withColumn("EngineModelNum",lit(7))
type7.count()

23

In [20]:
type13_2 = type13.sample(True, 291746/142225, 100)
type13_2.count()

291651

In [21]:
type5_2 = type5.sample(True, 291746/4085, 100)
type5_2.count()

291460

In [22]:
type4_2 = type4.sample(True, 291746/611, 100)
type4_2.count()

291934

In [23]:
type8_2 = type8.sample(True, 291746/64, 100)
type8_2.count()

292418

In [24]:
type7_2 = type7.sample(True, 291746/23, 100)
type7_2.count()

291542

In [25]:
dfAll = type6.union(type13_2).union(type5_2).union(type4_2).union(type8_2).union(type7_2)
dfAll.groupby("EngineModel").count().orderBy("count", ascending=False).show()

+-----------+------+
|EngineModel| count|
+-----------+------+
|     Type 8|292418|
|     Type 4|291934|
|     Type 6|291746|
|    Type 13|291651|
|     Type 7|291542|
|     Type 5|291460|
+-----------+------+



In [26]:
dfAll2 = dfAll.select('*', (dfAll.DistanceKm/dfAll.FuelUsedL).alias('FuelEfficiency'))
dfAll2.head()

Row(Make='No Make', ManufactureYear=2015, EngineModel='Type 6', StartTime=datetime.datetime(2019, 7, 27, 4, 39, 15), StopTime=datetime.datetime(2019, 7, 27, 6, 44, 56), DurationSec=7541, DistanceKm=200.47, FuelUsedL=66.98, StartOdoKm=963748.96, IdleTimeSec=74, Stops=2, MaxRPM=1486.0, EngineModelNum=6, FuelEfficiency=2.9929829799940277)

In [27]:
from pyspark.sql.types import IntegerType
dfAll2 = dfAll2.withColumn("EngineModelNum", dfAll2["EngineModelNum"].cast(IntegerType()))
dfAll2.printSchema()

root
 |-- Make: string (nullable = true)
 |-- ManufactureYear: integer (nullable = true)
 |-- EngineModel: string (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- StopTime: timestamp (nullable = true)
 |-- DurationSec: integer (nullable = true)
 |-- DistanceKm: double (nullable = true)
 |-- FuelUsedL: double (nullable = true)
 |-- StartOdoKm: double (nullable = true)
 |-- IdleTimeSec: integer (nullable = true)
 |-- Stops: integer (nullable = true)
 |-- MaxRPM: double (nullable = true)
 |-- EngineModelNum: integer (nullable = false)
 |-- FuelEfficiency: double (nullable = true)



In [28]:
dfAll3 = dfAll2.drop("Make")
dfAll3.printSchema()

root
 |-- ManufactureYear: integer (nullable = true)
 |-- EngineModel: string (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- StopTime: timestamp (nullable = true)
 |-- DurationSec: integer (nullable = true)
 |-- DistanceKm: double (nullable = true)
 |-- FuelUsedL: double (nullable = true)
 |-- StartOdoKm: double (nullable = true)
 |-- IdleTimeSec: integer (nullable = true)
 |-- Stops: integer (nullable = true)
 |-- MaxRPM: double (nullable = true)
 |-- EngineModelNum: integer (nullable = false)
 |-- FuelEfficiency: double (nullable = true)



In [29]:
dfAll4 = dfAll3.drop("FuelUsedL")
dfAll5 = dfAll4.drop("EngineModel")
dfAll5.printSchema()

root
 |-- ManufactureYear: integer (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- StopTime: timestamp (nullable = true)
 |-- DurationSec: integer (nullable = true)
 |-- DistanceKm: double (nullable = true)
 |-- StartOdoKm: double (nullable = true)
 |-- IdleTimeSec: integer (nullable = true)
 |-- Stops: integer (nullable = true)
 |-- MaxRPM: double (nullable = true)
 |-- EngineModelNum: integer (nullable = false)
 |-- FuelEfficiency: double (nullable = true)



In [33]:
dfAll5.agg({"FuelEfficiency": "max"}).collect()[0][0]

67700.0

In [34]:
dfAll5.filter("FuelEfficiency > 10").count()

50

In [30]:
dfAll6 = dfAll5.filter("FuelEfficiency < 10")

In [33]:
from pyspark.sql.functions import hour
final = dfAll6.select('ManufactureYear'
                      ,hour('StartTime').alias('StartTimeHour')
                      ,hour('StopTime').alias('StopTimeHour')
                      ,'DurationSec'
                      ,'DistanceKm'
                      ,'StartOdoKm'
                      ,'IdleTimeSec'
                      ,'Stops'
                      ,'MaxRPM'
                      ,'EngineModelNum'
                      ,col('FuelEfficiency').alias('label'))

In [35]:
final.count()

1750701

In [36]:
train,test = final.randomSplit([0.7,0.3], 100)

In [38]:
train.count()

1224147

In [39]:
test.count()

526554

In [37]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler().setInputCols(['ManufactureYear'
                                            ,'StartTimeHour'
                                            ,'StopTimeHour'
                                            ,'DurationSec'
                                            ,'DistanceKm'
                                            ,'StartOdoKm'
                                            ,'IdleTimeSec'
                                            ,'Stops'
                                            ,'MaxRPM'
                                            ,'EngineModelNum']).setOutputCol('features')

In [38]:
train2 = assembler.transform(train)
train3 = train2.select('features','label')
train3.show()

+--------------------+------------------+
|            features|             label|
+--------------------+------------------+
|[2009.0,0.0,2.0,5...|   3.3267943687976|
|[2009.0,0.0,2.0,7...|2.4517665740373165|
|[2009.0,0.0,2.0,9...| 2.983807992650436|
|[2009.0,0.0,3.0,7...|2.6102774631936576|
|[2009.0,0.0,3.0,1...| 3.200555384140697|
|[2009.0,0.0,3.0,1...| 3.056904176904177|
|[2009.0,1.0,3.0,8...| 2.592090859332239|
|[2009.0,1.0,5.0,1...| 2.751948714212865|
|[2009.0,1.0,5.0,1...|2.9934026699782676|
|[2009.0,2.0,4.0,5...|2.6429868819374374|
|[2009.0,2.0,4.0,6...|3.1517422255526415|
|[2009.0,2.0,5.0,9...|  2.88475499092559|
|[2009.0,2.0,6.0,1...| 2.928304191799649|
|[2009.0,3.0,4.0,5...| 3.167464114832536|
|[2009.0,4.0,6.0,7...|2.7303998960416673|
|[2009.0,4.0,6.0,8...| 2.693421386842774|
|[2009.0,5.0,6.0,4...|3.5429958557249295|
|[2009.0,5.0,7.0,5...|3.0603174603174605|
|[2009.0,5.0,7.0,7...|2.4107026886608653|
|[2009.0,5.0,8.0,1...| 2.887261146496815|
+--------------------+------------

In [39]:
test2 = assembler.transform(test)
test3 = test2.select('features','label')
test3.show()

+--------------------+------------------+
|            features|             label|
+--------------------+------------------+
|[2009.0,0.0,3.0,8...|3.0437869822485206|
|[2009.0,1.0,2.0,4...| 3.339280657037666|
|[2009.0,1.0,2.0,5...|2.9571917808219177|
|[2009.0,1.0,3.0,6...| 2.734478808705613|
|[2009.0,2.0,4.0,6...|3.2407407407407405|
|[2009.0,2.0,7.0,1...| 3.215968063872255|
|[2009.0,3.0,4.0,4...|3.0893796004206098|
|[2009.0,3.0,7.0,1...| 2.960147196993423|
|[2009.0,5.0,6.0,5...|2.6497677236921833|
|[2009.0,5.0,6.0,5...|3.3036688150718887|
|[2009.0,7.0,8.0,4...|3.2733612273361223|
|[2009.0,7.0,9.0,8...|2.8866911556936987|
|[2009.0,7.0,9.0,9...| 4.041855380290489|
|[2009.0,8.0,9.0,5...|2.9668311403508776|
|[2009.0,8.0,10.0,...| 3.059287531806616|
|[2009.0,8.0,11.0,...| 3.260088238459055|
|[2009.0,9.0,10.0,...| 3.286251468860165|
|[2009.0,9.0,11.0,...| 3.145325472396404|
|[2009.0,10.0,12.0...| 4.011396843950906|
|[2009.0,10.0,13.0...| 2.845262845262845|
+--------------------+------------

In [64]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features', labelCol='label', predictionCol='prediction')

In [65]:
# Fit the model to the data.
lrModel = lr.fit(train3)

In [66]:
# Print the coefficients and intercept for linear regression.
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [-0.00522269017576474,-0.0037607558649900186,0.004484601611952515,2.4930888268065797e-06,-0.0004761530362410274,4.9093101665071176e-08,-0.00016100800562631628,0.01438781127029656,0.001043047912654288,0.0012886581700199862] Intercept: 12.374395690530703


In [67]:
# Let's evaluate the model against the test data.
test_results = lrModel.evaluate(test3)

In [68]:
# Let's get some evaluation metrics.
print("RSME: {}".format(test_results.rootMeanSquaredError))

RSME: 0.5783728229532074


In [69]:
final.select('label').describe().show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|           1750701|
|   mean|3.5954739260050115|
| stddev|0.6985308048074458|
|    min|0.9618255125675561|
|    max| 8.584776758811211|
+-------+------------------+



In [70]:
# We can also get the R2 value. 
print("R2: {}".format(test_results.r2))

R2: 0.31646807971092716


In [41]:
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(featuresCol='features', labelCol='label', predictionCol='prediction').setFamily("gaussian")

In [42]:
glrModel = glr.fit(train3)

In [44]:
print("Coefficients: {} Intercept: {}".format(glrModel.coefficients,glrModel.intercept))

Coefficients: [-0.00522269017576474,-0.0037607558649900186,0.004484601611952515,2.4930888268065797e-06,-0.0004761530362410274,4.9093101665071176e-08,-0.00016100800562631628,0.01438781127029656,0.001043047912654288,0.0012886581700199862] Intercept: 12.374395690530703


In [45]:
gtest_results = glrModel.evaluate(test3)

In [54]:
summary = glrModel.summary

In [57]:
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

Coefficient Standard Errors: [0.00040157651492406086, 9.734479506689995e-05, 9.633732281847847e-05, 1.2740681426848287e-06, 4.424127995630865e-05, 1.890613658756191e-09, 1.911302155315486e-06, 0.000249199347224389, 2.25908935929478e-06, 0.0002372535193524574, 0.8089227621348708]
T Values: [-13.005467156744373, -38.63335335397696, 46.55103007587754, 1.9567939447515916, -10.762641512887098, 25.96675499391497, -84.23995399081197, 57.736151521059966, 461.7116664123, 5.431566088196106, 15.297376053398203]
P Values: [0.0, 0.0, 0.0, 0.05037192625549336, 0.0, 0.0, 0.0, 0.0, 0.0, 5.5872214632657347e-08, 0.0]
Dispersion: 0.33321121237944956
Null Deviance: 596554.3716863757
Residual Degree Of Freedom Null: 1224146
Deviance: 407895.84067732986
Residual Degree Of Freedom: 1224136
AIC: 2128680.195656531
Deviance Residuals: 
+--------------------+
|   devianceResiduals|
+--------------------+
|-0.00656263439100...|
|  -1.181148124296918|
| -0.3381561317460209|
| -0.8490807102427738|
|-0.0750715495599