This is the code used for my research 'Long Short Term Memory and Feedforward Artificial Neural Network for Flood Prediction'. I used three rain gauge data and one river gauge data for this analysis. I created the model so it takes three rainfall and the river water level data for past 15 days to predict the following day water level by sliding window method.


##Data Analysis

In [None]:
#Importing useful libraries

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import math
from tensorflow import keras
from keras.models import Sequential
from keras.layers import LSTM,Dense,Bidirectional
from keras.callbacks import ModelCheckpoint,EarlyStopping
from sklearn.preprocessing import RobustScaler,StandardScaler,MinMaxScaler
from sklearn.metrics import mean_squared_error
import math

In [None]:
#Mount the google drive to access it

from google.colab import drive
drive.mount('/content/drive')

In [None]:
#To upload a file to Colab

from google.colab import files
uploaded=files.upload()

In [None]:
#Reading the excel file containing all the data
df=pd.read_excel('Data_84_12.xlsx')
df.head()

#Columns of the table are 'Date','Rainfall_A','Rainfall_B','Rainfall_C','Water_Level'

In [None]:
#Extracting dates for the graphs

dates=df['Date'].values

In [None]:
#Use the date column as the index

df.Date=pd.to_datetime(df.Date)
df.set_index('Date',inplace=True)
df

In [None]:
#Checking the correlation among the rain gauges

from scipy.stats import pearsonr
corrAB, corrBC, corrAC,  = pearsonr(df['Rainfall_A'],df['Rainfall_B'])[0], pearsonr(df['Rainfall_B'],df['Rainfall_C'])[0], pearsonr(df['Rainfall_A'],df['Rainfall_C'])[0]
print('Pearsons correlation between A & B: %.3f' % corrAB)
print('Pearsons correlation between B & C: %.3f' % corrBC)
print('Pearsons correlation between A & C: %.3f' % corrAC)

#You can check the correlation between rain and river gauges too

In [None]:
#Plotting the rainfalls

plt.figure(figsize=(18,7))
plt.plot(df['Rainfall_A'])
plt.title('Rainfall_A', fontsize=28)
plt.xlabel('Date', fontsize=24)
plt.ylabel('Water level in meters', fontsize=24)

# Change font sizes for x and y axis tick labels
plt.xticks(fontsize=22)
plt.yticks(fontsize=22)
plt.show()

In [None]:
#Create boxplots so you can see the annual and monthly variations of the data

#Create 'Year' and 'Month' columns based on the date
df2=df.copy()
df2['Year']=[d.year for d in df2.index]
df2['Month']=[d.strftime('%b') for d in df2.index]

#Create subplots

plt.figure(figsize=(26,22))

plt.subplot(321)
sns.boxplot(x='Month',y='Rainfall_A',data=df2)
plt.xlabel('Month',fontsize=18)
plt.ylabel('Rainfall in mm',fontsize=18)
plt.title('Monthly Rainfall-A',fontsize=22)
plt.xticks(rotation=0, fontsize=18)
plt.yticks(fontsize=18)

plt.subplot(322)
sns.boxplot(x='Month',y='Rainfall_B',data=df2)
plt.xlabel('Month',fontsize=18)
plt.ylabel('Rainfall in mm',fontsize=18)
plt.title('Monthly Rainfall-B',fontsize=22)
plt.xticks(rotation=0, fontsize=18)
plt.yticks(fontsize=18)

plt.subplot(323)
sns.boxplot(x='Month',y='Rainfall_C',data=df2)
plt.xlabel('Month',fontsize=18)
plt.ylabel('Rainfall in mm',fontsize=18)
plt.title('Monthly Rainfall-C',fontsize=22)
plt.xticks(rotation=0, fontsize=18)
plt.yticks(fontsize=18)

plt.subplot(324)
plt.plot(df['Water_Level'])
plt.title('Water level', fontsize=22)
plt.xlabel('Date', fontsize=18)
plt.ylabel('Water level in meters', fontsize=18)
plt.xticks(fontsize=18)
plt.yticks(fontsize=18)

plt.subplot(325)
sns.boxplot(x='Year',y='Water_Level',data=df2)
plt.xlabel('Year',fontsize=18)
plt.ylabel('Water level in meters',fontsize=18)
plt.title('Annual Water Levels',fontsize=22)
plt.xticks(rotation=75, fontsize=18)
plt.yticks(fontsize=18)

