# Import Libraries

In [None]:
# we worked with google collab so this is the needed setup to run the project
from google.colab import drive
drive.mount('/content/drive/')

my_project_folder = ''

# **Set the file paths and notes**

<p>Use option = 0 if you want to do training and validation using 2 of the folds (keeping the 3rd fold for final training and testing).</p>

<p>Use option = 1 if you want to do final training (using 2 folds) and testing (with the 3rd fold).</p>

Be sure to update the name of the files - doing 3 cross-validation in this project is done manually with the prepared data to create comparable results with the rest of the projects. See an example below.

Due to the nature of this team's model, early stopping is carried out by manual experimentation during testing and validation for 3CV. To help reach a decision on the best number of epochs for training - refer to the tables with the loss of the generator and discriminator.


In [None]:
option = 1

# path - !!! Comment out the option you don't need. Currently it will proceed with latter option !!!
# path for training and validation (without using the 3rd fold). This is an example from the ClusterCV folder
path_lr_data_tr = 'lr_split_AandB_training.csv'
path_hr_data_tr = 'hr_split_AandB_training.csv'
path_lr_data_valid = 'lr_split_AandB_validation.csv'
path_hr_data_valid = 'hr_split_AandB_validation.csv'

# path for final training and testing (using fold A and B for training and C for testing).This is an example from the ClusterCV folder
path_lr_data = 'lr_split_AandB_finaltraining.csv'
path_hr_data = 'hr_split_AandB_finaltraining.csv'
path_lr_data_test = 'lr_clusterC.csv'
path_hr_data_test = 'hr_clusterC.csv'

# path to save the evaluation metrics to - this means that Cluster C will be used for testing
path_eval_matrics = '25-clusterCV-split3.csv'

In [None]:
!pip install scipy

In [None]:
# Standard scientific computing and data visualization libraries
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Machine learning and deep learning libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.model_selection import KFold

# Network analysis and graph libraries
import networkx as nx

# Statistical metrics and distance calculations
from sklearn.metrics import mean_squared_error, mean_absolute_error
from scipy.stats import pearsonr
from scipy.spatial.distance import jensenshannon

# Additional utilities
import random
import csv
import scipy.io
from scipy.io import loadmat
import tqdm

In [None]:
# Set a fixed random seed for reproducibility across multiple libraries
random_seed = 42
random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)

# Check for CUDA (GPU support) and set device accordingly
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA is available. Using GPU.")
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed)  # For multi-GPU setups
    # Additional settings for ensuring reproducibility on CUDA
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
else:
    device = torch.device("cpu")
    print("CUDA not available. Using CPU.")

# Preprocessing

## Matrix Vectorizer

In [None]:
class MatrixVectorizer:
    """
    A class for transforming between matrices and vector representations.

    This class provides methods to convert a symmetric matrix into a vector (vectorize)
    and to reconstruct the matrix from its vector form (anti_vectorize), focusing on
    vertical (column-based) traversal and handling of elements.
    """

    def __init__(self):
        """
        Initializes the MatrixVectorizer instance.

        The constructor currently does not perform any actions but is included for
        potential future extensions where initialization parameters might be required.
        """
        pass

    @staticmethod
    def vectorize(matrix, include_diagonal=False):
        """
        Converts a matrix into a vector by vertically extracting elements.

        This method traverses the matrix column by column, collecting elements from the
        upper triangle, and optionally includes the diagonal elements immediately below
        the main diagonal based on the include_diagonal flag.

        Parameters:
        - matrix (numpy.ndarray): The matrix to be vectorized.
        - include_diagonal (bool, optional): Flag to include diagonal elements in the vectorization.
          Defaults to False.

        Returns:
        - numpy.ndarray: The vectorized form of the matrix.
        """
        # Determine the size of the matrix based on its first dimension
        matrix_size = matrix.shape[0]

        # Initialize an empty list to accumulate vector elements
        vector_elements = []

        # Iterate over columns and then rows to collect the relevant elements
        for col in range(matrix_size):
            for row in range(matrix_size):
                # Skip diagonal elements if not including them
                if row != col:
                    if row < col:
                        # Collect upper triangle elements
                        vector_elements.append(matrix[row, col])
                    elif include_diagonal and row == col + 1:
                        # Optionally include the diagonal elements immediately below the diagonal
                        vector_elements.append(matrix[row, col])

        return np.array(vector_elements)

    @staticmethod
    def anti_vectorize(vector, matrix_size, include_diagonal=False):
        """
        Reconstructs a matrix from its vector form, filling it vertically.

        The method fills the matrix by reflecting vector elements into the upper triangle
        and optionally including the diagonal elements based on the include_diagonal flag.

        Parameters:
        - vector (numpy.ndarray): The vector to be transformed into a matrix.
        - matrix_size (int): The size of the square matrix to be reconstructed.
        - include_diagonal (bool, optional): Flag to include diagonal elements in the reconstruction.
          Defaults to False.

        Returns:
        - numpy.ndarray: The reconstructed square matrix.
        """
        # Initialize a square matrix of zeros with the specified size
        matrix = np.zeros((matrix_size, matrix_size))

        # Index to keep track of the current position in the vector
        vector_idx = 0

        # Fill the matrix by iterating over columns and then rows
        for col in range(matrix_size):
            for row in range(matrix_size):
                # Skip diagonal elements if not including them
                if row != col:
                    if row < col:
                        # Reflect vector elements into the upper triangle and its mirror in the lower triangle
                        matrix[row, col] = vector[vector_idx]
                        matrix[col, row] = vector[vector_idx]
                        vector_idx += 1
                    elif include_diagonal and row == col + 1:
                        # Optionally fill the diagonal elements after completing each column
                        matrix[row, col] = vector[vector_idx]
                        matrix[col, row] = vector[vector_idx]
                        vector_idx += 1

        return matrix

## Turn .csv to .mat Files

In [None]:
# Modified from https://stackoverflow.com/questions/20818121/save-csv-to-mat-or-binary
train = []
test = []
hr_train = []

with open(path_lr_data) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
        rowData = [ float(elem) for elem in row ]
        train.append(rowData)

with open(path_lr_data_test) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
        rowData = [ float(elem) for elem in row ]
        test.append(rowData)

with open(path_hr_data) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
        rowData = [ float(elem) for elem in row ]
        hr_train.append(rowData)
lentrain = len(train)
lentest = len(test)

train = np.array(train)
test = np.array(test)
hr_train = np.array(hr_train)
scipy.io.savemat('LR_data_160.mat', {'train':train,'test':test})
scipy.io.savemat('HR_data_268.mat', {'train':hr_train})

## Loading data from Mat files (run it!)

In [None]:
# Path to your .mat files

lr_data_path = './LR_data_160.mat'
hr_data_path = './HR_data_268.mat'

# Load the data
lr_data = loadmat(my_project_folder +lr_data_path)
hr_data = loadmat(my_project_folder +hr_data_path)

## Preprocessing (no need to run)

In [None]:
# Print the keys to see what variables are inside
print("LR Data Keys:", lr_data.keys())
print("HR Data Keys:", hr_data.keys())
print()

# Print basic information about 'LR' and 'HR' variables
print("LR Data Type:", type(lr_data['train']))
print("LR Data Shape:", lr_data['train'].shape)
print()

print("LR Data Type:", type(lr_data['test']))
print("LR Data Shape:", lr_data['test'].shape)
print()

