## Huidong Xu
ML goals: predicting tip amount using pyspark RandomForestRegressor

In [21]:
%%configure -f 
{
"conf":{
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
    
        "spark.executor.heartbeatInterval":"10800s",
        "spark.network.timeout":"24h",
    
        "spark.driver.memory": "18G",
        "spark.executor.memory": "18G",
        "spark.executor.cores":"8",
    
        "livy.server.session.timeout":"24h",
    
        "spark.dynamicAllocation.enabled":"false",
        "spark.ext.h2o.fail.on.unsupported.spark.param":"false",    
        
        "spark.app.name":"msds694_train"
      }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1615600877234_0002,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1615600877234_0002,pyspark,idle,Link,Link,✔


In [22]:
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=msds694_train>

In [23]:
from pyspark.sql import *
from pyspark.ml import *

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
from pyspark.sql import SparkSession

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
ss = SparkSession.builder.getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
data_path = "s3a://msds694project/data/dt_2/"

df = ss.read.parquet(data_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
df.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-------------+---------------+------------------+-----------------+-----------------+-------------+
|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|     vendorid|     ratecodeid|store_and_fwd_flag|     pulocationid|     dolocationid| payment_type|
+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-------------+---------------+------------------+-----------------+-----------------+-------------+
|              0|          0.0|      -35.0|  0.0|   -0.5|      -1.0|         0.0|                 -0.3|       -36.8|                 0.0|(5,[2],[1.0])|(100,[5],[1.0])|     (2,[0],[1.0])|(266,[243],[1.0])|(266,[264],[1.0])|(6,[3],[1.0])|
+---------------+-------------+-----------+-----+---

In [28]:
avg_value = df.agg({'tip_amount': "avg"}).collect()[0][0]
avg_value

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2.224630301004901

In [29]:
max_value = df.agg({'tip_amount': "max"}).collect()[0][0]
max_value

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

440.2200012207031

In [30]:
min_value = df.agg({'tip_amount': "min"}).collect()[0][0]
min_value

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-170.0

In [31]:
df = df.filter("tip_amount>=0")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Goal: predict tip_amount using

## Create dataframe with a feature vector and label

In [32]:
predictors = df.columns
response = "tip_amount"
predictors.remove('tip_amount')
predictors.remove('total_amount')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(outputCol="features", inputCols=predictors) 
df_va = va.transform(df).select("features", "tip_amount")\
               .withColumnRenamed("tip_amount", "label")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
df_va.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(653,[9,14,113,15...|  0.0|
|(653,[9,14,113,37...|  0.0|
|(653,[2,6,9,18,11...|  0.0|
|(653,[2,6,9,18,11...|  0.0|
|(653,[2,6,9,14,11...|  0.0|
+--------------------+-----+
only showing top 5 rows

## Split dataframe into training and test sets

In [35]:
# Create Training and Test data.
dtsets = df_va.randomSplit([0.8, 0.2], 1)
train = dtsets[0].cache()
valid = dtsets[1].cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
for (id, rdd) in sc._jsc.getPersistentRDDs().items():
    rdd.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create a RandomForestRegressor and build a model using training Dataset

In [37]:
# Initialize model with columns to utilize
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features",
                           labelCol="label")
# Train model
rfmodel = rf.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
print(rfmodel.toDebugString)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RandomForestRegressionModel (uid=RandomForestRegressor_27cb3d536af9) with 20 trees
  Tree 0 (weight 1.0):
    If (feature 5 <= 2.7200000286102295)
     If (feature 648 in {0.0})
      If (feature 408 in {0.0})
       If (feature 343 in {0.0})
        If (feature 4 <= 0.25)
         Predict: 0.012939578457651148
        Else (feature 4 > 0.25)
         Predict: 3.4445479772099636E-4
       Else (feature 343 not in {0.0})
        If (feature 651 in {0.0})
         Predict: 0.0
        Else (feature 651 not in {0.0})
         Predict: 8.09999974568685
      Else (feature 408 not in {0.0})
       If (feature 1 <= 0.35500000417232513)
        Predict: 16.059999465942383
       Else (feature 1 > 0.35500000417232513)
        Predict: 0.0
     Else (feature 648 not in {0.0})
      If (feature 2 <= 17.75)
       If (feature 1 <= 1.7450000047683716)
        If (feature 1 <= 1.0549999475479126)
         Predict: 1.7286578142734415
        Else (feature 1 > 1.0549999475479126)
         Predict: 2.

## Predicting with the model

In [38]:
# Make predictions
rfpredicts = rfmodel.transform(valid)

# Inspect results
rfpredicts.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+--------------------+
|            features|label|          prediction|
+--------------------+-----+--------------------+
|(653,[0,1,2,3,4,5...|  0.0|3.412046336171203E-4|
|(653,[0,1,2,3,4,5...| 6.15|    4.96288911821806|
|(653,[0,1,2,3,4,5...| 4.85|  4.7929061519918275|
|(653,[0,1,2,3,4,5...| 9.55|   7.804019562993067|
|(653,[0,1,2,3,4,5...|14.45|   7.804019562993067|
+--------------------+-----+--------------------+
only showing top 5 rows

## Evaluate the model

In [39]:
from pyspark.ml.evaluation import RegressionEvaluator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# Select columns to compute test error
evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
# Create evaluation metrics
rmse = evaluator.evaluate(rfpredicts, {evaluator.metricName: "rmse"})

# Print Model Metrics
print('RMSE: ' + str(rmse))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE: 1.6261909179290863

In [42]:
r2 = evaluator.evaluate(rfpredicts, {evaluator.metricName: "r2"})
print('R^2: ' + str(r2))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

R^2: 0.6613947133871283

In [43]:
mae = evaluator.evaluate(rfpredicts, {evaluator.metricName: "mae"})
print('MAE: ' + str(mae))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

MAE: 0.734047843092635