### Modelling

In [1]:
import pandas as pd

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName('MAST30034 Tutorial 3')
    .config('spark.sql.repl.eagerEval.enabled', True) 
    .config('spark.sql.parquet.cacheMetadata', 'true')
    .getOrCreate()
)

22/08/21 13:14:58 WARN Utils: Your hostname, mast30034 resolves to a loopback address: 127.0.1.1; using 45.113.234.45 instead (on interface eth0)
22/08/21 13:14:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/21 13:14:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/21 13:14:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/21 13:14:59 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/08/21 13:14:59 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/08/21 13:14:59 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
22/08/21 13:14:59 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
22/08/21 13:14:59 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.


In [3]:
# get taxi_weather data for all months
taxi_weather_data = spark.read.parquet('../data/curated/taxi_weather_data/final-2018-10.parquet')

files = ['final-2018-11.parquet', 'final-2018-12.parquet', 'final-2019-01.parquet', 
         'final-2019-02.parquet', 'final-2019-03.parquet']

for file in files:
    taxi_weather_month = spark.read.parquet(f'../data/curated/taxi_weather_data/{file}')
    # add each month taxi_weather_data to october dataframe
    taxi_weather_data = taxi_weather_data.union(taxi_weather_month)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
taxi_weather_data.printSchema()

root
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_day: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- average_temp: double (nullable = true)
 |-- average_dew_point: double (nullable = true)
 |-- average_wind_speed: double (nullable = true)



In [17]:
# sample the data 
taxi_weather_data = taxi_weather_data.sample(withReplacement=None, fraction=0.05, seed=5)

### Linear regression model
#### Using features:
- 'passenger_count
- 'trip_distance'
- 'PULocationID'
- 'RatecodeID'
- 'average_temp' 
- 'average_dew_point'
- 'average_wind_speed'
- 'total_amount'

In [18]:
# one hot encode categorical data - PUlocation, DOlocation before linear regression
from pyspark.ml.feature import OneHotEncoder

# one hot encoder PULocationID
onehot_pu = OneHotEncoder(inputCols=['PULocationID'],outputCols=['PULocation_dummy'])

# apply the one hot encoder to the taxi_weather data
taxi_weather_data2 = onehot_pu.fit(taxi_weather_data).transform(taxi_weather_data)

# display results
taxi_weather_data2.select('PULocationID', 'PULocation_dummy').distinct().sort('PULocation_dummy').show()

+------------+----------------+
|PULocationID|PULocation_dummy|
+------------+----------------+
|         263|     (263,[],[])|
|           1| (263,[1],[1.0])|
|           4| (263,[4],[1.0])|
|           7| (263,[7],[1.0])|
|          10|(263,[10],[1.0])|
|          11|(263,[11],[1.0])|
|          12|(263,[12],[1.0])|
|          13|(263,[13],[1.0])|
|          14|(263,[14],[1.0])|
|          15|(263,[15],[1.0])|
|          17|(263,[17],[1.0])|
|          18|(263,[18],[1.0])|
|          21|(263,[21],[1.0])|
|          24|(263,[24],[1.0])|
|          25|(263,[25],[1.0])|
|          26|(263,[26],[1.0])|
|          28|(263,[28],[1.0])|
|          32|(263,[32],[1.0])|
|          33|(263,[33],[1.0])|
|          35|(263,[35],[1.0])|
+------------+----------------+
only showing top 20 rows



In [31]:
# one hot encoder DOLocationID
onehot_rate = OneHotEncoder(inputCols=['RatecodeID'], outputCols=['RatecodeID_dummy'])

# apply the one hot encoder to the taxi_weather data
taxi_weather_onehot = onehot_rate.fit(taxi_weather_data2).transform(taxi_weather_data2)

# display results
taxi_weather_onehot.select('PULocationID', 'PULocation_dummy','RatecodeID', 'RatecodeID_dummy').distinct().sort('PULocation_dummy').show()

