#INSTRUCTIONS TO EXECUTE THE NOTEBOOK
1. Execute all the cells in a sequential manner. Don't jump around and execute cells.
2. After you are done executing and saving results (Training and Inferencing) for one activation function, restart session, otherwise the notebook won't run in the second attempt.
3. Just confirm once after saving results that the results are being saved in the correct directory.

In [None]:
import numpy as np
import torch
import torchvision
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torchsummary import summary
from torch.optim.lr_scheduler import ReduceLROnPlateau
from time import time
import seaborn as sns
import matplotlib.pyplot as plt
import os
from math import floor
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.is_available())

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

In [None]:
Year = input("Year: ")
Area = input("Area: ")
Ablation = input("Ablation(0 or 1): ")
activation_fn_list =  ['ReLU','LeakyReLU','SwishReLU','z^2cos(z)','DSU','GCU','SSU']
activation_fn = input(f"Input of Activation Function from the list {activation_fn_list}: ")
generalizing = input("Generalizing(0 or 1): ")

In [None]:
img_path = f'/content/drive/MyDrive/{Area}_{Year}.tif'
img_path

In [None]:
if Ablation == '0':
  feats = 'NA'
  df_train = pd.read_csv(f'/content/drive/MyDrive/training_processed_{Area}_{Year}.csv',index_col=0)
  df_test = pd.read_csv(f'/content/drive/MyDrive/testing_processed_{Area}_{Year}.csv',index_col=0)
else:
  feats = input("Features(s,si,st): ")
  df_train = pd.read_csv(f'/content/drive/MyDrive/training_{feats}_{Area}_{Year}.csv',index_col=0)
  df_test = pd.read_csv(f'/content/drive/MyDrive/testing_{feats}_{Area}_{Year}.csv',index_col=0)

In [None]:
if Ablation=='0':
  sample = pd.read_csv(f'/content/drive/MyDrive/processed_data/training_processed_{Area}_{Year}.csv',index_col=0)
else:
  sample = pd.read_csv(f'/content/drive/MyDrive/Ablation/training_{feats}_{Area}_{Year}.csv',index_col=0)
bands_list = sample.columns.values[1:]
print(bands_list)

In [None]:
df_test.head()

In [None]:
#Shuffling data
train_df = df_train.sample(frac=1).reset_index(drop=True)
test_df = df_test.sample(frac=1).reset_index(drop=True)
train_df.head()

In [None]:
train_df.head()

In [None]:
y_train = train_df['Class']
y_test = test_df['Class']

In [None]:
y_train.value_counts()

In [None]:
y_test.value_counts()

In [None]:
y_train.head()

In [None]:
y_test.head()

In [None]:
classes = y_train.unique()
num_class = y_train.nunique()
class2 = y_test.unique()

print(f"Classes are - {classes}")
print(f"Number of classes are - {num_class}")

In [None]:
train_df.drop(columns=['Class'],inplace=True)
test_df.drop(columns=['Class'],inplace=True)
train_df.head()

In [None]:
class_list = ["Water","Urban","Vegetation","Barren"]
class_dict = {}
for i in class2:
  class_dict[i] = class_list[i]
print(class_dict)


# DATA LOADERS

