## Name: Banyar Shin
## Student ID: 017192422
## CS-131 Assignment 4: Spark

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# create spark session
spark = SparkSession.builder.appName("TaxiPrediction").getOrCreate()

In [10]:
# read the csv file
df = spark.read.csv("gs://banyar-for-spark/2019-04.csv", header=True, inferSchema=True)

# select required columns
selected_df = df.select("passenger_count", "pulocationid", "dolocationid", "total_amount")

# show first 10 entries
print("First 10 entries of the selected dataset:")
selected_df.show(10)

                                                                                

First 10 entries of the selected dataset:
+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       239.0|       239.0|         8.8|
|            1.0|       230.0|       100.0|         8.3|
|            1.0|        68.0|       127.0|       47.75|
|            1.0|        68.0|        68.0|         7.3|
|            1.0|        50.0|        42.0|       23.15|
|            1.0|        95.0|       196.0|         9.8|
|            1.0|       211.0|       211.0|         6.8|
|            1.0|       237.0|       162.0|         7.8|
|            1.0|       148.0|        37.0|        20.3|
|            1.0|       265.0|       265.0|        0.31|
+---------------+------------+------------+------------+
only showing top 10 rows



In [11]:
# create feature vector
assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features"
)

# split data into training and testing sets (70-30 split)
trainDF, testDF = selected_df.randomSplit([0.7, 0.3], seed=42)

print("Training Dataset Count:", trainDF.count())
print("Testing Dataset Count:", testDF.count())

                                                                                

Training Dataset Count: 5204170




Testing Dataset Count: 2228969


                                                                                

In [12]:
# create decision tree regressor
dt = DecisionTreeRegressor(labelCol="total_amount", featuresCol="features")

# create the pipeline
pipeline = Pipeline(stages=[assembler, dt])

# train the model
model = pipeline.fit(trainDF)

print("Model training completed!")

                                                                                

Model training completed!


In [13]:
# make predictions on test data
predictions = model.transform(testDF)

# show predictions with features
print("First 10 predictions with features:")
predictions.select("passenger_count", "pulocationid", "dolocationid", "total_amount", "prediction").show(10)

First 10 predictions with features:


[Stage 49:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            0.0|         1.0|         1.0|       103.3| 19.53938668555223|
|            0.0|         4.0|         4.0|         6.8| 19.53938668555223|
|            0.0|         4.0|        33.0|       31.55|15.796962845828443|
|            0.0|         4.0|        49.0|        26.0|15.796962845828443|
|            0.0|         4.0|        79.0|         7.8|15.796962845828443|
|            0.0|         4.0|        79.0|        8.15|15.796962845828443|
|            0.0|         4.0|        79.0|         8.8|15.796962845828443|
|            0.0|         4.0|       107.0|        11.8|15.796962845828443|
|            0.0|         4.0|       113.0|       12.85|15.796962845828443|
|            0.0|         4.0|       144.0|        11.3|17.767105459370107|
+-----------

                                                                                

In [14]:
# evaluate the model using RMSE
evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE) = {rmse:.2f}")



Root Mean Square Error (RMSE) = 12.13


                                                                                