In [1]:
import numpy as np
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

from pyspark.sql.functions import col, variance
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml import Pipeline
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, BatchNormalization

from sklearn.metrics import r2_score, mean_squared_error
from sklearn.preprocessing import StandardScaler

2022-08-24 22:54:09.248815: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2022-08-24 22:54:09.388907: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-08-24 22:54:09.388927: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [2]:
sp = (
    SparkSession
    .builder.appName("Model")
    .config("spark.sql.session.timeZone", "-04")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .getOrCreate()
)
sp

22/08/24 22:54:16 WARN Utils: Your hostname, J-L resolves to a loopback address: 127.0.1.1; using 172.20.230.224 instead (on interface eth0)
22/08/24 22:54:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/24 22:54:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/24 22:54:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/24 22:54:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/08/24 22:54:18 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


## Load data

The next few lines will load processed taxi data and and further process it so that it is readable by the model. The data is also randomly being sampled to avoid space constraints

In [3]:
taxi_train = sp.read.parquet("../data/processed/taxis_train/").drop("year", "day")
taxi_train.show(5)

                                                                                

+-----+--------+------------+------------+-----------+-------------+----------+---------------+-----+-----------+---------+--------+----------+---------+--------+-------------+
|month|time_bin|PULocationID|DOLocationID|day_of_week|trip_duration|peak_hours|weather_pattern|count|Temperature|Dew Point|Humidity|Wind Speed|Wind Gust|Pressure|Precipitation|
+-----+--------+------------+------------+-----------+-------------+----------+---------------+-----+-----------+---------+--------+----------+---------+--------+-------------+
|   12|      10|          68|         170|          2|          834|         1|              0| 2114|       36.0|     21.0|    55.0|      10.0|      0.0|    30.1|          0.0|
|   12|      10|         161|         234|          2|          389|         1|              0| 2114|       36.0|     21.0|    55.0|      10.0|      0.0|    30.1|          0.0|
|   12|      10|         107|         224|          2|          377|         1|              0| 2114|       36.0|  

In [4]:
X_train = taxi_train.filter((col("year") == 2020) | (col("month") < 9))
X_val = taxi_train.filter((col("year") == 2021) & (col("month") > 8))

print(X_train.count())
print(X_val.count())

10462531
4588377


Trying a GLM with gaussian distribution on a spark distributed base

In [5]:
X_train.printSchema()

