In [2]:
from pyspark.sql import SparkSession
filePath = "2019-04.csv"
#make the spark session and import the file

In [3]:
spark = SparkSession.builder.appName("TestApp").getOrCreate()

In [4]:
#read the dataset, and then get the rows, debug as well to make sure things work
taxiDF = spark.read.csv(filePath, header=True,inferSchema = True)
print("Original top 5 rows")
taxiDF.select("passenger_count","pulocationid","dolocationid","total_amount").show(5)
#split the dataset
trainDF, testDF = taxiDF.randomSplit([0.8,0.2], seed=42)

Original top 5 rows
+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       239.0|       239.0|         8.8|
|            1.0|       230.0|       100.0|         8.3|
|            1.0|        68.0|       127.0|       47.75|
|            1.0|        68.0|        68.0|         7.3|
|            1.0|        50.0|        42.0|       23.15|
+---------------+------------+------------+------------+
only showing top 5 rows



In [5]:
#transformer
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["passenger_count", "pulocationid","dolocationid"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)

In [6]:
#make the decision tree model
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol="features",labelCol="total_amount")

In [7]:
#train the model
from pyspark.ml import Pipeline
pipeline = Pipeline (stages=[vecAssembler,dt])
pipelineModel = pipeline.fit(trainDF)

In [8]:
#test dataset stuff
predDF = pipelineModel.transform(testDF)
print("Prediction 10 rows")
predDF.select("passenger_count","pulocationid","dolocationid","prediction").show(10)

Prediction 10 rows
+---------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|        prediction|
+---------------+------------+------------+------------------+
|            1.0|       246.0|        68.0| 16.94557683958677|
|            1.0|       186.0|       261.0|23.242477286221444|
|            3.0|       144.0|       170.0|16.748857527037178|
|            1.0|       100.0|       170.0|17.810163163628932|
|            1.0|       161.0|       107.0| 16.94557683958677|
|            1.0|       239.0|        24.0|19.217579401582324|
|            1.0|       148.0|         4.0|25.278734609579708|
|            0.0|       186.0|        13.0|31.951581628420044|
|            1.0|       249.0|       144.0|14.893731419692985|
|            1.0|       249.0|        87.0| 16.94557683958677|
+---------------+------------+------------+------------------+
only showing top 10 rows



In [9]:
#check RMSE
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="total_amount",
    metricName="rmse"
)
rmse = regressionEvaluator.evaluate(predDF)
print("RMSE: ",rmse)

RMSE:  324.8517780082482


In [10]:
spark.stop()