Training Phase (2021/11~2023/10)

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.losses import mse

def create_sequences(data_X, data_Y, time_steps=1):
    x, y = [], []
    for i in range(len(data_X) - time_steps):
        x.append(data_X.iloc[i:(i + time_steps)].values)
        y.append(data_Y.iloc[i + time_steps-1]['count'])
    return np.array(x), np.array(y)

# Check the shape of X, it should be (samples, time steps, features)
# Assuming X_train and X_test are your input datasets with dummy variables
df = pd.read_csv("final_merged_weather_holidays.csv", index_col='Date')
train = df.iloc[:-720].copy(deep=True)
test = df.iloc[len(df) - 1440:].copy(deep=True)

scaler = MinMaxScaler((-1, 1))
x_col = ['Temp', 'Wind', 'Humidity(%)', 'Barometer', 'Visibility',
       'Weather_Heavy snow', 'Weather_Sunny', 'Weather_Light snow',
       'Weather_Overcast', 'Weather_Dense fog', 'Weather_More clouds than sun',
       'Weather_Heavy rain', 'Weather_Haze', 'Weather_Light freezing rain',
       'Weather_Cloudy', 'Weather_Snow', 'Weather_Refreshingly cool',
       'Weather_Mild', 'Weather_Broken clouds', 'Weather_Passing clouds',
       'Weather_Fog', 'Weather_Partly cloudy', 'Weather_Partly sunny',
       'Weather_Clear', 'Weather_Ice fog', 'Weather_Light rain',
       'Weather_Cool', 'Weather_Quite cool', 'Weather_Warm', 'Weather_Rain',
       'Weather_Mostly cloudy', 'Weather_Cold', 'Weather_Low clouds',
       'Weather_Freezing rain', 'Weather_Scattered clouds',
       'isHoliday']
X_train = pd.DataFrame(scaler.fit_transform(train.drop('count', axis=1)), columns=x_col)
y_train = pd.DataFrame(scaler.fit_transform(train[['count']]), columns=['count'])
X_test = pd.DataFrame(scaler.fit_transform(test.drop('count', axis=1)), columns=x_col)
y_test = pd.DataFrame(scaler.fit_transform(test[['count']]), columns=['count'])
time_steps = 720  # You can adjust this based on the length of sequences you want
X_train, y_train = create_sequences(X_train, y_train, time_steps)
X_test, y_test = create_sequences(X_test, y_test, time_steps)
# Make sure to replace these with your actual data

# Define the LSTM model
model = Sequential()
model.add(LSTM(units=50, input_shape=(time_steps, len(x_col))))
model.add(Dense(units=1))  # Output layer, adjust units based on your task


class PredictionCallback(tf.keras.callbacks.Callback):
    def __init__(self, val_data=(X_test, y_test)):
        self.validation_data = val_data
    def on_epoch_end(self, epoch, logs={}):
        y_pred = self.model.predict(self.validation_data[0])
        global epoch_performance, loss, bat
        mse = tf.keras.losses.MeanSquaredError()
        self.model.save_weights("{}-{}-{}.h5".format(loss, bat, epoch)) # save the model
        epoch_performance.append(mse(self.validation_data[1].T[0].reshape(-1), y_pred.reshape(-1)).numpy())
# batch_size = [10, 25, 50, 100, 200, 300] # 分次跑避免RAM不足
batch_size = [300, 500, 600] # 分次跑避免RAM不足
epochs = 25

loss_func = ['mean_squared_error',"mean_absolute_error","mean_absolute_percentage_error"]
loss_performance = []

for i, loss in enumerate(loss_func):
    batch_performance = []
    model.compile(optimizer='adam', loss=loss)  # Choose an appropriate loss function
    for j, bat in enumerate(batch_size):
        epoch_performance = []
        # Train the model
        model.fit(X_train, y_train, epochs=epochs, batch_size=bat, validation_data=(X_test, y_test), callbacks=[PredictionCallback()])
        
        batch_performance.append(epoch_performance)
    loss_performance.append(batch_performance)

