In [24]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf,col
from datetime import datetime
import numpy as np
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator


In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc= SparkContext()
sqlContext = SQLContext(sc)


In [2]:
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('C:\\Users\\dpawa\\OneDrive\\Documents\\657\\asssign 2\\train.csv')
#train.show()

In [3]:
from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6372.8 # Radius of earth in kilometers. Use 3956 for miles
    distance =  c * r
    return abs(round(distance, 2))

In [4]:
udf_get_distance = F.udf(haversine)

In [5]:
train = (train.withColumn('DISTANCE', udf_get_distance(
train.pickup_longitude, train.pickup_latitude,
train.dropoff_longitude, train.dropoff_latitude)))

In [6]:
columns_to_drop = ['pickup_longitude', 'pickup_latitude','dropoff_longitude','dropoff_latitude']
train = train.drop(*columns_to_drop)

In [89]:
train.describe().show()

+-------+---------+------------------+------------------+------------------+-----------------+------------------+
|summary|       id|         vendor_id|   passenger_count|store_and_fwd_flag|    trip_duration|          DISTANCE|
+-------+---------+------------------+------------------+------------------+-----------------+------------------+
|  count|  1458644|           1458644|           1458644|           1458644|          1458644|           1458644|
|   mean|     null|1.5349502688798637|1.6645295219395548|              null|959.4922729603659|2.1365657007467114|
| stddev|     null|0.4987771539074011|1.3142421678231184|              null|5237.431724497624| 2.667889495100226|
|    min|id0000001|                 1|                 0|                 N|                1|               0.0|
|    max|id4000000|                 2|                 9|                 Y|          3526282|              9.99|
+-------+---------+------------------+------------------+------------------+------------

In [7]:
train = train.withColumn('store_and_fwd_flag', F.when(train.store_and_fwd_flag == 'N', 0).otherwise(1))
train.take(1)

