In [16]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# Create SparkSession
spark = SparkSession.builder \
    .appName("TotalAmountPrediction") \
    .getOrCreate()

25/04/28 03:28:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [22]:
input_path = "hdfs:///user/soham_bhowmick/cs131/a4/2019-01-h1.csv"

rawDF = spark.read.csv(input_path, header=True, inferSchema=True)

# Select only passenger_count, PULocationID, DOLocationID, total_amount
dataDF = rawDF.select(
    rawDF.passenger_count,
    rawDF.pulocationid,
    rawDF.dolocationid,
    rawDF.total_amount
)

dataDF.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [23]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [24]:
trainDF, testDF = dataDF.randomSplit([0.8, 0.2], seed=42)

In [25]:
print(f"Training set count: {trainDF.count()}")
print(f"Test set count:     {testDF.count()}")

                                                                                

Training set count: 2920930




Test set count:     730150


                                                                                

In [26]:
assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features"
)

dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="total_amount"
).setMaxBins(200)

pipeline = Pipeline(stages=[assembler, dt])

In [27]:
model = pipeline.fit(trainDF)

                                                                                

In [28]:
predictions = model.transform(testDF)

predictions.select(
    "passenger_count",
    "pulocationid",
    "dolocationid",
    "total_amount",
    "prediction"
).show(10)

[Stage 34:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+-----------------+
|passenger_count|pulocationid|dolocationid|total_amount|       prediction|
+---------------+------------+------------+------------+-----------------+
|            0.0|         4.0|         4.0|         4.3|17.63754719264326|
|            0.0|         4.0|        33.0|       17.75|17.63754719264326|
|            0.0|         4.0|        68.0|        15.8|17.63754719264326|
|            0.0|         4.0|        79.0|        9.75|17.63754719264326|
|            0.0|         4.0|       125.0|         9.3|17.63754719264326|
|            0.0|         4.0|       170.0|       11.15|17.63754719264326|
|            0.0|         7.0|         7.0|        0.31|17.63754719264326|
|            0.0|         7.0|         7.0|         6.3|17.63754719264326|
|            0.0|         7.0|       112.0|        16.8|17.63754719264326|
|            0.0|         7.0|       138.0|        10.8|17.63754719264326|
+---------------+--------

                                                                                

In [29]:
evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE) on test data = {rmse:.4f}")



Root Mean Square Error (RMSE) on test data = 24.6682


                                                                                