In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F

In [None]:
df = pd.read_csv('variables.csv')
df = df.drop(columns=['Epic_week'])

In [None]:
n_lag = 4
n_steps = 8
# Create lagged variables for the deseas count, temperature, and rainfall
columns = list(df.columns)

for col in columns:
    for i in range(1, n_lag + 1):
        df[f"{col}_lag_{i}"] = df[col].shift(i)

# Drop rows with NaN values generated by shifting
df = df.dropna()

# Only keep the target column when there is no lag
df = df.drop(df.columns[1:50], axis=1)

In [None]:
y = df['Infectious and Parasitic Diseases'].shift(-n_steps+1)
y = y.dropna()
if n_steps != 1:
  df = df.iloc[:-n_steps+1, :]

X = df.drop('Infectious and Parasitic Diseases', axis=1)

In [None]:
X = X.reset_index(drop=True)
y = y.reset_index(drop=True)

In [None]:
X = np.array(X)
y = np.array(y)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler

class TimeSeriesDataset(Dataset):
    def __init__(self, X_data, y_data):
        self.X_data = X_data
        self.y_data = y_data

    def __getitem__(self, index):
        return self.X_data[index], self.y_data[index]

    def __len__(self):
        return len(self.X_data)

class LSTM_CNN(nn.Module):
    def __init__(self, input_size=200, hidden_layer_size=16, output_size=1, num_layers=2):
        super().__init__()
        self.hidden_layer_size = hidden_layer_size

        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers, batch_first=True)
        self.conv1 = nn.Conv1d(1, 4, kernel_size=3)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(4, 24, kernel_size=2)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool1d(2)
        self.fc1 = nn.Linear(72, 8)
        self.fc2 = nn.Linear(8, output_size)

    def forward(self, input_seq):
        lstm_out, _ = self.lstm(input_seq.view(len(input_seq), 1, -1))

        conv_out = self.conv1(lstm_out)
        conv_out = self.relu1(conv_out)
        conv_out = self.pool1(conv_out)

        conv_out = self.conv2(conv_out)
        conv_out = self.relu2(conv_out)
        conv_out = self.pool2(conv_out)

        fc_out = self.fc1(conv_out.view(len(input_seq), -1))
        predictions = self.fc2(fc_out)

        return predictions

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

train_size = int(len(X) * 0.7)
test_ind = range(train_size, len(X))

y_pred_store = np.array([None] * len(X))
y_test_array = np.array([])
y_pred_array = np.array([])

for forecast_ind in test_ind:
    train_ind = list(range(0, int(forecast_ind * 0.85)))
    val_ind = list(range(int(forecast_ind * 0.85), forecast_ind))
    X_train = X[train_ind]
    X_val = X[val_ind]
    X_test = X[forecast_ind].reshape(1, -1)
    y_train = y[train_ind]
    y_val = y[val_ind]
    y_test = y[forecast_ind]

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)  # Scale validation data
    X_test = scaler.transform(X_test)

    model = LSTM_CNN().to(device)
    loss_function = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-3)

    train_data = TimeSeriesDataset(torch.from_numpy(X_train).float(), torch.from_numpy(y_train).float())
    val_data = TimeSeriesDataset(torch.from_numpy(X_val).float(), torch.from_numpy(y_val).float())  # Validation data
    train_loader = DataLoader(train_data, batch_size=16, shuffle=False, drop_last=False)
    val_loader = DataLoader(val_data, batch_size=16, shuffle=False, drop_last=False)  # Validation loader

    best_val_loss = float("inf")  # Set initial best validation loss to infinity
    num_epochs = 200
    for epoch in range(num_epochs):  # set the number of epochs
        model.train()
        for seq, labels in train_loader:
            optimizer.zero_grad()

            y_pred = model(seq.to(device))
            labels = labels.to(device).unsqueeze(1)
            single_loss = loss_function(y_pred, labels)
            single_loss.backward()
            optimizer.step()

        model.eval()
        val_losses = []
        with torch.no_grad():
            for seq, labels in val_loader:
                y_val_pred = model(seq.to(device))
                labels = labels.to(device).unsqueeze(1)
                val_loss = loss_function(y_val_pred, labels).item()
                val_losses.append(val_loss)
            mean_val_loss = np.mean(val_losses)

        if mean_val_loss < best_val_loss:
            best_val_loss = mean_val_loss
            torch.save(model.state_dict(), 'best_model.pt') 

    model.load_state_dict(torch.load('best_model.pt'))  # Load the best model

    with torch.no_grad():
        seq = torch.from_numpy(X_test).float().to(device)
        y_test_pred = model(seq)
        y_pred_store[forecast_ind] = y_test_pred.item()
        y_test_array = np.append(y_test_array, y_test)
        y_pred_array = np.append(y_pred_array, y_test_pred.item())



In [None]:
# Calculate the mean squared error on training and test sets