print("HR Data Type:", type(hr_data['train']))
print("HR Data Shape:", hr_data['train'].shape)
print()
# If the data types are numpy arrays and not too large, you can print a small part of them
# Adjust the slicing as needed to avoid printing too much data
print("Sample from LR training Data:\n", lr_data['train'][:5])  # Adjust the index range as needed
print()
print("Sample from HR training Data:\n", hr_data['train'][:5])  # Adjust the index range as needed

In [None]:
lr_array = np.concatenate((lr_data['train'], lr_data['test']), axis=0)
hr_array = hr_data['train']

# Replace all negative values with 0
lr_array[lr_array < 0] = 0
hr_array[hr_array < 0] = 0

# Replace any 'NaN' values with 0
np.nan_to_num(lr_array, copy=False)
np.nan_to_num(hr_array, copy=False)

assert np.min(lr_array) == 0.
assert np.min(hr_array) == 0.

In [None]:
# Define a function to calculate statistics and return them in a dictionary
def calculate_statistics(data):
    statistics = {
        'Mean': np.mean(data),
        'Median': np.median(data),
        'Standard Deviation': np.std(data),
        'Min': np.min(data),
        'Max': np.max(data)
    }
    return statistics

# Calculate statistics for LR and HR data
lr_stats = calculate_statistics(lr_array)
hr_stats = calculate_statistics(hr_array)

# Create a DataFrame to hold the statistics for comparison
df_stats = pd.DataFrame({'LR Data': lr_stats, 'HR Data': hr_stats})

# Round the numbers to four decimal places for better readability
df_stats = df_stats.round(4)

df_stats

In [None]:
# Setting the Seaborn theme for nice aesthetics
sns.set_theme(style="whitegrid")

# Plotting histograms on the same figure for comparison
plt.figure(figsize=(10, 6))

# Making histograms semi-transparent with alpha and using a higher bin count for finer detail
sns.histplot(lr_array.flatten(), bins=100, color='blue', alpha=0.5, label='LR Data')
sns.histplot(hr_array.flatten(), bins=100, color='red', alpha=0.5, label='HR Data')

# Adding titles and labels
plt.title('Combined Distribution of LR and HR Data')
plt.xlabel('Value')
plt.ylabel('Frequency')

# Adding a legend to differentiate between LR and HR data
plt.legend()

plt.show()

In [None]:
# Setting the Seaborn theme for aesthetics
sns.set_theme(style="whitegrid")

# Plotting histograms on the same figure for comparison, excluding zeros
plt.figure(figsize=(10, 6))

# Making histograms semi-transparent with alpha and using a higher bin count for finer detail
# We filter out the zeros using lr_array[lr_array > 0].flatten() and hr_array[hr_array > 0].flatten()
sns.histplot(lr_array[lr_array > 0].flatten(), bins=100, color='blue', alpha=0.5, label='LR Data')
sns.histplot(hr_array[hr_array > 0].flatten(), bins=100, color='red', alpha=0.5, label='HR Data')

# Adding titles and labels
plt.title('Combined Distribution of LR and HR Data (Excluding Zeros)')
plt.xlabel('Value')
plt.ylabel('Frequency')

# Adding a legend to differentiate between LR and HR data
plt.legend()

# Display the plot
plt.show()

In [None]:
plt.figure(figsize=(20, 8))

# Heatmap of a subset of LR data
plt.subplot(1, 2, 1)
plt.imshow(lr_array, aspect='auto', cmap='viridis')  # Adjust subset size as needed
plt.colorbar()
plt.title('LR Data Heatmap')

# Heatmap of a subset of HR data
plt.subplot(1, 2, 2)
plt.imshow(hr_array, aspect='auto', cmap='viridis')  # Adjust subset size as needed
plt.colorbar()
plt.title('HR Data Heatmap')

plt.show()

In [None]:
# Choose which matrix to visualise
i = 18

# Now, we use the function for LR and HR data with the right flags
lr_vector = lr_array[i, :]
hr_vector = hr_array[i, :]

# Now, we use the function for LR and HR data with the right flags
lr_matrix = MatrixVectorizer.anti_vectorize(lr_vector, 160, include_diagonal=False)
hr_matrix = MatrixVectorizer.anti_vectorize(hr_vector, 268, include_diagonal=False)

# Plot the heatmaps
plt.figure(figsize=(18, 8))

# LR Data Heatmap
plt.subplot(1, 2, 1)
sns.heatmap(lr_matrix, square=True, cmap='viridis')
plt.title('LR Data Heatmap (160x160)')

# HR Data Heatmap
plt.subplot(1, 2, 2)
sns.heatmap(hr_matrix, square=True, cmap='viridis')
plt.title('HR Data Heatmap (268x268)')

plt.show()

In [None]:
# Toy vector for demonstration
toy_vector = np.array([1, 2, 3, 4, 5, 6])  # This is a vectorized upper triangular part of a 4x4 matrix

# The matrix_size is 4 for our toy example
matrix_size = 4

# We call the anti_vectorize function with include_diagonal=False since we are not including the diagonal
toy_matrix = MatrixVectorizer.anti_vectorize(toy_vector, matrix_size, include_diagonal=False)

toy_matrix

In [None]:
vectorized_matrix = MatrixVectorizer.vectorize(toy_matrix, include_diagonal=False)

vectorized_matrix

## Anti-Vectorize the Data

In [None]:
# Anti vectorize the data

X = np.zeros((lentrain,160,160))
Y = np.zeros((lentrain,268,268))
X_test=np.zeros((lentest,160,160))


for i in range(lentrain):
  X[i] = MatrixVectorizer.anti_vectorize(lr_data['train'][i], 160, include_diagonal=False)
  Y[i] = MatrixVectorizer.anti_vectorize(hr_data['train'][i], 268, include_diagonal=False)

for i in range(lentest):
  X_test[i]=MatrixVectorizer.anti_vectorize(lr_data['test'][i], 160, include_diagonal=False)


# Plot the heatmaps
plt.figure(figsize=(18, 8))

# LR Data Heatmap
plt.subplot(1, 2, 1)
sns.heatmap(X[12], square=True, cmap='viridis')
plt.title('LR Data Heatmap (160x160)')

# HR Data Heatmap
plt.subplot(1, 2, 2)
sns.heatmap(Y[12], square=True, cmap='viridis')
plt.title('HR Data Heatmap (268x268)')

plt.show()


# Plotting, Model Class Definitions, Training and Testing.

## Plotting and Writing Functions

In [None]:
def make_one_plot(mae, pcc, js_dis, avg_mae_pc, avg_mae_ec, avg_mae_bc,title):
    """
    Create a barplot visualizing various error metrics for model evaluation.

    Parameters:
    - mae (float): Mean Absolute Error of the predicted matrices compared to ground truth.
    - pcc (float): Pearson Correlation Coefficient between the vectorized predicted and ground truth matrices.
    - js_dis (float): Jensen-Shannon Distance between vectorized predicted and ground truth matrices.
    - avg_mae_pc (float): Average Mean Absolute Error for PageRank Centrality.
    - avg_mae_ec (float): Average Mean Absolute Error for Eigenvector Centrality.
    - avg_mae_bc (float): Average Mean Absolute Error for Betweenness Centrality.
    - title (str): Title for the plot.

    Returns:
    - None: Displays the barplot.
    """

    labels = ['MAE','PCC','JSD','MAE (PC)', 'MAE (EC)', 'MAE (BC)']
    data = [mae, pcc, js_dis, avg_mae_pc, avg_mae_ec, avg_mae_bc]
    colors = ['#F46867', '#70B163', '#6A67FF', '#F9C963', '#83FEFF', '#80FE5F']

    plt.figure(figsize=(6, 4))
    plt.title(title)

    plt.bar(x=labels, height=data, color=colors)
    plt.ylabel('Error')
    plt.show()