Tuning, Evaluation Phase

In [None]:
loss_func = ['mean_squared_error',"mean_absolute_error","mean_absolute_percentage_error"]
import numpy
import matplotlib.pyplot as plt
from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D
fig, ax = plt.subplots(1,len(loss_performance), figsize = (10*len(loss_performance), 10), subplot_kw=dict(projection='3d'))
epochs = numpy.arange(1, 26)
batch_size = [10, 25, 50, 100, 200, 300,400, 500, 600]
for i in range(3):
  xx, yy = numpy.meshgrid(epochs, batch_size)
  print(xx.shape, yy.shape)
  z = numpy.array(loss_performance[i])
  ax[i].plot_surface(xx, yy, z,cmap=cm.coolwarm,
                       linewidth=0, antialiased=False) # 繪製三維曲面圖
  ax[i].set_title(loss_func[i], fontsize = 30)
  min_num = []
  min_value = []
  for j, val in enumerate(loss_performance[i]):
    min_value.append(min(val))
    min_num.append(val.index(min_value[j]))
  first_index = min_value.index(min(min_value))
  second_index = min_num[first_index]
  print(batch_size[first_index], second_index)
  print(loss_performance[i][first_index][second_index])
plt.savefig("Model_Evaluation.png", transparent=True, bbox_inches="tight")
plt.show()


Testing Phase (2023 Nov)

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
from keras.models import load_model
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.losses import mse

def create_sequences(data_X, data_Y, time_steps=1):
    x, y = [], []
    for i in range(len(data_X) - time_steps):
        x.append(data_X.iloc[i:(i + time_steps)].values)
        y.append(data_Y.iloc[i + time_steps]['count'])
    return np.array(x), np.array(y)
time_steps = 720
# Check the shape of X, it should be (samples, time steps, features)
# Assuming X_train and X_test are your input datasets with dummy variables
df = pd.read_csv('final_merged_weather_holidays.csv')
# df = pd.read_csv('merged_weather.csv').drop('isHoliday', axis=1)
train = df.iloc[:-720].copy(deep=True)
test = df.iloc[len(df) - time_steps - 720:].copy(deep=True)

# print(df_nov)
scaler = MinMaxScaler((-1, 1))
scaler_pred = MinMaxScaler((-1, 1))
# print(df.columns)
x_col = ['Temp', 'Wind', 'Humidity(%)', 'Barometer', 'Visibility',
       'Weather_Heavy snow', 'Weather_Sunny', 'Weather_Light snow',
       'Weather_Overcast', 'Weather_Dense fog', 'Weather_More clouds than sun',
       'Weather_Heavy rain', 'Weather_Haze', 'Weather_Light freezing rain',
       'Weather_Cloudy', 'Weather_Snow', 'Weather_Refreshingly cool',
       'Weather_Mild', 'Weather_Broken clouds', 'Weather_Passing clouds',
       'Weather_Fog', 'Weather_Partly cloudy', 'Weather_Partly sunny',
       'Weather_Clear', 'Weather_Ice fog', 'Weather_Light rain',
       'Weather_Cool', 'Weather_Quite cool', 'Weather_Warm', 'Weather_Rain',
       'Weather_Mostly cloudy', 'Weather_Cold', 'Weather_Low clouds',
       'Weather_Freezing rain', 'Weather_Scattered clouds',
       'isHoliday']
# , 'isHoliday'
X_train = pd.DataFrame(scaler.fit_transform(train.drop(['count', 'Date'], axis=1)), columns=x_col)
y_train = pd.DataFrame(scaler.fit_transform(train[['count']]), columns=['count'])
X_test = pd.DataFrame(scaler_pred.fit_transform(test.drop(['count', 'Date'], axis=1)), columns=x_col)
y_test = pd.DataFrame(scaler_pred.fit_transform(test[['count']]), columns=['count'])
  # You can adjust this based on the length of sequences you want