mse = mean_squared_error(y_test_array, y_pred_array)
print('Mean Squared Error for {} step: {}'.format(n_steps ,mse))

mae = mean_absolute_error(y_test_array, y_pred_array)
print('Mean Absolute Error for {} step: {}'.format(n_steps ,mae))

def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

mape = mean_absolute_percentage_error(y_test_array, y_pred_array)
print('Mean Absolute Percentage Error for {} step: {:.2f}%'.format(n_steps,mape))

In [None]:
def mean_absolute_scaled_error(y_true, y_pred):
    n = len(y_true)

    # Calculate absolute error of the forecasted values
    abs_err = np.abs(y_true[n_steps:] - y_pred[n_steps:])

    # Calculate the mean absolute error of the forecasted values
    mae = np.mean(abs_err)

    # Calculate the mean absolute error for the in-sample h-step naive forecast
    naive_forecast = y_true[:-n_steps]  # naive forecast shifts the series by h step
    mae_naive = np.mean(np.abs(y_true[n_steps:] - naive_forecast))

    # Calculate and return MASE
    mase = mae / mae_naive

    return mase

mase = mean_absolute_scaled_error(y_test_array, y_pred_array)
print('Mean Absolute Scaled Error for {} step: {:.2f}'.format(n_steps,mase))

In [None]:
# 8 forecasting window. different size of samples.
# 1 week ahead: [0, 48, 100, 152, 204, 256, 309, 361, 413, 465, 517]
# 2 week ahead: [0, 47, 99, 151, 203, 255, 308, 360, 412, 464, 516]
# 3 week ahead: [0, 46, 98, 150, 202, 254, 307, 359, 411, 463, 515]
# 4 week ahead: [0, 45, 97, 149, 201, 253, 306, 358, 410, 462, 514]
# 5 week ahead: [0, 44, 96, 148, 200, 252, 305, 357, 409, 461, 513]
# 6 week ahead: [0, 43, 95, 147, 199, 251, 304, 356, 408, 460, 512]
# 7 week ahead: [0, 42, 94, 146, 198, 250, 303, 355, 407, 459, 511]
# 8 week ahead: [0, 41, 93, 145, 197, 249, 302, 354, 406, 458, 510]

In [None]:
plt.figure(figsize=(9, 4))

# Set the x-axis limits
plt.xlim(-30, len(X)+40)

# Shade the left half of the figure in light blue
first_test_index = test_ind[0]  # Get the index of the first test observation
plt.axvspan(-30, first_test_index, facecolor='lightblue', alpha=0.4)

# Create tick locations and labels
tick_interval = 52
# origin data is [0, 52, 104, 156, 208, 260, 313, 365, 417, 469, 521]
tick_locations = np.array([0, 41, 93, 145, 197, 249, 302, 354, 406, 458, 510])
tick_label_locations = tick_locations[:-1] + tick_interval / 2
tick_labels = [f'{year:02d}' for year in range(9, 19)]

plt.plot(y, label='Observed', linestyle='-', linewidth=0.6, alpha=1, color='black')
plt.scatter(range(len(y_pred_store)), y_pred_store, label=F'LSTM-CNN {n_steps} week ahead forecast', marker='o', s=10, alpha=1, color='darkorchid')


plt.xlabel('Year', fontsize=12)
plt.ylabel('IPDs ED Attendances', fontsize=12)

plt.xticks(tick_locations)
plt.gca().set_xticklabels([])
plt.gca().set_xticks(tick_label_locations, minor=True)
plt.gca().set_xticklabels(tick_labels, minor=True)
plt.gca().tick_params(axis='x', which='minor', length=0)


plt.legend(loc='upper left', frameon=False)
plt.savefig('LSTMCNN8.tif', format='tif', dpi=400)
plt.show()


In [None]:
# # （without CV） just had a try. It is not our main code

# import torch
# import torch.nn as nn
# from torch.utils.data import Dataset, DataLoader
# from sklearn.preprocessing import StandardScaler

# class TimeSeriesDataset(Dataset):
#     def __init__(self, X_data, y_data):
#         self.X_data = X_data
#         self.y_data = y_data

#     def __getitem__(self, index):
#         return self.X_data[index], self.y_data[index]

#     def __len__(self):
#         return len(self.X_data)

# class LSTM_CNN(nn.Module):
#     def __init__(self, input_size=200, hidden_layer_size=16, output_size=1, num_layers=2):
#         super().__init__()
#         self.hidden_layer_size = hidden_layer_size

#         self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers, batch_first=True)
#         self.conv1 = nn.Conv1d(1, 4, kernel_size=3)
#         self.relu1 = nn.ReLU()
#         self.pool1 = nn.MaxPool1d(2)
#         self.conv2 = nn.Conv1d(4, 24, kernel_size=2)
#         self.relu2 = nn.ReLU()
#         self.pool2 = nn.MaxPool1d(2)
#         self.fc1 = nn.Linear(72, 8)
#         self.fc2 = nn.Linear(8, output_size)

