In [2]:
import numpy as np
import pandas as pd

In [3]:
data_df = pd.read_csv('Merged_Data_202101_202303_v3.csv')
data_df.head()

Unnamed: 0.1,Unnamed: 0,Hour,station_id,Start_Count,Stop_Count,Net_Flow,Split_hour,day_of_week,is_holiday,Day,Month,feels_like
0,0,2021-01-21 22:00:00,1,1,0,-1,22,3,False,21,1,-3.42
1,1,2021-01-28 21:00:00,1,1,0,-1,21,3,False,28,1,-5.97
2,2,2021-02-03 16:00:00,1,1,0,-1,16,2,False,3,2,-4.41
3,3,2021-02-04 16:00:00,1,1,0,-1,16,3,False,4,2,-1.97
4,4,2021-02-07 15:00:00,1,1,0,-1,15,6,False,7,2,-0.96


## Python version RandomForestRegressor

In [4]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

X = data_df[['station_id', 'Split_hour', 'day_of_week', 'is_holiday', 'feels_like', 'Day', 'Month']]
y = data_df['Net_Flow']


#X = pd.get_dummies(X, columns=['Split_hour', 'day_of_week', 'is_holiday', 'Day', 'Month'])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42)


In [29]:
from sklearn.metrics import mean_squared_error
import time

n_estimators_range = [10, 20, 30]  
max_depth_range = [5, 10, 15]    

best_mse = float('inf')
best_n = 0
best_depth = 0

for n in n_estimators_range:
    for depth in max_depth_range:
        start_time = time.time()
        model = RandomForestRegressor(n_estimators=n, max_depth=depth, random_state=42)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        time_inter = time.time() - start_time
        print(f"n_estimators: {n}, max_depth: {depth}, MSE: {mse: .4f}, Time: {time_inter: .4f}s")

        if mse < best_mse:
            best_mse = mse
            best_n = n
            best_depth = depth

print(f"Best MSE: {best_mse} with n_estimators = {best_n} and max_depth = {best_depth}")


n_estimators: 10, max_depth: 5, MSE:  7.2230, Time:  14.5742s
n_estimators: 10, max_depth: 10, MSE:  6.5828, Time:  27.2400s
n_estimators: 10, max_depth: 15, MSE:  6.1794, Time:  39.0072s
n_estimators: 20, max_depth: 5, MSE:  7.2268, Time:  29.0404s
n_estimators: 20, max_depth: 10, MSE:  6.5766, Time:  54.6136s
n_estimators: 20, max_depth: 15, MSE:  6.1416, Time:  77.7038s
n_estimators: 30, max_depth: 5, MSE:  7.2234, Time:  43.3882s
n_estimators: 30, max_depth: 10, MSE:  6.5741, Time:  81.8475s
n_estimators: 30, max_depth: 15, MSE:  6.1353, Time:  116.5622s
Best MSE: 6.135294515412483 with n_estimators = 30 and max_depth = 15


In [11]:
from sklearn.metrics import mean_squared_error
import time
from joblib import dump, load

model = RandomForestRegressor(n_estimators=40, max_depth=15, random_state=42)
model.fit(X_train, y_train)

dump(model, 'random_forest_model.joblib')



['random_forest_model.joblib']

## Predict net_flows for every station in next 24 hours

In [9]:
import pandas as pd
import numpy as np

next_24_hours = pd.date_range(start=pd.Timestamp('now').floor('H'), periods=24, freq='H')
all_stations = data_df['station_id'].unique()

prediction_data = pd.DataFrame({
    'datetime': np.tile(next_24_hours, len(all_stations)),
    'station_id': np.repeat(all_stations, len(next_24_hours))
})

prediction_data['Split_hour'] = prediction_data['datetime'].dt.hour
prediction_data['day_of_week'] = 1
prediction_data['is_holiday'] = False
prediction_data['feels_like'] = 3.0 
prediction_data['Day'] = 22
prediction_data['Month'] = 4
prediction_data = prediction_data[['station_id', 'Split_hour', 'day_of_week', 'is_holiday', 'feels_like', 'Day', 'Month']]

y_pred = model.predict(prediction_data)

prediction_data['predicted_net_flow'] = y_pred

print(prediction_data.head())


   station_id  Split_hour  day_of_week  is_holiday  feels_like  Day  Month  \
0           1          19            1       False         3.0   22      4   
1           1          20            1       False         3.0   22      4   
2           1          21            1       False         3.0   22      4   
3           1          22            1       False         3.0   22      4   
4           1          23            1       False         3.0   22      4   

   predicted_net_flow  
0            1.227608  
1           -0.847306  
2           -1.196946  
3            0.270404  
4            0.219386  


In [10]:
prediction_data.to_csv('Predicted_data_0422.csv')

## Spark version RandomForestRegressor

In [12]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("RandomForestTraining") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.memory.fraction", "0.9") \
    .getOrCreate()