[Row(id='id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55), dropoff_datetime=datetime.datetime(2016, 3, 14, 17, 32, 30), passenger_count=1, store_and_fwd_flag=0, trip_duration=455, DISTANCE='1.5')]

In [10]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

split_col = pyspark.sql.functions.split(train['pickup_datetime'], ' ')
train = train.withColumn('Date', split_col.getItem(0))
train = train.withColumn('Time', split_col.getItem(1))
split_date=pyspark.sql.functions.split(train['Date'], '-')     
train= train.withColumn('Year', split_date.getItem(0))
train= train.withColumn('Month', split_date.getItem(1))
train= train.withColumn('Day', split_date.getItem(2))
funcWeekDay =  udf(lambda x: datetime.strptime(x, '%Y-%m-%d').strftime('%w'))
train = train.withColumn('weekDay', funcWeekDay(col('Date')))
split_date=pyspark.sql.functions.split(train['Time'], ':')     
train= train.withColumn('hour', split_date.getItem(0))
train= train.withColumn('minutes', split_date.getItem(1))
train= train.withColumn('seconds', split_date.getItem(2))
train.take(1)


[Row(id='id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55), dropoff_datetime=datetime.datetime(2016, 3, 14, 17, 32, 30), passenger_count=1, store_and_fwd_flag=0, trip_duration=455, DISTANCE='1.5', Date='2016-03-14', Time='17:24:55', Year='2016', Month='03', Day='14', weekDay='1', hour='17', minutes='24', seconds='55')]

In [11]:
train = train.withColumn("Year", train["Year"].cast(IntegerType()))
train = train.withColumn("weekDay", train["weekDay"].cast(IntegerType()))
train = train.withColumn("Month", train["Month"].cast(IntegerType()))
train = train.withColumn("Day", train["Day"].cast(IntegerType()))
train = train.withColumn("hour", train["hour"].cast(IntegerType()))
train = train.withColumn("minutes", train["minutes"].cast(IntegerType()))
train = train.withColumn("seconds", train["seconds"].cast(IntegerType()))
train = train.withColumn("DISTANCE", train["DISTANCE"].cast(IntegerType()))

train.take(1)

[Row(id='id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55), dropoff_datetime=datetime.datetime(2016, 3, 14, 17, 32, 30), passenger_count=1, store_and_fwd_flag=0, trip_duration=455, DISTANCE=1, Date='2016-03-14', Time='17:24:55', Year=2016, Month=3, Day=14, weekDay=1, hour=17, minutes=24, seconds=55)]

In [12]:
columns_to_drop = ['pickup_datetime','dropoff_datetime', 'Date','Time']
train = train.drop(*columns_to_drop)

In [14]:
train.show(1)

+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+
|       id|vendor_id|passenger_count|store_and_fwd_flag|trip_duration|DISTANCE|Year|Month|Day|weekDay|hour|minutes|seconds|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+
|id2875421|        2|              1|                 0|          455|       0|2016|    3| 14|      1|  17|     24|     55|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+
only showing top 1 row



In [13]:
from pyspark.sql.functions import log
train = train.withColumn("log_duration", log(train["trip_duration"]) )
train.show(1)

+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+----------------+
|       id|vendor_id|passenger_count|store_and_fwd_flag|trip_duration|DISTANCE|Year|Month|Day|weekDay|hour|minutes|seconds|    log_duration|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+----------------+
|id2875421|        2|              1|                 0|          455|       1|2016|    3| 14|      1|  17|     24|     55|6.12029741895095|
+---------+---------+---------------+------------------+-------------+--------+----+-----+---+-------+----+-------+-------+----------------+
only showing top 1 row



In [14]:
train.cache()
train.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- store_and_fwd_flag: integer (nullable = false)
 |-- trip_duration: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- weekDay: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- seconds: integer (nullable = true)
 |-- log_duration: double (nullable = true)



In [48]:
train.describe().toPandas()


Unnamed: 0,summary,id,vendor_id,passenger_count,store_and_fwd_flag,trip_duration,DISTANCE,Year,Month,Day,weekDay,hour,minutes,seconds
0,count,1458644,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0,1458644.0
1,mean,,1.5349502688798635,1.6645295219395548,1.0,959.492272960366,3.4408639325291177,2016.0,3.516817674497684,15.504018115455176,3.1128177951576945,13.60648451575573,29.59015770811795,29.473590540255195
2,stddev,,0.4987771539074011,1.3142421678231184,0.0,5237.431724497624,4.296542880941734,0.0,1.6810375087348843,8.703135115281617,1.9928049699468888,6.399692034352387,17.324714120895614,17.319851679258015
3,min,id0000001,1.0,0.0,1.0,1.0,0.0,2016.0,1.0,1.0,0.0,0.0,0.0,0.0
4,max,id4000000,2.0,9.0,1.0,3526282.0,97.59,2016.0,6.0,31.0,6.0,23.0,59.0,59.0


In [15]:
vectorAssembler = VectorAssembler(inputCols = ['vendor_id', 'passenger_count', 'store_and_fwd_flag','DISTANCE', 'Year', 'Month','Day','weekDay','hour','minutes','seconds'], outputCol = 'features')
vtrain = vectorAssembler.transform(train)
vtrain = vtrain.select(['features', 'log_duration'])
vtrain.show(3) 

+--------------------+------------------+
|            features|      log_duration|
+--------------------+------------------+
|[2.0,1.0,0.0,1.0,...|  6.12029741895095|
|[1.0,1.0,0.0,1.0,...|6.4967749901858625|
|[2.0,1.0,0.0,6.0,...|  7.66105638236183|
+--------------------+------------------+
only showing top 3 rows



In [None]:
splits = vtrain.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

In [16]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'log_duration',maxDepth = 2)
#dt_model = dt.fit(train_df)
#dt_predictions = dt_model.transform(train_df)

In [76]:
#DRt
dt_evaluator = RegressionEvaluator(labelCol="log_duration", predictionCol="prediction", metricName="rmse")
#rmse = dt_evaluator.evaluate(dt_predictions)
#print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [34]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 3,5])
             .addGrid(dt.maxBins, [4, 8,32])
             .build())

dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dt_evaluator,
                      numFolds = 10)

# Run cross validations
dtcvModel = dtcv.fit(train_df)
print(dtcvModel)
dtpredictions = dtcvModel.transform(test_df)

# Use test set here so we can measure the accuracy of our model on new data


CrossValidatorModel_26a4ed946038


In [31]:
mae = dt_evaluator.evaluate(dtpredictions)
print(" Mean Absolute Error (MAE) on test data = %g" % mae)

 Mean Absolute Error (MAE) on test data = 0.340612


In [36]:
rmse = dt_evaluator.evaluate(dtpredictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.513194


In [54]:
dtpredictions_ert = dtcvModel.transform(train_df)


In [55]:
dtpredictions_ert.show(2)

[Row(features=SparseVector(11, {0: 1.0, 1: 2.0, 4: 2016.0, 5: 2.0, 6: 14.0, 9: 17.0}), log_duration=5.3230099791384085, prediction=5.341284741186751),
 Row(features=SparseVector(11, {0: 2.0, 1: 1.0, 4: 2016.0, 5: 5.0, 6: 15.0, 9: 15.0}), log_duration=5.814130531825066, prediction=5.341284741186751)]

In [29]:
dtpredictions.show(2)

[Row(features=SparseVector(11, {0: 2.0, 1: 1.0, 4: 2016.0, 5: 2.0, 6: 28.0, 9: 23.0}), log_duration=5.493061443340548, prediction=5.711890426639655),
 Row(features=SparseVector(11, {0: 2.0, 1: 2.0, 4: 2016.0, 5: 2.0, 6: 7.0, 9: 55.0}), log_duration=5.54907608489522, prediction=5.711890426639655)]

In [56]:
pred_table = dtpredictions.groupBy('prediction').count()
pred_table = pred_table.drop('count')
pred_table.show()

+------------------+
|        prediction|
+------------------+
| 6.284032803395688|
|6.0485785145505995|
| 5.946427673262689|
| 6.141958153601273|
| 6.551517246541201|
|7.1425228143753525|
| 7.545394329958232|
| 7.298411167160363|
|5.7496543843581795|
| 7.924025245291648|
|6.7242448681574745|
| 5.341284741186751|
| 6.786321805577687|
| 7.279632571860633|
|6.4916823051830965|
|5.9916296670176505|
| 6.682920331289729|
| 5.432234644903332|
| 5.122152969922232|
| 6.092726346118731|
+------------------+
only showing top 20 rows



In [57]:
pred_table1 = dtpredictions_ert.groupBy('prediction').count()
pred_table1.show()

+------------------+------+
|        prediction| count|
+------------------+------+
| 6.284032803395688| 27037|
|6.0485785145505995| 28532|
| 5.946427673262689| 26135|
| 6.141958153601273| 35344|
| 6.551517246541201| 22270|
|7.1425228143753525| 38084|
| 7.545394329958232| 48256|
| 7.298411167160363| 49836|
|5.7496543843581795| 13042|
| 7.924025245291648| 26899|
|6.7242448681574745| 14875|
| 5.341284741186751| 13475|
| 6.786321805577687| 47976|
| 7.279632571860633| 12740|
|6.4916823051830965| 61864|
|5.9916296670176505| 36084|
| 6.682920331289729|116061|
| 5.432234644903332| 14672|
| 5.122152969922232|  9770|
| 6.092726346118731|  8156|
+------------------+------+
only showing top 20 rows



In [43]:
predlist = pred_table.select("prediction").collect()


In [47]:
predlist = pred_table.select("prediction").rdd.flatMap(lambda x: x).collect()
predlist[0]

6.284032803395688

In [71]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

lr = LinearRegression(featuresCol ='features', labelCol = 'log_duration',maxIter=10)
#lr_model = lr.fit(newdf)
evaluator = RegressionEvaluator(labelCol="log_duration", predictionCol="prediction", metricName="rmse")


paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01,0.3,0.5,1]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.1,0.2, 0.5, 0.8,1.0])\
    .build()

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)



In [77]:
for i in predlist:
    newdf = dtpredictions.filter(dtpredictions.prediction == i)
    newdft = dtpredictions_ert.filter(dtpredictions_ert.prediction == i)
    newdf = newdf.drop('prediction')
    newdft = newdft.drop('prediction')
    model = tvs.fit(newdft)
    lr_predictions = model.transform(newdf)
    rmse = dt_evaluator.evaluate(lr_predictions)
    print("mae on test data = %g" % rmse)

    


mae on test data = 0.401267
mae on test data = 0.500687
mae on test data = 0.478158
mae on test data = 0.503749
mae on test data = 0.403103
mae on test data = 0.346481
mae on test data = 0.361704
mae on test data = 0.370933
mae on test data = 0.438178
mae on test data = 0.358747
mae on test data = 0.353605
mae on test data = 0.832952
mae on test data = 0.360192
mae on test data = 0.321652
mae on test data = 0.388788
mae on test data = 0.444754
mae on test data = 0.436411
mae on test data = 0.837187
mae on test data = 0.908399
mae on test data = 0.464472
mae on test data = 0.397229
mae on test data = 0.341766
mae on test data = 0.781383
mae on test data = 0.387552
mae on test data = 0.33334
mae on test data = 0.774761
mae on test data = 0.726717
mae on test data = 0.498695
mae on test data = 0.429825
mae on test data = 0.399224
mae on test data = 1.66699
mae on test data = 0.790693
