In [1]:
from pyspark.sql import SparkSession

session = SparkSession.builder.getOrCreate()
context = session.sparkContext
df = session.read.csv("trip_data_small.csv", header=True, inferSchema=True)
df.show(5)

21/09/21 07:47:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/21 07:47:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/09/21 07:47:38 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N|2013-01-01 15:11:48|2013-01-01 15:18:10|              4|              382|          1.0|      -73.978165|      40.757977|       -73.989838|       40.751171|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...| 

In [2]:
df_sel = df.select("medallion", "passenger_count", "trip_distance", "trip_time_in_secs")
df_sel.show(5)

+--------------------+---------------+-------------+-----------------+
|           medallion|passenger_count|trip_distance|trip_time_in_secs|
+--------------------+---------------+-------------+-----------------+
|89D227B655E5C82AE...|              4|          1.0|              382|
|0BD7C8F5BA12B88E0...|              1|          1.5|              259|
|0BD7C8F5BA12B88E0...|              1|          1.1|              282|
|DFD2202EE08F7A8DC...|              2|          0.7|              244|
|DFD2202EE08F7A8DC...|              1|          2.1|              560|
+--------------------+---------------+-------------+-----------------+
only showing top 5 rows



In [3]:
df_sel.stat.corr('trip_distance', 'trip_time_in_secs', method='pearson')

                                                                                

0.812064833071135

In [4]:
df_sel.stat.corr('trip_distance', 'passenger_count', method='pearson')

                                                                                

0.005809447362652621

In [6]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ["passenger_count", "trip_time_in_secs"], outputCol = "features")
df_vec = vectorAssembler.transform(df_sel)
df_vec.show(5)

+--------------------+---------------+-------------+-----------------+-----------+
|           medallion|passenger_count|trip_distance|trip_time_in_secs|   features|
+--------------------+---------------+-------------+-----------------+-----------+
|89D227B655E5C82AE...|              4|          1.0|              382|[4.0,382.0]|
|0BD7C8F5BA12B88E0...|              1|          1.5|              259|[1.0,259.0]|
|0BD7C8F5BA12B88E0...|              1|          1.1|              282|[1.0,282.0]|
|DFD2202EE08F7A8DC...|              2|          0.7|              244|[2.0,244.0]|
|DFD2202EE08F7A8DC...|              1|          2.1|              560|[1.0,560.0]|
+--------------------+---------------+-------------+-----------------+-----------+
only showing top 5 rows



In [7]:
df_prep = df_vec.select(['features', 'trip_distance'])
df_prep.show(5)

+-----------+-------------+
|   features|trip_distance|
+-----------+-------------+
|[4.0,382.0]|          1.0|
|[1.0,259.0]|          1.5|
|[1.0,282.0]|          1.1|
|[2.0,244.0]|          0.7|
|[1.0,560.0]|          2.1|
+-----------+-------------+
only showing top 5 rows



In [8]:
splits = df_prep.randomSplit([0.7, 0.3])
df_train = splits[0]
df_test = splits[1]

In [9]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='trip_distance', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(df_train)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

21/09/21 07:51:21 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/09/21 07:51:21 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Coefficients: [0.0,0.00519836959257972]
Intercept: -0.6666637461506705


In [10]:
lr_model_summ = lr_model.summary
print("RMSE: %f" % lr_model_summ.rootMeanSquaredError)
print("R2: %f" % lr_model_summ.r2)
print("numIterations: %d" % lr_model_summ.totalIterations)
print("objectiveHistory: %s" % str(lr_model_summ.objectiveHistory))
lr_model_summ.residuals.show(5)

RMSE: 2.021844
R2: 0.651278
numIterations: 3
objectiveHistory: [0.5000000000000001, 0.43358532503359065, 0.23006876770246557, 0.23006876770242157]


[Stage 10:>                                                         (0 + 1) / 1]

+------------------+
|         residuals|
+------------------+
|0.5743267617193561|
|0.6666637461506705|
|0.6666637461506705|
|0.6666637461506705|
|0.6666637461506705|
+------------------+
only showing top 5 rows



                                                                                

In [11]:
df_train.describe().show()



+-------+------------------+
|summary|     trip_distance|
+-------+------------------+
|  count|            419072|
|   mean|2.8529949268860793|
| stddev| 3.423801771231003|
|    min|               0.0|
|    max|             95.85|
+-------+------------------+



                                                                                

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_rmse = RegressionEvaluator(labelCol="trip_distance", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="trip_distance", predictionCol="prediction", metricName="r2")
lr_predictions = lr_model.transform(df_test)
print("RMSE on test data = %g" % evaluator_rmse.evaluate(lr_predictions))
print("R2 on test data = %g" % evaluator_r2.evaluate(lr_predictions))

                                                                                

RMSE on test data = 2.00746




R2 on test data = 0.655281


                                                                                

In [13]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'trip_distance')
dt_model = dt.fit(df_train)
dt_predictions = dt_model.transform(df_test)
print("RMSE on test data = %g" % evaluator_rmse.evaluate(dt_predictions))
print("R2 on test data = %g" % evaluator_r2.evaluate(dt_predictions))

21/09/21 07:54:30 WARN BlockManager: Asked to remove block broadcast_39_piece0, which does not exist
                                                                                

RMSE on test data = 1.93827




R2 on test data = 0.678633


                                                                                

In [14]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'trip_distance', maxIter=10)
gbt_model = gbt.fit(df_train)
gbt_predictions = gbt_model.transform(df_test)
print("RMSE on test data = %g" % evaluator_rmse.evaluate(gbt_predictions))
print("R2 on test data = %g" % evaluator_r2.evaluate(gbt_predictions))

                                                                                

RMSE on test data = 1.92355




R2 on test data = 0.683497


                                                                                

In [None]:
session.stop()