X_train, y_train = create_sequences(X_train, y_train, time_steps)
X_test, y_test = create_sequences(X_test, y_test, time_steps)

model = Sequential()
model.add(LSTM(units=50, input_shape=(time_steps, len(x_col))))
model.add(Dense(units=1))  # Output layer, adjust units based on your task

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')  # Choose an appropriate loss function
# model.load_weights('mean_squared_error-10-14.h5')
# model.load_weights('mean_squared_error-25-49.h5')
model.load_weights('./pre-trained  LSTM model/mean_absolute_error-300-11.h5')
predictions = np.array(model.predict(X_test)).reshape(-1, 1)
print(mse(y_test.reshape(-1), predictions.reshape(-1)).numpy())
predictions = scaler_pred.inverse_transform(predictions)
y_test = scaler_pred.inverse_transform(y_test.reshape(-1, 1))

# Plot the actual vs predicted values
plt.figure(figsize=(10, 6))
plt.plot(y_test, label='Actual', color='gray')
plt.plot(predictions, label='Predicted', color='#34b7cc')
plt.legend()
plt.title('2023 Nov Actual vs Predicted Values')
plt.xlabel('Time')
plt.ylabel('Count')
test = test.reset_index()
xticks_indices = range(time_steps, len(test), 24)
xticks_labels = test['Date'][xticks_indices]
xticks_indices = range(0, 720, 24)
print(len(xticks_indices), len(xticks_labels))
plt.xticks(xticks_indices, xticks_labels, rotation=45, fontsize=7, ha = 'right')
# plt.savefig("2023 Nov Predict.png", transparent=True, bbox_inches="tight")
plt.show()

Prediction Example (2023/12)

In [None]:
def create_sequences(data_X, time_steps=1):
    x= []
    for i in range(len(data_X) - time_steps):
        x.append(data_X.iloc[i:(i + time_steps)].values)
    return np.array(x)

df = pd.read_csv('202312.csv')
print(df.columns)
scaler = MinMaxScaler((-1, 1))
X_test = pd.DataFrame(scaler.fit_transform(df.drop('Date', axis = 1)), columns=x_col)
X_test = create_sequences(X_test, time_steps)
predictions = np.array(model.predict(X_test))
predictions = scaler_pred.inverse_transform(predictions)
predictions = predictions.reshape(-1)
low = []
high = []
isLow = False
isHigh = False
for i, count in enumerate(predictions):
    if(i == (len(predictions) - 1)):
        if(isLow): low[len(low) - 1].append(i)
        elif(isHigh): high[len(high) - 1].append(i)
    if(isLow):
        if(count < 1000): continue
        else:
            low[len(low) - 1].append(i - 1)
            isLow = False
    elif(isHigh):
        if(count > 4000): continue
        else:
            high[len(high) - 1].append(i - 1)
            isHigh = False
    else:
        if(count > 4000):
            isHigh = True
            high.append([i])
        elif(count < 1000):
            isLow = True
            low.append([i])

# Plot the actual vs predicted values
# 
plt.figure(figsize=(10, 6))
plt.plot(predictions.reshape(-1), label='Predicted Volumn', color='#34b7cc')
for i in low:
    plt.axvspan(i[0], i[1], color='#34b7cc', alpha=0.2)
for i in high:
    plt.axvspan(i[0], i[1], color='#ffcf00', alpha=0.3)
plt.title('2023 Dec Predicted Volume')
plt.xlabel('Time Steps')
plt.ylabel('Count')
plt.grid(axis = 'y')
plt.legend()
print(df.index)
xticks_indices = range(time_steps, len(df), 8)
xticks_labels = df['Date'][xticks_indices]
xticks_indices = range(0, len(df) - time_steps, 8)
plt.xticks(xticks_indices, xticks_labels, rotation=45, fontsize=7, ha = 'right')

plt.savefig("2023 Dec Predict.png", transparent=True, bbox_inches="tight")
plt.show()