def plot_gan_loss(g_loss, d_loss, mae_loss, mae_loss_val=None):
    """
    Plot the Generator, Discriminator, and Mean Absolute Error (MAE) losses
    (training and validation) during training.

    Parameters:
    - g_loss (list): List of Generator loss values across epochs.
    - d_loss (list): List of Discriminator loss values across epochs.
    - mae_loss (list): List of Mean Absolute Error (MAE) values across epochs for the training data.
    - mae_loss_val (list) :List of Mean Absolute Error (MAE) values across epochs for the validation data.

    Returns:
    - None: Displays the separate loss plots.
    """

    # Plot Generator and Discriminator loss
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.title("Generator and Discriminator Loss During Training")
    plt.plot(g_loss, label="Generator")
    plt.plot(d_loss, label="Discriminator")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    if mae_loss_val is not None:
      # Plot Mean Absolute Error (MAE) loss of both training and testing throughout training
      plt.subplot(1, 2, 2)
      plt.title("Mean Absolute Error (MAE) Loss During Training")
      plt.plot(mae_loss, label="Training MAE",color='blue')
      plt.plot(mae_loss_val, label="Validation MAE",color='purple')

      plt.xlabel("Epoch")
      plt.ylabel("Loss")
      plt.legend()

    else:
      # Plot Mean Absolute Error (MAE) loss without Validation
      plt.subplot(1, 2, 2)
      plt.title("Mean Absolute Error (MAE) Loss During Training")
      plt.plot(mae_loss, label="MAE")
      plt.xlabel("Epoch")
      plt.ylabel("Loss")
      plt.legend()


    plt.tight_layout()
    plt.show()

def calculate_metrics(pred_matrices, gt_matrices, print_losses=False, plot=False):
  """
  Calculate various metrics to evaluate the predictions.

  Parameters:
  - pred_matrices (list of 2D arrays): List of predicted adjacency matrices.
  - gt_matrices (list of 2D arrays): List of ground truth adjacency matrices.
  - print_losses (bool): If True, print calculated metrics. Default is False.
  - plot (bool): If True, plot barplots of metrics. Default is False.

  Returns:
  - mae (float): Mean Absolute Error of the predicted matrices compared to ground truth.
  - pcc (float): Pearson Correlation Coefficient between the vectorized predicted and ground truth matrices.
  - avg_mae_bc (float): Average Mean Absolute Error for Betweenness Centrality.
  - avg_mae_ec (float): Average Mean Absolute Error for Eigenvector Centrality.
  - avg_mae_pc (float): Average Mean Absolute Error for PageRank Centrality.
  - js_dis (float): Jensen-Shannon Distance between vectorized predicted and ground truth matrices.
  """

  num_test_samples = len(pred_matrices)

  # Initialize lists to store MAEs for each centrality measure
  mae_bc = []
  mae_ec = []
  mae_pc = []

  # Iterate over each test sample
  for i  in tqdm.tqdm(range(num_test_samples)):
      # Convert adjacency matrices to NetworkX graphs
      pred_graph = nx.from_numpy_array(pred_matrices[i], edge_attr="weight")
      gt_graph = nx.from_numpy_array(gt_matrices[i], edge_attr="weight")

      # Compute centrality measures
      pred_bc = nx.betweenness_centrality(pred_graph, weight="weight")
      pred_ec = nx.eigenvector_centrality(pred_graph, weight="weight")
      pred_pc = nx.pagerank(pred_graph, weight="weight")

      gt_bc = nx.betweenness_centrality(gt_graph, weight="weight")
      gt_ec = nx.eigenvector_centrality(gt_graph, weight="weight")
      gt_pc = nx.pagerank(gt_graph, weight="weight")

      # Convert centrality dictionaries to lists
      pred_bc_values = list(pred_bc.values())
      pred_ec_values = list(pred_ec.values())
      pred_pc_values = list(pred_pc.values())

      gt_bc_values = list(gt_bc.values())
      gt_ec_values = list(gt_ec.values())
      gt_pc_values = list(gt_pc.values())

      # Compute MAEs
      mae_bc.append(mean_absolute_error(pred_bc_values, gt_bc_values))
      mae_ec.append(mean_absolute_error(pred_ec_values, gt_ec_values))
      mae_pc.append(mean_absolute_error(pred_pc_values, gt_pc_values))

  # Compute average MAEs
  avg_mae_bc = sum(mae_bc) / len(mae_bc)
  avg_mae_ec = sum(mae_ec) / len(mae_ec)
  avg_mae_pc = sum(mae_pc) / len(mae_pc)

  # vectorize and flatten
  pred_1d = []
  gt_1d = []
  for i in range(pred_matrices.shape[0]):
    pred_1d.extend(MatrixVectorizer.vectorize(pred_matrices[i]).flatten())
    gt_1d.extend(MatrixVectorizer.vectorize(gt_matrices[i]).flatten())

  mae = mean_absolute_error(pred_1d, gt_1d)
  pcc = pearsonr(pred_1d, gt_1d)[0]
  js_dis = jensenshannon(pred_1d, gt_1d)

  if print_losses:
    print("MAE: ", mae)
    print("PCC: ", pcc)
    print("Jensen-Shannon Distance: ", js_dis)
    print("Average MAE betweenness centrality:", avg_mae_bc)
    print("Average MAE eigenvector centrality:", avg_mae_ec)
    print("Average MAE PageRank centrality:", avg_mae_pc)

  # Plot barplots if requested
  if plot:
    labels = ['MAE','PCC','JSD','MAE (PC)', 'MAE (EC)', 'MAE (BC)']
    data = [mae, pcc, js_dis, avg_mae_pc, avg_mae_ec, avg_mae_bc]
    colors = ['#F46867', '#70B163', '#6A67FF', '#F9C963', '#83FEFF', '#80FE5F']
    plt.figure(figsize=(6, 4))
    sns.barplot(x=labels, y=data, palette=colors)
    plt.ylabel('Error')
    plt.show()
  return mae, pcc, avg_mae_bc, avg_mae_ec, avg_mae_pc, js_dis

