***STEP 1: Creating Dataset***

In [2]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("CS131 HW Data Analysis").getOrCreate()

# Define the path to your data in GCS
gcs_path = "gs://dataproc-staging-us-central1-156990865139-iitllk4z/data/2019-01-h1.csv"

df = spark.read.csv(gcs_path, header=True, inferSchema=True)
# Select relevant columns: passenger_count (4th), pulocationid (8th), dolocationid (9th), total_amount (17th)
df_selected = df.select(df.columns[3], df.columns[7], df.columns[8], df.columns[16])
# Show first 10 rows
df_selected.show(10)

25/04/23 19:53:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



***STEP 2: CREATING TRAINING AND TESTING DATASETS***

In [6]:
trainDF1, testDF1 = df_selected.randomSplit([0.8, 0.2], seed = 42)
trainDF1.show(10)

[Stage 5:>                                                          (0 + 1) / 1]

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            0.0|         1.0|         1.0|        90.0|
|            0.0|         1.0|         1.0|      101.39|
|            0.0|         4.0|         4.0|         4.3|
|            0.0|         4.0|         4.0|         4.8|
|            0.0|         4.0|         4.0|        5.75|
|            0.0|         4.0|        33.0|       17.75|
|            0.0|         4.0|        68.0|        15.8|
|            0.0|         4.0|        68.0|       16.55|
|            0.0|         4.0|        79.0|         5.3|
|            0.0|         4.0|        79.0|         5.8|
+---------------+------------+------------+------------+
only showing top 10 rows



                                                                                

**randomSplit messed up training and testing data for some reason, used another method to split data**

In [3]:
trainDF = df_selected.sample(withReplacement=False, fraction=0.8, seed=42)
testDF = df_selected.subtract(trainDF)

In [4]:
trainDF.show(10)

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            1.0|       163.0|       229.0|        9.05|
|            2.0|       141.0|       234.0|        13.0|
|            2.0|       246.0|       162.0|       19.55|
|            1.0|       238.0|       151.0|         8.5|
|            1.0|       163.0|        25.0|       42.95|
+---------------+------------+------------+------------+
only showing top 10 rows



In [7]:
print("Train:", trainDF.count(), "Test:", testDF.count(), "Total:", df_selected.count())



Train: 2920930 Test: 136146 Total: 3650999


                                                                                

***STEP 3: CREATING DECISION TREE***

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor


In [11]:
#Create features into single vector
assembler = VectorAssembler(inputCols=['passenger_count', 'pulocationid', 'dolocationid'], outputCol='features')
vecTrain = assembler.transform(trainDF)
vecTrain.select("passenger_count", "pulocationid", "dolocationid" , "features", "total_amount").show(10)

# Initialize decision tree regressor
dt_regressor = DecisionTreeRegressor(featuresCol='features', labelCol='total_amount', maxBins = 40)

+---------------+------------+------------+-----------------+------------+
|passenger_count|pulocationid|dolocationid|         features|total_amount|
+---------------+------------+------------+-----------------+------------+
|            1.0|       151.0|       239.0|[1.0,151.0,239.0]|        9.95|
|            1.0|       239.0|       246.0|[1.0,239.0,246.0]|        16.3|
|            5.0|       193.0|       193.0|[5.0,193.0,193.0]|        7.55|
|            5.0|       193.0|       193.0|[5.0,193.0,193.0]|       55.55|
|            5.0|       193.0|       193.0|[5.0,193.0,193.0]|       13.31|
|            1.0|       163.0|       229.0|[1.0,163.0,229.0]|        9.05|
|            2.0|       141.0|       234.0|[2.0,141.0,234.0]|        13.0|
|            2.0|       246.0|       162.0|[2.0,246.0,162.0]|       19.55|
|            1.0|       238.0|       151.0|[1.0,238.0,151.0]|         8.5|
|            1.0|       163.0|        25.0| [1.0,163.0,25.0]|       42.95|
+---------------+--------

***STEP 4: CREATING PIPELINE***

In [12]:
from pyspark.ml import Pipeline

In [13]:
pipe = Pipeline(stages=[assembler, dt_regressor])

***STEP 5: TRAINING MODEL***

In [14]:
model = pipe.fit(trainDF)

                                                                                

***STEP 6: SHOWING PREDICTATED RESULTS***

In [24]:
# Make predictions
pred = model.transform(testDF)

# Show first 10 predictions along with features
pred.select('passenger_count', 'pulocationid', 'dolocationid', 'prediction', 'total_amount').show(10)



+---------------+------------+------------+------------------+------------+
|passenger_count|pulocationid|dolocationid|        prediction|total_amount|
+---------------+------------+------------+------------------+------------+
|            1.0|        50.0|        48.0| 13.32710946479575|        9.68|
|            1.0|        51.0|        18.0|20.644553580295813|        22.3|
|            1.0|        61.0|        82.0| 13.32710946479575|        29.8|
|            1.0|        68.0|        14.0|20.644553580295813|        45.3|
|            1.0|        74.0|       130.0| 13.32710946479575|       43.56|
|            1.0|       132.0|       230.0| 49.33787412069614|        64.8|
|            1.0|       143.0|       100.0|13.537444086232135|       11.03|
|            1.0|       151.0|       161.0|12.202640378959797|        21.0|
|            1.0|       161.0|       232.0|12.202640378959797|       18.95|
|            1.0|       164.0|        87.0|13.537444086232135|       22.85|
+-----------

                                                                                

***STEP 7: EVALUATING RESULTS***

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

In [26]:
evaluator = RegressionEvaluator(
    labelCol="total_amount", 
    predictionCol="prediction", 
    metricName="rmse"
)

rmse = evaluator.evaluate(pred)
print(f"RMSE: {rmse}")



RMSE: 138.45239637546695


                                                                                

The RMSE is extremely high, showing that this model seems to not be accurate in predicting total amount. 