In [1]:
from pyspark.ml.regression import LinearRegression


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config('spark.master', 'local[4]') \
    .appName('Test') \
    .getOrCreate()

sc = spark.sparkContext
spark

In [11]:
seed=1

In [7]:
Green_Taxi_DF = spark.read.csv("Green_Taxi_B7.csv",header=True,inferSchema=True)

In [9]:
Green_Taxi_DF.limit(1).toPandas

Unnamed: 0,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,month,week,day,hour,Trip_distance,Final_Fare_amount,Trip_Duration,speed,log_Trip_distance,log_Trip_Duration
0,-75.231,39.928,-75.227,39.93,6.0,23.0,8.0,20.0,0.25,4.3,1.12,13.43,-1.39,0.11


In [17]:
Green_Taxi_DF= Green_Taxi_DF.drop('speed')

In [18]:
trainData,testData = Green_Taxi_DF.randomSplit([0.8,0.2],seed=seed)

In [19]:
inputCols = trainData.columns

In [20]:
inputCols

['Pickup_longitude',
 'Pickup_latitude',
 'Dropoff_longitude',
 'Dropoff_latitude',
 'month',
 'week',
 'day',
 'hour',
 'Trip_distance',
 'Final_Fare_amount',
 'Trip_Duration',
 'log_Trip_distance',
 'log_Trip_Duration']

In [34]:
inputCols = list( filter(lambda s: s != "Trip_Duration", inputCols) )
inputCols = list( filter(lambda s: s != "Final_Fare_amount", inputCols) )



In [35]:
############### Machine Learning
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler()\
        .setInputCols(inputCols)\
        .setOutputCol("featureVector")

In [36]:
# Predict Fare
TripTrainData = va.transform(trainData)
TripTrainData.select("featureVector", 'Trip_Duration').show(truncate=False, n=2)


+---------------------------------------------------------------------------------------+-------------+
|featureVector                                                                          |Trip_Duration|
+---------------------------------------------------------------------------------------+-------------+
|[-75.23100000000001,39.928000000000004,-75.227,39.93,6.0,23.0,8.0,20.0,0.25,-1.39,0.11]|1.12         |
|[-75.165,39.954,-75.165,39.954,2.0,7.0,15.0,12.0,0.2,-1.61,-0.36]                      |0.7          |
+---------------------------------------------------------------------------------------+-------------+
only showing top 2 rows



In [37]:
TripTestData = va.transform(testData)
TripTestData.select("featureVector", 'Trip_Duration').show(truncate=False, n=2)

+-----------------------------------------------------------------+-------------+
|featureVector                                                    |Trip_Duration|
+-----------------------------------------------------------------+-------------+
|[-74.285,40.519,-74.293,40.521,4.0,15.0,16.0,7.0,0.52,-0.65,1.04]|2.83         |
|[-74.23,40.77,-74.188,40.765,6.0,23.0,6.0,10.0,2.59,0.95,2.4]    |11.05        |
+-----------------------------------------------------------------+-------------+
only showing top 2 rows



In [41]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'featureVector', labelCol='Trip_Duration', maxIter=10, regParam=0.3, elasticNetParam=0.8)


In [42]:
lr_model = lr.fit(TriprainData)


In [44]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1.892686
r2: 0.896467


In [46]:
lr_predictions = lr_model.transform(TripTestData)

In [47]:
lr_predictions.select("prediction","Trip_Duration","featureVector").show(5)

+------------------+-------------+--------------------+
|        prediction|Trip_Duration|       featureVector|
+------------------+-------------+--------------------+
|1.3848359266317418|         2.83|[-74.285,40.519,-...|
|12.538868355726056|        11.05|[-74.23,40.77,-74...|
| 4.380866570617193|         4.25|[-74.195,40.702,-...|
|14.734066359839229|        11.07|[-74.185,40.564,-...|
| 12.65856433426481|        10.47|[-74.179,40.607,-...|
+------------------+-------------+--------------------+
only showing top 5 rows



In [48]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Trip_Duration",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

R Squared (R2) on test data = 0.896729


<br><B>Now we will do the same for the Taxi Fare <br><br><br>

In [49]:
# Predict Fare
FareTrainData = va.transform(trainData)
FareTrainData.select("featureVector", 'Final_Fare_amount').show(truncate=False, n=2)

+---------------------------------------------------------------------------------------+-----------------+
|featureVector                                                                          |Final_Fare_amount|
+---------------------------------------------------------------------------------------+-----------------+
|[-75.23100000000001,39.928000000000004,-75.227,39.93,6.0,23.0,8.0,20.0,0.25,-1.39,0.11]|4.3              |
|[-75.165,39.954,-75.165,39.954,2.0,7.0,15.0,12.0,0.2,-1.61,-0.36]                      |3.3              |
+---------------------------------------------------------------------------------------+-----------------+
only showing top 2 rows



In [50]:
FareTestData = va.transform(testData)
FareTestData.select("featureVector", 'Final_Fare_amount').show(truncate=False, n=2)

+-----------------------------------------------------------------+-----------------+
|featureVector                                                    |Final_Fare_amount|
+-----------------------------------------------------------------+-----------------+
|[-74.285,40.519,-74.293,40.521,4.0,15.0,16.0,7.0,0.52,-0.65,1.04]|4.8              |
|[-74.23,40.77,-74.188,40.765,6.0,23.0,6.0,10.0,2.59,0.95,2.4]    |11.8             |
+-----------------------------------------------------------------+-----------------+
only showing top 2 rows



In [51]:
from pyspark.ml.regression import LinearRegression
Fare_lr = LinearRegression(featuresCol = 'featureVector', labelCol='Final_Fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)


In [54]:
Fare_lr_model = Fare_lr.fit(FareTrainData)

In [56]:
Fare_trainingSummary = Fare_lr_model.summary
print("RMSE: %f" % Fare_trainingSummary.rootMeanSquaredError)
print("r2: %f" % Fare_trainingSummary.r2)

RMSE: 0.928586
r2: 0.955896


In [57]:
Fare_lr_predictions = Fare_lr_model.transform(FareTestData)

In [58]:
Fare_lr_predictions.select("prediction","Final_Fare_amount","featureVector").show(5)


+------------------+-----------------+--------------------+
|        prediction|Final_Fare_amount|       featureVector|
+------------------+-----------------+--------------------+
| 4.995013706744404|              4.8|[-74.285,40.519,-...|
|12.395950299341617|             11.8|[-74.23,40.77,-74...|
| 6.480426398875684|5.799999999999998|[-74.195,40.702,-...|
|16.861726055416284|             16.3|[-74.185,40.564,-...|
|13.201589959648663|             12.3|[-74.179,40.607,-...|
+------------------+-----------------+--------------------+
only showing top 5 rows



In [60]:
from pyspark.ml.evaluation import RegressionEvaluator
Fare_lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Final_Fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(Fare_lr_predictions))


R Squared (R2) on test data = 0.780654
