In [1]:
import sys
import os
import numpy as np
import torch
import pandas as pd
import csv
from scipy.stats import binom
from sklearn.metrics import roc_auc_score
import torch.nn as nn

In [2]:
# custom imports
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "../..")))

from src.utils import load_config
import src.graphs_generation as graphs_gen

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  from .autonotebook import tqdm as notebook_tqdm


# Model definition

## Common functions

Functions adapted from:
`\HUPLACLIP-NNs\scripts\visualizations\degree_distribution.ipynb`

In [16]:
def p_correction(p_nodes, graph_size, clique_size):
    '''Returns the value of the corrected p-value in the graph with clique ("p_reduce" case) '''
    p_corrected = (
        p_nodes * graph_size * (graph_size - 1)
        - clique_size * (clique_size - 1)
    ) / ((graph_size - clique_size) * (graph_size + clique_size - 1))
    return p_corrected

# P(d|C=0)
def p_noclique(degree_arr, graph_size):
    ''' 
    For an array of degree values (degree_arr: ndarray of shape [N_graphs, 1, graph_size]), returns the probability that (in a graph WITHOUT the clique) a node has exactly that degree.    
    Returns: ndarray of probabilities, same shape as degree_arr
    '''
    return binom.pmf(degree_arr, 
                     graph_size-1, 
                     0.5    # "p_reduce" correction only acts on graph with clique
                     )

def expected_count_noclique(degree_arr, graph_size):
    '''
    Uses p_noclique to obtain the number of nodes that (in a graph WITHOUT the clique) are expected to have exactly that degree.
    '''
    # expected count = graph_size * probability_per_node
    return graph_size * p_noclique(degree_arr, graph_size)

# P(d|C=1)
def p_ingroup(degree_arr, graph_size, clique_size, p_corrected):
    ''' 
    For an array of degree values (degree_arr: ndarray of shape [N_graphs, 1, graph_size]), returns the probability that (in a graph WITH the clique) a node INSIDE the clique has exactly that degree.    
    Returns: ndarray of probabilities, same shape as degree_arr
    '''
    return binom.pmf(degree_arr - (clique_size-1),  # number of non-clique connections
                     graph_size - clique_size,      # number of possible non-clique nodes
                     p_corrected
                     )

def p_outgroup(degree_arr, graph_size, clique_size, p_corrected):
    ''' 
    For an array of degree values (degree_arr: ndarray of shape [N_graphs, 1, graph_size]), returns the probability that (in a graph WITH the clique) a node OUTSIDE the clique has exactly that degree.    
    Returns: ndarray of probabilities, same shape as degree_arr
    '''
    return binom.pmf(degree_arr, 
                     graph_size-1, 
                     p_corrected
                     )

def expected_count_clique(degree_arr, graph_size, clique_size, p_corrected):
    '''Combines p_outgroup and p_ingroup (single mixture) to obtain the number of nodes that (in a graph WITH the clique) are expected to have exactly that degree'''
    prob = clique_size/graph_size * p_ingroup(degree_arr, graph_size, clique_size, p_corrected) + (1-clique_size/graph_size) * p_outgroup(degree_arr, graph_size, clique_size, p_corrected)    
    # expected count = graph_size * probability_per_node
    prob = graph_size * prob
    return prob

## Architecture

"Ideal MLP" designed starting from this initial sketch (11/11/2025):

![Alt text](../../scripts/Ideal-MLP_performance/whiteboard_images/Ideal-MLP-sketch.jpg)

More complete diagram (17/11/2025):

![Alt text](../../scripts/Ideal-MLP_performance/whiteboard_images/Zoom-Meeting_2025-11-17_Ideal-MLP_architecture.png)

