In [None]:
# By Mohammed Nassar
# CS-131

In [39]:
# Imports
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [40]:
# Load dataset into a Spark DataFrame
df = spark.read.csv("gs://dataproc-staging-us-central1-994673731457-y869twbd/2019-01-h1.csv", header=True, inferSchema=True)

# Select the needed columns
# passenger_count (4th), pulocationid (8th), dolocationid (9th), total_amount (17th)
selected_df = df.select(
    df.columns[3],
    df.columns[7],
    df.columns[8],
    df.columns[16],
)

selected_df.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [41]:
# Create trainDF and testDF
trainDF, testDF = selected_df.randomSplit([0.8, 0.2], seed=42)

In [42]:
# Create a "features" column using VectorAssembler
assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features"
)

In [43]:
# Create a decision tree regressor to predict total_amount from the other three features.
dtr = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="total_amount",
    predictionCol="prediction"
)

In [44]:
# Create a pipleline 
pipeline = Pipeline(stages=[assembler, dtr])

In [45]:
# Train the model
model = pipeline.fit(trainDF)

                                                                                

In [46]:
# Show the predicted results along with the three features in the notebook
predictions = model.transform(testDF)

# Show results of first 10
predictions.select("passenger_count", "pulocationid", "dolocationid", "total_amount", "prediction").show(10)

[Stage 57:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            0.0|         4.0|         4.0|         4.3| 22.33089275550838|
|            0.0|         4.0|        33.0|       17.75|16.906691081749617|
|            0.0|         4.0|        68.0|        15.8|16.906691081749617|
|            0.0|         4.0|        79.0|        9.75|16.906691081749617|
|            0.0|         4.0|       125.0|         9.3|16.906691081749617|
|            0.0|         4.0|       170.0|       11.15|16.906691081749617|
|            0.0|         7.0|         7.0|        0.31| 22.33089275550838|
|            0.0|         7.0|         7.0|         6.3| 22.33089275550838|
|            0.0|         7.0|       112.0|        16.8|16.906691081749617|
|            0.0|         7.0|       138.0|        10.8|16.906691081749617|
+-----------

                                                                                

In [49]:
# Evaluate the model with RMSE
evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE): ", rmse)



Root Mean Squared Error (RMSE):  25.042623886782174


                                                                                