In [28]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import PipelineModel
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator

try:
    spark.stop()
except NameError:
    pass

spark = SparkSession.builder \
        .appName('taxi') \
        .master('local[*]') \
        .config("spark.dynamicAllocation.enabled", "false") \
        .getOrCreate()


25/04/27 05:05:45 INFO SparkEnv: Registering MapOutputTracker
25/04/27 05:05:45 INFO SparkEnv: Registering BlockManagerMaster
25/04/27 05:05:45 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/04/27 05:05:45 INFO SparkEnv: Registering OutputCommitCoordinator


In [29]:
file = 'gs://dataproc-staging-us-central1-100317476977-28dz6rb8/data/yellow_tripdata_2019-01.csv'

df = spark.read.csv(file, header=True, inferSchema=True)
df.show(3)


                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2019-01-01 00:46:40|  2019-01-01 00:53:20|              1|          1.5|         1|                 N|         151|         239|           1|        7.0|  0.5|    0.5|      1.65|         0.0|                  0.3

In [30]:
df = df.select('passenger_count', 'pulocationid', 'dolocationid', 'total_amount')


In [31]:
trainDF, testDF = df.randomSplit([.8, .2], seed=42)

trainDF.groupBy("passenger_count") \
  .count() \
  .orderBy("passenger_count") \
  .show()

trainDF.show(3)

                                                                                

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              0|  94072|
|              1|4363627|
|              2| 891053|
|              3| 251987|
|              4| 112819|
|              5| 259005|
|              6| 160781|
|              7|     14|
|              8|     20|
|              9|      8|
+---------------+-------+



[Stage 6:>                                                          (0 + 1) / 1]

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|              0|           1|           1|        90.0|
|              0|           1|           1|      101.39|
|              0|           1|           1|      132.35|
+---------------+------------+------------+------------+
only showing top 3 rows



                                                                                

In [32]:
vecAssembler = VectorAssembler(inputCols=['passenger_count', 'pulocationid', 'dolocationid'], outputCol='features')
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select('features').show(5)



[Stage 7:>                                                          (0 + 1) / 1]

+-------------+
|     features|
+-------------+
|[0.0,1.0,1.0]|
|[0.0,1.0,1.0]|
|[0.0,1.0,1.0]|
|[0.0,1.0,1.0]|
|[0.0,4.0,4.0]|
+-------------+
only showing top 5 rows



                                                                                

In [33]:
lr = LinearRegression(featuresCol='features', labelCol='total_amount')
print(">>> actual master is:", spark.sparkContext.master)

>>> actual master is: local[*]


In [34]:
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)


25/04/27 05:07:47 WARN Instrumentation: [24f52980] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [36]:
predDF = pipelineModel.transform(testDF)
# predDF.select('passenger_count', 'polocationID', 'DOLocationID', 'total_amount', prediction).show(10)
predDF.show(10)

[Stage 12:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------+---------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|       features|        prediction|
+---------------+------------+------------+------------+---------------+------------------+
|              0|           1|           1|      116.75|  [0.0,1.0,1.0]|20.335386347052818|
|              0|           4|           4|         4.3|  [0.0,4.0,4.0]|20.250426101914126|
|              0|           4|           4|        5.75|  [0.0,4.0,4.0]|20.250426101914126|
|              0|           4|          68|        15.8| [0.0,4.0,68.0]| 19.45677629339708|
|              0|           4|          79|         7.8| [0.0,4.0,79.0]|19.320367732558214|
|              0|           4|          90|        12.3| [0.0,4.0,90.0]|19.183959171719348|
|              0|           4|         107|       10.55|[0.0,4.0,107.0]| 18.97314594133201|
|              0|           4|         144|        9.45|[0.0,4.0,144.0]|18.51431

                                                                                

In [40]:
regressionEvaluator = RegressionEvaluator(
    predictionCol='prediction',
    labelCol='total_amount',
    metricName='rmse'
)
rmse = regressionEvaluator.evaluate(predDF)
print(rmse)



504.1134552645679


                                                                                