In [None]:
class Ideal_MLP(nn.Module):
    def __init__(self, graph_size:int, max_clique_prop:float = 0.7):
        super().__init__()
        self.N = graph_size
        # Precomputed variables:
        # - array of bin edges: 
        self.tau = torch.arange(0, self.N+1) #NOTE: possible improvement is extending this to actual bins spanning more than 1 single value
        print("Tau values: ", self.tau)
        # - array of clique size values:
        max_K = int(max_clique_prop * self.N)
        self.clique_size_values = torch.arange(1, max_K + 1)
        print("Clique size values: ", self.clique_size_values)
        # - expected histograms:
        self.expected_hist_no_clique = expected_count_noclique(self.tau, self.N) # dimension: len(self.tau) x 1        
        expected_hist_clique = []  # to be filled: list of tensors
        for K in self.clique_size_values:
            p_corrected = p_correction(0.5, self.N, K)
            expected_hist_K = expected_count_clique(self.tau, self.N, K, p_corrected)  # dimension: len(self.tau) x 1 (NOTE: these should sum to N)
            expected_hist_clique.append(expected_hist_K)
        self.expected_hist_clique = torch.stack(expected_hist_clique, dim=1)  # final dimension: len(self.tau) x clique_size
        
ideal_MLP = Ideal_MLP(100)        

# Processing blocks:
# - compute degree of each node (sum over rows/columns of AM)
# - compute for all k, for all tau:
#   - left/right blocks
#   - right - left
#   - store in matrix (containing 1 if k_i = tau_j), dim N x k
# - sum matrix over k -> how many nodes have degree = tau_j (raw count histogram)
# - for each tau_j:
#   - diff_clique_tau_j = abs(Raw count histogram - exp.histogram with clique(for all possible clique sizes))
# - diff_clique_tau_j -> compute mean
# - diff_noclique_tau_j = abs(Raw count histogram - exp.histogram without clique)

# OUT: soft output (softmax)

Tau values:  tensor([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
         14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,  27,
         28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,  40,  41,
         42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,  53,  54,  55,
         56,  57,  58,  59,  60,  61,  62,  63,  64,  65,  66,  67,  68,  69,
         70,  71,  72,  73,  74,  75,  76,  77,  78,  79,  80,  81,  82,  83,
         84,  85,  86,  87,  88,  89,  90,  91,  92,  93,  94,  95,  96,  97,
         98,  99, 100])
Clique size values:  tensor([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
        19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
        37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54,
        55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70])
(101,)
tensor(100.0000, dtype=torch.float64)
tensor(100.0000, dtype=torch.float64)


# Test function

In [None]:
# read configuration file:
config = load_config(
    os.path.join("Ideal-MLP_test_config.yml")
)  # CHANGE THIS TO PERFORM DIFFERENT EXPERIMENTS