def make_all_plots(mae, pcc, avg_mae_bc, avg_mae_ec, avg_mae_pc, js_dis):
  """
  Create subplots displaying error metrics across different folds and their average.

  Parameters:
  - mae (list of floats): List of Mean Absolute Errors for each fold.
  - pcc (list of floats): List of Pearson Correlation Coefficients for each fold.
  - avg_mae_bc (list of floats): List of Average Mean Absolute Errors for Betweenness Centrality for each fold.
  - avg_mae_ec (list of floats): List of Average Mean Absolute Errors for Eigenvector Centrality for each fold.
  - avg_mae_pc (list of floats): List of Average Mean Absolute Errors for PageRank Centrality for each fold.
  - js_dis (list of floats): List of Jensen-Shannon Distances for each fold.

  Returns:
  - None: Displays the subplots.
  """
  # Create a figure with 4x2 subplots
  fig, axes = plt.subplots(nrows=4, ncols=2, figsize=(10, 14))

  colors = ['#F46867', '#70B163', '#6A67FF', '#F9C963', '#83FEFF', '#80FE5F']
  labels = ['MAE','PCC','JSD','MAE (PC)', 'MAE (EC)', 'MAE (BC)']

  # Plot the data on each subplot
  data_1 = [mae[0], pcc[0], js_dis[0], avg_mae_pc[0], avg_mae_ec[0], avg_mae_bc[0]]
  data_2 = [mae[1], pcc[1], js_dis[1], avg_mae_pc[1], avg_mae_ec[1], avg_mae_bc[1]]
  data_3 = [mae[2], pcc[2], js_dis[2], avg_mae_pc[2], avg_mae_ec[2], avg_mae_bc[2]]
  data_4 = [mae[3], pcc[3], js_dis[3], avg_mae_pc[3], avg_mae_ec[3], avg_mae_bc[3]]
  errors = [np.std(mae[0:3]), np.std(pcc[0:3]), np.std(js_dis[0:3]),
            np.std(avg_mae_pc[0:3]), np.std(avg_mae_ec[0:3]), np.std(avg_mae_bc[0:3])]

  axes[0, 0].bar(x=labels[:3], height=data_1[:3], color=colors[:3])
  axes[0, 0].set_title('Fold 1')

  axes[0, 1].bar(x=labels[3:], height=data_1[3:], color=colors[3:])
  axes[0, 1].set_title('Fold 1')

  axes[1, 0].bar(x=labels[:3], height=data_2[0:3], color=colors[:3])
  axes[1, 0].set_title('Fold 2')

  axes[1, 1].bar(x=labels[3:], height=data_2[3:], color=colors[3:])
  axes[1, 1].set_title('Fold 2')

  axes[2, 0].bar(x=labels[:3], height=data_3[0:3], color=colors[:3])
  axes[2, 0].set_title('Fold 3')

  axes[2, 1].bar(x=labels[3:], height=data_3[3:], color=colors[3:])
  axes[2, 1].set_title('Fold 3')

  axes[3, 0].bar(x=labels[:3], height=data_4[:3], yerr=errors[:3], color=colors[:3], capsize=5)
  axes[3, 0].set_title('Avg. Across Folds')

  axes[3, 1].bar(x=labels[3:], height=data_4[3:], yerr=errors[3:], color=colors[3:], capsize=5)
  axes[3, 1].set_title('Avg. Across Folds')

  # Adjust layout for better spacing
  plt.tight_layout()

  # Show the plots
  plt.show()

def make_csv(data_to_print,filename):
  """
  Create a CSV file from the given data and save it with the specified filename.

  Parameters:
  - data_to_print (list): List of data arrays to be printed to the CSV file.
  - filename (str): The name of the CSV file to be created.

  Notes:
  - The data should be provided in the form of a list, where each element is an array.
  - The function creates a CSV file with two columns: 'ID' and 'Predicted'.
  - The 'ID' column represents the index of each element in the flattened arrays.
  - The 'Predicted' column contains the flattened values from the input arrays.
  """

  # Prepare data for DataFrame
  data = {
      'ID': [],
      'Predicted': []
  }


  # Flatten each array and create rows for each element
  id=1
  for array in data_to_print:
      #flattened = array.flatten()
      flattened = array
      for value in flattened:
          data['ID'].append(id)
          data['Predicted'].append(value)
          id=id+1

  # Create a DataFrame
  df = pd.DataFrame(data)

  # Write the DataFrame to an Excel file
  df.to_csv(filename, index=False)


## Model class definition

### Helpful operational functions such as: pad_HR_adj, normalize adjacency and weight variable glorot

In [None]:
def pad_HR_adj(label, split):

    label = torch.nn.functional.pad(label, (split, split, split, split), mode="constant").to(device)
    identity = torch.eye(label.shape[1]).to(device)
    return label + identity

def unpad(data, split):
    if data.dim() == 2:
        idx_0 = data.shape[0]-split
        idx_1 = data.shape[1]-split
        train = data[split:idx_0, split:idx_1]
        return train
    idx_0 = data.shape[1]-split
    idx_1 = data.shape[2]-split
    train = data[:, split:idx_0, split:idx_1]
    return train

def normalize_adj_torch(mx):
    rowsum = mx.sum(-1)
    r_inv_sqrt = torch.pow(rowsum, -0.5).flatten(start_dim=1)
    r_inv_sqrt[torch.isinf(r_inv_sqrt)] = 0.
    diag = torch.eye(r_inv_sqrt.shape[1]).to(device)
    r_mat_inv_sqrt = r_inv_sqrt.unsqueeze(-1).expand(*r_inv_sqrt.shape, r_inv_sqrt.shape[1]) * diag
    mx = torch.matmul(mx, r_mat_inv_sqrt)
    mx = torch.transpose(mx, -2, -1)
    mx = torch.matmul(mx, r_mat_inv_sqrt)
    return mx

def weight_variable_glorot(output_dim):

    input_dim = output_dim
    init_range = np.sqrt(6.0 / (input_dim + output_dim))
    initial = np.random.uniform(-init_range, init_range,
                                (input_dim, output_dim))

    return initial

def weight_variable_glorot_extended(input_dim,output_dim):

    init_range = np.sqrt(6.0 / (input_dim + output_dim))
    initial = np.random.uniform(-init_range, init_range,
                                (input_dim, output_dim))

    return initial

def augment(lr, hr, aug=None, noise=None, p=1.0):
    """
    Augment the input arrays lr and/or hr by adding noise.

    Parameters:
    - lr (np.ndarray): Low-resolution array.
    - hr (np.ndarray): High-resolution array.
    - aug (str or None): Specifies which array(s) to augment ('lr', 'hr', or 'both' for both).
    - noise (float or None): Standard deviation of the noise to be added.
    - p (float): Probability of adding noise.

    Returns:
    Tuple of augmented lr and hr arrays.
    """

    # Helper function to add noise to an array
    def add_noise(arr, noise_std):
        z = torch.normal(0, noise_std, arr.shape).to(device)
        z = (z + z.transpose(2, 1))/2
        return arr + z

    # Augment lr if specified
    if aug == 'lr':
        if random.random() < p:
            lr = add_noise(lr, noise)
            lr[lr<0] = 0

    # Augment hr if specified
    elif aug == 'hr':
        if random.random() < p:
            hr = add_noise(hr, noise)
            hr[hr<0] = 0

    elif aug == 'both':
        if random.random() < p:
            lr = add_noise(lr, noise)
            hr = add_noise(hr, noise)
            lr[lr<0] = 0
            hr[hr<0] = 0

    return lr, hr

### GSR Layer and GraphConvolution definitions

In [None]:

class GSRLayer(nn.Module):

    def __init__(self, hr_dim):
        super(GSRLayer, self).__init__()

        self.weights = torch.from_numpy(
            weight_variable_glorot(hr_dim)).type(torch.FloatTensor)
        self.weights = torch.nn.Parameter(
            data=self.weights, requires_grad=True)

    def forward(self, A, X):
        with torch.autograd.set_detect_anomaly(True):

            lr = A
            lr_dim = lr.shape[1]
            f = X
            # depreceated
            #eig_val_lr, U_lr = torch.symeig(lr, eigenvectors=True, upper=True)

            eig_val_lr, U_lr = torch.linalg.eigh(lr, UPLO='U')

            # U_lr = torch.abs(U_lr)
            eye_mat = torch.eye(lr_dim).type(torch.FloatTensor)
            s_d = torch.cat((eye_mat, eye_mat), 0).to(device)

            a = torch.matmul(self.weights, s_d)
            b = torch.matmul(a, torch.transpose(U_lr, 1, 2))
            f_d = torch.matmul(b, f)
            f_d = torch.abs(f_d)
            identity = torch.eye(f_d.shape[1]).to(device)
            adj = f_d + identity

            X = torch.matmul(adj, torch.transpose(adj, 1, 2))
            X = (X + X.transpose(1, 2))/2
            identity = torch.eye(X.shape[1]).to(device)
            X += identity
        return adj, torch.abs(X)

