GCNN Model 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
import pandas as pd
import networkx as nx
from torch_geometric.utils import from_networkx
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt

# Loading the data 

df = pd.read_csv(r"C:\Users\hp\Downloads\Optimized_Covid19_Dataset.csv")
df = df.sample(frac=0.2)  # Randomly sample 20% of the data for downsampling

# Defining  the default feature vector (ensuring  all the  nodes have these features)
default_feature = torch.tensor([0.0, 0.0, 0.0], dtype=torch.float)  # Default feature vector for nodes

# Feature scaling
scaler = StandardScaler()
df[['Population', 'Cases', 'Deaths']] = scaler.fit_transform(df[['Population', 'Cases', 'Deaths']])  # Standardizing  the features

# Creating  a graph
G = nx.Graph()  # Initialize graph

# Adding nodes with features
for i, row in df.iterrows():
    features = torch.tensor([row['Population'], row['Cases'], row['Deaths']], dtype=torch.float)  # Extracting features
    G.add_node(i, x=features)  # Adding  node with features

# Grouping by state and creating edges more efficiently
state_groups = df.groupby('Province_State').groups  # Grouping nodes by 'Province_State'

for state, indices in state_groups.items():
    edges = [(i, j) for idx, i in enumerate(indices) for j in indices[idx + 1:]]  # Create edges within the same state
    G.add_edges_from(edges)  # Adding edges to the graph

# Checking and ensuring all nodes have the same features
for node in G.nodes:
    if 'x' not in G.nodes[node]:
        G.nodes[node]['x'] = default_feature  # Assiging  default features if missing

# Converting to PyTorch Geometric data
data = from_networkx(G)  # Converting NetworkX graph to PyTorch Geometric format

# Defining a GCN model with more layers and dropout
class GCN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate=0.4):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)  # First GCN layer
        self.conv2 = GCNConv(hidden_dim, hidden_dim)  # Second GCN layer
        self.conv3 = GCNConv(hidden_dim, output_dim)  # Third GCN layer (output)
        self.dropout_rate = dropout_rate  # Dropout rate

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)  # Applying first GCN layer
        x = F.relu(x)  # Applyng ReLU activation
        x = F.dropout(x, p=self.dropout_rate, training=self.training)  # dropout
        x = self.conv2(x, edge_index)  # Appling second GCN layer
        x = F.relu(x)  # Applying ReLU activation
        x = F.dropout(x, p=self.dropout_rate, training=self.training)  # Applying dropout
        x = self.conv3(x, edge_index)  # Applying third GCN layer (output)
        return x  # Return the final output

model = GCN(input_dim=3, hidden_dim=38, output_dim=1, dropout_rate=0.2)  # Initialize the GCN model

# predicting next week's cases
df['Next_Week_Cases'] = df.groupby('Counties')['Cases'].shift(-1).fillna(0)  # Creating target variable for next week's cases
y = torch.tensor(df['Next_Week_Cases'].values, dtype=torch.float).unsqueeze(1)  # Converting target to tensor

# Spliting data into train and validation
indices = list(range(len(df)))
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)  # Split indices into train and validation sets

# Creating masks
train_mask = torch.zeros(data.num_nodes, dtype=torch.bool)
train_mask[train_indices] = True  # Mask for training nodes

val_mask = torch.zeros(data.num_nodes, dtype=torch.bool)
val_mask[val_indices] = True  # Mask for validation nodes

# Defining optimizer with L2 regularization (weight decay)
optimizer = optim.Adam(model.parameters(), lr=0.008, weight_decay=1e-4)  # Adam optimizer with L2 regularization
criterion = nn.MSELoss()  # Mean Squared Error loss

# Training with early stopping
best_val_loss = float('inf')
patience = 10
patience_counter = 0

for epoch in range(100):  # Training for up to 100 epochs
    model.train()
    optimizer.zero_grad()  
    output = model(data) 
    train_output = output[train_mask]  # Getting the output for training data
    loss = criterion(train_output, y[train_mask])  # Calculating training loss
    loss.backward()  # Backpropagation
    optimizer.step()  # Update model parameters

    # Validation
    model.eval()
    with torch.no_grad():
        val_output = output[val_mask]  # Get the output for validation data
        val_loss = criterion(val_output, y[val_mask])  # Calculate validation loss

    print(f'Epoch {epoch}, Train Loss: {loss.item()}, Val Loss: {val_loss.item()}')  # Print training and validation loss

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss 
        patience_counter = 0 
    else:
        patience_counter += 1  

    if patience_counter >= patience:
        print("Early stopping due to no improvement in validation loss")  # Early stopping condition
        break