# looping over the different graph sizes in the experiment:
for graph_size in config["graph_size_values"]:

    # Create empty dictionaries for storing testing results:
    fraction_correct_results = {}  # Fraction correct for each clique size
    metrics_results_list = []

    # Calculate max clique size for testing (proportion of graph size):
    if graph_size in [100, 150, 200]:
        max_clique_size_proportion_test = config["testing_parameters"]["max_clique_size_proportion_test"][0]
    elif graph_size in [300, 400, 480, 600]:
        max_clique_size_proportion_test = config["testing_parameters"]["max_clique_size_proportion_test"][1]
    elif graph_size in [800, 1000, 1200]:
        max_clique_size_proportion_test = config["testing_parameters"]["max_clique_size_proportion_test"][2]                        
    elif graph_size in [1500, 2000]:
        max_clique_size_proportion_test = config["testing_parameters"]["max_clique_size_proportion_test"][3]                            
    elif graph_size in [3000, 5000]:
        max_clique_size_proportion_test = config["testing_parameters"]["max_clique_size_proportion_test"][4]
    else:
        max_clique_size_proportion_test = config["testing_parameters"]["max_clique_size_proportion_test"][5]
    max_clique_size = int(
        max_clique_size_proportion_test * graph_size
    )

    # Calculate array of clique sizes for all test curriculum
    # NOTE: if max clique size is smaller than the the number of test levels, use max clique size as the number of test levels
    clique_sizes = np.linspace(
        max_clique_size,
        1,
        num=min(max_clique_size, config["testing_parameters"]["clique_testing_levels"]),
    ).astype(int)
    
    # Metrics initialization
    TP, FP, TN, FN = 0, 0, 0, 0  
    y_scores = []
    y_true = []    

    # Loop for decreasing clique sizes
    for current_clique_size in clique_sizes:

        # Initialize fraction correct list, updated at each test iteration
        fraction_correct_list = []

        # Loop for testing iterations:
        for test_iter in range(config["testing_parameters"]["test_iterations"]):

            # Generate clique size value of each graph in the current batch
            clique_size_array_test = graphs_gen.generate_batch_clique_sizes(
                np.array([current_clique_size]),
                config["testing_parameters"]["num_test"],
            )

            # Generate validation graphs
            test = graphs_gen.generate_batch(
                config["testing_parameters"]["num_test"],
                graph_size,
                clique_size_array_test,
                config["p_correction_type"],
                False,
            )
            
            # Perform prediction on test data
            soft_output = ideal_MLP(test[0]).squeeze()
            hard_output = (soft_output >= 0.5).int().cpu().numpy()  # converting to hard output (0/1)
            # print(hard_output.shape, test_labels.shape)   # DEBUGGING

            # Update global metrics for AUC-ROC
            y_scores.extend(soft_output.cpu().tolist())
            y_true.extend(test[1].cpu().tolist())
             
            # transforming hard_output and test_labels to torch tensors:
            hard_output = torch.tensor(hard_output, dtype=torch.float32)
            test_labels = torch.tensor(test[1], dtype=torch.float32)
            
            # Compute metrics
            TP += ((hard_output == 1) & (test_labels == 1)).sum().item()
            FP += ((hard_output == 1) & (test_labels == 0)).sum().item()
            TN += ((hard_output == 0) & (test_labels == 0)).sum().item()
            FN += ((hard_output == 0) & (test_labels == 1)).sum().item()

            # updating fraction correct list with the accuracy of the current test iteration:
            fraction_correct_list.append(
                (hard_output == test_labels).sum().item()
                / (1.0 * config["testing_parameters"]["num_test"])
            )
            
            # delete unused variables
            del test, hard_output, test_labels, clique_size_array_test, soft_output
            torch.cuda.empty_cache()

        # Updating dictionary after all test iterations for current clique size have been completed:
        fraction_correct_results[current_clique_size] = round(
            sum(fraction_correct_list) / len(fraction_correct_list), 2
        )

        # Printing the size of the clique just tested and the corresponding test accuracy:
        print(
            f"||| Completed testing for clique = {current_clique_size}. "
            f"Average fraction correct = {fraction_correct_results[current_clique_size]}"
        )
        print("|||===========================================================")

    # - notify completion of testing:
    print(f"| Finished testing Ideal MLP at N = {graph_size}.")

    # Computing metrics:
    precision = TP / (TP + FP + 1e-10)
    recall = TP / (TP + FN + 1e-10)
    F1 = 2 * (precision * recall) / (precision + recall + 1e-10)
    AUC_ROC = roc_auc_score(y_true, y_scores)
    num_params = sum(
        p.numel() for p in ideal_MLP.parameters()
    )  # storing total number of parameters
    metrics_results = {
        "TP": TP,
        "FP": FP,
        "TN": TN,
        "FN": FN,
        "precision": precision,
        "recall": recall,
        "F1": F1,
        "AUC_ROC": AUC_ROC,
        "total_params": num_params,
    }

    # Saving accuracy results in .csv file:
    # - defining file name and path:
    file_path = os.path.join(
        os.getcwd(), "results", f"Ideal-MLP_N{graph_size}_fraction_correct.csv"
    )
    # - saving the dictionary as a .csv file:
    with open(file_path, "w") as file:
        writer = csv.writer(file)
        writer.writerow(["clique size", "fraction correct"])  # Add column labels
        for key, value in fraction_correct_results.items():
            writer.writerow([key, value])
    # Saving metrics results in .csv file:
    # - defining file name and path:
    file_path = os.path.join(
        os.getcwd(), "results", f"Ideal-MLP_N{graph_size}_metrics.csv"
    )
    # - saving the dictionary as a .csv file:
    pd.DataFrame([metrics_results]).to_csv(file_path, index=False)

    print(f"- Ideal MLP Results saved successfully for N = {graph_size}.")