In [None]:
# Train a new model
import numpy as np
import torch
from backbone import backbone_model, criterions_and_optimizers, training_page
from scipy.special import softmax
from load_config import *

import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleMultiHeadCNN(nn.Module):
    def __init__(self, input_channels=1, num_classes=10, heads_num=3, conv_layers_config=[16, 32], fc_layers_config=[], dropout_prob=0.2):
        """
        A simple configurable multi-head CNN.

        Parameters:
        - input_channels: Number of channels in input images
        - num_classes: Number of output classes per head
        - heads_num: Number of classification heads
        - conv_layers_config: List of integers, number of filters per conv layer
        - fc_layers_config: List of integers, number of neurons per fully connected layer, NOT including linear classifier layer (last one)
        - dropout_prob: Dropout probability in FC layers
        """
        super(SimpleMultiHeadCNN, self).__init__()
        
        # Build convolutional backbone
        layers = []
        in_ch = input_channels
        for out_ch in conv_layers_config:
            layers.append(nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1))
            layers.append(nn.ReLU())
            layers.append(nn.MaxPool2d(2))
            in_ch = out_ch
        layers.append(nn.AdaptiveAvgPool2d((1,1)))
        self.features = nn.Sequential(*layers)
        
        # Flatten size after conv layers (assuming square input)
        self.flatten_size = in_ch  # placeholder, will compute dynamically in forward if needed
        
        # Build multi-heads
        self.classification_heads = nn.ModuleList()
        for _ in range(heads_num):
            head_layers = []
            prev_size = self.flatten_size
            for fc_size in fc_layers_config:
                head_layers.append(nn.Linear(prev_size, fc_size))
                head_layers.append(nn.ReLU())
                head_layers.append(nn.Dropout(dropout_prob))
                prev_size = fc_size
            head_layers.append(nn.Linear(prev_size, num_classes))
            self.classification_heads.append(nn.Sequential(*head_layers))
    
    def forward(self, x):
        # Convolutional backbone
        x = self.features(x)
        x = torch.flatten(x, 1)
        
        # Pass through each head
        outputs = []
        for head in self.classification_heads:
            outputs.append(head(x))
        return outputs

def main(current_config):
    best_val_loss = np.ones(config['general']['heads_num'])*np.inf
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    train_loader, val_loader, test_loader, cal_loader=load_data(current_config)
    model = ModifiedResNet50(current_config).to(device)
    optimizer = set_optimizer(model,current_config)
    criterion = set_criterion(current_config)
    if current_config['current_step']['training_phase'][0]=="retrain" or current_config['current_step']['training_phase'][0]=="eval":
        model,_=load_best_weights(model, optimizer, config['general']['save_path'])
    if not current_config['current_step']['training_phase'][0]=="eval":
        model,best_val_loss=training_loop(model,train_loader,val_loader,optimizer,criterion,device,best_val_loss,current_config)
    test_loss, test_accuracy,test_output,test_targets = evaluate_individual_heads(model, test_loader, criterion, device)
    cal_loss, cal_accuracy,cal_output,cal_targets = evaluate_individual_heads(model, cal_loader, criterion, device)
    print(f"Cal average accuracy:{np.array(cal_accuracy).mean()}")
    print(f"Test average accuracy:{np.array(test_accuracy).mean()}")
    print(f"Train sampels:{len(train_loader.dataset)}\nVal sampels:{len(val_loader.dataset)}\nTest sampels:{test_targets.shape[0]}\nCAL sampels:{cal_targets.shape[0]}")
    if config['general']['save_output'][0] and current_config['current_step']['training_phase'][0]=="eval":
        save_dir = config['general']['save_output'][1]
        save_name=config['general']['save_output'][2]
        np.save(f"{save_dir}/test_outputs_{config['general']['dataset_name']}_{save_name}.npy", softmax(test_output,axis=2))
        np.save(f"{save_dir}/cal_outputs_{config['general']['dataset_name']}_{save_name}.npy", softmax(cal_output,axis=2))
        np.save(f"{save_dir}/cal_target_{config['general']['dataset_name']}_{save_name}.npy", cal_targets)
        np.save(f"{save_dir}/test_target_{config['general']['dataset_name']}_{save_name}.npy", test_targets)
    print(f"Test Loss: {np.array(test_loss).mean()}, Test Accuracy: {np.array(test_accuracy).mean()}")


if __name__ == "__main__":
    config=load_config()
    current_config={}
    current_config['current_step']={}
    current_config['current_step']['batch_size']= config['general']['batch_size']
    current_config['current_step']['dropout_prob']= config['general']['dropout_prob']
    current_config['current_step']['num_layers']= config['general']['num_layers']
    current_config['current_step']['optimizer_name']=config['general']['optimizer_name']
    current_config['current_step']['weight_decay'] = config['general']['weight_decay']
    for step_index, s in enumerate(config['training_plan'].values()):
        current_config['current_step']['trained_heads'] = s['trained_heads']
        current_config['current_step']['training_phase'] = s['training_phase']
        current_config['current_step']['num_epochs'] = s['num_epochs']
        current_config['current_step']['criterion_name'] = s['criterion_name']
        current_config['current_step']['lr_features']= s['lr_features']
        current_config['current_step']['lr_heads'] = s['lr_heads']
        print(current_config['current_step'])
        main(current_config)

In [None]:
from utils import load_data, compute_scores, generate_Dcal_Dcells_sets, create_random_split
from multi_dim_cp import main_algo
from matplotlib import pyplot as plt
from matplotlib import colors
import numpy as np
from scipy.spatial import cKDTree
from matplotlib.patches import Patch
from utils import create_true_rest_sets
from tabulate import tabulate

np.random.seed(5)

config={
    'DATASET_NAME': 'PathMNIST_demo', # CIFAR100_demo, PathMNIST_demo
    
    'ALPHA': 0.2,
    
    'b': 1, # Always 1
    
    'N_HEADS': [1,2,4,7],
    
    'SCORING_METHOD': 'RAPS' # RAPS, SAPS , NAIVE , APS
    
}

cal_output,cal_target,test_output,test_target=load_data(config)
