In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession instance (an entry point to all Spark functions)
spark = SparkSession.builder.appName("MYAPP").getOrCreate()
# Read a file in CSV format into Spark DataFrame
df = spark.read.csv('gs://dataproc-staging-us-central1-667037614087-6smxgfsy/data/2019-01-h1.csv', header=True, inferSchema=True)

25/04/29 04:16:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [2]:
df.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|     1.0| 2019-01-01 00:46:40|  2019-01-01 00:53:20|            1.0|          1.5|       1.0|                 N|       151.0|       239.0|         1.0|        7.0|  0.5|    0.5|      1.65|         0.0|                  0.3

In [3]:
new_dataset = df.select("passenger_count","pulocationid","dolocationid","total_amount")
new_dataset.show(10)

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [7]:
trainDF, testDF = new_dataset.randomSplit([0.8, 0.2])

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

In [9]:
assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features"
)

dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="total_amount"
).setMaxBins(32)

pipeline = Pipeline(stages=[assembler, dt])

In [10]:
model = pipeline.fit(trainDF)

                                                                                

In [11]:
predictions = model.transform(testDF)

predictions.select(
    "passenger_count",
    "pulocationid",
    "dolocationid",
    "total_amount",
    "prediction"
).show()

[Stage 18:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            0.0|         1.0|         1.0|        90.0|22.152742992951815|
|            0.0|         4.0|        79.0|        9.75|12.843170374378925|
|            0.0|         4.0|        80.0|       15.95|12.843170374378925|
|            0.0|         4.0|       137.0|        9.35|14.284385147633689|
|            0.0|         4.0|       170.0|       11.15|14.284385147633689|
|            0.0|         7.0|         7.0|         8.8|22.152742992951815|
|            0.0|         7.0|        42.0|        29.3|12.843170374378925|
|            0.0|         7.0|        48.0|       21.95|12.843170374378925|
|            0.0|         7.0|       138.0|        10.8|14.284385147633689|
|            0.0|         7.0|       162.0|        13.8|14.284385147633689|
|           

                                                                                

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

In [13]:
evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.3f}")



Root Mean Squared Error (RMSE) on test data = 31.287


                                                                                