In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("TESTAPP").getOrCreate()

25/04/27 01:36:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
path = "gs://dataproc-staging-northamerica-northeast2-863261792305-cfusnfhq/data/2019-01-h1.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
smaller_set = df.select("passenger_count", "pulocationid", "dolocationid", "total_amount")
smaller_set.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [3]:
traindf, testdf = smaller_set.randomSplit([.8, .2])

In [4]:
assembler = VectorAssembler(inputCols = ["passenger_count", "pulocationid", "dolocationid"], outputCol = "features")

In [7]:
dt = DecisionTreeRegressor(labelCol="total_amount", featuresCol="features")
dt = dt.setMaxBins(1000)

In [8]:
pipeline = Pipeline(stages=[assembler, dt])
pipelineModel = pipeline.fit(traindf)

                                                                                

In [9]:
prediction = pipelineModel.transform(testdf)
prediction.select("passenger_count", "pulocationid", "dolocationid", "prediction").show(10)

[Stage 17:>                                                         (0 + 1) / 1]

+---------------+------------+------------+-----------------+
|passenger_count|pulocationid|dolocationid|       prediction|
+---------------+------------+------------+-----------------+
|            0.0|         1.0|         1.0|99.32666666666665|
|            0.0|         1.0|         1.0|99.32666666666665|
|            0.0|         4.0|        17.0|17.86533632205955|
|            0.0|         4.0|        68.0|17.86533632205955|
|            0.0|         4.0|        79.0|17.86533632205955|
|            0.0|         4.0|        79.0|17.86533632205955|
|            0.0|         4.0|        90.0|17.86533632205955|
|            0.0|         4.0|       144.0|17.86533632205955|
|            0.0|         4.0|       170.0|17.86533632205955|
|            0.0|         4.0|       170.0|17.86533632205955|
+---------------+------------+------------+-----------------+
only showing top 10 rows



                                                                                

In [13]:
evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rsme = evaluator.evaluate(prediction)
print(f"{rsme:.2f}")



732.05


                                                                                