In [1]:
df = spark.read.csv("gs://dataproc-staging-us-central1-914709071200-uawfkl3d/2019-01-h1.csv", header=True, inferSchema=True)

df.select("passenger_count", "PULocationID", "DOLocationID", "total_amount").show(10)


                                                                                

+---------------+------------+------------+------------+
|passenger_count|PULocationID|DOLocationID|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [3]:
#split into train and test
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=42)

# print number of rows
print(f"""There are {trainDF.count()} rows in the training set,
and {testDF.count()} rows in the test set""")




There are 2920930 rows in the training set,
and 730150 rows in the test set


                                                                                

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor

#put the geatures together
assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],  
    outputCol="features"
)

#decision tree
dtr = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="total_amount"
)
# maxbins
dtr = dtr.setMaxBins(100)

#make a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, dtr])

#train model
model = pipeline.fit(trainDF)

#predictions for first 10 rows
predictions = model.transform(testDF)
predictions.select(
    "passenger_count", "pulocationid", "dolocationid",
    "total_amount", "prediction"
).show(10)

#root mean square error
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


25/04/26 22:36:01 WARN BlockManager: Asked to remove block broadcast_43_piece0, which does not exist
25/04/26 22:36:02 WARN BlockManager: Asked to remove block broadcast_46_piece0, which does not exist
                                                                                

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            1.0|       223.0|       223.0|         4.3|11.704727838091689|
|            1.0|       234.0|       186.0|         6.2|11.704727838091689|
|            1.0|       158.0|       249.0|         5.8|13.307442852646982|
|            1.0|       140.0|       237.0|        10.3|11.704727838091689|
|            1.0|       148.0|        79.0|         8.8| 15.79020656469008|
|            1.0|       233.0|       198.0|        27.3|11.704727838091689|
|            1.0|       158.0|       164.0|        14.8|11.704727838091689|
|            4.0|       161.0|       229.0|         6.8|11.704727838091689|
|            1.0|       143.0|       262.0|        15.3|13.307442852646982|
|            3.0|        37.0|        36.0|         7.3| 19.13224138340017|
+-----------



Root Mean Squared Error (RMSE): 72.08322863719988


                                                                                