In [18]:
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [19]:
# 1. Create a dataset that only contains passenger_count (4th col), pulocationid (8th col), 
# dolocationid (9th col), and total_amount (17th col) based on the 2019-01-h1.csv dataset. 
# In the Jupyter notebook, show the first 10 entries in the created dataset.

spark = SparkSession.builder.appName("A4").getOrCreate()
filepath = 'gs://dataproc-staging-us-central1-523272529471-j0pxs7mj/data/2019-01-h1.csv'

df = spark.read.csv(filepath, header=True, inferSchema=True)
selected_df = df.select('passenger_count', 'pulocationid', 'dolocationid', 'total_amount')
selected_df.show(10) 

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [20]:
# 2. Create trainDF and testDF.

trainDF, testDF = selected_df.randomSplit([.8, .2], seed=42)
print(f'there are {trainDF.count()} rows in the training set, and {testDF.count()} rows in the testing set.')



there are 2920849 rows in the training set, and 730150 rows in the testing set.


                                                                                

In [22]:
vecAssembler = VectorAssembler(inputCols=['passenger_count', 'pulocationid', 'dolocationid'], outputCol='features')
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select('passenger_count', 'pulocationid', 'dolocationid', 'total_amount').show(10)

[Stage 44:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            0.0|         1.0|         1.0|        90.0|
|            0.0|         1.0|         1.0|      116.75|
|            0.0|         4.0|         4.0|         4.8|
|            0.0|         4.0|         4.0|        5.75|
|            0.0|         4.0|        17.0|        20.3|
|            0.0|         4.0|        68.0|        12.8|
|            0.0|         4.0|        79.0|         5.3|
|            0.0|         4.0|        79.0|         5.8|
|            0.0|         4.0|        79.0|        6.35|
|            0.0|         4.0|        79.0|         7.8|
+---------------+------------+------------+------------+
only showing top 10 rows



                                                                                

In [24]:
# 3. Create a decision tree regressor to predict total_amount from the other three features.
decisionTree = DecisionTreeRegressor(featuresCol='features', labelCol='total_amount')
decisionTree = decisionTree.setMaxBins(700) # explicitly using setMaxBins() funct

In [27]:
# 4. Create a pipeline.
pipeline = Pipeline(stages=[vecAssembler, decisionTree])
pipelineModel = pipeline.fit(trainDF)

# 5. Train the model.
predDF = pipelineModel.transform(testDF)

# 6. Show the predicted results along with the three features in the notebook. (Show the first 10 entries.)
predDF.select('passenger_count', 'pulocationid', 'dolocationid', 'total_amount', 'prediction').show(10)

25/04/29 01:48:09 WARN BlockManager: Asked to remove block broadcast_107_piece0, which does not exist
[Stage 74:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+-----------------+
|passenger_count|pulocationid|dolocationid|total_amount|       prediction|
+---------------+------------+------------+------------+-----------------+
|            0.0|         4.0|         4.0|         4.3|17.85327406654835|
|            0.0|         4.0|        33.0|       17.75|17.85327406654835|
|            0.0|         4.0|        68.0|        15.8|17.85327406654835|
|            0.0|         4.0|        79.0|        9.75|17.85327406654835|
|            0.0|         4.0|       125.0|         9.3|17.85327406654835|
|            0.0|         4.0|       170.0|       11.15|17.85327406654835|
|            0.0|         7.0|         7.0|        0.31|17.85327406654835|
|            0.0|         7.0|         7.0|         6.3|17.85327406654835|
|            0.0|         7.0|       112.0|        16.8|17.85327406654835|
|            0.0|         7.0|       138.0|        10.8|17.85327406654835|
+---------------+--------

                                                                                

In [29]:
# 7. Evaluate the model with RMSE. (The RMSE value should be in the notebook.)
regressionEvaluator = RegressionEvaluator(
    labelCol='total_amount',
    predictionCol='prediction',
    metricName='rmse'
)
rmse = regressionEvaluator.evaluate(predDF)

print(f"The RMSE value is = {rmse}")



The RMSE value is = 24.62354704567572