In [None]:
class CSVDataset(Dataset):
    def __init__(self, dataframe, target_column, transform=None):
        """
        Args:
            dataframe (pd.DataFrame): DataFrame containing the data.
            target_column (str): Name of the target column. If None, assumes no labels (for inference).
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.dataframe = dataframe
        self.target_column = target_column
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # if torch.is_tensor(idx):
        #     idx = idx.tolist()

        # Get the data (features) as a tensor

        features = self.dataframe.iloc[idx].values
        label = self.target_column.iloc[idx]

        features = torch.tensor(features, dtype=torch.float32)  # Convert features to a tensor
        features = torch.reshape(features,(1,-1))
        label = torch.tensor(label, dtype=torch.long)  # Convert label to a tensor

        if self.transform:
          features = self.transform(features)

        return features, label


In [None]:
train_dataset = CSVDataset(train_df,y_train)
test_dataset = CSVDataset(test_df,y_test)
#After this you can also use random_split to split the dataset, however, we have already defined the train and test dataset during data collection stage
print(train_dataset[0])

In [None]:
train_dataset = DataLoader(train_dataset, batch_size=32, shuffle=False, num_workers=2)
test_dataset = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

In [None]:
print(f"Length of the Training Data Batches {len(train_dataset)}")
input_features =0
for input,labels in train_dataset:
  input_features = input.shape[2]
  print(f"Dimensions of the first batch {input.shape}")
  print(f"Dimensions of the first batch's labels {labels.shape}")
  break



# Modelling Phase
* Here we are creating a 1D CNN with input dimensions as above, we will scale it up later

## Activation Functions

In [None]:
class SincActivation(nn.Module):
    def forward(self, input):
        # Implementing sinc(z - pi)
        return torch.sin(input - torch.pi) / (input - torch.pi + 1e-9)  # Added epsilon to avoid division by zero

class ZCosActivation(nn.Module):
    def forward(self, input):
        # z * cos(z)
        return input * torch.cos(input)

class SincSubActivation(nn.Module):
    def forward(self, input):
        # 2 * (sinc(z - pi) - sinc(z + pi))
        sinc_left = torch.sin(input - torch.pi) / (input - torch.pi + 1e-9)
        sinc_right = torch.sin(input + torch.pi) / (input + torch.pi + 1e-9)
        return 2 * (sinc_left - sinc_right)

class zsqcosz(nn.Module):
  def forward(self,input):
    return input**2 * torch.cos(input)

In [None]:

class Simple1DCNN(nn.Module):
    def __init__(self,activation_fn):
        super(Simple1DCNN, self).__init__()

        # Define the CNN layers
        self.conv1a = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1)  # 1 input channel, 16 output channels
        self.conv1b = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=1)  # 1 input channel, 16 output channels
        self.bn1a = nn.BatchNorm1d(16)  # Batch normalization after first convolution layer
        self.bn1b = nn.BatchNorm1d(32)  # Batch normalization after second convolution layer

        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)  # First pooling layer

        self.conv2a = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1)  # 16 input channels, 32 output channels
        self.bn2a = nn.BatchNorm1d(64)  # Batch normalization after third convolution layer

        self.conv2b = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, stride=1)  # 16 input channels, 32 output channels
        self.bn2b = nn.BatchNorm1d(128)  # Batch normalization after fourth convolution layer

        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)  # Second pooling layer


        self.drop = nn.Dropout(0.2)
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)  # Takes in input of any shape, gives us the output of our desired shape by averaging the values adaptively (Based upon the input dimensions and output dimensions)
        self.fc1 = nn.Linear(128, 64)  # Fully connected layer to output 4 classes
        self.bn_fc1 = nn.BatchNorm1d(64)  # Batch normalization before dropout in fully connected layers

        self.fc2 = nn.Linear(64, 32)  # Fully connected layer to output 4 classes
        self.bn_fc2 = nn.BatchNorm1d(32)

        self.fc3 = nn.Linear(32,4)

        self.activation = activation_fn
        # Activation and softmax layers
        if(activation_fn == 'ReLU'):
          self.activation = nn.ReLU()
        elif(activation_fn == 'LeakyReLU'):
          self.activation = nn.LeakyReLU()
        elif(activation_fn == 'SwishReLU'):
          self.activation = nn.SiLU()
        elif(activation_fn == 'z^2cos(z)'):
          self.activation = SincSubActivation()
        elif(activation_fn == 'DSU'):
          self.activation = SincSubActivation()
        elif(activation_fn == 'GCU'):
          self.activation = SincSubActivation()
        else:
          self.activation = SincActivation()

        self.softmax = nn.Softmax(dim=1)  # Softmax activation

    def forward(self, x):
        # Pass through conv layer, ReLU, and pooling

       # Convolution, BatchNorm, Activation, Pooling, Dropout
        x = self.activation(self.conv1a(x))
        x = self.activation(self.conv1b(x))
        x = self.pool1(x)
        x=self.drop(x)

        x = self.activation(self.conv2a(x))
        x = self.activation(self.conv2b(x))
        x = self.pool2(x)
        x=self.drop(x)

        # Global Average Pooling
        # x = self.global_avg_pool(x)

        # Flatten for the fully connected layer
        x = x.view(-1, 128)

        # Fully connected layers with BatchNorm and Dropout
        x = self.drop(self.activation(self.fc1(x)))
        x = self.drop(self.activation(self.fc2(x)))
        x = self.fc3(x)
        x = self.softmax(x)
        return x

# SMALL CNN


In [None]:
# # @title DON'T RUN IF BIG CNN IS RUNNING
# class Simple1DCNN(nn.Module):
#     def __init__(self,activation_fn):
#         super(Simple1DCNN, self).__init__()

#         # Define the CNN layers
#         self.conv1a = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1)  # 1 input channel, 16 output channels

#         self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)  # First pooling layer

#         self.conv2a = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=1)  # 16 input channels, 32 output channels

#         self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)  # Second pooling layer


#         self.drop = nn.Dropout(0.2)
#         self.global_avg_pool = nn.AdaptiveAvgPool1d(1)  # Takes in input of any shape, gives us the output of our desired shape by averaging the values adaptively (Based upon the input dimensions and output dimensions)
#         self.fc1 = nn.Linear(32, 4)  # Fully connected layer to output 4 classes
#         # self.fc2 = nn.Linear(64, 32)  # Fully connected layer to output 4 classes
#         # self.fc3 = nn.Linear(32,4)

#         self.activation = activation_fn
#         # Activation and softmax layers
#         if(activation_fn == 'ReLU'):
#           self.activation = nn.ReLU()
#         elif(activation_fn == 'LeakyReLU'):
#           self.activation = nn.LeakyReLU()
#         elif(activation_fn == 'SwishReLU'):
#           self.activation = nn.SiLU()
#         elif(activation_fn == 'z^2cos(z)'):
#           self.activation = zsqcosz()
#         elif(activation_fn == 'DSU'):
#           self.activation = SincSubActivation()
#         elif(activation_fn == 'GCU'):
#           self.activation = ZCosActivation()
#         else:
#           self.activation = SincActivation()

#         self.softmax = nn.Softmax(dim=1)  # Softmax activation

#     def forward(self, x):
#         # Pass through conv layer, ReLU, and pooling

#         x = self.activation(self.conv1a(x))
#         x = self.pool1(x)

#         x = self.activation(self.conv2a(x))
#         x = self.pool2(x)

#         # Global Average Pooling
#         x = self.global_avg_pool(x)


#         # Flatten the output for the fully connected layer
#         x = x.view(-1, 32)  # After global average pooling, output shape is [batch_size, 32, 1] --> Therefore we reshape it
#         # Fully connected layer
#         x = self.fc1(x)
#         # x = self.drop(x)
#         # x = self.activation(self.fc2(x))
#         # x = self.drop(x)
#         # x = self.fc3(x)
#         x = self.softmax(x)
#         return x

# For Four Features Ablation Study Dataset

In [None]:
# class Simple1DCNNAblation(nn.Module):
#     def __init__(self, input_dim,activation_fn):
#         super(Simple1DCNNAblation, self).__init__()

#         # Initialize an empty ModuleList to hold CNN layers
#         self.cnn_layers = nn.ModuleList()

#         # Initial configuration
#         in_channels = 1  # Starting input channels
#         out_channels = 0  # Starting output channels
#         kernel_size = 3
#         stride = 1
#         pool_size = 2

#         current_dim = input_dim  # Track current dimension of the feature map
#         while current_dim >= kernel_size:
#             # Add Conv layer
#             out_channels+=16
#             self.cnn_layers.append(
#                 nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride)
#                 nn.BatchNorm1d(out_channels)
#             )
#             in_channels=out_channels
#             out_channels+=16
#             self.cnn_layers.append(
#                 nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride)
#                 nn.BatchNorm1d(out_channels)

#             )

#             # Update dimensions and channel numbers for the next layer
#             current_dim = floor((current_dim - (kernel_size - 1) - 1) / pool_size)  # Pooling reduces the dimension
#             in_channels = out_channels  # Input channels for the next layer is the output channels of this layer
#               # Incrementally increase out_channels for each added CNN layer

#         self.fc = nn.ModuleList()
#         self.drop = nn.Dropout(0.2)
#         while(out_channels!=32):
#           self.fc.append(nn.Linear(out_channels,out_channels/2))
#           out_channels = out_channels/2

#         self.pool1d = nn.MaxPool1d(kernel_size=pool_size, stride=pool_size)
#         self.global_avg_pool = nn.AdaptiveAvgPool1d(1)  # Takes in input of any shape, gives us the output of our desired shape by averaging the values adaptively (Based upon the input dimensions and output dimensions)
#         self.drop = nn.Dropout(0.2)
#         self.fc3 = nn.Linear(out_channels, 4)
#         self.activation=None
#         if(activation_fn == 'ReLU'):
#           self.activation = nn.ReLU()
#         elif(activation_fn == 'LeakyReLU'):
#           self.activation = nn.LeakyReLU()
#         elif(activation_fn == 'SwishReLU'):
#           self.activation = nn.SiLU()
#         elif(activation_fn == 'z^2cos(z)'):
#           self.activation = zsqcosz()
#         elif(activation_fn == 'DSU'):
#           self.activation = SincSubActivation()
#         elif(activation_fn == 'GCU'):
#           self.activation = ZCosActivation()
#         else:
#           self.activation = SincActivation()
#         self.softmax = nn.Softmax(dim=1)

#     def forward(self, x):
#         #Dynamically pool the output from the Conv layers
#         for i,layer in enumerate(self.cnn_layers):
#           if(!isinstance(layer,nn.BatchNorm1d())):
#             x = self.activation(layer(x))
#           else:
#             x = layer(x)
#             x = self.activation(x)
#             if(i%2==0 & i!=0):
#               if(x.size()[2]>=2):
#                 x = self.pool1d(x)
#                 x = self.drop(x)

#         # Global Average Pooling
#         x = self.global_avg_pool(x)

#         # Flatten the output for fully connected layers
#         x = x.view(x.size(0), -1)
#         for layer in self.fc:
#           x = self.activation(layer(x))
#           x = self.drop(x)
#         x = self.softmax(self.fc3(x))

#         return x

In [None]:
from math import floor

class Simple1DCNNAblation(nn.Module):
    def __init__(self, input_dim, activation_fn):
        super(Simple1DCNNAblation, self).__init__()

        self.cnn_layers = nn.ModuleList()

        in_channels = 1
        out_channels = 16  # Start with a valid number of output channels
        kernel_size = 3
        stride = 1
        pool_size = 2

        current_dim = input_dim  # Track current dimension of the feature map
        while current_dim >= kernel_size:
            self.cnn_layers.append(nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride))

            # Update in_channels and out_channels for the next layer
            in_channels = out_channels
            out_channels += 16

            # Update the current dimension after pooling
            current_dim = floor((current_dim - (kernel_size - 1) - 1) / pool_size)
            if current_dim < kernel_size:
                break

        # Save the final output size after CNN layers and pooling
        self.final_cnn_output_size = in_channels * current_dim

        # Fully connected layers
        self.fc_layers = nn.ModuleList()
        fc_input_size = self.final_cnn_output_size
        while fc_input_size > 32:
            self.fc_layers.append(nn.Linear(fc_input_size, fc_input_size // 2))
            fc_input_size //= 2

        self.pool1d = nn.MaxPool1d(kernel_size=pool_size, stride=pool_size)
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        self.drop = nn.Dropout(0.2)
        self.fc3 = nn.Linear(fc_input_size, 16)
        self.fc4 = nn.Linear(16, 4)

        # Define activation function
        if activation_fn == 'ReLU':
            self.activation = nn.ReLU()
        elif activation_fn == 'LeakyReLU':
            self.activation = nn.LeakyReLU()
        elif activation_fn == 'SwishReLU':
            self.activation = nn.SiLU()
        elif activation_fn == 'z^2cos(z)':
            self.activation = SincSubActivation()
        elif activation_fn == 'DSU':
            self.activation = SincSubActivation()
        elif activation_fn == 'GCU':
            self.activation = SincSubActivation()
        else:
            self.activation = SincActivation()

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        for i in range(len(self.cnn_layers)):
            x = self.activation(self.cnn_layers[i](x))  # Apply activation after convolution

            if x.size(2) >= 2:
                x = self.pool1d(x)  # Apply pooling
                x = self.drop(x)   # Apply dropout

        # Global Average Pooling
        x = self.global_avg_pool(x)

        # Flatten for fully connected layers
        x = x.view(x.size(0), -1)
        for layer in self.fc_layers:
            x = self.activation(layer(x))
            x = self.drop(x)

        x = self.activation(self.fc3(x))
        x = self.softmax(self.fc4(x))

        return x


In [None]:
if(feats=='NA'):
  model = Simple1DCNN(activation_fn).to(device)
  print(model)
  print(summary(model,(1,19)))
else:
  # Create an instance of the model
  model = Simple1DCNNAblation(input_features,activation_fn).to(device)
  print(model)
  print(summary(model,(1,input_features)))

In [None]:
def validate(model, test_loader, criterion):
    model.eval()
    total_loss = 0.0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:

            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            total += labels.size(0)

    avg_loss = total_loss / len(test_loader)
    print(f'Validation Loss: {avg_loss:.4f}')
    return avg_loss

In [None]:
from sklearn.utils.class_weight import compute_class_weight

# Calculate class weights based on the distribution of the labels
def get_class_weights(labels):
    class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
    return torch.tensor(class_weights, dtype=torch.float)

In [None]:
all_labels = []

# Gather all labels from the train_loader to compute class weights
for inputs, labels in train_dataset:
    all_labels.extend(labels.numpy())
all_labels = np.array(all_labels)

# Calculate class weights
class_weights = get_class_weights(all_labels)
print(f"Class weights: {class_weights}")

# Move weights to the same device as the model
class_weights = class_weights.to(device)

In [None]:
path_name=None
if(Ablation=='0'):
  path_name = 'Non-Ablated_Data'
else:
  if(feats=='s'):
    path_name = 'Spectral_Features'
  elif(feats=='si'):
    path_name = 'Spectral+Indices'
  else:
    path_name = 'Spectral+Textures'

In [None]:
if(generalizing=='0'):
  generalizing='Non-generalizability'
else:
  generalizing='Generalizability'

In [None]:
y_train.value_counts()

In [None]:
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss(weight = class_weights)  # Assume 4 classes for this example
optimizer = optim.Adam(model.parameters(), lr=0.001)


def train(model, train_loader, test_loader, criterion,optimizer,epochs=101,patience=5):
    training_logs = {'Training_Loss':[],'Training_Accuracy':[],'Validation_Loss':[]}
    model.train()  # Set model to training mode
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)
    best_loss = float('inf')  # Initialize best loss for early stopping
    patience_counter = 0  # Counter for early stopping
    train_losses = []  # List to store the loss values for each epoch
    train_accuracy = [] #List to store training accuracy

    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total=0

        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            # Clear the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)

            # Compute the loss
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Print statistics
            running_loss += loss.item()
            # Get predictions and calculate accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Calculate accuracy for this epoch
        accuracy = 100 * correct / total
            # if i % 10 == 9:    # Print every 10 batches
            #     print(f'Epoch {epoch + 1}, Batch {i + 1}, Loss: {running_loss / 10:.4f}')
        # Compute average loss for the epoch
        epoch_loss = running_loss / len(train_loader)
        train_losses.append(epoch_loss)  # Store epoch loss
        train_accuracy.append(accuracy)
        training_logs['Training_Loss'].append(epoch_loss)
        training_logs['Training_Accuracy'].append(accuracy)

        print(f'Epoch [{epoch + 1}/{epochs}], Average Loss: {epoch_loss:.4f}, Average Accuracy {accuracy:.4f}')



        # Validate the model after each epoch
        val_loss = validate(model, test_loader, criterion)
        training_logs['Validation_Loss'].append(val_loss)
        scheduler.step(val_loss)  # Step the scheduler


### Early Stopping commented for Experimentation purposes -


        # # Early stopping
        # if val_loss < best_loss:
        #     best_loss = val_loss
        #     patience_counter = 0  # Reset counter
        #     print(f"Validation loss improved to {best_loss:.4f}.")
        # else:
        #     patience_counter += 1
        #     print(f"No improvement in validation loss. Patience counter: {patience_counter}/{patience}")
        #     if patience_counter >= patience:
        #         print("Early stopping triggered.")
        #         break


    # Plot Loss
    plt.figure(figsize=(12, 6))

    # Plot Training Loss
    # Plot Loss
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Loss', color='blue', linewidth=2, marker='o')
    plt.title(f'Training Loss - Activation Function: {activation_fn}', fontsize=14)
    plt.xlabel('Epoch', fontsize=12)
    plt.ylabel('Loss', fontsize=12)
    plt.grid(False)  # Remove grid
    plt.legend(fontsize=12)

    # Plot Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracy, label='Accuracy', color='green', linewidth=2, marker='o')
    plt.title(f'Training Accuracy - Activation Function: {activation_fn}', fontsize=14)
    plt.xlabel('Epoch', fontsize=12)
    plt.ylabel('Accuracy (%)', fontsize=12)
    plt.grid(False)  # Remove grid
    plt.legend(fontsize=12)

    plt.tight_layout()

    # Save the figure
    os.makedirs(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}', exist_ok=True)
    plt.savefig(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/training_plot.png', dpi=300)
    plt.show()
    return training_logs


In [None]:
os.makedirs(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/demo.txt', exist_ok=True)

In [None]:

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, cohen_kappa_score


def inference(model, test_loader, num_classes,name):
    test_statistics = {}
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    all_labels = []
    all_predictions = []

    with torch.no_grad():  # No need to track gradients during inference
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)

            # Get predictions
            _, predicted = torch.max(outputs, 1)

            # Store predictions and true labels for further analysis
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            # Calculate accuracy
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # Convert to numpy arrays for metrics computation
    all_labels = np.array(all_labels)
    all_predictions = np.array(all_predictions)
    classes = np.unique(all_predictions)
    print(classes)
    # print(np.unique(all_predictions))
    # print(np.unique(all_labels))

    # Accuracy
    accuracy = 100 * correct / total
    print(f'Accuracy of the network on the test data: {accuracy:.2f}%')

    # Precision, Recall, and F1-Score (average='weighted' to consider class imbalance)

    #As Precision, Recall and F1 is calculated per class (One vs all) and then aggregated, so weighted parameter will aggregate the values using the sample size and give weights accordingly
    precision = precision_score(all_labels, all_predictions, average='weighted')
    recall = recall_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')

    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'F1-Score: {f1:.2f}')

    # Confusion Matrix
    cm = confusion_matrix(all_labels, all_predictions)
    print(f'Confusion Matrix:\n{cm}')

    # Plot the confusion matrix using seaborn
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    os.makedirs(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}', exist_ok=True)
    if(generalizing=='Non-generalizability'):
      plt.savefig(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/confusion_matrix.png', dpi=300)
    else:
      if(name=='Chicago'):
        plt.savefig(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/confusion_matrix.png', dpi=300)
      else:
        plt.savefig(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/confusion_matrix.png', dpi=300)

    plt.show()

    # User's Accuracy (Precision per class)
    user_accuracy = cm.diagonal() / cm.sum(axis=1)
    for i, ua in enumerate(user_accuracy):
        print(f"User's Accuracy for class {i}: {ua:.2f}")

    # Producer's Accuracy (Recall per class)
    producer_accuracy = cm.diagonal() / cm.sum(axis=0)
    print(f"Producer Accuracyy {producer_accuracy}")
    print(type(producer_accuracy))
    for i, pa in enumerate(producer_accuracy):
        print(f"Producer's Accuracy for class {i}: {pa:.2f}")

    # Kappa Score
    kappa = cohen_kappa_score(all_labels, all_predictions)
    print(f'Kappa Score: {kappa:.2f}')

    test_statistics["Test Accuracy"] = [accuracy]
    test_statistics["Precision"] = [precision]
    test_statistics["Recall"] = [recall]
    test_statistics["F1-Score"] = [f1]
    test_statistics["Kappa Score"] = [kappa]
    test_statistics["Confusion Matrix"] = [cm.tolist()]
    # print(cm.tolist())
    df = pd.DataFrame(test_statistics)
    user_accuracy_cols = {f'User Accuracy Class {i}': [acc] for i, acc in enumerate(user_accuracy.tolist())}
    print(user_accuracy_cols)
    producer_accuracy_cols = {f'Producer Accuracy Class {i}': [acc] for i, acc in enumerate(producer_accuracy.tolist())}

    # Combine all columns into a single DataFrame
    df = pd.concat([df, pd.DataFrame({**user_accuracy_cols, **producer_accuracy_cols})], axis=1)

    test_statistics["User Accuracy"] = user_accuracy.tolist()
    test_statistics["Producer Accuracy"] = producer_accuracy.tolist()
    test_statistics["Confusion Matrix"] = [cm]
    # Open a new text file and write test statistics
    if(generalizing=='Non-generalizability'):
      save_path = f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/test_logs.txt'
    else:
      if(name=='Chicago'):
        save_path = f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/test_logs.txt'
      else:
        save_path = f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/Generalizetest_logs.txt'

    with open(save_path, 'w') as file:
        for key, value in test_statistics.items():
            # Formatting each line
            if isinstance(value, list):  # for lists like User and Producer Accuracy
                value = ', '.join(map(str, value))
            elif isinstance(value, list):  # for lists like Confusion Matrix
                value = '\n' + '\n'.join(['\t'.join(map(str, row)) for row in value])
            file.write(f"{key}: {value}\n")


    return df


In [None]:
# @title Previous Inference CODE
# import numpy as np
# import torch
# import seaborn as sns
# import matplotlib.pyplot as plt
# from sklearn.metrics import (precision_score, recall_score, f1_score,
#                              confusion_matrix, cohen_kappa_score)

# def inference(model, test_loader, num_classes):
#     test_statistics = {}
#     model.eval()  # Set the model to evaluation mode
#     correct = 0
#     total = 0
#     all_labels = []
#     all_predictions = []

#     with torch.no_grad():  # No need to track gradients during inference
#         for inputs, labels in test_loader:
#             inputs, labels = inputs.to(device), labels.to(device)

#             # Get model outputs
#             outputs = model(inputs)

#             # Get predictions
#             _, predicted = torch.max(outputs, 1)

#             # Store predictions and true labels for further analysis
#             all_predictions.extend(predicted.cpu().numpy())
#             all_labels.extend(labels.cpu().numpy())

#             # Calculate accuracy
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()

#     # Convert to numpy arrays for metrics computation
#     all_labels = np.array(all_labels).ravel()  # Ensure it's 1D
#     all_predictions = np.array(all_predictions).ravel()  # Ensure it's 1D

#     # Check shapes
#     print("Shape of all_labels:", all_labels.shape)
#     print("Shape of all_predictions:", all_predictions.shape)

#     # Accuracy
#     accuracy = 100 * correct / total
#     print(f'Accuracy of the network on the test data: {accuracy:.2f}%')

#     # Precision, Recall, and F1-Score
#     precision = precision_score(all_labels, all_predictions, average='weighted')
#     recall = recall_score(all_labels, all_predictions, average='weighted')
#     f1 = f1_score(all_labels, all_predictions, average='weighted')

#     print(f'Precision: {precision:.2f}')
#     print(f'Recall: {recall:.2f}')
#     print(f'F1-Score: {f1:.2f}')

#     # Confusion Matrix
#     cm = confusion_matrix(all_labels, all_predictions)
#     print(f'Confusion Matrix:\n{cm}')

#     # Ensure cm is not empty
#     if cm.size == 0:
#         raise ValueError("Confusion matrix is empty. Check your predictions and labels.")

#     # Plot the confusion matrix using seaborn
#     plt.figure(figsize=(8, 6))
#     sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class2, yticklabels=class2)
#     plt.title('Confusion Matrix')
#     plt.xlabel('Predicted Label')
#     plt.ylabel('True Label')
#     plt.savefig(f'confusion_matrix.png', dpi=300)  # Save the plot
#     plt.show()

#     # User's Accuracy (Precision per class)
#     user_accuracy = cm.diagonal() / cm.sum(axis=1)
#     for i, ua in enumerate(user_accuracy):
#         print(f"User's Accuracy for class {i}: {ua:.2f}")

#     # Producer's Accuracy (Recall per class)
#     producer_accuracy = cm.diagonal() / cm.sum(axis=0)
#     for i, pa in enumerate(producer_accuracy):
#         print(f"Producer's Accuracy for class {i}: {pa:.2f}")

#     # Kappa Score
#     kappa = cohen_kappa_score(all_labels, all_predictions)
#     print(f'Kappa Score: {kappa:.2f}')

#     test_statistics["Test Accuracy"] = accuracy
#     test_statistics["Precision"] = precision
#     test_statistics["Recall"] = recall
#     test_statistics["F1-Score"] = f1
#     test_statistics["User Accuracy"] = user_accuracy
#     test_statistics["Producer Accuracy"] = producer_accuracy
#     test_statistics["Kappa Score"] = kappa
#     test_statistics["Confusion Matrix"] = cm.tolist()

#     return test_statistics


#TRAINING

In [None]:
# Train the model for 100 epochs

train_logs = train(model, train_dataset, test_dataset, criterion, optimizer,epochs=100)

In [None]:
# train_logs = pd.DataFrame(train_logs)
# train_logs.head()
# torch.save(model.state_dict(),  f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/best_model.pt')

In [None]:
# model_path = f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/best_model.pt'
# model = Simple1DCNN(activation_fn)
# model.load_state_dict(torch.load(model_path))
# model.to(device)

#INFERENCING

In [None]:
#Inferencing
test_logs = inference(model, test_dataset,4,"Pune")

In [None]:
print(test_logs.head())

In [None]:
# @title
train_logs = pd.DataFrame(train_logs)
train_logs.to_csv(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/train_logs.csv',index=False)

# if(generalizing=='0'):
test_logs = pd.DataFrame(test_logs)
test_logs.to_csv(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/test_logs.csv',index=False)
# else:
#   test_logs = pd.DataFrame(test_logs)
#   test_logs.to_csv(f'/content/drive/MyDrive/Results/CNN/{generalizing}/{path_name}/{activation_fn}/test_logs_chicago_inference.csv',index=False)