class HGSRLayer(nn.Module):

    def __init__(self, lr_dim, inter_dim, hr_dim):
        super(HGSRLayer, self).__init__()

        self.inter_dim = inter_dim

        self.weights_1 = torch.from_numpy(
            weight_variable_glorot_extended(inter_dim,lr_dim)).type(torch.FloatTensor)
        self.weights_1 = torch.nn.Parameter(
            data=self.weights_1, requires_grad=True)

        self.weights_2 = torch.from_numpy(
            weight_variable_glorot_extended(hr_dim,inter_dim)).type(torch.FloatTensor)
        self.weights_2 = torch.nn.Parameter(
            data=self.weights_2, requires_grad=True)

        self.weights_3 = torch.from_numpy(
            weight_variable_glorot_extended(hr_dim,inter_dim)).type(torch.FloatTensor)
        self.weights_3 = torch.nn.Parameter(
            data=self.weights_3, requires_grad=True)

        self.weights_4 = torch.from_numpy(
            weight_variable_glorot_extended(inter_dim,hr_dim)).type(torch.FloatTensor)
        self.weights_4 = torch.nn.Parameter(
            data=self.weights_4, requires_grad=True)

        self.gc1 = GraphConvolution(
            inter_dim, inter_dim, 0, act=F.relu)
        self.gc2 = GraphConvolution(
            inter_dim, inter_dim, 0, act=F.relu)

        # Initialise W_2, W_4 with zeros
        self.weights_2.data.fill_(0)
        self.weights_4.data.fill_(0)
        self.weights_1.data.fill_(0)
        self.weights_3.data.fill_(0)


    def forward(self, A, X):
        with torch.autograd.set_detect_anomaly(True):
            # First weights
            f_d = self.weights_1 @ (X @ self.weights_2 + X[:, :, :self.inter_dim])
            f_d = torch.abs(f_d)
            identity = torch.eye(f_d.shape[1], dtype=torch.float32).to(device)
            adj = f_d + identity

            X_I = torch.matmul(adj, adj.transpose(1, 2))
            X_I = (X_I + X_I.transpose(1, 2))/2
            identity = torch.eye(X_I.shape[1], dtype=torch.float32).to(device)
            X_I = X_I + identity

            self.X_I = self.gc1(X_I, adj)
            self.X_I = self.gc2(X_I, adj)

            f_d = self.weights_3 @ (X_I @ self.weights_4 + self.weights_1 @ X) # try W_5 in case it doesn't work
            f_d = torch.abs(f_d)
            identity = torch.eye(f_d.shape[1], dtype=torch.float32).to(device)
            adj = f_d + identity

            X = torch.matmul(adj, adj.transpose(1, 2))
            X = (X + X.transpose(1, 2))/2
            identity = torch.eye(X.shape[1], dtype=torch.float32).to(device)
            X = X + identity

        return adj, torch.abs(X)