plt.subplot(326)
sns.boxplot(x='Month',y='Water_Level',data=df2)
plt.xlabel('Month',fontsize=18)
plt.ylabel('Water level in meters',fontsize=18)
plt.title('Monthly Water Levels',fontsize=22)
plt.xticks(rotation=0, fontsize=18)
plt.yticks(fontsize=18)

#Save the image in the google drive
save_path = '/content/drive/MyDrive/box_plots.png'
plt.savefig(save_path)
plt.show()

##Defining a sliding window

In [None]:
def to_seq(n_past,n_future,dataset):
  #Empty lists to be populated using formatted training data
  trainX = []
  trainY = []
  for i in range(n_past, len(dataset) - n_future +1):
    trainX.append(dataset[i - n_past:i, 0:dataset.shape[1]])
    trainY.append(dataset[i + n_future - 1:i + n_future, 3])
  return np.array(trainX), np.array(trainY)



In [None]:
#Standardize the data to reduce the range

scaler = StandardScaler()
df_for_training_scaled = scaler.fit_transform(df)

In [None]:
#Splitting the data into train and test sets by 7:3

train_size=int(len(df)*0.7)
test_size=len(df)-train_size

train_data=df_for_training_scaled[:train_size,:]
test_data=df_for_training_scaled[train_size:,:]

In [None]:
#Create inputs and outputs using the sliding window function

n_future = 1   # Number of days we want to look into the future based on the past days.
n_past = 15  # Number of past days we want to use to predict the future.

trainX,trainY=to_seq(n_past,n_future,train_data)
testX,testY=to_seq(n_past,n_future,test_data)

print('trainX shape == {}.'.format(trainX.shape))
print('trainY shape == {}.'.format(trainY.shape))
print('\n')
print('testX shape == {}.'.format(testX.shape))
print('testY shape == {}.'.format(testY.shape))

In [None]:
#Dates for plotting graphs

train_dates=dates[:train_size]
test_dates=dates[train_size:]

train_dates=train_dates[-trainY.shape[0]:]
test_dates=test_dates[-testY.shape[0]:]

#LSTM model

In [None]:
# define model with one hidden layer containing 64 LSTM units
# One output node for the next day water level prediction

model = Sequential()
model.add(LSTM(64, activation='relu', input_shape=(trainX.shape[1],trainX.shape[2]),return_sequences=False))
model.add(Dense(1))
model.summary()

In [None]:
#Hyperparameters

learning_rate=0.001
batch_size=32
from keras.optimizers import Adam
opt=Adam(learning_rate)
model.compile(optimizer=opt, loss='mse')

##Checkpoints to save the best model and Early stopping as the regularization

In [None]:
file_path='/content/drive/MyDrive/LSTM/best_model' #The location of the saved model
checkpoint=ModelCheckpoint(file_path,monitor='val_loss',save_best_only=True, verbose=1)
es=EarlyStopping(monitor='val_loss', mode='min', patience=30, verbose=1)

In [None]:
# fit model
history=model.fit(trainX,trainY,validation_split=0.1,batch_size=batch_size,verbose=2,epochs=200,callbacks=[checkpoint,es])

In [None]:
#Check the loss vs epochs during training
plt.figure(figsize=(10,15))
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.legend()
plt.xlabel('Number of epochs')
plt.ylabel('Loss')
plt.title('Loss vs epochs')
plt.show()

In [None]:
#Load the best model

from keras.models import load_model
model=load_model('/content/drive/MyDrive/LSTM/best_model')

In [None]:
#Model evaluation

train_loss=model.evaluate(trainX,trainY)
test_loss=model.evaluate(testX,testY)
print('trained model, train_loss: {:5.3f}%'.format(100*train_loss))
print('Trained model, test_loss: {:5.3f}%'.format(100*test_loss))

In [None]:
#Prediction

yhat = model.predict(testX, verbose=0)
yhatr = model.predict(trainX, verbose=0)

train_predict_LSTM=scaler.inverse_transform(np.repeat(yhatr,4,axis=1))[:,3]
test_predict_LSTM=scaler.inverse_transform(np.repeat(yhat,4,axis=1))[:,3]

trainY_LSTM=scaler.inverse_transform(np.repeat(trainY,4,axis=1))[:,3]
testY_LSTM=scaler.inverse_transform(np.repeat(testY,4,axis=1))[:,3]

