In [1]:
# This cell initiates a spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('a4').getOrCreate()

25/04/28 20:46:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
file_path = 'gs://dataproc-staging-northamerica-northeast2-450676183334-mwz98htd/notebooks/jupyter/2019-01-h1.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True)

# Select the columns to use manually
trimmed_df = df.select("passenger_count", "PULocationID", "DOLocationID", "total_amount")


trimmed_df.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|PULocationID|DOLocationID|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [3]:
# trainDF, testDF = df.randomSplit([.8, .2])

# Seed is used to make a split reproduceable? (unsure)
# 0.8 and 0.2 split the dataset to 80% use for training and 20% for testing
trainDF, testDF = trimmed_df.randomSplit([0.8, 0.2], seed=42)

In [4]:
from pyspark.ml.feature import VectorAssembler

In [5]:
# vecAss = VectorAssembler(inputCols = ['passenger_count'], outputCol = "features")

# Assemble to vector, features, "passenger_count", "PULocationID", "DOLocationID"
vecAss = VectorAssembler(inputCols = ["passenger_count", "PULocationID", "DOLocationID"], outputCol="features")

# vecTrainDF = vecAss.transform(trainDF)

# vecTrainDF.show(3)

In [13]:
#from pyspark.ml.regression import LinearRegression

#lr = LinearRegression(featuresCol = 'features', labelCol = 'fare_amount')

# Import the decision tree regressor instead of linear regression
from pyspark.ml.regression import DecisionTreeRegressor

# Set max bins ; maxBins=512 (NOT NEEDED)
dt = DecisionTreeRegressor(featuresCol="features", labelCol="total_amount")

In [14]:
from pyspark.ml import Pipeline

In [15]:
# Changed lt to dt for decision tree
pipeline = Pipeline(stages = [vecAss, dt])

pipelineModel = pipeline.fit(trainDF) # ML transformer DF --> DF + prediction

                                                                                

In [16]:
# predDF = pipelineModel.transform(testDF)
# predDF.show(4)

predDF = pipelineModel.transform(testDF)
predDF.select("passenger_count", "PULocationID", "DOLocationID", "prediction").show(10)


[Stage 34:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------------+
|passenger_count|PULocationID|DOLocationID|        prediction|
+---------------+------------+------------+------------------+
|            0.0|         4.0|         4.0| 17.69633136094663|
|            0.0|         4.0|        33.0| 17.69633136094663|
|            0.0|         4.0|        68.0|15.368079591565136|
|            0.0|         4.0|        79.0|15.368079591565136|
|            0.0|         4.0|       125.0|15.368079591565136|
|            0.0|         4.0|       170.0|15.368079591565136|
|            0.0|         7.0|         7.0| 17.69633136094663|
|            0.0|         7.0|         7.0| 17.69633136094663|
|            0.0|         7.0|       112.0|15.368079591565136|
|            0.0|         7.0|       138.0|15.368079591565136|
+---------------+------------+------------+------------------+
only showing top 10 rows




                                                                                

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
evalr = RegressionEvaluator(predictionCol='prediction',
                            labelCol='total_amount',
                            metricName='rmse')

rmse = evalr.evaluate(predDF)
print(rmse)



24.87634060519545



                                                                                