root
 |-- month: integer (nullable = true)
 |-- time_bin: integer (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- trip_duration: long (nullable = true)
 |-- peak_hours: integer (nullable = true)
 |-- weather_pattern: integer (nullable = true)
 |-- count: long (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Dew Point: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed: double (nullable = true)
 |-- Wind Gust: double (nullable = true)
 |-- Pressure: double (nullable = true)
 |-- Precipitation: double (nullable = true)



Converting required columns to categorical

In [61]:
categories = ["month", "time_bin", "PULocationID", "DOLocationID", "day_of_week", "weather_pattern"]

indexers = [StringIndexer(inputCol=c, outputCol=c+"_index") for c in categories]
encoders = [OneHotEncoder(inputCol=c+"_index", outputCol=c+"_encoded") for c in categories]
#assembler = [VectorAssembler(inputCols=[c for X_train], outputCol=c+"_assembled") for c in X_train.columns if "_encoded" in c]
pipeline = Pipeline(stages=indexers + encoders) #+ assembler)

In [62]:
X_train_transformed = pipeline.fit(X_train.dropna()).transform(X_train.dropna())
X_train_transformed.head(1)

                                                                                

[Row(month=8, time_bin=5, PULocationID=246, DOLocationID=163, day_of_week=4, trip_duration=1356, peak_hours=0, weather_pattern=1, count=2575, Temperature=76.0, Dew Point=69.0, Humidity=79.0, Wind Speed=6.0, Wind Gust=0.0, Pressure=30.0, Precipitation=0.0, month_index=0.0, time_bin_index=7.0, PULocationID_index=22.0, DOLocationID_index=19.0, day_of_week_index=2.0, weather_pattern_index=1.0, month_encoded=SparseVector(11, {0: 1.0}), time_bin_encoded=SparseVector(23, {7: 1.0}), PULocationID_encoded=SparseVector(59, {22: 1.0}), DOLocationID_encoded=SparseVector(59, {19: 1.0}), day_of_week_encoded=SparseVector(4, {2: 1.0}), weather_pattern_encoded=SparseVector(2, {1: 1.0}))]

In [63]:
assembler = VectorAssembler(
    inputCols= [c for c in X_train_transformed.drop("trip_duration").columns if not (c in categories) and not ("_index" in c)],
    outputCol="features"
)

In [64]:
X_train_assembled = assembler.transform(X_train_transformed)
X_train_assembled.select("features", "trip_duration").show(2)

+--------------------+-------------+
|            features|trip_duration|
+--------------------+-------------+
|(167,[1,2,3,4,5,7...|         1356|
|(167,[1,2,3,4,5,7...|         1349|
+--------------------+-------------+
only showing top 2 rows



In [58]:
X_train_assembled.select("features", "trip_duration").withColumnRenamed("trip_duration", "label").head(2)

[Row(features=SparseVector(167, {1: 2575.0, 2: 76.0, 3: 69.0, 4: 79.0, 5: 6.0, 7: 30.0, 9: 1.0, 27: 1.0, 65: 1.0, 121: 1.0, 163: 1.0, 166: 1.0}), label=1356),
 Row(features=SparseVector(167, {1: 2575.0, 2: 76.0, 3: 69.0, 4: 79.0, 5: 6.0, 7: 30.0, 9: 1.0, 27: 1.0, 44: 1.0, 136: 1.0, 163: 1.0, 166: 1.0}), label=1349)]

In [65]:
glr1 = GeneralizedLinearRegression(family="gaussian", link="log", maxIter=10, regParam=0.3)

model1 = glr1.fit(X_train_assembled.select("features", "trip_duration").withColumnRenamed("trip_duration", "label"))
summary1 = model1.summary
print(summary1)

[Stage 186:>                                                        (0 + 8) / 8]

22/08/24 18:40:57 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
22/08/24 18:40:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/08/24 18:40:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

22/08/24 18:41:09 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK




Coefficients:
             Feature Estimate Std Error  T Value P Value
         (Intercept)   6.5299    0.0094 691.5588  0.0000
          peak_hours  -0.0093    0.0002 -38.7869  0.0000
               count   0.0001    0.0000 395.1094  0.0000
         Temperature  -0.0033    0.0001 -41.7993  0.0000
           Dew Point   0.0036    0.0001  42.5535  0.0000
            Humidity  -0.0017    0.0000 -39.5407  0.0000
          Wind Speed  -0.0021    0.0000 -49.9765  0.0000
           Wind Gust   0.0000    0.0000   1.4194  0.1558
            Pressure   0.0029    0.0003   9.9614  0.0000
       Precipitation  -0.0003    0.0003  -1.1075  0.2681
     month_encoded_8  -0.0033    0.0003 -12.3029  0.0000
     month_encoded_7  -0.0028    0.0003 -10.4613  0.0000
     month_encoded_6   0.0037    0.0003  13.7220  0.0000
     month_encoded_5  -0.0013    0.0003  -4.8885  0.0000
     month_encoded_4  -0.0051    0.0003 -18.4748  0.0000
     month_encoded_3  -0.0064    0.0003 -23.0912  0.0000
    month_encoded

                                                                                

In [70]:
def pipe(data: DataFrame):
    """
    Function to act as a pipeline to process and feed the model in vectorized format
    """
    # List of categorical columns
    categories = ["month", "time_bin", "PULocationID", "DOLocationID", "day_of_week", "weather_pattern"]
    drop_columns = ["trip_duration", "count"]
    # Add bike/taxi specific functions

    # Pipeline
    indexers = [StringIndexer(inputCol=c, outputCol=c+"_index") for c in categories]
    encoders = [OneHotEncoder(inputCol=c+"_index", outputCol=c+"_encoded") for c in categories]
    transformed = Pipeline(stages=indexers + encoders).fit(data.dropna()).transform(data.dropna())
    assembled = VectorAssembler(
        inputCols= [c for c in transformed.drop("trip_duration").columns if not (c in categories) and not ("_index" in c)],
        outputCol="features"
    ).transform(transformed)
    
    return assembled.withColumnRenamed("trip_duration", "label")

In [71]:
taxi_train_transformed = pipe(taxi_train)
taxi_train_transformed.head(2)

                                                                                

[Row(month=12, time_bin=10, PULocationID=68, DOLocationID=170, day_of_week=2, label=834, peak_hours=1, weather_pattern=0, count=2114, Temperature=36.0, Dew Point=21.0, Humidity=55.0, Wind Speed=10.0, Wind Gust=0.0, Pressure=30.1, Precipitation=0.0, month_index=2.0, time_bin_index=5.0, PULocationID_index=8.0, DOLocationID_index=3.0, day_of_week_index=4.0, weather_pattern_index=2.0, month_encoded=SparseVector(11, {2: 1.0}), time_bin_encoded=SparseVector(23, {5: 1.0}), PULocationID_encoded=SparseVector(59, {8: 1.0}), DOLocationID_encoded=SparseVector(59, {3: 1.0}), day_of_week_encoded=SparseVector(4, {}), weather_pattern_encoded=SparseVector(2, {}), features=SparseVector(167, {0: 1.0, 1: 2114.0, 2: 36.0, 3: 21.0, 4: 55.0, 5: 10.0, 7: 30.1, 11: 1.0, 25: 1.0, 51: 1.0, 105: 1.0})),
 Row(month=12, time_bin=10, PULocationID=161, DOLocationID=234, day_of_week=2, label=389, peak_hours=1, weather_pattern=0, count=2114, Temperature=36.0, Dew Point=21.0, Humidity=55.0, Wind Speed=10.0, Wind Gust=0.

In [72]:
X_train = taxi_train_transformed.filter((col("year") == 2020) | (col("month") < 9))
X_val = taxi_train_transformed.filter((col("year") == 2021) & (col("month") > 8))

print(X_train.count())
print(X_val.count())

                                                                                

10461886




4588377


                                                                                

In [73]:
# Testing general linear model without transformations
glr2 = GeneralizedLinearRegression(family="gaussian", link="log", maxIter=10, regParam=0.3)

model2 = glr2.fit(X_train.select("features", "label"))
summary2 = model2.summary
print(summary2)



Coefficients:
             Feature Estimate Std Error  T Value P Value
         (Intercept)   6.5299    0.0094 691.5588  0.0000
          peak_hours  -0.0093    0.0002 -38.7869  0.0000
               count   0.0001    0.0000 395.1094  0.0000
         Temperature  -0.0033    0.0001 -41.7993  0.0000
           Dew Point   0.0036    0.0001  42.5535  0.0000
            Humidity  -0.0017    0.0000 -39.5407  0.0000
          Wind Speed  -0.0021    0.0000 -49.9765  0.0000
           Wind Gust   0.0000    0.0000   1.4194  0.1558
            Pressure   0.0029    0.0003   9.9614  0.0000
       Precipitation  -0.0003    0.0003  -1.1075  0.2681
    month_encoded_10   0.0054    0.0003  19.3657  0.0000
    month_encoded_11   0.0031    0.0003  11.2585  0.0000
    month_encoded_12   0.0049    0.0003  17.3448  0.0000
     month_encoded_9   0.0073    0.0003  25.8189  0.0000
     month_encoded_8  -0.0033    0.0003 -12.3029  0.0000
     month_encoded_7  -0.0028    0.0003 -10.4613  0.0000
     month_encode

                                                                                

In [79]:
outcome2 = model2.transform(X_val.select("features", "label"))

In [78]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(model2.transform(X_val.select("features", "label")))
print(rmse)



464.72833054735514


                                                                                

In [82]:
outcome2.select(variance("label")).show()

                                                                                

+------------------+
|   var_samp(label)|
+------------------+
|222702.38552534615|
+------------------+



In [83]:
222702 ** 0.5

471.91312759871386

In [6]:
X_train_pd = X_train.sample(0.25).toPandas()
X_train_pd.head()

                                                                                

Unnamed: 0,month,time_bin,PULocationID,DOLocationID,day_of_week,trip_duration,peak_hours,weather_pattern,count,Temperature,Dew Point,Humidity,Wind Speed,Wind Gust,Pressure,Precipitation
0,8,5,236,50,4,1349,0,1,2575,76.0,69.0,79.0,6.0,0.0,30.0,0.0
1,8,5,263,48,4,790,0,1,2575,76.0,69.0,79.0,6.0,0.0,30.0,0.0
2,8,5,246,237,4,1119,0,1,2575,76.0,69.0,79.0,6.0,0.0,30.0,0.0
3,8,5,249,211,4,636,0,1,2575,76.0,69.0,79.0,6.0,0.0,30.0,0.0
4,8,5,107,186,4,647,0,1,2575,76.0,69.0,79.0,6.0,0.0,30.0,0.0


In [18]:
model = Sequential()

model.add(Dense(128, input_shape=(len(X_train.columns) - 1, ), activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.1))

model.add(Dense(1, activation='relu'))

model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_6 (Dense)             (None, 128)               1024      
                                                                 
 dropout_4 (Dropout)         (None, 128)               0         
                                                                 
 dense_7 (Dense)             (None, 64)                8256      
                                                                 
 dropout_5 (Dropout)         (None, 64)                0         
                                                                 
 dense_8 (Dense)             (None, 1)                 65        
                                                                 
Total params: 9,345
Trainable params: 9,345
Non-trainable params: 0
_________________________________________________________________


In [19]:
model.compile(loss="mean_squared_error", optimizer="adam", metrics=["accuracy"])

In [8]:
y_train = X_train_pd["trip_duration"]
X_train_pd_x = X_train_pd.drop(["trip_duration"], axis=1)

sc = StandardScaler()
X_train_pd_x_scaled = sc.fit_transform(X_train_pd_x.values)

In [9]:
X_train_pd_x_scaled[:2]

array([[ 0.4597196 , -1.16717755, -1.01797639,  0.38120244, -0.10861665,
        -0.66504068,  0.69536124,  0.75670975,  1.23960898,  1.18296872,
        -0.80262976, -0.45293955,  0.01036787, -0.13483045],
       [ 0.4597196 , -1.16717755,  1.27595914,  0.92029807, -0.10861665,
        -0.66504068,  0.69536124,  0.75670975,  1.23960898,  1.18296872,
        -0.80262976, -0.45293955,  0.01036787, -0.13483045]])

In [21]:
# history = model.fit(X_train_pd_x_scaled, y_train, epochs=5, batch_size=64, validation_split=0.2)
y_train = X_train_pd["trip_duration"]
history = model.fit(X_train_pd.drop(["trip_duration"], axis=1).values, y_train, epochs=5, batch_size=64, validation_split=0.2)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


: 

In [33]:
model.summary

                                                                                

Coefficients:
      Feature Estimate Std Error   T Value P Value
  (Intercept)   0.0011    0.0000   52.7675  0.0000
        month   0.0000    0.0000  -39.6288  0.0000
     time_bin   0.0000    0.0000  134.8782  0.0000
 PULocationID   0.0000    0.0000   34.7926  0.0000
 DOLocationID   0.0000    0.0000   52.3010  0.0000
  day_of_week   0.0000    0.0000  -13.3189  0.0000
   peak_hours   0.0000    0.0000   14.7386  0.0000
        count   0.0000    0.0000 -167.8731  0.0000
  Temperature   0.0000    0.0000    6.6478  0.0000
    Dew Point   0.0000    0.0000  -12.0570  0.0000
     Humidity   0.0000    0.0000   19.6858  0.0000
   Wind Speed   0.0000    0.0000    3.8424  0.0001
    Wind Gust   0.0000    0.0000    4.2182  0.0000
     Pressure   0.0000    0.0000   -1.7406  0.0818
Precipitation   0.0000    0.0000    0.5569  0.5776

(Dispersion parameter for gamma family taken to be 3.4149)
    Null deviance: 11735396.3049 on 30136538 degrees of freedom
Residual deviance: 11545752.4081 on 30136538 d

In [35]:
glr_g = GeneralizedLinearRegression(family="gaussian", link="log", maxIter=10, regParam=0.3)

model = glr_g.fit(va_df)
summary = model.summary
print(summary)



Coefficients:
      Feature Estimate Std Error   T Value P Value
  (Intercept)   6.8277    0.0192  355.6322  0.0000
        month   0.0045    0.0001   38.3041  0.0000
     time_bin  -0.0093    0.0001 -130.1202  0.0000
 PULocationID  -0.0002    0.0000  -35.6199  0.0000
 DOLocationID  -0.0002    0.0000  -53.4053  0.0000
  day_of_week   0.0025    0.0002   14.1817  0.0000
   peak_hours  -0.0073    0.0005  -15.3472  0.0000
        count   0.0000    0.0000  162.7269  0.0000
  Temperature  -0.0009    0.0002   -6.0341  0.0000
    Dew Point   0.0019    0.0002   11.4238  0.0000
     Humidity  -0.0016    0.0001  -19.2205  0.0000
   Wind Speed  -0.0004    0.0001   -4.1146  0.0000
    Wind Gust  -0.0002    0.0000   -4.0015  0.0001
     Pressure   0.0010    0.0006    1.7375  0.0823
Precipitation  -0.0003    0.0006   -0.5384  0.5903

(Dispersion parameter for gaussian family taken to be 2970084.9911)
    Null deviance: 89667181649503.7200 on 30136538 degrees of freedom
Residual deviance: 895080791966

                                                                                

In [39]:
x_tra_pd = X_tra.to_pandas_on_spark()
x_tra_pd.head()

[Stage 84:>                                                         (0 + 8) / 8]

22/08/21 01:23:21 WARN MemoryStore: Not enough space to cache rdd_359_6 in memory! (computed 44.4 MiB so far)
22/08/21 01:23:21 WARN MemoryStore: Not enough space to cache rdd_359_2 in memory! (computed 44.4 MiB so far)
22/08/21 01:23:21 WARN BlockManager: Persisting block rdd_359_6 to disk instead.
22/08/21 01:23:21 WARN BlockManager: Persisting block rdd_359_2 to disk instead.
22/08/21 01:23:21 WARN MemoryStore: Not enough space to cache rdd_359_7 in memory! (computed 67.3 MiB so far)
22/08/21 01:23:21 WARN BlockManager: Persisting block rdd_359_7 to disk instead.
22/08/21 01:23:21 WARN MemoryStore: Not enough space to cache rdd_359_3 in memory! (computed 44.4 MiB so far)
22/08/21 01:23:21 WARN BlockManager: Persisting block rdd_359_3 to disk instead.
22/08/21 01:23:21 WARN MemoryStore: Not enough space to cache rdd_359_0 in memory! (computed 29.6 MiB so far)
22/08/21 01:23:21 WARN BlockManager: Persisting block rdd_359_0 to disk instead.
22/08/21 01:23:21 WARN MemoryStore: Not enoug



22/08/21 01:23:26 WARN MemoryStore: Not enough space to cache rdd_359_6 in memory! (computed 5.5 MiB so far)
22/08/21 01:23:26 WARN MemoryStore: Not enough space to cache rdd_359_5 in memory! (computed 3.7 MiB so far)
22/08/21 01:23:26 WARN MemoryStore: Not enough space to cache rdd_359_4 in memory! (computed 356.7 MiB so far)
22/08/21 01:23:26 WARN MemoryStore: Not enough space to cache rdd_359_2 in memory! (computed 8.3 MiB so far)




22/08/21 01:23:27 WARN MemoryStore: Not enough space to cache rdd_359_3 in memory! (computed 356.7 MiB so far)




22/08/21 01:23:29 WARN MemoryStore: Not enough space to cache rdd_359_1 in memory! (computed 67.3 MiB so far)
22/08/21 01:23:29 WARN MemoryStore: Not enough space to cache rdd_359_0 in memory! (computed 356.7 MiB so far)


                                                                                

22/08/21 01:23:32 WARN MemoryStore: Not enough space to cache rdd_359_0 in memory! (computed 158.5 MiB so far)


Unnamed: 0,month,time_bin,PULocationID,DOLocationID,day_of_week,label,peak_hours,count,Temperature,Dew Point,Humidity,Wind Speed,Wind Gust,Pressure,Precipitation
0,8,5,143,100,4,795,0,4492,76.0,69.0,79.0,6.0,0.0,30.0,0.0
1,8,5,87,186,4,1282,0,4492,76.0,69.0,79.0,6.0,0.0,30.0,0.0
2,8,5,236,163,4,295,0,4492,76.0,69.0,79.0,6.0,0.0,30.0,0.0
3,8,5,90,164,4,317,0,4492,76.0,69.0,79.0,6.0,0.0,30.0,0.0
4,8,5,234,100,4,350,0,4492,76.0,69.0,79.0,6.0,0.0,30.0,0.0


22/08/21 02:17:40 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1816005 ms exceeds timeout 120000 ms
22/08/21 02:17:40 WARN SparkContext: Killing executors is not supported by current scheduler.
