In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CS131-A4-Taxi").getOrCreate()


In [9]:
file_path = "gs://taxidatasaet/2019-01-h1.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)
df.printSchema()
df.show(5)


                                                                                

root
 |-- vendorid: double (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- ratecodeid: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pulocationid: double (nullable = true)
 |-- dolocationid: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+---

In [10]:
selected_df = df.select("passenger_count", "PULocationID", "DOLocationID", "total_amount")
selected_df.show(10)


+---------------+------------+------------+------------+
|passenger_count|PULocationID|DOLocationID|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [12]:
trainDF, testDF = selected_df.randomSplit([0.8, 0.2], seed=42)

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

assembler = VectorAssembler(
    inputCols=["passenger_count", "PULocationID", "DOLocationID"],
    outputCol="features"
)

# Decision Tree Regressor
dtr = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="total_amount"
)

dtr.setMaxBins(100)

pipeline = Pipeline(stages=[assembler, dtr])


In [14]:
model = pipeline.fit(trainDF)


                                                                                

In [15]:
predictions = model.transform(testDF)

predictions.select("passenger_count", "PULocationID", "DOLocationID", "prediction").show(10)

[Stage 18:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------------+
|passenger_count|PULocationID|DOLocationID|        prediction|
+---------------+------------+------------+------------------+
|            0.0|         4.0|         4.0| 24.03192497663894|
|            0.0|         4.0|        33.0|18.581726675586104|
|            0.0|         4.0|        68.0|18.581726675586104|
|            0.0|         4.0|        79.0|18.581726675586104|
|            0.0|         4.0|       125.0|18.581726675586104|
|            0.0|         4.0|       170.0|18.581726675586104|
|            0.0|         7.0|         7.0|18.891607300712764|
|            0.0|         7.0|         7.0|18.891607300712764|
|            0.0|         7.0|       112.0|18.581726675586104|
|            0.0|         7.0|       138.0|18.581726675586104|
+---------------+------------+------------+------------------+
only showing top 10 rows



                                                                                

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")



Root Mean Squared Error (RMSE): 60.27038711535193


                                                                                