class GraphConvolution(nn.Module):
    """
    Simple GCN layer, similar to https://arxiv.org/abs/1609.02907
    """

    def __init__(self, in_features, out_features, dropout, act=F.relu):
        super(GraphConvolution, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.dropout = dropout
        self.act = act
        self.weight = torch.nn.Parameter(
            torch.FloatTensor(in_features, out_features))
        self.reset_parameters()

    def reset_parameters(self):
        torch.nn.init.xavier_uniform_(self.weight)

    def forward(self, input, adj):
        input = F.dropout(input, self.dropout, self.training)
        support = torch.matmul(input, self.weight)
        output = torch.matmul(adj, support)
        output = self.act(output)
        return output





### GraphUnet, GCN definitions

In [None]:
# Ops

class GraphUnpool(nn.Module):

    def __init__(self):
        super(GraphUnpool, self).__init__()

    def forward(self, A, X, idx):
        new_X = torch.zeros([A.shape[0], A.shape[1], X.shape[2]]).to(device)
        extract = torch.arange(A.shape[0], dtype=torch.int).to(device)
        new_X[:, idx][extract, extract] = X
        return A, new_X

class GraphPool(nn.Module):

    def __init__(self, k, in_dim):
        super(GraphPool, self).__init__()
        self.k = k
        self.proj = nn.Linear(in_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, A, X):
        scores = self.proj(X)
        # scores = torch.abs(scores)
        scores = torch.squeeze(scores, dim=(1,2))
        scores = self.sigmoid(scores/100)
        num_nodes = A.shape[1]
        values, idx = torch.topk(scores, int(self.k*num_nodes))
        extract = torch.arange(A.shape[0], dtype=torch.int).to(device)

        new_X = X[:, idx, :][extract, extract, :, :]
        values = torch.unsqueeze(values, -1)
        new_X = torch.mul(new_X, values)
        A = A[:, idx, :][extract, extract, :, :]
        A = A[:, :, idx][extract, :, extract, :]
        return A, new_X, idx


class GCN(nn.Module):

    def __init__(self, in_dim, out_dim):
        super(GCN, self).__init__()
        self.proj = nn.Linear(in_dim, out_dim)
        self.drop = nn.Dropout(p=0)

    def forward(self, A, X):

        X = self.drop(X)
        X = torch.matmul(A, X)
        X = self.proj(X)
        return X


class GraphUnet(nn.Module):

    def __init__(self, ks, in_dim, out_dim, dim):
        super(GraphUnet, self).__init__()
        self.ks = ks

        self.start_gcn = GCN(in_dim, dim).to(device)
        self.bottom_gcn = GCN(dim, dim).to(device)
        self.end_gcn = GCN(2*dim, out_dim).to(device)
        self.down_gcns = []
        self.up_gcns = []
        self.pools = []
        self.unpools = []
        self.l_n = len(ks)
        for i in range(self.l_n):
            self.down_gcns.append(GCN(dim, dim).to(device))
            self.up_gcns.append(GCN(dim, dim).to(device))
            self.pools.append(GraphPool(ks[i], dim).to(device))
            self.unpools.append(GraphUnpool().to(device))

    def forward(self, A, X):
        adj_ms = []
        indices_list = []
        down_outs = []
        X = self.start_gcn(A, X)
        start_gcn_outs = X
        org_X = X
        for i in range(self.l_n):

            X = self.down_gcns[i](A, X)
            adj_ms.append(A)
            down_outs.append(X)
            A, X, idx = self.pools[i](A, X)
            indices_list.append(idx)
        X = self.bottom_gcn(A, X)
        for i in range(self.l_n):
            up_idx = self.l_n - i - 1

            A, idx = adj_ms[up_idx].to(device), indices_list[up_idx].to(device)
            A, X = self.unpools[i](A, X, idx)
            X = self.up_gcns[i](A, X)
            X = X.add(down_outs[up_idx])
        X = torch.cat([X, org_X], 2)
        X = self.end_gcn(A, X)

        return X, start_gcn_outs

### AGSRNet, dense and discriminator definition

In [None]:
class AGSRNet(nn.Module):

    def __init__(self, ks, args):
        super(AGSRNet, self).__init__()

        self.lr_dim = args.lr_dim
        self.inter_dim = args.inter_dim
        self.hr_dim = args.hr_dim
        self.hidden_dim = args.hidden_dim

        self.layer = HGSRLayer(self.lr_dim, self.inter_dim, self.hr_dim)

        self.net = GraphUnet(ks, self.lr_dim, self.hr_dim, self.hr_dim)
        self.gc1 = GraphConvolution(
            self.hr_dim, self.hidden_dim, 0, act=F.relu)
        self.gc2 = GraphConvolution(
            self.hidden_dim, self.hr_dim, 0, act=F.relu)

    def forward(self, lr, lr_dim, hr_dim):
        with torch.autograd.set_detect_anomaly(True):

            I = torch.eye(self.lr_dim).type(torch.FloatTensor)
            A = normalize_adj_torch(lr).type(torch.FloatTensor)
            A = A.to(device)
            I = I.to(device)

            self.net_outs, self.start_gcn_outs = self.net(A, I)

            self.outputs, self.Z = self.layer(A, self.net_outs)

            self.hidden1 = self.gc1(self.Z, self.outputs)
            self.hidden2 = self.gc2(self.hidden1, self.outputs)
            z = self.hidden2

            z = (z + z.transpose(1, 2))/2
            identity = torch.eye(z.shape[1]).to(device)
            z += identity

        return torch.abs(z), self.net_outs, self.start_gcn_outs, self.outputs


class Dense(nn.Module):
    def __init__(self, n1, n2, args):
        super(Dense, self).__init__()
        self.weights = torch.nn.Parameter(
            torch.FloatTensor(n1, n2), requires_grad=True)
        nn.init.normal_(self.weights, mean=args.mean_dense, std=args.std_dense)

    def forward(self, x):
        np.random.seed(1)
        torch.manual_seed(1)

        out = torch.matmul(x, self.weights)
        return out


class Discriminator(nn.Module):
    def __init__(self, args):
        super(Discriminator, self).__init__()
        # Add GCN and MLP in discriminator
        self.hr_dim=args.hr_dim

        self.gcn_1 = GCN(args.hr_dim, args.hr_dim).to(device)
        self.relu_gcn_1 = nn.ReLU(inplace=False)

        self.gcn_2 = GCN(args.hr_dim, args.hr_dim).to(device)
        self.relu_gcn_2 = nn.ReLU(inplace=False)


        self.dense_1 = Dense(args.hr_dim, args.hr_dim, args)
        self.relu_1 = nn.ReLU(inplace=False)
        self.dense_2 = Dense(args.hr_dim, args.hr_dim, args)
        self.relu_2 = nn.ReLU(inplace=False)
        self.dense_3 = Dense(args.hr_dim, 1, args)

        self.sigmoid = nn.Sigmoid()

    def forward(self, inputs):
        np.random.seed(1)
        torch.manual_seed(1)
        # GCN part
        I = torch.eye(self.hr_dim).type(torch.FloatTensor).repeat(inputs.shape[0], 1, 1)
        A = normalize_adj_torch(inputs).type(torch.FloatTensor)
        A = A.to(device)
        I = I.to(device)
        gcn_outs_1 =self.relu_gcn_1 (self.gcn_1(A,I))
        gcn_outs_2 = self.relu_gcn_2(self.gcn_2(A,gcn_outs_1))


        # dc_den1 = self.relu_1(self.dense_1(inputs))
        # MLP Part
        dc_den1 = self.relu_1(self.dense_1(gcn_outs_2))

        dc_den2 = self.relu_2(self.dense_2(dc_den1))
        output = dc_den2
        output = self.dense_3(dc_den2)
        output = self.sigmoid(output)

        return torch.abs(output)

def gaussian_noise_layer(input_layer, args):
    z = torch.empty_like(input_layer)
    noise = z.normal_(mean=args.mean_gaussian, std=args.std_gaussian)
    z = torch.abs(input_layer + noise)

    z = (z + z.transpose(1, 2))/2
    identity = torch.eye(z.shape[1]).to(device)
    return z + identity

## Training and Testing Functions

In [None]:
def train(model, subjects_adj, subjects_labels, args, aug=None, batch_size=16, val_adj=None, val_labels=None):
    """
    Train a given PyTorch model on a dataset of low-resolution and high-resolution adjacency matrices.

    Parameters:
    - model (torch.nn.Module): The PyTorch model to be trained.
    - subjects_adj (list): List of numpy arrays representing the low-resolution adjacency matrices.
    - subjects_labels (list): List of numpy arrays representing the corresponding high-resolution adjacency matrices.
    - args (Namespace): Namespace containing training-related parameters.
    - aug (str or None): Specifies which array(s) to augment ('lr', 'hr', or 'both' for both).
    - batch_size (int): Training batch size

    Returns:
    - train_losses_D (list): List of discriminator loss values for each epoch during training.
    - train_losses_G (list): List of generator loss values for each epoch during training.
    - mae_losses (list): List of Mean Absolute Error (MAE) values for each epoch during training.
    """
    criterion = nn.MSELoss()
    bce_loss = nn.BCELoss()
    netD = Discriminator(args).to(device)
    optimizerG = optim.Adam(model.parameters(), lr=args.g_lr)
    optimizerD = optim.Adam(netD.parameters(), lr=args.d_lr)

    train_losses_G = []
    train_losses_D = []
    mae_losses = []
    mae_losses_validation=[]

    subjects_adj = torch.from_numpy(subjects_adj).type(torch.FloatTensor).to(device)
    subjects_labels = torch.from_numpy(subjects_labels).type(torch.FloatTensor).to(device)

    lr_addition = tuple()
    hr_addition = tuple()

    for epoch in range(args.epochs):
      epoch_performance = []
      # Track losses for plotting
      train_loss_D = 0
      train_loss_G = 0
      total_mae_loss = 0

      perm = torch.randperm(subjects_adj.shape[0])
      lr_batches = torch.split(subjects_adj[perm], batch_size) + lr_addition
      hr_batches = torch.split(subjects_labels[perm], batch_size) + hr_addition
      counter=0
      with tqdm.tqdm(zip(lr_batches, hr_batches), unit="batch", total=len(lr_batches)) as tepoch:
        for i, (lr, hr) in enumerate(tepoch):
            # Data augmentation
            lr, hr = augment(lr, hr, aug, args.noise, args.p)

            optimizerD.zero_grad()
            optimizerG.zero_grad()

            # Set device of data
            lr = lr.to(device)
            hr = hr.to(device)

            model_outputs, net_outs, start_gcn_outs, layer_outs = model(
                lr, args.lr_dim, args.hr_dim)

            mse_loss = args.lmbda * criterion(net_outs, start_gcn_outs) + criterion(model_outputs, hr)

            # Loss for plots
            mae_loss = F.l1_loss(model_outputs, hr)

            if epoch % 4 == 0:
                epoch_performance.append((i, mae_loss.item()))


            # Process real data and fake data for discriminator
            real_data = model_outputs.detach()
            fake_data = gaussian_noise_layer(hr, args)

            # Pass them through the discriminator
            d_real = netD(real_data).to(device)
            d_fake = netD(fake_data).to(device)

            # Calculate discriminator loss
            dc_loss_real = bce_loss(d_real, torch.ones(d_real.shape[0], args.hr_dim, 1).to(device))
            dc_loss_fake = bce_loss(d_fake, torch.zeros(d_real.shape[0], args.hr_dim, 1).to(device))
            dc_loss = dc_loss_real + dc_loss_fake

            # Optimise the discriminator
            dc_loss.backward()
            optimizerD.step()

            # Add noise to the generated image and pass it through the generator
            # d_fake = netD(model_outputs).to(device)

            # their initial d_fake
            d_fake = netD(gaussian_noise_layer(hr, args)).to(device)

            # Calculate the generator loss
            gen_loss = bce_loss(d_fake, torch.ones(d_fake.shape[0], args.hr_dim, 1).to(device))
            generator_loss = args.g_weight * gen_loss + mse_loss

            # just adding - as we said in meeting
            #generator_loss = -gen_loss + mse_loss

            # Optimise the generator
            generator_loss.backward()
            optimizerG.step()

            # Logging info
            train_loss_D += dc_loss.item()
            train_loss_G += generator_loss.item()
            total_mae_loss += mae_loss.item()
            counter+=1

            # Logging
            if i % 50 == 0:
                tepoch.set_description(f"Epoch {epoch}")
                tepoch.set_postfix(Loss_D=dc_loss.item(), Loss_G=generator_loss.item(), MAE=mae_loss.item())


      if epoch % 4 == 0:
        worst_performing_samples = sorted(epoch_performance, key=lambda x: x[1], reverse=True)
        print("Worst-performing training samples last epoch (Index: MAE):")
        for index, mae in worst_performing_samples[:2]:  # Adjust the slice as needed
            print(f"Index {index}: MAE = {mae}")
        indices_to_duplicate = worst_performing_samples[:2]  # Example indices

        # Duplicate each chosen sample 2 additional times
        lr_addition = tuple(lr_batches[index] for index, _ in indices_to_duplicate for _ in range(2))
        hr_addition = tuple(hr_batches[index] for index, _ in indices_to_duplicate for _ in range(2))

      # train_losses_D.append(batch_size * train_loss_D / len(subjects_adj))
      # train_losses_G.append(batch_size * train_loss_G / len(subjects_adj))

      # mae_losses.append(batch_size * total_mae_loss / len(subjects_adj))
      train_losses_D.append(train_loss_D / counter)
      train_losses_G.append(train_loss_G / counter)
      mae_losses.append(total_mae_loss / counter)


      if ((val_adj is not None) and (val_labels is not None)):
        _,validation_total_mae_loss=val_with_real(model, val_adj, val_labels, args)
        mae_losses_validation.append(validation_total_mae_loss)

    return train_losses_D, train_losses_G, mae_losses, mae_losses_validation

### Validation and Testing

In [None]:
def val_with_real(model, val_adj, val_labels, args, to_csv=None):
    """
    Evaluate a PyTorch model on validation data and compute relevant metrics.

    Parameters:
    - model (torch.nn.Module): The PyTorch model to be evaluated.
    - val_adj (list): List of numpy arrays representing the low-resolution adjacency matrices.
    - val_labels (list): List of numpy arrays representing the corresponding high-resolution adjacency matrices.
    - args (Namespace): Namespace containing additional arguments for the evaluation.
    - to_csv (str or None): If provided, the path to save the predictions in CSV format. Default is None.

    Returns:
    - preds_list (numpy.ndarray): Array of predictions for each evaluated pair of low and high-resolution matrices.
    """
    val_error = []
    preds_list = []
    preds_vectorized_list = []

    for lr, hr in zip(val_adj, val_labels):
        all_zeros_lr = not np.any(lr)
        all_zeros_hr = not np.any(hr)
        if all_zeros_lr == False and all_zeros_hr == False:
            lr = torch.from_numpy(lr).type(torch.FloatTensor)
            np.fill_diagonal(hr, 1)
            hr = torch.from_numpy(hr).type(torch.FloatTensor)

            lr = lr.to(device)
            hr = hr.to(device)

            preds, a, b, c = model(lr[None, :, :], args.lr_dim, args.hr_dim)
            preds = preds[0]
            a = a[0]
            b = b[0]
            c = c[0]

            # post-processing
            preds[preds < 0] = 0

            preds_list.append(preds.cpu().detach().numpy())
            preds_vectorized_list.append(MatrixVectorizer.vectorize(preds.cpu().detach().numpy()))
            mae_loss = F.l1_loss(preds, hr)

            val_error.append(mae_loss.item())
    actual_error=np.mean(val_error)
    print("Validation error MAE: ",actual_error )

    preds_list = np.asarray(preds_list)

    if to_csv is not None:
      make_csv(preds_vectorized_list,to_csv)
    return preds_list,actual_error


def test(model, test_adj, args, to_csv):
    """
    Test a PyTorch model on a set of low-resolution adjacency matrices and generate predictions.

    Parameters:
    - model (torch.nn.Module): The PyTorch model to be tested.
    - test_adj (list): List of numpy arrays representing the low-resolution adjacency matrices.
    - args (Namespace): Namespace containing additional arguments for testing.
    - to_csv (str or None): If provided, the path to save the predictions in CSV format. Default is None.

    Returns:
    - preds_list (numpy.ndarray): Array of vectorized predictions for each evaluated low-resolution matrix.
    """
    preds_list = []
    preds_vectorized_list = []

    for lr in test_adj:
        all_zeros_lr = not np.any(lr)
        if all_zeros_lr == False :
            lr = torch.from_numpy(lr).type(torch.FloatTensor)

            lr = lr.to(device)
            preds, a, b, c = model(lr[None, :, :], args.lr_dim, args.hr_dim)
            preds = preds[0]
            a = a[0]
            b = b[0]
            c = c[0]

            # post-processing
            preds[preds < 0] = 0

            preds_list.append(preds.cpu().detach().numpy())
            preds_vectorized_list.append(MatrixVectorizer.vectorize(preds.cpu().detach().numpy()))

    preds_list = np.asarray(preds_list)

    if to_csv is not None:
      make_csv(preds_vectorized_list,to_csv)

    return preds_list

# Argument class for the model

In [None]:
class Args:
    """
    Container class for model training and configuration parameters.
    These are the model's hyperparameters.
    """
    def __init__(self):
        self.epochs = 156

        self.d_lr = 0.0001
        self.g_lr = 0.0001

        self.g_weight = 1.0
        self.lmbda = 0.1
        self.lr_dim = 160
        self.inter_dim = 240
        self.hr_dim = 268
        self.hidden_dim = 268
        self.padding = 26
        self.mean_dense = 0.0
        self.std_dense = 0.01
        self.mean_gaussian = 0.0
        self.std_gaussian = 0.1

        self.noise = 0.005
        self.p = 0.70


# Run the model

## 3-Cross Validation Definition

In [None]:
def run_cross_validation(X, Y, n_splits=3, plot=True, batch_size=4, aug='lr', plot_for_val_and_train=False):
  """
  Perform k-fold cross-validation on a given model using the provided data
  writes csv files for the validation sets and creates plot of the performance
  metrics.

  Parameters:
  - X (numpy.ndarray): Input data (low-resolution adjacency matrices).
  - Y (numpy.ndarray): Target data (high-resolution adjacency matrices).
  - n_splits (int): Number of folds for cross-validation. Default is 3.
  - plot (bool): If True, plot metrics across folds and their averages. Default is True.

  Returns:
  - None: Displays the cross-validation plots and creates the csv files.
  """

  # Define the parameters
  args = Args()
  ks = [0.9, 0.7, 0.6, 0.5] # graphUnet params

  mae_scores = []
  pcc_scores = []
  avg_mae_bc_scores = []
  avg_mae_ec_scores = []
  avg_mae_pc_scores = []
  js_dis_scores = []

  X_trai = []
  with open(path_lr_data_tr) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
      rowData = [ float(elem) for elem in row ]
      X_trai.append(rowData)
  X_trai = np.array(X_trai)
  scipy.io.savemat('LR_data_X_160.mat', {'train':X_trai})
  lr_data_path_X = './LR_data_X_160.mat'
  X_trai = loadmat(my_project_folder +lr_data_path_X)
  X_train = np.zeros((93,160,160))
  for i in range(93):
    X_train[i] = MatrixVectorizer.anti_vectorize(X_trai['train'][i], 160, include_diagonal=False)

  Y_trai = []
  with open(path_hr_data_tr) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
      rowData = [ float(elem) for elem in row ]
      Y_trai.append(rowData)
  Y_trai = np.array(Y_trai)
  scipy.io.savemat('HR_data_Y_268.mat', {'train':Y_trai})
  hr_data_path_Y = './HR_data_Y_268.mat'
  Y_trai = loadmat(my_project_folder +hr_data_path_Y)
  Y_train = np.zeros((93,268,268))
  for i in range(93):
    Y_train[i] = MatrixVectorizer.anti_vectorize(Y_trai['train'][i], 268, include_diagonal=False)

  X_valid = []
  with open(path_lr_data_valid) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
      rowData = [ float(elem) for elem in row ]
      X_valid.append(rowData)
  X_valid = np.array(X_valid)
  scipy.io.savemat('LR_data_Xval_160.mat', {'train':X_valid})
  lr_data_path_Xval = './LR_data_Xval_160.mat'
  X_valid = loadmat(my_project_folder +lr_data_path_Xval)
  X_val = np.zeros((40,160,160))
  for i in range(40):
    X_val[i] = MatrixVectorizer.anti_vectorize(X_valid['train'][i], 160, include_diagonal=False)

  Y_valid = []
  with open(path_hr_data_valid) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
      rowData = [ float(elem) for elem in row ]
      Y_valid.append(rowData)
  Y_valid = np.array(Y_valid)
  scipy.io.savemat('HR_data_Yval_268.mat', {'train':Y_valid})
  hr_data_path_Y = './HR_data_Yval_268.mat'
  Y_valid = loadmat(my_project_folder +hr_data_path_Y)
  Y_val = np.zeros((40,268,268))
  for i in range(40):
    Y_val[i] = MatrixVectorizer.anti_vectorize(Y_valid['train'][i], 268, include_diagonal=False)

  model = AGSRNet(ks, args).to(device)

  if plot_for_val_and_train:
    train_losses_D, train_losses_G, mae_losses, mae_losses_val = train(model, X_train, Y_train, args, batch_size=batch_size, aug=aug, val_adj=X_val,val_labels=Y_val)
  else:
    train_losses_D, train_losses_G, mae_losses, mae_losses_val = train(model, X_train, Y_train, args, batch_size=batch_size, aug=aug)

  preds, _ = val_with_real(model, X_val, Y_val, args, to_csv=f"predictions_fold_{i+1}.csv")



## Executing the 3-Cross Validation

In [None]:
if option == 0:
    run_cross_validation(X, Y, n_splits=3, plot=True, plot_for_val_and_train=True, batch_size=4, aug='lr')

### Model training with all the data and creating the submission file for Kaggle


In [None]:
if option == 1:
    # Define the parameters

    args = Args()
    ks = [0.9, 0.7, 0.6, 0.5] # graphUnet params

    model = AGSRNet(ks, args).to(device)
    print(model)

    train_losses_D, train_losses_G, mae_losses,_ = train(model, X, Y, args, batch_size=4, aug='lr')
    test_preds= test(model, X_test, args, "output.csv")


In [None]:
# Plot losses
plot_gan_loss(train_losses_G, train_losses_D, mae_losses)

In [None]:
import numpy as np
import networkx as nx
from scipy.stats import pearsonr
from scipy.spatial.distance import jensenshannon
from sklearn.metrics import mean_absolute_error
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim
import community.community_louvain as community_louvain
import os
def calculate_centralities(adj_matrix):
    if adj_matrix.shape[0] != adj_matrix.shape[1]:
        raise ValueError(f"Adjacency matrix is not square: shape={adj_matrix.shape}")
    print(f"Processing adjacency matrix of shape: {adj_matrix.shape}")

    G = nx.from_numpy_array(adj_matrix)
    partition = community_louvain.best_partition(G)

    # Calculate the participation coefficient with the partition
    pc_dict = participation_coefficient(G, partition)

    # Calculate averages of centrality measures
    pr = nx.pagerank(G, alpha=0.9)
    ec = nx.eigenvector_centrality_numpy(G, max_iter=100)
    bc = nx.betweenness_centrality(G, normalized=True, endpoints=False)
    ns = np.array(list(nx.degree_centrality(G).values())) * (len(G.nodes()) - 1)
    acc = nx.average_clustering(G, weight=None)

    # Average participation coefficient
    pc_avg = np.mean(list(pc_dict.values()))

    return {
        'pr': np.mean(list(pr.values())),
        'ec': np.mean(list(ec.values())),
        'bc': np.mean(list(bc.values())),
        'ns': ns,
        'pc': pc_avg,
        'acc': acc
    }

def participation_coefficient(G, partition):
    # Initialize dictionary for participation coefficients
    pc_dict = {}

    # Calculate participation coefficient for each node
    for node in G.nodes():
        node_degree = G.degree(node)
        if node_degree == 0:
            pc_dict[node] = 0.0
        else:
            # Count within-module connections
            within_module_degree = sum(1 for neighbor in G[node] if partition[neighbor] == partition[node])
            # Calculate participation coefficient
            pc_dict[node] = 1 - (within_module_degree / node_degree) ** 2

    return pc_dict

def evaluate_all(true_hr_matrices, predicted_hr_matrices, output_path=path_eval_matrics):
    print(true_hr_matrices.shape)
    print(predicted_hr_matrices.shape)

    num_subjects = true_hr_matrices.shape[0]
    results = []

    for i in range(num_subjects):
        true_matrix = true_hr_matrices[i, :, :]
        pred_matrix = predicted_hr_matrices[i, :, :]

        print(f"Evaluating subject {i+1} with matrix shapes: true={true_matrix.shape}, pred={pred_matrix.shape}")

        if true_matrix.shape != pred_matrix.shape or true_matrix.shape[0] != true_matrix.shape[1]:
            print(f"Error: Matrix shape mismatch or not square for subject {i+1}: true={true_matrix.shape}, pred={pred_matrix.shape}")
            continue

        metrics = {
            'ID': i + 1,
            'MAE': mean_absolute_error(true_matrix.flatten(), pred_matrix.flatten()),
            'PCC': pearsonr(true_matrix.flatten(), pred_matrix.flatten())[0],
            'JSD': jensenshannon(true_matrix.flatten(), pred_matrix.flatten()),
        }

        true_metrics = calculate_centralities(true_matrix)
        pred_metrics = calculate_centralities(pred_matrix)

        for key in ['NS', 'PR', 'EC', 'BC', 'PC', 'ACC']:
            metrics[f'MAE in {key}'] = mean_absolute_error([true_metrics[key.lower()]], [pred_metrics[key.lower()]])

        results.append(metrics)

    df = pd.DataFrame(results)
    if not df.empty:
        # Check if the file exists to decide whether to write headers
        file_exists = os.path.isfile(output_path)

        df.to_csv(output_path, mode='a', header=not file_exists, index=False)
        print(f"Results appended to {output_path}.")
    else:
        print("No data to save.")



In [None]:
Y_test = []
with open(path_hr_data_test) as f:
    next(f)
    reader = csv.reader(f)
    for row in reader:
        rowData = [ float(elem) for elem in row ]
        Y_test.append(rowData)
Y_test = np.array(Y_test)

In [None]:
scipy.io.savemat('HR_data_true_268.mat', {'train':Y_test})
hr_data_true_path = './HR_data_true_268.mat'
Y_t = loadmat(my_project_folder +hr_data_true_path)

In [None]:
Y_true = np.zeros((lentest,268,268))
for i in range(lentest):
  Y_true[i] = MatrixVectorizer.anti_vectorize(Y_t['train'][i], 268, include_diagonal=False)

In [None]:
metrics = evaluate_all(
    Y_true, test_preds
)