In [1]:
import os, sys
print("cwd:", os.getcwd())
print("files:", os.listdir())
print("sys.path[0]:", sys.path[0])

cwd: /Users/nickbossi/Documents/Oxford/UDL/Mini Project/Code
files: ['.DS_Store', 'uv.lock', 'project.ipynb', 'database.py', 'pyproject.toml', 'README.md', '.gitignore', 'acquisition_functions.py', '.venv', 'UDL_results', '.git', 'main.py', 'Backup', 'data', 'plots']
sys.path[0]: /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python39.zip


In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
import itertools
import copy
import random
from IPython.display import Image
from database import create_database, save_database, load_database, update_database, print_database
from acquisition_functions import calc_entropy, calc_BALD, calc_var_rat, calc_Mean_STD, calc_uniform, get_TNC_preds

Plot saved successfully as acquisition_curves.png


In [4]:
! nvidia-smi 

zsh:1: command not found: nvidia-smi


In [5]:
# Setting seeds for reproducibility
def set_seeds(seed: int = 2025):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    # For Apple Silicon (MPS)
    if torch.backends.mps.is_available():
        torch.manual_seed(seed)



In [6]:
# Check if Apple's Metal Performance Shaders (MPS) is available
if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Device: MPS (Apple Silicon GPU)")
elif torch.cuda.is_available():
    device = torch.device("cuda")
    print("Device: CUDA (NVIDIA GPU)")
else:
    device = torch.device("cpu")
    print("Device: CPU")

Device: MPS (Apple Silicon GPU)


