In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col


In [12]:
spark = SparkSession.builder.appName("Assignment4").getOrCreate()


In [29]:
df = spark.read.csv("2019-04.csv", header=True, inferSchema=True)

filtered_df = df.select("passenger_count", "PULocationID", "DOLocationID", "total_amount") \
                .dropna() \
                .withColumn("passenger_count", col("passenger_count").cast("int")) \
                .filter("total_amount < 300") \
                .filter("total_amount > 0") \
                .filter("passenger_count > 0")

filtered_df.show(10)


+---------------+------------+------------+------------+
|passenger_count|PULocationID|DOLocationID|total_amount|
+---------------+------------+------------+------------+
|              1|       239.0|       239.0|         8.8|
|              1|       230.0|       100.0|         8.3|
|              1|        68.0|       127.0|       47.75|
|              1|        68.0|        68.0|         7.3|
|              1|        50.0|        42.0|       23.15|
|              1|        95.0|       196.0|         9.8|
|              1|       211.0|       211.0|         6.8|
|              1|       237.0|       162.0|         7.8|
|              1|       148.0|        37.0|        20.3|
|              1|       265.0|       265.0|        0.31|
+---------------+------------+------------+------------+
only showing top 10 rows



In [30]:
trainDF, testDF = filtered_df.randomSplit([0.8, 0.2], seed=42)


In [31]:
assembler = VectorAssembler(
    inputCols=["passenger_count", "PULocationID", "DOLocationID"],
    outputCol="features"
)


In [32]:
dt = DecisionTreeRegressor(labelCol="total_amount", featuresCol="features")

pipeline = Pipeline(stages=[assembler, dt])


In [33]:
model = pipeline.fit(trainDF)


In [34]:
predictions = model.transform(testDF)
predictions.select("passenger_count", "PULocationID", "DOLocationID", "prediction").show(10)


+---------------+------------+------------+------------------+
|passenger_count|PULocationID|DOLocationID|        prediction|
+---------------+------------+------------+------------------+
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|         1.0|20.078089399744336|
|              1|         1.0|       264.0| 26.76297929094587|
|              1|         1.0|       264.0| 26.76297929094587|
+---------------+------------+------------+------------------+
only showing top 10 rows



In [35]:
evaluator = RegressionEvaluator(
    labelCol="total_amount", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 11.995981228033358


In [36]:
filtered_df.select("total_amount").summary().show()


+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|           7282553|
|   mean| 19.22256732649921|
| stddev|14.429138551092501|
|    min|              0.01|
|    25%|              11.3|
|    50%|             14.76|
|    75%|             20.76|
|    max|             299.8|
+-------+------------------+

