In [19]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as F
import os

spark = SparkSession.builder.appName("A4").getOrCreate()



In [20]:
# Create the dataset.
csv_file = "2019-01-h1.csv"

df = (spark.read.format("csv")
     .option("inferSchema", "true")
     .option("header", "true")
     .load(csv_file))

In [21]:
data = df.select("passenger_count", "pulocationid", "dolocationid", "total_amount")

# Filter null values.
cleaned_data = (data.filter(F.col("passenger_count").isNotNull())
                .filter(F.col("pulocationid").isNotNull())
                .filter(F.col("dolocationid").isNotNull())
                .filter(F.col("total_amount").isNotNull()))

In [22]:
# Train (80%) / Test (20%) split.
trainDF, testDF = cleaned_data.randomSplit([0.8, 0.2], seed=99)

In [23]:
# Pipeline: assemble the features.
assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features",
)

In [24]:
dt = (DecisionTreeRegressor(labelCol="total_amount",
                            featuresCol="features",
                            maxDepth=5)
     .setMaxBins(64)) # Avoid too many categories.

In [25]:
pipeline = Pipeline(stages=[assembler, dt])

In [26]:
model = pipeline.fit(trainDF)

In [27]:
prediction = model.transform(testDF)
prediction.select("passenger_count", "pulocationid", "dolocationid", "prediction").show(10, truncate=False)

+---------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|prediction        |
+---------------+------------+------------+------------------+
|0.0            |4.0         |4.0         |22.232054401669217|
|0.0            |4.0         |17.0        |18.342035834146   |
|0.0            |4.0         |68.0        |18.063497363008622|
|0.0            |4.0         |68.0        |18.063497363008622|
|0.0            |4.0         |79.0        |18.063497363008622|
|0.0            |4.0         |90.0        |18.063497363008622|
|0.0            |4.0         |90.0        |18.063497363008622|
|0.0            |4.0         |107.0       |18.063497363008622|
|0.0            |4.0         |125.0       |18.063497363008622|
|0.0            |4.0         |137.0       |18.063497363008622|
+---------------+------------+------------+------------------+
only showing top 10 rows



In [28]:
rmse = RegressionEvaluator(labelCol="total_amount",
                           predictionCol="prediction",
                           metricName="rmse").evaluate(prediction)

print(f"RMSE: {rmse:.2f}")

spark.stop()

RMSE: 64.46
