In [47]:
from pyspark.sql import SparkSession

In [48]:
spark = SparkSession.builder.appName("a4").getOrCreate()

In [49]:
# 1. Creating the dataset
file_path = "gs://dataproc-staging-us-central1-1022936215757-ezcuz3ds/data/reduced_2019.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [50]:
# 2. Create trainDF and testDF
trainDF, testDF = df.randomSplit([0.8,0.2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")



There are 2920384 rows in the training set, and 730615 in the test set


                                                                                

In [51]:
from pyspark.ml.feature import VectorAssembler

In [52]:
vecAss = VectorAssembler(inputCols=["passenger_count", "pulocationid", "dolocationid"], outputCol="features")
vecTrainDF = vecAss.transform(trainDF)
vecTrainDF.show(5)

[Stage 137:>                                                        (0 + 1) / 1]

+---------------+------------+------------+------------+-------------+
|passenger_count|pulocationid|dolocationid|total_amount|     features|
+---------------+------------+------------+------------+-------------+
|            0.0|         1.0|         1.0|        90.0|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|      101.39|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|      116.75|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|      132.35|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|       172.8|[0.0,1.0,1.0]|
+---------------+------------+------------+------------+-------------+
only showing top 5 rows



                                                                                

In [53]:
from pyspark.ml.regression import DecisionTreeRegressor

In [54]:
dtr = DecisionTreeRegressor(featuresCol='features', labelCol = 'total_amount')
dtr.setMaxBins(64)
lrModel = dtr.fit(vecTrainDF)

                                                                                

In [55]:
from pyspark.ml import Pipeline

In [56]:
pipeline = Pipeline(stages = [vecAss, dtr])
pipelineModel = pipeline.fit(trainDF)

                                                                                

In [59]:
predDF = pipelineModel.transform(testDF)
predDF.select('passenger_count','pulocationid', 'dolocationid','prediction' ).show(10)

[Stage 168:>                                                        (0 + 1) / 1]

+---------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|        prediction|
+---------------+------------+------------+------------------+
|            0.0|         1.0|         1.0| 18.40309286568426|
|            0.0|         4.0|         4.0| 18.40309286568426|
|            0.0|         4.0|         4.0| 18.40309286568426|
|            0.0|         4.0|        68.0|13.650920445112941|
|            0.0|         4.0|        79.0|13.650920445112941|
|            0.0|         4.0|        80.0|13.650920445112941|
|            0.0|         4.0|       107.0|13.650920445112941|
|            0.0|         4.0|       114.0|13.650920445112941|
|            0.0|         4.0|       161.0|13.650920445112941|
|            0.0|         4.0|       161.0|13.650920445112941|
+---------------+------------+------------+------------------+
only showing top 10 rows



                                                                                

In [60]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="total_amount",
    metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
print(rmse)



50.98573622802005


                                                                                