In [1]:
from pyspark.sql import SparkSession
from time import time
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("A4SparkSQLApp").getOrCreate()


file = "gs://dataproc-staging-us-central1-493734936652-dwhhoqnb/2019-01-h1.csv"

df = spark.read.csv(file, header=True, inferSchema=True)


25/04/29 00:39:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [4]:
#1
df = df.select("passenger_count", "pulocationid", "dolocationid", "total_amount")
df.show(10)

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [5]:
#2
trainDF, testDF = df.randomSplit([.8,.2],seed=42)

In [11]:
#3 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor

vecAssembler = VectorAssembler(inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="total_amount")
dt = dt.setMaxBins(128)

In [12]:
#4
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, dt])


In [13]:
#5
model = pipeline.fit(trainDF)

                                                                                

In [14]:
# 6
predict = model.transform(testDF)

predict.select("passenger_count", "pulocationid", "dolocationid", "total_amount", "prediction").show(10)


[Stage 32:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            0.0|         4.0|         4.0|         4.3| 24.03192497663894|
|            0.0|         4.0|        33.0|       17.75|19.232590849471666|
|            0.0|         4.0|        68.0|        15.8| 17.01111277291278|
|            0.0|         4.0|        79.0|        9.75| 17.01111277291278|
|            0.0|         4.0|       125.0|         9.3| 17.01111277291278|
|            0.0|         4.0|       170.0|       11.15| 17.01111277291278|
|            0.0|         7.0|         7.0|        0.31|19.232590849471666|
|            0.0|         7.0|         7.0|         6.3|19.232590849471666|
|            0.0|         7.0|       112.0|        16.8| 17.01111277291278|
|            0.0|         7.0|       138.0|        10.8| 17.01111277291278|
+-----------

                                                                                

In [18]:
#7
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predict)
print(f"RMSE is {rmse}")



RMSE is 24.620677651840293


