In [55]:
# import for spark 
from pyspark.sql import SparkSession 

# need to start session for spark 
spark = SparkSession.builder.appName("A4SparkSession").getOrCreate()

# read from csv file (uploaded in the dataproc cluster already)
df = spark.read.csv('gs://dataproc-staging-us-central1-20673574438-1npjkcso/2019-01-h1.csv', header=True, inferSchema=True)

# reduce dataset to specified columns 
small_dataset = df.select("passenger_count","PULocationID","DOLocationID", "total_amount")

# show first 10 entries of new dataset
small_dataset.show(10)

#step 1 done

                                                                                

+---------------+------------+------------+------------+
|passenger_count|PULocationID|DOLocationID|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [56]:
# created train and test DF based on an 80/20 split and a seed of 42 (common w/data)
trainDF, testDF = small_dataset.randomSplit([0.8, 0.2], seed=42)

 
#show number of rows in each df
print("train count:", trainDF.count())
print("test count:", testDF.count())

#step 2 done



train count: 2920849




test count: 730150



                                                                                

In [57]:
# imports for decision tree (and combining columns into 1 vector)
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler

# combine three other features (case snesitive i think) into one column (vector)
assblr = VectorAssembler(
    inputCols=["passenger_count", "PULocationID", "DOLocationID"],  
    outputCol="features" 
)

# this is the decisin tree regressor 
decTree = DecisionTreeRegressor(
    featuresCol="features", 
    labelCol="total_amount" 
)

#step 3 done 





#need to be in same block due to varibales/imports from above needed for pipeline 
#pipeline helps = simpler process

#import for pipeline from pyspark 
from pyspark.ml import Pipeline

#make pipeline
pipeline = Pipeline(stages=[assblr, decTree])

#step 4 done 

In [58]:
# set max bins for better accurate results (chose a high number = 80)
decTree.setMaxBins(80)

# trained decision tree fitted pipeline stored in model (features combined and decision tree regressor used)
modelFitted = pipeline.fit(trainDF)

# predict test data (try it out)
predicted = modelFitted.transform(testDF)

# first five precited versus actual show - step 6 with first 10 values
predicted.select("passenger_count","PULocationID","DOLocationID","prediction","total_amount").show(10)

#step 5 & 6 done

[Stage 226:>                                                        (0 + 1) / 1]

+---------------+------------+------------+------------------+------------+
|passenger_count|PULocationID|DOLocationID|        prediction|total_amount|
+---------------+------------+------------+------------------+------------+
|            0.0|         4.0|         4.0|22.591374728615502|         4.3|
|            0.0|         4.0|        33.0|19.228018304431938|       17.75|
|            0.0|         4.0|        68.0|18.810800590405595|        15.8|
|            0.0|         4.0|        79.0|18.810800590405595|        9.75|
|            0.0|         4.0|       125.0|18.810800590405595|         9.3|
|            0.0|         4.0|       170.0|18.810800590405595|       11.15|
|            0.0|         7.0|         7.0|22.591374728615502|        0.31|
|            0.0|         7.0|         7.0|22.591374728615502|         6.3|
|            0.0|         7.0|       112.0|18.810800590405595|        16.8|
|            0.0|         7.0|       138.0|18.810800590405595|        10.8|
+-----------


                                                                                

In [59]:
# need this ipmport to calculate RMSE value 
from pyspark.ml.evaluation import RegressionEvaluator

# evaluate via regressionevaluater 
evaluation = RegressionEvaluator(
# these are the actual numbers
labelCol="total_amount",  
# this is what the model predicted
predictionCol="prediction",  
# given actual vs predicted what is our RMSE (root mean squared error)
metricName="rmse")

# find RMSE score
rmseScore = evaluation.evaluate(predicted)

# print RMSE score 
print(f"Root Mean Squared Error aka RMSE for this model is: {rmseScore}")

#step 7 done 



Root Mean Squared Error aka RMSE for this model is: 24.62693035407849




                                                                                