### Importing Libraries
Import the required libraries for data analysis.

In [None]:
import time
import scipy.io
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import snntorch as snn
from snntorch import utils
from snntorch import surrogate
from snntorch import functional as SF

from codecarbon import EmissionsTracker

## Dataset

In [None]:
class GraphDataset(Dataset):
    FILE_PATH = ''
    
    def __init__(self, user_id=None, label_list=None, num_samples=None, num_steps=None):
        self.user_id = user_id
        self.label_list = label_list
        self.num_samples = num_samples
        
        self.data, self.labels = self.load_data(user_id, label_list, num_steps)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.Tensor(self.data[idx]), self.labels[idx]
    
    def load_data(self, user_id, label_list, num_steps):
        data = list()
        labels = list()
        
        for i in range(len(label_list)):
            for j in range(1, self.num_samples + 1):
                try:
                    adjMatrix = scipy.io.loadmat(
                        self.FILE_PATH + str(user_id) +
                        '_label' + str(label_list[i]) +
                        '_item' + str(j) +
                        '_steps' + str(num_steps) + '.mat')
                                        
                except Exception as e:
                    print(f"Load fail: {e}")
                    continue
                    
                if num_steps == 1:
                    tmpAdjMatrix = adjMatrix['fullMatrix'][np.newaxis, :, :]
                else:
                    tmpAdjMatrix = list()
                    for k in range(num_steps):
                        tmpAdjMatrix.append(adjMatrix['fullMatrix'][:, :, k])
                                        
                data.append(np.array(tmpAdjMatrix))
                labels.append(label_list[i])
                                
        if not data or not labels:
            raise RuntimeError("No data or labels loaded.")
                    
        data = torch.tensor(np.stack(data), dtype=torch.float32)
                
        label_dict = {label: index for index, label in enumerate(self.label_list)}
        new_label_list = [label_dict[label] for label in labels]
        labels = torch.tensor(new_label_list, dtype=torch.long)
        
        return data, labels

