In [18]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# 1.  dataset that only contains passenger_count (4th col), pulocationid (8th col), dolocationid (9th col), 
#     and total_amount (17th col) based on the 2019-01-h1.csv dataset

spark = SparkSession.builder.appName("SparkA4").getOrCreate()
csv_file = "gs://dataproc-staging-us-central1-823943063357-inh6zdl6/data/2019-01-h1.csv"
df = spark.read.csv(csv_file, 
                    header=True, 
                    inferSchema=True)
df.select("passenger_count", "pulocationid", "dolocationid", "total_amount").show(10)

25/04/26 07:30:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [3]:
# 2. Create trainDF and testDF
trainDF, testDF = df.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")



There are 2920930 rows in the training set, and 730150 in the test set


                                                                                

In [10]:
# 3. Create a decision tree regressor to predict total_amount from the other three features

vecAssembler = VectorAssembler(inputCols=["passenger_count", "pulocationid", "dolocationid"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("passenger_count", "pulocationid", "dolocationid", "total_amount", "features").show(10)

decisiontree = DecisionTreeRegressor(featuresCol="features", labelCol="total_amount", maxBins=100)

[Stage 12:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+-----------------+
|passenger_count|pulocationid|dolocationid|total_amount|         features|
+---------------+------------+------------+------------+-----------------+
|            1.0|        80.0|       112.0|         6.3| [1.0,80.0,112.0]|
|            1.0|       114.0|        79.0|       32.75| [1.0,114.0,79.0]|
|            1.0|        50.0|       226.0|        25.3| [1.0,50.0,226.0]|
|            1.0|       249.0|         4.0|         9.8|  [1.0,249.0,4.0]|
|            1.0|       158.0|       158.0|         5.8|[1.0,158.0,158.0]|
|            1.0|       246.0|        68.0|         7.8| [1.0,246.0,68.0]|
|            1.0|       164.0|       224.0|        10.8|[1.0,164.0,224.0]|
|            1.0|       226.0|       129.0|        55.3|[1.0,226.0,129.0]|
|            1.0|       142.0|       260.0|        17.3|[1.0,142.0,260.0]|
|            1.0|       141.0|       133.0|        40.0|[1.0,141.0,133.0]|
+---------------+--------

                                                                                

In [12]:
# 4. Create a pipeline

pipeline = Pipeline(stages=[vecAssembler, decisiontree])

In [15]:
# 5. Train the model

model = pipeline.fit(trainDF)

                                                                                

In [17]:
# 6. Show the predicted results along with the three features in the notebook

prediction = model.transform(testDF)
prediction.select("passenger_count", "pulocationid", "dolocationid", "total_amount", "prediction").show(10)

[Stage 42:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            1.0|       223.0|       223.0|         4.3|11.704727838091687|
|            1.0|       234.0|       186.0|         6.2|11.704727838091687|
|            1.0|       158.0|       249.0|         5.8|13.307442852646982|
|            1.0|       140.0|       237.0|        10.3|11.704727838091687|
|            1.0|       148.0|        79.0|         8.8|15.790206564690084|
|            1.0|       233.0|       198.0|        27.3|11.704727838091687|
|            1.0|       158.0|       164.0|        14.8|11.704727838091687|
|            4.0|       161.0|       229.0|         6.8|11.704727838091687|
|            1.0|       143.0|       262.0|        15.3|13.307442852646982|
|            3.0|        37.0|        36.0|         7.3| 19.13224138340017|
+-----------

                                                                                

In [20]:
# 7. Evaluate the model with RMSE
regressionEvaluator = RegressionEvaluator(
    labelCol="total_amount", 
    predictionCol="prediction", 
    metricName="rmse")

rmse = regressionEvaluator.evaluate(prediction)
print(f"RMSE value: {rmse}")



RMSE value: 72.08322863719988


                                                                                