In [None]:
plt.figure(figsize=(18,7))
plt.plot(test_dates,test_predict_LSTM,color='r',label='Predicted')
plt.plot(test_dates,testY_LSTM,label='Actual')
plt.xlabel('Dates',fontsize=24)
plt.ylabel('Water Level',fontsize=24)
plt.legend(loc='upper left',fontsize=22)
plt.title('Predicted vs Actual Water Levels - LSTM',fontsize=28)

plt.xticks(fontsize=22)
plt.yticks(fontsize=22)
plt.show()

#ANN

In [None]:
#Reshaping the data to be compatible with ANN architecture

trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1] * trainX.shape[2]))
testX = np.reshape(testX, (testX.shape[0], testX.shape[1] * testX.shape[2]))

print('trainX shape == {}.'.format(trainX.shape))
print('trainY shape == {}.'.format(trainY.shape))
print('\n')
print('testX shape == {}.'.format(testX.shape))
print('testY shape == {}.'.format(testY.shape))

In [None]:
# define model with one hidden layer containing 64 neurons
# One output node for the next day water level prediction

model = Sequential()
model.add(Dense(64, activation='relu', input_shape=(trainX.shape[1],)))
model.add(Dense(1))
model.summary()

In [None]:
#Hyperparameters

learning_rate=0.001
batch_size=32
from keras.optimizers import Adam
opt=Adam(learning_rate)
model.compile(optimizer=opt, loss='mse')

In [None]:
file_path='/content/drive/MyDrive/ANN/best_model' #The location of the saved model
checkpoint=ModelCheckpoint(file_path,monitor='val_loss',save_best_only=True, verbose=1)
es=EarlyStopping(monitor='val_loss', mode='min', patience=30, verbose=1)

In [None]:
# fit model
history=model.fit(trainX,trainY,validation_split=0.1,batch_size=batch_size,verbose=2,epochs=200,callbacks=[checkpoint,es])

In [None]:
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.legend()
plt.xlabel('Number of epochs')
plt.ylabel('Loss')
plt.title('Loss vs epochs')
plt.show()

In [None]:
from keras.models import load_model
model=load_model('/content/drive/MyDrive/ANN/best_model')

In [None]:
train_loss=model.evaluate(trainX,trainY)
test_loss=model.evaluate(testX,testY)
print('trained model, train_loss: {:5.3f}%'.format(100*train_loss))
print('Trained model, test_loss: {:5.3f}%'.format(100*test_loss))

In [None]:
yhat = model.predict(testX, verbose=0)
yhatr = model.predict(trainX, verbose=0)

train_predict_ANN=scaler.inverse_transform(np.repeat(yhatr,4,axis=1))[:,3]
test_predict_ANN=scaler.inverse_transform(np.repeat(yhat,4,axis=1))[:,3]

trainY_ANN=scaler.inverse_transform(np.repeat(trainY,4,axis=1))[:,3]
testY_ANN=scaler.inverse_transform(np.repeat(testY,4,axis=1))[:,3]


In [None]:
plt.figure(figsize=(18,7))
plt.plot(test_dates,test_predict_ANN,color='r',label='Predicted')
plt.plot(test_dates,testY_ANN,label='Actual')
plt.xlabel('Dates',fontsize=24)
plt.ylabel('Water Level',fontsize=24)
plt.legend(loc='upper left',fontsize=22)
plt.title('Predicted vs Actual Water Levels - LSTM',fontsize=28)

plt.xticks(fontsize=22)
plt.yticks(fontsize=22)
plt.show()

##RMSE

In [None]:
#calculate root mean squared error
import math

def RMSE(observed, simulated):
  return math.sqrt(mean_squared_error(observed, simulated))


## Nash Sutcliffe

In [None]:
def calculate_nse(observed, simulated):
    if len(observed) != len(simulated):
        raise ValueError("The lengths of observed and simulated data do not match.")

    mean_observed = sum(observed) / len(observed)

    numerator = sum((obs - sim) ** 2 for obs, sim in zip(observed, simulated))
    denominator = sum((obs - mean_observed) ** 2 for obs in observed)

    nse = 1 - (numerator / denominator)
    return nse

In [None]:
print("Root Mean Square Error (RMSE) - ANN:", RMSE(testY_ANN,test_predict_ANN))
print("Nash-Sutcliffe Efficiency (NSE) - ANN:", calculate_nse(testY_ANN,test_predict_ANN))
print('\n')
print("Root Mean Square Error (RMSE) - LSTM:", RMSE(testY_LSTM,test_predict_LSTM))
print("Nash-Sutcliffe Efficiency (NSE) - LSTM:", calculate_nse(testY_LSTM,test_predict_LSTM))