#     def forward(self, input_seq):
#         lstm_out, _ = self.lstm(input_seq.view(len(input_seq), 1, -1))

#         conv_out = self.conv1(lstm_out)
#         conv_out = self.relu1(conv_out)
#         conv_out = self.pool1(conv_out)

#         conv_out = self.conv2(conv_out)
#         conv_out = self.relu2(conv_out)
#         conv_out = self.pool2(conv_out)

#         fc_out = self.fc1(conv_out.view(len(input_seq), -1))
#         predictions = self.fc2(fc_out)

#         return predictions

# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# train_size = int(len(X) * 0.7)
# test_ind = range(train_size, len(X))

# y_test_array = np.array([])
# y_pred_array = np.array([])

# train_ind = list(range(0, int(train_size * 0.85)))
# val_ind = list(range(int(train_size * 0.85), train_size))
# test_ind = list(range(train_size, len(X)))
# X_train = X[train_ind]
# X_val = X[val_ind]
# X_test = X[test_ind]
# y_train = y[train_ind]
# y_val = y[val_ind]
# y_test = y[test_ind]

# scaler = StandardScaler()
# X_train = scaler.fit_transform(X_train)
# X_val = scaler.transform(X_val)  # Scale validation data
# X_test = scaler.transform(X_test)

# model = LSTM_CNN().to(device)
# loss_function = nn.MSELoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.0002, weight_decay=1e-3)

# train_data = TimeSeriesDataset(torch.from_numpy(X_train).float(), torch.from_numpy(y_train).float())
# val_data = TimeSeriesDataset(torch.from_numpy(X_val).float(), torch.from_numpy(y_val).float())  # Validation data
# test_data = TimeSeriesDataset(torch.from_numpy(X_test).float(), torch.from_numpy(y_test).float())
# train_loader = DataLoader(train_data, batch_size=16, shuffle=False, drop_last=False)
# val_loader = DataLoader(val_data, batch_size=16, shuffle=False, drop_last=False)  # Validation loader
# test_loader = DataLoader(test_data, batch_size=16, shuffle=False, drop_last=False)

# num_epochs = 200
# best_val_loss = float("inf")  # Set initial best validation loss to infinity
# train_draw = []
# val_draw = []
# best_epoch = 0

# for epoch in range(num_epochs):  # set the number of epochs
#     model.train()
#     mean_train_loss = 0.0
#     train_losses = []
#     for seq, labels in train_loader:
#         optimizer.zero_grad()

#         y_pred = model(seq.to(device))
#         labels = labels.to(device).unsqueeze(1)
#         single_loss = loss_function(y_pred, labels)
#         single_loss.backward()
#         optimizer.step()
#         train_losses.append(single_loss)
#     mean_train_loss = torch.stack(train_losses).mean()
#     mean_train_loss = mean_train_loss.detach().cpu().numpy()


#     model.eval()
#     val_losses = []
#     with torch.no_grad():
#         for seq, labels in val_loader:
#             y_val_pred = model(seq.to(device))
#             labels = labels.to(device).unsqueeze(1)
#             val_loss = loss_function(y_val_pred, labels).item()
#             val_losses.append(val_loss)
#         mean_val_loss = np.mean(val_losses)

#     train_draw.append(mean_train_loss)
#     val_draw.append(mean_val_loss)

#     print('Epoch [{}/{}], Train Loss: {:.4f}, Val Loss: {:.4f}'
#           .format(epoch+1, num_epochs, mean_train_loss, mean_val_loss))

#     if mean_val_loss < best_val_loss:
#         best_val_loss = mean_val_loss
#         best_epoch = epoch
#         torch.save(model.state_dict(), 'best_model.pt')  # Save the model

# print('The epoch corresponding to the optimal validation loss is {}'.format(best_epoch))

In [None]:
# model.load_state_dict(torch.load('best_model.pt'))
# model.eval()
# test_loss = 0.0
# outputs_list = []
# targets_list = []
# with torch.no_grad():
#     for inputs, targets in test_loader:
#         outputs = model(inputs.to(device))
#         targets = targets.to(device).unsqueeze(1)
#         loss = loss_function(outputs, targets)
#         test_loss += loss.item() * inputs.size(0)
#         outputs_list.extend(outputs.detach().cpu().numpy())
#         targets_list.extend(targets.detach().cpu().numpy())
#     test_loss /= len(test_loader.dataset)
# outputs_arr = np.array(outputs_list)
# targets_arr = np.array(targets_list)


# print('Test Loss: {:.4f}'.format(test_loss))

In [None]:
# def mape(actual, predicted):
#     mask = actual != 0
#     return np.mean(np.abs((actual[mask] - predicted[mask]) / actual[mask])) * 100

# mape_value = mape(targets_arr, outputs_arr)
# print("MAPE: {:.2f}%".format(mape_value))