<h1>Random Forest - Number of Trips<h1>

In [1]:
from tensorflow import keras
from tensorflow.keras.layers import Dense, Normalization

In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from pyspark.ml.tuning import CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import sum,avg,max,min,mean,count
import numpy as np
import pandas as pd

In [3]:
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

In [4]:
spark = (
    SparkSession.builder.appName("Random Forest - Number of Trips")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.driver.memory", "8g")
    .config("spark.sql.parquet.enableVectorizedReader", False)
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

22/08/16 23:23:17 WARN Utils: Your hostname, Sens-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.11 instead (on interface en0)
22/08/16 23:23:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/16 23:23:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/16 23:23:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/16 23:23:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
sdf = spark.read.parquet('../data/curated/combined_data')

                                                                                

In [6]:
sdf = sdf.groupBy('Date', 'Hour', 'PU_Location_ID').agg(avg("Temperature_C").alias("Temperature_C"), \
                                                           avg("Humidity_%").alias("Humidity_%"), \
                                                           avg("Speed_kmh").alias("Speed_kmh"), \
                                                           avg("Precip_Rate_mm").alias("Precip_rate_mm"), \
                                                           avg("Driver_pay").alias("Avg_driver_pay"), \
                                                           avg("Day_of_week").alias("Day_of_week"), \
                                                           count('Temperature_C').alias("Num_trips"))

In [7]:
sdf = sdf.orderBy('Date', 'PU_Location_ID', 'Hour')

<h3>Formatting for Model Fitting<h3>

In [8]:
# Assemble as vector for pyspark modelling
feature_list = []
for col in sdf.columns:
    if col == 'Date' or col == 'Avg_driver_pay' or col == 'Num_trips':
        continue
    else:
        feature_list.append(col)

assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

In [9]:
model_sdf = assembler.transform(sdf.dropna('any'))

In [10]:
# Index categorical features (maxCategories = 270 to account for all the different pick up location IDs)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=270).fit(model_sdf)

                                                                                

Split after ordering to get the same split as neural network models

In [11]:
model_sdf = model_sdf.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy('Date', 'PU_Location_ID', 'Hour')))
train_sdf = model_sdf.where("rank <= .8").drop("rank")
test_sdf = model_sdf.where("rank > .8").drop("rank")

In [12]:
rf = RandomForestRegressor(featuresCol="indexedFeatures", labelCol='Num_trips', maxBins = 270)

pipeline = Pipeline(stages=[featureIndexer, rf])

In [13]:
model = pipeline.fit(train_sdf)

22/08/16 23:24:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:24:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:24:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/16 23:24:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:24:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 14:>                                                         (0 + 9) / 9]

22/08/16 23:24:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:24:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/16 23:24:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:24:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

In [14]:
# Make predictions.
predictions = model.transform(test_sdf).select('prediction').toPandas()

22/08/16 23:25:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:25:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:25:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/16 23:25:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:25:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

22/08/16 23:25:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:25:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/16 23:25:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:25:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

In [15]:
y_test = test_sdf.select('Num_trips').toPandas()

22/08/16 23:25:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:25:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:25:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/16 23:26:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:26:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/16 23:26:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:26:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:26:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/16 23:26:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

<h3>Basic Model Performance<h3>

In [18]:
errors = np.array(np.array(predictions) - y_test)
squared_errors = errors**2

mean_squared_error = squared_errors.mean()

print(f'MSE: {mean_squared_error}')

MSE: 3502.335507157777


In [19]:
tot_sum_squares = (np.array(y_test - y_test.mean())**2).sum()
r2 = 1 - (squared_errors.sum() / tot_sum_squares)
print(f'Model R^2: {r2:.4f}')

Model R^2: 0.7062


<h3>Save predictions for further analysis<h3>

In [22]:
pd.DataFrame(predictions).to_csv('../data/curated/model_data/num_trips_pred_rf.csv')