+------------+----------------+----------+----------------+
|PULocationID|PULocation_dummy|RatecodeID|RatecodeID_dummy|
+------------+----------------+----------+----------------+
|         263|     (263,[],[])|       2.0|   (5,[2],[1.0])|
|         263|     (263,[],[])|       5.0|       (5,[],[])|
|         263|     (263,[],[])|       1.0|   (5,[1],[1.0])|
|           1| (263,[1],[1.0])|       5.0|       (5,[],[])|
|           4| (263,[4],[1.0])|       1.0|   (5,[1],[1.0])|
|           7| (263,[7],[1.0])|       1.0|   (5,[1],[1.0])|
|          10|(263,[10],[1.0])|       2.0|   (5,[2],[1.0])|
|          10|(263,[10],[1.0])|       5.0|       (5,[],[])|
|          11|(263,[11],[1.0])|       1.0|   (5,[1],[1.0])|
|          12|(263,[12],[1.0])|       1.0|   (5,[1],[1.0])|
|          13|(263,[13],[1.0])|       1.0|   (5,[1],[1.0])|
|          14|(263,[14],[1.0])|       1.0|   (5,[1],[1.0])|
|          14|(263,[14],[1.0])|       5.0|       (5,[],[])|
|          15|(263,[15],[1.0])|       5.

In [32]:
# adapted from MAST30034: Tutorial 3
# VectorAssembler creates new vectors from existing columns
from pyspark.ml.feature import VectorAssembler

features = 'features'
input_cols = ['trip_distance', 'PULocationID', 'RatecodeID', 'average_temp', 
              'average_dew_point', 'average_wind_speed', 'total_amount']

assembler = VectorAssembler(
    # which column to combine
    inputCols=input_cols, 
    # How should the combined columns be named
    outputCol=features
)

taxi_weather_final = assembler.transform(taxi_weather_onehot.dropna('any'))

In [33]:
# split data into training and testing data
train_df, test_df = taxi_weather_final.randomSplit([0.7, 0.3], seed = 5)


In [38]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# train linear regression model on training taxi_weather data
lm = LinearRegression(featuresCol='features', labelCol='tip_amount').fit(train_df)

# Create predictions for the test data
predictions = lm.transform(test_df)

# Analyse predictions
predictions.select("prediction","tip_amount", "features").show(20)
# Calculate the RMSE
print("RMSE for linear regression model: ")
RegressionEvaluator(labelCol='tip_amount', metricName='rmse').evaluate(predictions)


22/08/21 18:40:10 WARN Instrumentation: [a7b833ae] regParam is zero, which might cause numerical instability and overfitting.


                                                                                

+------------------+----------+--------------------+
|        prediction|tip_amount|            features|
+------------------+----------+--------------------+
|2.2367436806679435|       3.0|[3.61,43.0,1.0,20...|
| 8.332002207304512|       5.5|[17.4,132.0,2.0,2...|
|  7.94951035679618|       8.3|[10.8,138.0,1.0,2...|
|2.9479861869889814|       0.0|[6.2,43.0,1.0,201...|
|0.9300884236261381|       0.5|[0.8,232.0,1.0,20...|
|1.5726231907874506|       1.2|[2.2,137.0,1.0,20...|
|11.336926527043447|     12.61|[16.99,132.0,2.0,...|
|3.6417363641327456|      3.86|[4.3,246.0,1.0,20...|
|2.1367908686628847|      2.35|[2.2,230.0,1.0,20...|
|1.0364049556699126|      1.55|[0.9,246.0,1.0,20...|
|1.8401940582770522|       1.5|[2.7,114.0,1.0,21...|
|1.2328581443051017|      1.46|[0.93,186.0,1.0,2...|
|0.8580143306347131|      1.25|[1.1,137.0,1.0,21...|
|0.8043494297349016|      1.06|[0.43,162.0,1.0,2...|
| 2.282663740405829|      2.55|[2.5,90.0,1.0,213...|
|1.5624212768414958|      1.65|[0.8,162.0,1.0,

                                                                                

1.1388184043549339

In [35]:
# Access coefficients
pd.DataFrame(
    data=[lm.intercept] + list(lm.coefficients),
    index=['intercept'] + input_cols,
    columns=['coefficient']
)

Unnamed: 0,coefficient
intercept,0.114543
trip_distance,-0.344418
PULocationID,0.000364
RatecodeID,-0.844727
average_temp,6.1e-05
average_dew_point,0.000134
average_wind_speed,0.000289
total_amount,0.246659


### Decision tree model

In [36]:
from pyspark.ml.regression import DecisionTreeRegressor
# train decision tree model on training taxi_weather data
dt_model = DecisionTreeRegressor(featuresCol ='features', labelCol = 'tip_amount').fit(train_df)

# Create predictions for the test data
dt_predictions = dt_model.transform(test_df)

# Calculate the RMSE
print('RMSE for decision tree model: ')
RegressionEvaluator(labelCol='tip_amount', metricName="rmse").evaluate(dt_predictions)

                                                                                

RMSE for decision tree model: 


                                                                                

1.2653997265332835