24/04/22 22:29:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/04/22 22:29:57 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [13]:
df = spark.read.csv('Merged_Data_202101_202303_v3.csv', header=True, inferSchema=True)
df = df.select("station_id", "Split_hour", "day_of_week", "is_holiday", "feels_like", "Net_Flow", "Day", "Month")
df.printSchema()

                                                                                

root
 |-- station_id: integer (nullable = true)
 |-- Split_hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- is_holiday: boolean (nullable = true)
 |-- feels_like: double (nullable = true)
 |-- Net_Flow: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)



In [15]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

#df = df.withColumn("is_holiday", col("is_holiday").cast("string"))

# First, ensure any categorical features are indexed if they're not already
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index")
    for column in ["station_id", "Split_hour", "day_of_week", "Day"]
]
# Now create a VectorAssembler to combine all features into one vector
assembler = VectorAssembler(
    inputCols=["station_id_index", "Split_hour_index", "day_of_week_index", "is_holiday", "Day_index", "feels_like"],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + [assembler])

model = pipeline.fit(df)
transformed_df = model.transform(df)

(train_data, test_data) = transformed_df.randomSplit([0.7, 0.3], seed=42)


                                                                                

## Save parameters of the best model

In [16]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_mse = RegressionEvaluator(labelCol="Net_Flow", predictionCol="prediction", metricName="mse")

import pandas as pd
results = pd.DataFrame(columns=["numTrees", "maxDepth", "MSE", "Time"])

rf = RandomForestRegressor(featuresCol="features", labelCol="Net_Flow", numTrees=30, maxDepth=15, maxBins=1000, seed=42)
rf_model = rf.fit(train_data)
rf_model.write().overwrite().save("Spark_N_30_D_15")


24/04/22 22:34:37 WARN DAGScheduler: Broadcasting large task binary with size 1353.8 KiB
24/04/22 22:34:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/04/22 22:34:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/04/22 22:34:57 WARN DAGScheduler: Broadcasting large task binary with size 7.1 MiB
24/04/22 22:35:08 WARN DAGScheduler: Broadcasting large task binary with size 12.3 MiB
24/04/22 22:35:22 WARN DAGScheduler: Broadcasting large task binary with size 1402.9 KiB
24/04/22 22:35:25 WARN DAGScheduler: Broadcasting large task binary with size 17.7 MiB
24/04/22 22:35:43 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/04/22 22:35:46 WARN DAGScheduler: Broadcasting large task binary with size 16.5 MiB
24/04/22 22:35:52 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/04/22 22:35:55 WARN DAGScheduler: Broadcasting large task binary with size 16.5 MiB
24/04/22 22:36:04 WARN DAGScheduler: Broadca

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import time
numTrees_list = [10, 20, 30]
maxDepth_list = [5, 10, 15]

evaluator_mse = RegressionEvaluator(labelCol="Net_Flow", predictionCol="prediction", metricName="mse")

import pandas as pd
results = pd.DataFrame(columns=["numTrees", "maxDepth", "MSE", "Time"])

for num in numTrees_list:
    for depth in maxDepth_list:
        start_time = time.time()
        rf = RandomForestRegressor(featuresCol="features", labelCol="Net_Flow", numTrees=num, maxDepth=depth, maxBins=1000, seed=42)
        rf_model = rf.fit(train_data)
        
        predictions = rf_model.transform(test_data)
        
        mse = evaluator_mse.evaluate(predictions)
        time_inter = time.time() - start_time
        print(f"numTrees: {num}, maxDepth: {depth}, MSE: {mse: .4f}, Time: {time_inter: .4f}s")
        new_row = pd.DataFrame({"numTrees": [num], "maxDepth": [depth], "MSE": [mse], "Time": [time_inter]})
        results = pd.concat([results, new_row], ignore_index=True)

# Display results
#print(results)


### Spark with weather data

In [15]:
results

Unnamed: 0,numTrees,maxDepth,MSE,Time
0,10,5,6.908905,23.80379
1,10,10,6.159883,34.212571
2,10,15,5.934817,104.68024
3,20,5,6.894684,26.5304
4,20,10,6.144721,56.503823
5,20,15,5.867577,239.560308
6,100,5,6.854832,103.332939


In [6]:
results

Unnamed: 0,numTrees,maxDepth,MSE,Time
0,30,5,6.861225,30.473653
1,30,10,6.112688,92.362313
2,30,15,5.847989,420.506886


### Spark without weather data

In [None]:
print(results)##without weather

   numTrees maxDepth       MSE
0         5        3  7.155212
1         5        5  6.779841
2         5       10  6.269571
3         5       15  6.215765
4        10        3  7.148010
5        10        5  6.866187
6        10       10  6.261236
7        10       15  6.197606
8        20        3  7.095159
9        20        5  6.787787
10       20       10  6.257980
11       20       15  6.187825


In [19]:
spark.stop()

#