In [38]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

In [39]:
spark = SparkSession.builder.appName("A4").getOrCreate()
file = "gs://dataproc-staging-us-central1-734514214178-tdqcky8f/data/a4set.csv"
df = spark.read.csv(file, header=True, inferSchema=True)
df.select("passenger_count", "pulocationid", "dolocationid", "total_amount").show(5)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
+---------------+------------+------------+------------+
only showing top 5 rows



In [40]:
# Ensure that all columns are doubles, not strings (since all values are numeric we can cast all)
for column in df.columns:
    df = df.withColumn(column, col(column).cast(DoubleType()))
    
df.printSchema()

# Split into training and test sets
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

root
 |-- passenger_count: double (nullable = true)
 |-- pulocationid: double (nullable = true)
 |-- dolocationid: double (nullable = true)
 |-- total_amount: double (nullable = true)





There are 2920360 rows in the training set, and 730639 in the test set


                                                                                

In [41]:
df.show(5)
trainDF.show(5)

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
+---------------+------------+------------+------------+
only showing top 5 rows



[Stage 138:>                                                        (0 + 1) / 1]

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            0.0|         1.0|         1.0|        90.0|
|            0.0|         1.0|         1.0|      101.39|
|            0.0|         1.0|         1.0|      116.75|
|            0.0|         1.0|         1.0|      132.35|
|            0.0|         1.0|         1.0|       172.8|
+---------------+------------+------------+------------+
only showing top 5 rows



                                                                                

In [42]:
# Assemble a features column with vector assembler
vecAsm = VectorAssembler(inputCols = ["passenger_count", "pulocationid", "dolocationid"], outputCol = "rawFeatures")
vecTrainDF = vecAsm.transform(trainDF)
vecTrainDF.show(5)

[Stage 139:>                                                        (0 + 1) / 1]

+---------------+------------+------------+------------+-------------+
|passenger_count|pulocationid|dolocationid|total_amount|  rawFeatures|
+---------------+------------+------------+------------+-------------+
|            0.0|         1.0|         1.0|        90.0|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|      101.39|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|      116.75|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|      132.35|[0.0,1.0,1.0]|
|            0.0|         1.0|         1.0|       172.8|[0.0,1.0,1.0]|
+---------------+------------+------------+------------+-------------+
only showing top 5 rows



                                                                                

In [43]:
# Indexer for identifying categorical values stored as numbers (ex. IDs)
fi = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=3).fit(vecTrainDF)

# Train a DecisionTree model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="total_amount")

# Chain vector assembler and indexer alongside decision tree model
pipeline = Pipeline(stages=[vecAsm, fi, dt])

# Train and predict
model = pipeline.fit(trainDF)
predictions = model.transform(testDF)

predictions.select("prediction", "total_amount", "features").show(5)

# Compute error
evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE on test data = %g" % rmse)

                                                                                

+------------------+------------+--------------+
|        prediction|total_amount|      features|
+------------------+------------+--------------+
| 21.54906337558414|      114.35| [0.0,1.0,1.0]|
| 21.54906337558414|         3.8| [0.0,4.0,4.0]|
| 21.54906337558414|         4.8| [0.0,4.0,4.0]|
|13.609029548795787|        12.8|[0.0,4.0,68.0]|
|13.609029548795787|        6.35|[0.0,4.0,79.0]|
+------------------+------------+--------------+
only showing top 5 rows





RMSE on test data = 60.8906


                                                                                