In [None]:
def split_dataset(full_dataset, batch_size, perc):
    train_size = int(perc * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    return train_loader, test_loader

## Neural Network
### CNN

In [None]:
class CNNMIAged(nn.Module):
    def __init__(self, label_list):
        super(CNNMIAged, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.norm1 = nn.BatchNorm2d(6)

        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.norm2 = nn.BatchNorm2d(16)
        self.dropout1 = nn.Dropout(0.5)
        self.pool1 = nn.AvgPool2d(2, 2)
        
        self.conv3 = nn.Conv2d(16, 16, kernel_size=5)
        self.norm3 = nn.BatchNorm2d(16)
        self.dropout2 = nn.Dropout(0.5)
        self.pool2 = nn.AvgPool2d(2, 2)

        self.dropout3 = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(16, len(label_list))

    def forward(self, x):
        x = F.elu(self.norm1(self.conv1(x)))
        
        x = F.elu(self.norm2(self.conv2(x)))
        x = self.dropout1(x)
        x = self.pool1(x)
        
        x = F.elu(self.norm3(self.conv3(x)))
        x = self.dropout2(x)
        x = self.pool2(x)
        
        x = torch.flatten(x, 1)
        
        x = self.dropout3(x)
        
        x = self.fc1(x)
        
        return x

In [None]:
def training_network_CNN(net, num_epochs, train_loader, lr):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=lr, betas=(0.9, 0.999))
    
    tracker = EmissionsTracker()
    tracker.start()
    start_time = time.time()
    for epoch in range(num_epochs):

        for i, data in enumerate(train_loader, 0):
            inputs, targets = data
            
            inputs = inputs.to(device)
            targets = targets.to(device)

            optimizer.zero_grad()

            outputs = net(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            
    emissions: float = tracker.stop()
            
    return (time.time() - start_time) / 60, emissions

In [None]:
def test_network_CNN(net, test_loader):
    correct = 0
    total = 0

    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            outputs = net(images)
            _, predictions = torch.max(outputs, 1)

            total += labels.size(0)
            correct += (predictions == labels).sum().item()
            
    return correct / total

### SCNN

In [None]:
spike_grad = surrogate.fast_sigmoid(slope=25)
beta = 0.9

class SCNN(nn.Module):
    def __init__(self, label_list):
        super(SCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.norm1 = nn.BatchNorm2d(6)
        self.lif1 = snn.Leaky(beta=beta, spike_grad=spike_grad)

        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.norm2 = nn.BatchNorm2d(16)
        self.lif2 = snn.Leaky(beta=beta, spike_grad=spike_grad)
        self.dropout1 = nn.Dropout(0.5)
        self.pool1 = nn.AvgPool2d(2, 2)
        
        self.conv3 = nn.Conv2d(16, 16, kernel_size=5)
        self.norm3 = nn.BatchNorm2d(16)
        self.lif3 = snn.Leaky(beta=beta, spike_grad=spike_grad)
        self.dropout2 = nn.Dropout(0.5)
        self.pool2 = nn.AvgPool2d(2, 2)

        self.dropout3 = nn.Dropout(0.5)
        
        self.fc1 = nn.Linear(144, len(label_list))
        self.lif4 = snn.Leaky(beta=beta, spike_grad=spike_grad)

    def forward(self, x):
        # Initialize hidden states and outputs at t=0
        mem1 = self.lif1.init_leaky()
        mem2 = self.lif2.init_leaky()
        mem3 = self.lif3.init_leaky()
        mem4 = self.lif4.init_leaky()
        
        cur1 = self.norm1(self.conv1(x))
        spk1, mem1 = self.lif1(cur1, mem1)
        
        cur2 = self.norm2(self.conv2(spk1))
        spk2, mem2 = self.lif2(cur2, mem2)
        x = self.dropout1(spk2)
        x = self.pool1(x)

        cur3 = self.norm3(self.conv3(x))
        spk3, mem3 = self.lif3(cur3, mem3)
        x = self.dropout2(x)
        x = self.pool2(x)
        
        x = torch.flatten(x, 1)
        
        x = self.dropout3(x)
        
        cur4 = self.fc1(x)
        spk4, mem4 = self.lif4(cur4, mem4)

        return spk4, mem4

In [None]:
def forward_pass(net, num_steps, data):
    mem_rec = []
    spk_rec = []
    utils.reset(net)  # resets hidden states for all LIF neurons in net
    
    for step in range(num_steps):
        spk_out, mem_out = net(data)
        spk_rec.append(spk_out)
        mem_rec.append(mem_out)
        
    return torch.stack(spk_rec), torch.stack(mem_rec)

In [None]:
def training_network_SCNN(net, num_epochs, train_loader, lr, batch_size, num_steps):    
    loss_fn = SF.ce_rate_loss()
    
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=lr, betas=(0.9, 0.999))
        
    tracker = EmissionsTracker()
    tracker.start()
    start_time = time.time()
    for epoch in range(num_epochs):

        for data, targets in iter(train_loader):
            data = data.to(device)
            targets = targets.to(device)
                        
            net.train()
            spk_rec, _ = forward_pass(net, num_steps, data)

            # initialize the loss & sum over time
            loss_val = loss_fn(spk_rec, targets)

            # Gradient calculation + weight update
            optimizer.zero_grad()
            loss_val.backward()
            optimizer.step()   
    
    emissions: float = tracker.stop()
            
    return (time.time() - start_time) / 60, emissions

In [None]:
def test_network_SCNN(net, test_loader, num_steps):
    total = 0
    correct = 0
    
    with torch.no_grad():
        net.eval()
        
        for data, targets in iter(test_loader):
            data = data.to(device)
            targets = targets.to(device)
            spk_rec, _ = forward_pass(net, num_steps, data)

            correct += SF.accuracy_rate(spk_rec, targets) * spk_rec.size(1)
            total += spk_rec.size(1)
    
    return correct / total

# Calculation of the accuracies

In [None]:
def user_printer(user_id):
    print("\n" * 2)
    print("#" * 50)
    print("#                     User " + str(i) + "                     #")
    print("#" * 50)
    print("\n")

In [None]:
rept = 1
num_users = 9

num_samples = 72
label_list = [769, 770]

lr = 1e-2
batch_size = 6
num_epochs = 200

# Dataloader arguments
dtype = torch.float
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")

# Temporal Dynamics
num_inputs =  22 * 22
num_steps = 1

In [None]:
CNN_outcomes = {
    'acc': [],
    'duration': [],
    'carbon': []
}

SCNN_outcomes = {
    'acc': [],
    'duration': [],
    'carbon': []
}

for i in range(1, num_users + 1):
    user_printer(i)
    
    user_acc_CNN = []
    user_duration_CNN = []
    user_carbon_CNN = []
    
    user_acc_SCNN = []
    user_duration_SCNN = []
    user_carbon_SCNN = []
    
    ## Load the dataset of a specific user
    full_dataset = GraphDataset(user_id=i, label_list=label_list,
                            num_samples=num_samples, num_steps=num_steps)
    
    for j in range(rept):
        train_loader, test_loader = split_dataset(full_dataset, batch_size, 0.75)
        
        ## Classification        
        # Load the network
        CNN_net = CNNMIAged(label_list).to(device)
        SCNN_net = SCNN(label_list).to(device)
        
        ### Training
        ## CNN
        duration, carbon = training_network_CNN(CNN_net, num_epochs, train_loader, lr)
        user_duration_CNN.append(duration)
        user_carbon_CNN.append(carbon)
        
        acc = test_network_CNN(CNN_net, test_loader)
        user_acc_CNN.append(acc)
        
        ## SCNN
        duration, carbon = training_network_SCNN(SCNN_net, num_epochs, train_loader, lr, batch_size, num_steps)
        user_duration_SCNN.append(duration)
        user_carbon_SCNN.append(carbon)
        
        acc = test_network_SCNN(SCNN_net, test_loader, num_steps)
        user_acc_SCNN.append(acc)
        
    CNN_outcomes['acc'].append(user_acc_CNN)
    CNN_outcomes['duration'].append(user_duration_CNN)
    CNN_outcomes['carbon'].append(user_carbon_CNN)
    
    SCNN_outcomes['acc'].append(user_acc_SCNN)
    SCNN_outcomes['duration'].append(user_duration_SCNN)
    SCNN_outcomes['carbon'].append(user_carbon_SCNN)

In [None]:
def calc_statistics(metrics):
    values = np.array(metrics)
    mean = np.mean(values, axis=1)
    std = np.std(values, axis=1)
    
    return mean, std

In [None]:
print(calc_statistics(CNN_outcomes['acc']))
print(calc_statistics(CNN_outcomes['duration']))
print(calc_statistics(CNN_outcomes['carbon']))

In [None]:
print(calc_statistics(SCNN_outcomes['acc']))
print(calc_statistics(SCNN_outcomes['duration']))
print(calc_statistics(SCNN_outcomes['carbon']))

### Time

In [None]:
num_steps = 25

In [None]:
SCNN_time_outcomes = {
    'acc': [],
    'duration': [],
    'carbon': []
}

for i in range(1, num_users + 1):
    user_printer(i)
    
    user_acc_SCNN = []
    user_duration_SCNN = []
    user_carbon_SCNN = []
        
    ## Load the dataset of a specific user
    full_dataset = GraphDataset(user_id=i, label_list=label_list,
                            num_samples=num_samples, num_steps=1)
    
    for j in range(rept):
        train_loader, test_loader = split_dataset(full_dataset, batch_size, 0.75)
        
        ## Classification
        # Load the network
        SCNN_net = SCNN(label_list).to(device)
        
        ### Training
        ## SCNN
        duration, carbon = training_network_SCNN(SCNN_net, num_epochs, train_loader, lr, batch_size, num_steps)
        user_duration_SCNN.append(duration)
        user_carbon_SCNN.append(carbon)
        
        acc = test_network_SCNN(SCNN_net, test_loader, num_steps)
        user_acc_SCNN.append(acc)
        
    SCNN_time_outcomes['acc'].append(user_acc_SCNN)
    SCNN_time_outcomes['duration'].append(user_duration_SCNN)
    SCNN_time_outcomes['carbon'].append(user_carbon_SCNN)

In [None]:
print(calc_statistics(SCNN_time_outcomes['acc']))
print(calc_statistics(SCNN_time_outcomes['duration']))
print(calc_statistics(SCNN_time_outcomes['carbon']))

#### Time evolution 

In [None]:
def forward_pass(net, num_steps, data):
    mem_rec = []
    spk_rec = []
    utils.reset(net)  # resets hidden states for all LIF neurons in net
    
    for step in range(num_steps):
        data_step = data[:, step, :, :].unsqueeze(1)
        
        spk_out, mem_out = net(data_step)
        spk_rec.append(spk_out)
        mem_rec.append(mem_out)
        
    return torch.stack(spk_rec), torch.stack(mem_rec)

In [None]:
SCNN_evol_time_outcomes = {
    'acc': [],
    'duration': [],
    'carbon': []
}

for i in range(1, num_users + 1):
    user_printer(i)
    
    user_acc_SCNN = []
    user_duration_SCNN = []
    user_carbon_SCNN = []
    
    ## Load the dataset of a specific user
    full_dataset = GraphDataset(user_id=i, label_list=label_list,
                            num_samples=num_samples, num_steps=num_steps)
    
    for j in range(rept):
        train_loader, test_loader = split_dataset(full_dataset, batch_size, 0.75)
        
        ## Classification
        # Load the network
        SCNN_net = SCNN(label_list).to(device)
        
        ### Training
        ## SCNN
        duration, carbon = training_network_SCNN(SCNN_net, num_epochs, train_loader, lr, batch_size, num_steps)
        user_duration_SCNN.append(duration)
        user_carbon_SCNN.append(carbon)
        
        acc = test_network_SCNN(SCNN_net, test_loader, num_steps)
        user_acc_SCNN.append(acc)
        
    SCNN_evol_time_outcomes['acc'].append(user_acc_SCNN)
    SCNN_evol_time_outcomes['duration'].append(user_duration_SCNN)
    SCNN_evol_time_outcomes['carbon'].append(user_carbon_SCNN)

In [None]:
print(calc_statistics(SCNN_evol_time_outcomes['acc']))
print(calc_statistics(SCNN_evol_time_outcomes['duration']))
print(calc_statistics(SCNN_evol_time_outcomes['carbon']))