In [7]:
class CNN(nn.Module):
    def __init__(self, 
                num_filters: int = 32, 
                hidden_dim: int = 128,
                kernel_size: int = 4,
                max_pool: int = 2,
                output_dim:int=10,
                width: int = 28,
                height: int = 28
                ):
        super().__init__()

        self.layers = nn.Sequential(
            # Convolution 1
            nn.Conv2d(in_channels = 1, out_channels = num_filters, kernel_size = kernel_size),
            nn.ReLU(),

            # Convolution 2
            nn.Conv2d(in_channels = num_filters, out_channels = num_filters, kernel_size = kernel_size),
            nn.ReLU(),

            # Max pooling
            nn.MaxPool2d(kernel_size=max_pool),

            # Dropoout 1
            nn.Dropout(p=0.25),

            # Flatten and fully connected (account for two convolutions and maxpooling in input dimension)
            nn.Flatten(),
            nn.Linear(in_features = (num_filters 
                                    * ((width - 2 * kernel_size + 2) // max_pool)
                                    * ((height - 2 * kernel_size + 2) // max_pool)), 
                      out_features = hidden_dim),
            nn.ReLU(),

            # Droput 2
            nn.Dropout(p=0.5),

            # Fully connected 2 
            nn.Linear(in_features = hidden_dim, out_features = output_dim)
        )
    def forward(self, x):
        x = self.layers(x)
        return x



In [8]:
def get_balanced_set(dataset, size: int):
    targets = np.array(dataset.targets)
    all_indices = np.arange(len(targets))

    samples_per_class = size//10

    indices = []

    for c in range(10):
        c_indices = all_indices[targets == c]

        indices.extend(np.random.choice(c_indices, size=samples_per_class, replace = False))
    
    return np.array(indices)

def get_dataset():
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    return train_dataset


def get_indices():
    train_dataset = get_dataset()

    # All the indices of the training data
    total_indices = np.arange(len(train_dataset))

    # Initial Training Set: 20 random but balanced samples
    train_indices = get_balanced_set(train_dataset, 20)

    remaining_indices = np.setdiff1d(total_indices, train_indices)

    # Get the 100 validation datapoints used to optimize the learning rate
    validation_indices = np.random.choice(remaining_indices, size = 100, replace = False)

    # Pool Set: Everything else
    pool_indices = np.setdiff1d(remaining_indices, validation_indices)
    
    return train_indices, validation_indices, pool_indices


In [9]:
def train_model(model, train_indices, lr = 3e-4, weight_decay = 1e-6, n_epochs = 50):
    train_dataset= get_dataset()
    train_loader = DataLoader(Subset(train_dataset, train_indices), batch_size=128, shuffle=True)
    n_batches = len(train_loader)

    # Initialize model, optimizer and loss function 
    model.train()

    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay = weight_decay)
    criterion = nn.CrossEntropyLoss()

    # Training loop
    for epoch in range(n_epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.to(device)
            target = target.to(device)
            optimizer.zero_grad()
            output = model(data)
            
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()


    return model


In [10]:
def validation(n_epochs=50, model = None, train_indices = None):
    print(f"Performing validation.")
    train_dataset = get_dataset()

    #set_seeds(2025)
    indices, validation_indices, _ = get_indices()

    if train_indices is not None:
        train_indices = train_indices
        
    else:
        train_indices = indices
    
    val_loader = DataLoader(Subset(train_dataset, validation_indices), batch_size = len(validation_indices), shuffle = False)

    val_losses = []
    val_accuracies = []
    lambdas = [np.exp(-i) for i in np.linspace(0,5,40)]


    for wd in lambdas:
        set_seeds(2025)
        if model == None:
            model = CNN().to(device)
        
        model = train_model(model = model ,train_indices = train_indices, weight_decay = wd, n_epochs = n_epochs)
        model.train()

        correct = 0

        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device).float(), target.to(device)
            output = model(data)

            _, predicted = torch.max(output, dim=1)
            correct += (target == predicted).sum().item()

            val_accuracies.append(correct/100)

            val_loss = nn.CrossEntropyLoss()(output, target)
            val_losses.append(val_loss.item())
            #print(f"Single Val_loss = {val_loss.item()}")

    #print(f"All val_losses = {val_losses}")
    plt.plot(lambdas, val_losses)
    plt.show()

    opt_wd = lambdas[np.argmin(val_losses)]
    print(f"Validation complete!")

    #print(f"opt_wd accuracy = {val_accuracies[np.argmin(val_losses)]}")
    return opt_wd 


In [11]:
def test_model(model, deterministic: bool = False, T: int = 20):
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    test_dataset = datasets.MNIST('./data', train=False, download=True, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    correct = 0
    total = 0

    # if deterministic: 
    if deterministic:
        model.eval()
        with torch.no_grad():
            for batch_idx, (data, target) in (pbar:= tqdm.tqdm(enumerate(test_loader))):
                data, target = data.to(device).float(), target.to(device)
                output = model(data)
                _, predicted = torch.max(output, dim=1)
                correct += (target == predicted).sum().item()
                total +=target.size(0)

    # else: do MC over dropout
    else:
        model.train()
        
        for batch_idx, (data, target) in (pbar:= tqdm.tqdm(enumerate(test_loader))):
            data, target = data.to(device).float(), target.to(device)
            TNC_preds = get_TNC_preds(data, model,T)

            # Take mean over T to get MC estimate of model output
            MC_estimate = torch.mean(TNC_preds, dim =0)

            # Get predicted class and check if correct
            _, predicted = torch.max(MC_estimate, dim=1)
            correct += (target == predicted).sum().item()
            total +=target.size(0)

    accuracy = correct/total
    return accuracy

In [12]:
def train_w_acquisition(base_model, 
                        train_dataset, 
                        train_indices, 
                        pool_indices, 
                        acq_fn_name, 
                        acq_fn, 
                        opt_wd,
                        T: int = 100, 
                        n_acq: int = 100, 
                        n_epochs: int =50,
                        acq_batch_size = 32,
                        run_num: int = 0,
                        deterministic: bool = False):
    # Ensures base_model and indices are the same at the start of each acquisition function
    model = copy.deepcopy(base_model)              
    train_indices_copy = train_indices.copy()
    pool_indices_copy = pool_indices.copy()

    # Iterates through the acquisition steps, evaluating the entire pool against the relevant
    # acquisition function, and choosing the top 10, adds these to the training set and further trains
    # the model 
    for i in range(n_acq):
        acq_step = (i+1)*10
        
        uncertainty_scores = []

        # Gets data from pool from which we will acquire new xs
        pool_data = DataLoader(Subset(train_dataset, pool_indices_copy), batch_size = acq_batch_size, shuffle = False)

        print(f"Calulating uncertainty scores using: {acq_fn_name}, acq_step: {acq_step}")
        if deterministic:
            model.eval()
        else:
            model.train()
        # Gets uncertainty scores in batches due to memory constraints
        for batch_idx, (x_batch,_) in enumerate(pool_data):
            x_batch = x_batch.to(device)

            acq_values = acq_fn(get_TNC_preds(x_batch, model, T, deterministic))

            # Helps with memory
            if isinstance(acq_values, torch.Tensor):
                scores = acq_values.detach().cpu().numpy()
            else:
                scores = acq_values
                
            uncertainty_scores.append(scores)
            
            # Delete unused values to help clear memory
            del x_batch, acq_values 

        # Combines all uncertainty scores into one array
        uncertainty_scores = np.concatenate(uncertainty_scores)

        # Gets indices of pool with highest 10 uncertainty scores
        acquired_pool_indices = np.argsort(uncertainty_scores)[::-1][:10]

        # Maps these indices back to the actual training set indices and updates pool and training indices
        acquired_dataset_indices = pool_indices_copy[acquired_pool_indices]
        pool_indices_copy = np.setdiff1d(pool_indices_copy, acquired_dataset_indices)
        train_indices_copy = np.concatenate((train_indices_copy, acquired_dataset_indices))

        # Sets different seed for each run to get mean and std statistics
        #set_seeds(2025+run_num)
        model = train_model(model = CNN().to(device), train_indices=train_indices_copy, lr = 1e-3, weight_decay = opt_wd, n_epochs = n_epochs)
        accuracy = test_model(model, deterministic, T = T)
        
        data = [deterministic, run_num, acq_fn_name, acq_step, accuracy]
        update_database(data)

        print(f"Accuracy for acq_fn {acq_fn_name} at acq-step {acq_step} is {accuracy}")

        #print_database()

        #resets model to train from scratch for next acquisition

    
    return model


In [13]:
def run_experiments(acq_fns: dict, 
                    deterministic: bool = False, 
                    T: int = 100, 
                    run_nums: list = [0,1,2]):
    # Gets optimal wd via validation, setting seeds to ensure all acq_fns and runs have same base model
    #set_seeds(2025)
    #opt_wd = validation()
    opt_wd = 1e-4
    print(f"Opt wd: {opt_wd}")

    # Loads initial model, datasets and indices, sets seeds for reproduction of initialisation
    #set_seeds(2025)
    model = CNN().to(device)
    train_dataset = get_dataset()

    #set_seeds(2025)
    train_indices, validation_indices, pool_indices = get_indices()

    # Trains base model with optimal wd, set seeds to ensure all acquisition runs have the same initial model.
    #set_seeds(2025)
    base_model = train_model(model = model, train_indices = train_indices, lr = 1e-3, weight_decay = opt_wd, n_epochs = 50)
    print(f"Base model test accuracy: {test_model(base_model, deterministic = deterministic, T = T)}")

    # Iterates through acquisition functions, performing active learning and saving acquired data in database
    for key, value in acq_fns.items():
        for run_num in run_nums:
            model = train_w_acquisition(base_model=base_model, 
                                        train_dataset = train_dataset, 
                                        train_indices = train_indices, 
                                        pool_indices=pool_indices, 
                                        acq_fn_name = key, 
                                        acq_fn = value, 
                                        opt_wd = opt_wd,
                                        T = T,
                                        n_acq=100, 
                                        n_epochs = 50,
                                        run_num = run_num,
                                        deterministic = deterministic)
            accuracy = test_model(model, deterministic = deterministic, T = T)

        print(f"Test accuracy for acquisition function: {key} = {accuracy}")

In [None]:
#acq_fns = {"entropy": calc_entropy, "uniform": calc_uniform, "BALD":calc_BALD, "var_rat": calc_var_rat,"Mean_STD": calc_Mean_STD}
#acq_fns = {"Mean_STD": calc_Mean_STD}
#run_experiments(acq_fns= acq_fns, run_nums = [0], deterministic = False)

In [15]:
class inf_CNN(nn.Module):
    def __init__(self, 
                num_filters: int = 32, 
                hidden_dim: int = 128,
                kernel_size: int = 4,
                max_pool: int = 2,
                output_dim:int=10,
                width: int = 28,
                height: int = 28
                ):
        super().__init__()

        self.layers = nn.Sequential(
            # Convolution 1
            nn.Conv2d(in_channels = 1, out_channels = num_filters, kernel_size = kernel_size),
            nn.ReLU(),

            # Convolution 2
            nn.Conv2d(in_channels = num_filters, out_channels = num_filters, kernel_size = kernel_size),
            nn.ReLU(),

            # Max pooling
            nn.MaxPool2d(kernel_size=max_pool),

            # Dropoout 1
            #nn.Dropout(p=0.25),

            # Flatten and fully connected (account for two convolutions and maxpooling in input dimension)
            nn.Flatten(),
            nn.Linear(in_features = (num_filters 
                                    * ((width - 2 * kernel_size + 2) // max_pool)
                                    * ((height - 2 * kernel_size + 2) // max_pool)), 
                      out_features = hidden_dim),
            nn.ReLU(),

            # Droput 2
            #nn.Dropout(p=0.5),
        )
            # Fully connected 2 
        self.final_layer = nn.Linear(in_features = hidden_dim, out_features = output_dim)

    def forward(self, x):
        phi = self.layers(x)
        output = self.final_layer(phi)
        return output, phi

Boolean = True

*****************************************************

Acq function = entropy

$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$

Run number = run_0
_________________________________

[[10, 0.6124], [20, 0.6829], [30, 0.7117], [40, 0.7228], [50, 0.7129], [60, 0.7556], [70, 0.7474], [80, 0.7783], [90, 0.8133], [100, 0.8204], [110, 0.8276], [120, 0.8553], [130, 0.8876], [140, 0.861], [150, 0.8588], [160, 0.8797], [170, 0.8682], [180, 0.8882], [190, 0.9031], [200, 0.8846], [210, 0.8934], [220, 0.9103], [230, 0.9126], [240, 0.8999], [250, 0.9134], [260, 0.9248], [270, 0.9261], [280, 0.9293], [290, 0.93], [300, 0.9226], [310, 0.9319], [320, 0.9357], [330, 0.9397], [340, 0.9405], [350, 0.9414], [360, 0.9411], [370, 0.9307], [380, 0.934], [390, 0.9447], [400, 0.936], [410, 0.9363], [420, 0.9506], [430, 0.9455], [440, 0.9531], [450, 0.9602], [460, 0.9469], [470, 0.9597], [480, 0.961], [490, 0.9626], [500, 0.9609], [510, 0.9623], [520, 0.9629], [530, 0.9613], [540, 0.9666],

In [16]:
def train_reg_model(model, train_indices, lr = 3e-4, weight_decay = 1e-6, n_epochs = 100, s = 1, sigma = 1):
    train_dataset = get_dataset()
    train_loader = DataLoader(Subset(train_dataset, train_indices), batch_size=64, shuffle=True)
    n_batches = len(train_loader)

    # Initialize model, optimizer and loss function 
    model = model.float()
    model.train()

    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay = weight_decay)
    criterion = nn.MSELoss()

    # Training loop
    for epoch in range(n_epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.to(device).float()

            # Change predictions to one-hot encodings
            target = (F.one_hot(target, num_classes = 10)).to(device).float()

            optimizer.zero_grad()
            # Extract prediction
            output, _ = model(data)
            
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
    
    return model


In [17]:
def test_reg_model(model):
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    test_dataset = datasets.MNIST('./data', train=False, download=True, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    
    total_loss = 0.0
    n = 0

    model.eval()
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            data = data.to(device)
            # Change predictions to one-hot encodings
            target = (F.one_hot(target, num_classes = 10)).float().to(device)

            # Extract prediction
            output, _ = model(data)
        
            loss = nn.MSELoss(reduction = 'sum')(output, target)      #gets sum not mean

            total_loss += loss.item()
            n += target.numel()

    
    RMSE = np.sqrt(total_loss/n) 


    return RMSE

In [18]:
def calc_SigmaW(model, sigma2, s2, train_indices):
    train_dataset = get_dataset()
    train_loader = DataLoader(Subset(train_dataset, train_indices), batch_size = len(train_indices))


    model.eval()
    
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.to(device)
            _, phi = model(data)
            K = phi.shape[1]

    inv = (1/sigma2) * phi.T @ phi + (1/s2) * torch.eye(K).to(device)

    return torch.linalg.inv(inv)

In [19]:
def calc_pred_var(model, train_dataset, train_indices, pool_indices, sigma2, s2):
    
    SigmaW = calc_SigmaW(model, sigma2, s2, train_indices)
    X_star_loader = DataLoader(Subset(train_dataset, pool_indices), batch_size = 64, shuffle = False)
    model.eval()
    variances = []

    with torch.no_grad():
        for batch_idx, (data,_) in enumerate(X_star_loader):
            data = data.to(device)
            _, phi = model(data)
            batch_variances = sigma2 + torch.einsum('bk,kj,bj->b', phi, SigmaW, phi)
            variances.append(batch_variances.cpu())
        
    return torch.cat(variances).numpy()

In [20]:
def calc_uniform_var(model, train_dataset, train_indices, pool_indices, sigma2,s2):
    N = len(pool_indices)
    return torch.rand(N).detach().cpu().numpy()
 

In [21]:
def train_w_pred_var(base_model, 
                        train_dataset, 
                        train_indices, 
                        pool_indices,
                        opt_wd,
                        inf_fn_name,
                        inf_fn,
                        n_acq: int = 100, 
                        n_epochs: int =50,
                        sigma2: int = 1,
                        s2: int = 0.1,
                        run_num: int = 0):
    
    # Ensures base_model and indices are the same at the start of each acquisition function
    model = copy.deepcopy(base_model)              
    train_indices_copy = train_indices.copy()
    pool_indices_copy = pool_indices.copy()

    train_dataset = get_dataset()

    # Iterates through the acquisition steps, evaluating the entire pool against the relevant
    # acquisition function, and choosing the top 10, adds these to the training set and further trains
    # the model 
    for i in range(n_acq):
        acq_step = (i+1)*10
    
        print(f"Calulating uncertainty scores using predictive variances, acq_step: {acq_step}")

        uncertainty_scores = inf_fn(model, train_dataset, train_indices_copy, pool_indices_copy, sigma2, s2)

        # Gets indices of pool with highest 10 uncertainty scores
        acquired_pool_indices = np.argsort(uncertainty_scores)[::-1][:10]

        # Maps these indices back to the actual training set indices and updates pool and training indices
        acquired_dataset_indices = pool_indices_copy[acquired_pool_indices]
        pool_indices_copy = np.setdiff1d(pool_indices_copy, acquired_dataset_indices)
        train_indices_copy = np.concatenate((train_indices_copy, acquired_dataset_indices))

        # Sets different seed for each run to get mean and std statistics
        #set_seeds(2025+run_num)
        model = train_reg_model(model = inf_CNN().to(device), train_indices=train_indices_copy, lr = 1e-3, weight_decay = 0.0, n_epochs = n_epochs)
        RMSE = test_reg_model(model)
        data = [inf_fn_name, run_num, acq_step, RMSE]
        update_database(data=data, filename = "inference_database")

        print(f"RMSE for run {run_num} of acq_fn {inf_fn_name} at acq-step {acq_step} = {RMSE}")

    
    return model


In [22]:
def run_reg_experiments(inf_fns: dict,
                        run_nums = [0,1,2]):
    # Gets optimal wd via validation, setting seeds to ensure all acq_fns and runs have same base model
    #set_seeds(2025)
    #opt_wd = validation()
    opt_wd = 1e-4
    print(f"Opt wd: {opt_wd}")

    # Loads initial model, datasets and indices, sets seeds for reproduction of initialisation
    #set_seeds(2025)
    model = inf_CNN().to(device)
    train_dataset = get_dataset()

    #set_seeds(2025)
    train_indices, validation_indices, pool_indices = get_indices()

    # Trains base model with optimal wd, set seeds to ensure all acquisition runs have the same initial model.
    #set_seeds(2025)
    base_model = train_reg_model(model = model, train_indices = train_indices, lr = 1e-3, weight_decay = opt_wd, n_epochs = 50)
    print(f"Base model test accuracy: {test_reg_model(base_model)}")

    # Iterates through acquisition functions, performing active learning and saving acquired data in database
    for key, value in inf_fns.items():
        for run_num in run_nums:
            model = train_w_pred_var(base_model=base_model, 
                                        train_dataset = train_dataset, 
                                        train_indices = train_indices, 
                                        pool_indices=pool_indices, 
                                        inf_fn_name = key, 
                                        inf_fn = value, 
                                        opt_wd = opt_wd,
                                        n_acq=100, 
                                        n_epochs = 50,
                                        sigma2 = 1,
                                        s2 = 0.1,
                                        run_num = run_num)
        RMSE = test_reg_model(model)

        print(f"Test accuracy for inference function: {key} = {RMSE}")

In [23]:
#inference_fns = {"analytic_inf": calc_pred_var, "MFVI_inf":""}
inference_fns = {"analytic_inf": calc_pred_var, "uniform": calc_uniform_var}
run_reg_experiments(inf_fns = inference_fns)

Opt wd: 0.0001
Base model test accuracy: 0.2634800714778036
Calulating uncertainty scores using predictive variances, acq_step: 10
Database saved successfully to ./UDL_results/inference_database
RMSE for run 0 of acq_fn analytic_inf at acq-step 10 = 0.24579392861114788
Calulating uncertainty scores using predictive variances, acq_step: 20
Database saved successfully to ./UDL_results/inference_database
RMSE for run 0 of acq_fn analytic_inf at acq-step 20 = 0.24056301777150388
Calulating uncertainty scores using predictive variances, acq_step: 30
Database saved successfully to ./UDL_results/inference_database
RMSE for run 0 of acq_fn analytic_inf at acq-step 30 = 0.2447221609926911
Calulating uncertainty scores using predictive variances, acq_step: 40
Database saved successfully to ./UDL_results/inference_database
RMSE for run 0 of acq_fn analytic_inf at acq-step 40 = 0.2340000970872857
Calulating uncertainty scores using predictive variances, acq_step: 50
Database saved successfully to 

In [24]:
N=1

for i in range(N):
    all_outputs = []
    for T in range(5):
        model = train_model(train_indices = train_indices, lr = opt_lr)
        output = test_model(model)
        all_outputs.append(output)
    all_outputs = torch.stack(all_outputs, dim =0)
    print(all_outputs.shape)

    acquired_indices = np.random.choice(pool_indices, size=10, replace= False)
    pool_indices = np.setdiff1d(pool_indices, acquired_indices)
    train_indices = np.concatenate((train_indices, acquired_indices))

    train_loader = DataLoader(Subset(train_dataset, train_indices), batch_size = 64, shuffle = True)

    print(len(train_loader.dataset))



NameError: name 'train_indices' is not defined

In [None]:

t1n1=[0.1,0.6,0.3]
t1n2=[0.0,0.8,0.2]
t1n3=[0.1,0.7,0.2]
t2n1=[0.1,0.2,0.7]
t2n2=[0.0,0.9,0.1]
t2n3=[0.8,0.1,0.1]


t1_predictions = torch.tensor([t1n1, t1n2, t1n3])

# T=2 Predictions: Shape (3, 3) -> N x C
t2_predictions = torch.tensor([t2n1, t2n2, t2n3])

# --- Step 2: Stack the T tensors along a new dimension (dim=0) ---
# Resulting shape: (T, N, C) -> (2, 3, 3)
tensor = torch.stack([t1_predictions, t2_predictions], dim=0)

print(tensor)

print(calc_var_rat(tensor))

# print(torch_tensor)
# print(torch_tensor.shape)
# print(calc_entropy(torch_tensor))

# print(f"Entropy shape: {calc_entropy(torch_tensor).shape}")

# print(calc_MI(torch_tensor))
# print(calc_MI(torch_tensor).shape)


tensor([[[0.1000, 0.6000, 0.3000],
         [0.0000, 0.8000, 0.2000],
         [0.1000, 0.7000, 0.2000]],

        [[0.1000, 0.2000, 0.7000],
         [0.0000, 0.9000, 0.1000],
         [0.8000, 0.1000, 0.1000]]])
max indices shape: torch.Size([2, 3, 1])
zeros: torch.Size([2, 3, 3])
one_hot scatter: torch.Size([2, 3, 3])
sum over T: torch.Size([3, 3])
c_argmax: torch.Size([3])
tensor([0.5000, 0.0000, 0.5000])


In [None]:
def test_model(model):
    for batch_idx, (data,target) in (pbar:= tqdm.tqdm(enumerate(test_loader))):
        print()
        