# Evaluating with regression metrics
model.eval()
with torch.no_grad():
    predictions = model(data) 
    y_true = y.numpy().flatten()  # Ground truth values
    y_pred = predictions.numpy().flatten()  # Predicted values

    # Convert predictions to binary using a threshold for classification metrics
    threshold = 0.5  # Define threshold for binary classification
    y_binary = (y_true > threshold).astype(int)  
    y_pred_binary = (y_pred > threshold).astype(int)  

    # Calculating classification metrics
    accuracy = accuracy_score(y_binary, y_pred_binary)  # Calculate accuracy
    precision = precision_score(y_binary, y_pred_binary, average='macro')  # Calculate precision
    recall = recall_score(y_binary, y_pred_binary, average='macro')  # Calculate recall
    f1 = f1_score(y_binary, y_pred_binary, average='macro')  # Calculate F1-score
    conf_matrix = confusion_matrix(y_binary, y_pred_binary)  # Generate confusion matrix

    # Calculate RMSE
    rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))  # Root Mean Squared Error

    # Calculate R-squared
    ss_res = np.sum((y_true - y_pred) ** 2)  # Sum of squares of residuals
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)  # Total sum of squares
    r2 = 1 - (ss_res / ss_tot)  # R-squared calculation

    # Print metrics
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision (Macro): {precision:.4f}')
    print(f'Recall (Sensitivity, Macro): {recall:.4f}')
    print(f'F1-Score (Macro): {f1:.4f}')
    print(f'Confusion Matrix:\n{conf_matrix}')
    print(f'RMSE: {rmse:.4f}')  # Print RMSE
    print(f'R-squared: {r2:.4f}')  # Print R-squared


LSTM Model 

In [None]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.regularizers import l2

# Loading the data 

df = pd.read_csv(r"C:\Users\hp\Downloads\Optimized_Covid19_Dataset.csv")
df = df.sample(frac=0.2)  # Randomly sample 20% of the data for downsampling

# Preprocess the data
county_data = df[df['Counties'] == df['Counties'].iloc[0]]  # Filter by the first county for example
time_series_data = county_data[['Weeks', 'Cases', 'Deaths']].set_index('Weeks')
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(time_series_data)

def create_sequences(data, seq_length):
    x, y = [], []
    for i in range(len(data) - seq_length):
        x.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(x), np.array(y)

seq_length = 4
X, y = create_sequences(scaled_data, seq_length)

# Reshape y for SMOTE
threshold = 0.5  # Assuming binary classification for 'Deaths' > 0
y_classification = (y[:, 1] > threshold).astype(int)

# Split the data into training and test sets
X_train, X_test, y_train, y_test, y_train_class, y_test_class = train_test_split(X, y, y_classification, test_size=0.2, random_state=42)


# Apply SMOTE to the training data
smote = SMOTE(k_neighbors=2, random_state=42)
X_train_res, y_train_class_res = smote.fit_resample(X_train.reshape(X_train.shape[0], -1), y_train_class)
X_train_res = X_train_res.reshape(-1, seq_length, 2)


# To match the new resampled labels, replicate y_train values based on y_train_class_res
y_train_res = np.array([y_train[i % len(y_train)] for i in range(len(y_train_class_res))])

# Define the LSTM model with regularization
model = Sequential([
    LSTM(50, activation='relu', input_shape=(seq_length, 2), return_sequences=True, kernel_regularizer=l2(0.001)),
    LSTM(50, activation='relu', kernel_regularizer=l2(0.001)),
    Dense(25, activation='relu', kernel_regularizer=l2(0.001)),
    Dense(2)
])

# Compile the model
model.compile(optimizer='adam', loss='mse')

# Train the model
history = model.fit(X_train_res, y_train_res, epochs=100, validation_data=(X_test, y_test), verbose=1)

# Evaluate the model
loss = model.evaluate(X_test, y_test, verbose=1)
print(f'Test Loss: {loss}')

# Forecast future data
predicted = model.predict(X_test)
predicted = scaler.inverse_transform(predicted)

# Convert predictions back to the original scale
y_test_orig = scaler.inverse_transform(y_test)

# Filter out near-zero values
near_zero_threshold = 1e-6
filtered_indices = np.abs(y_test_orig[:, 1]) > near_zero_threshold  # Adjusting the threshold if needed
y_test_filtered = y_test_orig[filtered_indices]
predicted_filtered = predicted[filtered_indices]

# For classification metrics, we'll need to threshold the predictions and actuals
predicted_binary = (predicted[:, 1] > threshold).astype(int)  # For the 'Deaths' column
y_test_binary = (y_test_orig[:, 1] > threshold).astype(int)

# Calculate classification metrics
accuracy = accuracy_score(y_test_binary, predicted_binary)
precision = precision_score(y_test_binary, predicted_binary, average='macro')
recall = recall_score(y_test_binary, predicted_binary, average='macro')
f1 = f1_score(y_test_binary, predicted_binary, average='macro')
conf_matrix = confusion_matrix(y_test_binary, predicted_binary)

# Output the evaluation metrics
print(f'Accuracy: {accuracy:.4f}')
print(f'Precision (Macro): {precision:.4f}')
print(f'Recall (Sensitivity, Macro): {recall:.4f}')
print(f'F1-Score (Macro): {f1:.4f}')
print(f'Confusion Matrix:\n{conf_matrix}')

# Print each metric explicitly
print(f'Accuracy: {accuracy:.4f}')
print(f'Precision (Macro): {precision:.4f}')
print(f'Recall (Sensitivity, Macro): {recall:.4f}')
print(f'F1-Score (Macro): {f1:.4f}')
print(f'Confusion Matrix:\n{conf_matrix}')

from sklearn.metrics import mean_squared_error, r2_score
import numpy as np

# Compute RMSE for the 'Deaths' column (or the full data if you prefer)
rmse = np.sqrt(mean_squared_error(y_test_filtered[:, 1], predicted_filtered[:, 1]))
print(f'RMSE (Deaths): {rmse:.4f}')

# Compute R-squared for the 'Deaths' column
r2 = r2_score(y_test_filtered[:, 1], predicted_filtered[:, 1])
print(f'R-squared (Deaths): {r2:.4f}')
