In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Create Spark Session
spark = SparkSession.builder.appName("TaxiDataDecisionTree").getOrCreate()

# Task 1: Show the first 10 entries
df = spark.read.csv('gs://dataproc-staging-us-central1-857864625438-mu4tgbuf/data/2019-01-h1-selected-200.csv',
                    header=True, inferSchema=True)
df.show(10)

# Task 2: Create trainDF and testDF
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=42)

trainDF = trainDF.select(
    col('passenger_count').cast('int'),
    col('PULocationID').cast('int'),
    col('DOLocationID').cast('int'),
    col('total_amount').cast('double')
)

testDF = testDF.select(
    col('passenger_count').cast('int'),
    col('PULocationID').cast('int'),
    col('DOLocationID').cast('int'),
    col('total_amount').cast('double')
)

# Task 3: Create a decision tree regressor to predict total_amount from the other three features
assembler = VectorAssembler(
    inputCols=['passenger_count', 'PULocationID', 'DOLocationID'],
    outputCol='features'
)

dt = DecisionTreeRegressor(
    labelCol='total_amount',
    featuresCol='features',
    maxBins=500
)

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [6]:
# Task 4: Create a pipeline
pipeline = Pipeline(stages=[assembler, dt])

# Task 5: Train the model
model = pipeline.fit(trainDF)

# Task 6: Show the predicted results (first 10)
predictions = model.transform(testDF)
predictions.select('passenger_count', 'PULocationID', 'DOLocationID', 'total_amount', 'prediction').show(10)

25/04/28 22:52:53 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 500 to 169 (= number of training instances)


+---------------+------------+------------+------------+------------------+
|passenger_count|PULocationID|DOLocationID|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|              1|           4|         246|        26.3|10.610000000000001|
|              1|          41|          50|       20.16| 9.845833333333335|
|              1|          43|         137|       16.44|             14.76|
|              1|          74|         168|         9.3|18.229999999999997|
|              1|          79|         231|         9.3|18.229999999999997|
|              1|          90|         148|        14.3|             27.47|
|              1|         107|         181|        20.3|18.229999999999997|
|              1|         114|         198|        38.8|18.229999999999997|
|              1|         141|         234|        15.3|11.300816326530612|
|              1|         141|         262|        9.96|12.840909090909095|
+-----------

In [7]:
# Task 7: Evaluate the model with RMSE
evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.2f}")


Root Mean Squared Error (RMSE) on test data = 8.52
