In [None]:
import sys
sys.path.append('../')

from generate_bf import *
from computesk import *
import numpy as np
import torch

import copy
import torch
import torch.optim as optim
import os

from models.UMNN import MonotonicNN
from tqdm import tqdm

import matplotlib.pyplot as plt

In [None]:
num_test = 10000
num_pairs = 20
num_features = num_pairs * 2
hidden_layers = [64, 64, 64]
nb_steps = 50
lr = 0.01
num_epochs = 100

In [None]:
testing_samples_path = 'BF/BF-samples/bf-testing_samples20p.txt'
validation_samples_path = 'BF/BF-samples/bf-validation_samples20p.txt'
save_path = 'BF/BF-saved_models/BF-20p-64_64_64.es' # Change this!

In [None]:
# testing_samples = generate_easy_bf(num_test, num_pairs)
# validation_samples = generate_easy_bf(num_test, num_pairs)

# np.savetxt(testing_samples_path, testing_samples)
# np.savetxt(validation_samples_path, validation_samples)

In [None]:
test_samples = torch.tensor(np.loadtxt(testing_samples_path)).to(torch.float32)
validation_samples = torch.tensor(np.loadtxt(validation_samples_path)).to(torch.float32)

print(test_samples)

In [None]:
test_samples.shape

In [None]:
training_sizes = [5000]
random_regs = [1, 0.1, 0.01, 0.001, 0]

In [None]:
kths = list(range(num_features))
fixed_map = generate_non_linear_maps(num_features, hidden_layers, nb_steps, 'cpu')

In [None]:
opt_regs = {}
all_test_losses = {}
all_learnt_maps = {}
all_opt_maps = {}
test_no_reg_losses = {}

# Learning Sk map

In [None]:
num_train = training_sizes[0]
X_tr = generate_easy_bf(num_train, num_pairs)

for i in kths:
    kth = i
    print('kth =', kth)
    best_val_overall = float('inf')
    opt_reg = 0
    opt_Sk = None # should this be moved in?
    each_learnt_map = {}
    non_kth = [idx for idx in range(X_tr.shape[1]) if idx != kth]

    for j in tqdm(range(len(random_regs)), desc='Random Regs', leave=False):
        regLambda = random_regs[j]
        Sk = copy.deepcopy(fixed_map)[kth]
        optimizer = optim.Adam(Sk.parameters(), lr=lr)
        n = X_tr.shape[0]
        early_stop_counter = 0
        best_epoch = 0
        best_valL = float('inf')
        for epoch in range(num_epochs):
            zk = X_tr.detach().requires_grad_(True)
            h = zk[:, non_kth]
            x = zk[:, [kth]]

            sk_zi = Sk(x, h)
            jacobian = torch.autograd.grad(sk_zi, x, torch.ones_like(sk_zi), create_graph=True)[0]
            loss = (0.5 * sk_zi**2 - torch.log(jacobian)).sum(axis=0) / n #mapS_losses(sk_zi, jacobian).sum(axis=0) / n#, kth)
            regulariser = torch.sqrt((jacobian**2).sum(axis=0) / n)
            loss += regLambda * regulariser
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Validation
            Sk_zi_val, jacobian_val = test_map(validation_samples, non_kth, kth, Sk)
            val_loss = test_losses(Sk_zi_val, jacobian_val)#, kth)
            print(f'Val {num_train}st λ = {regLambda}, Epoch {epoch}: {val_loss}')

            # Save the smallest validation loss at each loop.
            if val_loss[1] < best_valL:
                best_valL = val_loss[1]
                if val_loss[1] < best_val_overall: # overall for all λ and epoch
                    best_val_overall = val_loss[1]
                    opt_reg = regLambda
                    opt_Sk = Sk
                    
                best_epoch = epoch
                early_stop_counter = 0
            else:
                early_stop_counter += 1

            # Check for early stopping
            if early_stop_counter >= 10:
                print(f'Early stopping at Epoch {epoch} for best epoch {best_epoch}.')
                break

        each_learnt_map.setdefault(regLambda, []).append(Sk)

    # Test the best model
    Sk_zi_test, jacobian_test = test_map(test_samples, non_kth, kth, opt_Sk)

    all_test_losses.setdefault(num_train, []).append(test_losses(Sk_zi_test, jacobian_test))###, kth))
    print(f'Test {num_train}, λ = {opt_reg}: {all_test_losses[num_train]}')

    all_learnt_maps.setdefault(num_train, []).append(each_learnt_map)
    all_opt_maps.setdefault(num_train, []).append(opt_Sk)
    opt_regs.setdefault(num_train, []).append(opt_reg)
    
print('Optimal λ ∀ =', opt_regs)


In [None]:
data = all_test_losses[num_train]

# Sum of the second value of each tuple
total = sum(t[1] for t in data)
print("Total NLL (computed):", total)


In [None]:
save_directory = save_path + '/optimal_reg'
os.makedirs(save_directory, exist_ok=True)

In [None]:
for model_name, model in all_opt_maps.items():
    save_path_here = os.path.join(save_directory, 'tr' + f'{model_name}.pth')
    state_dicts = {}
    for i in range(len(model)):
        state_dicts[f'model{i+1}_state_dict'] = model[i].state_dict()
    torch.save(state_dicts, save_path_here)


In [None]:
save_directory_no_reg = save_path + '/all_reg'
os.makedirs(save_directory_no_reg, exist_ok=True)

In [None]:
all_organised_learners = {}

for num_train, learnt_maps_list in all_learnt_maps.items():
    print(num_train)
    all_learnt_maps_org = {}
    for learnt_map_dict in learnt_maps_list:
        for reg_lambda, model_list in learnt_map_dict.items():
            print(reg_lambda)
            print(model_list)
            all_learnt_maps_org.setdefault(reg_lambda, []).append(model_list[0])
    all_organised_learners[num_train] = all_learnt_maps_org

for training_size, regularization_dict in all_organised_learners.items():
    training_size_folder = os.path.join(save_directory_no_reg, 'tr' + str(training_size))
    os.makedirs(training_size_folder, exist_ok=True)

    for regularization, model in regularization_dict.items():
        model_name = f'reg{regularization}.pth'
        model_path = os.path.join(training_size_folder, model_name)
        state_dicts = {}
        for i in range(len(model)):
            state_dicts[f'model{i+1}_state_dict'] = model[i].state_dict()
        torch.save(state_dicts, model_path)


In [None]:
loaded_models = torch.load(save_directory + '/tr5000.pth')


In [None]:

model_state_dicts = [loaded_models[f'model{i}_state_dict'] for i in range(1, num_features + 1)]


In [None]:

models = []

for i in range(num_features):
    model = MonotonicNN(num_features, hidden_layers, nb_steps, 'cpu')
    model.load_state_dict(model_state_dicts[i])
    models.append(model)


In [None]:


precision_matrix = []


for j in range(num_features):
    Sj = models[j]
    row = []
    Sj.eval()
    kth = j
    non_kth = [idx for idx in range(test_samples.shape[1]) if idx != kth]

    zk = test_samples.detach().requires_grad_(True)
    h = zk[:, non_kth]
    x = zk[:, [kth]]
    
    sk_zi = Sj(x,h)  
    for i in range(num_features):
        print(i, j)
        if i != j:
            first_derivative = torch.autograd.grad(sk_zi, zk, torch.ones_like(sk_zi), create_graph=True)[0]
            first_derivative = torch.log(torch.abs(first_derivative))
            second_derivative = torch.autograd.grad(first_derivative[:, [j]], zk, torch.ones_like(first_derivative[:, [j]]), create_graph=True)[0] # check whether they are columns or row vector. column vector by adding [j]!!! This is currently a row vevotr which isn't right!
            third_derivative= torch.autograd.grad(second_derivative[:, [j]], zk, torch.ones_like(second_derivative[:, [j]]), create_graph=True)[0]

            second = torch.abs(third_derivative[:,[i]]).mean().item()
            first_half = -1/2 * (sk_zi**2)
            first_half_derivative = torch.autograd.grad(first_half, zk, torch.ones_like(first_half), create_graph=True)[0]
            second_half_deriative = torch.autograd.grad(first_half_derivative[:, [j]], zk, torch.ones_like(first_half_derivative[:, [j]]), create_graph=True)[0]

            first = torch.abs(second_half_deriative[:, [i]]).mean().item()
            row.append(first + second)
        else:
            row.append(1) 
    precision_matrix.append(row)
    print(precision_matrix[-1])

In [None]:
matrix = np.array(precision_matrix)
transpose_matrix = matrix.T 
symmetric_matrix = (transpose_matrix + matrix) / 2
print(symmetric_matrix)
plt.figure(figsize=(8, 6))
plt.xticks(np.arange(0, len(symmetric_matrix), 1), np.arange(1, len(symmetric_matrix) + 1))
plt.yticks(np.arange(0, len(symmetric_matrix), 1), np.arange(1, len(symmetric_matrix) + 1))
plt.imshow(symmetric_matrix, cmap='gray', interpolation='nearest')
plt.colorbar()
plt.show()

In [None]:
# Normalise with maximum value of the matrix.
max_value = np.max(symmetric_matrix)

normalized_matrix = symmetric_matrix / max_value

np.fill_diagonal(normalized_matrix, 1)

print("Normalized matrix with diagonals set to 1:")
print(normalized_matrix)


plt.figure(figsize=(8, 6))
plt.xticks(np.arange(0, len(normalized_matrix), 1), np.arange(1, len(normalized_matrix) + 1))
plt.yticks(np.arange(0, len(normalized_matrix), 1), np.arange(1, len(normalized_matrix) + 1))
plt.imshow(normalized_matrix, cmap='gray', interpolation='nearest')
plt.colorbar()
plt.show()

In [None]:

max_value = np.max(symmetric_matrix)

normalized_matrix = symmetric_matrix / max_value

np.fill_diagonal(normalized_matrix, 1)

print("Normalized matrix with diagonals set to 1:")
print(normalized_matrix)

# Create ticks for odd numbers only
ticks = np.arange(1, len(normalized_matrix) + 1, 2)

plt.figure(figsize=(8, 6))
plt.xticks(np.arange(0, len(normalized_matrix), 2), ticks)
plt.yticks(np.arange(0, len(normalized_matrix), 2), ticks)
plt.imshow(normalized_matrix, cmap='gray', interpolation='nearest')
plt.colorbar()
plt.show()


In [None]:


matrix = np.array(precision_matrix)
transpose_matrix = matrix.T 
symmetric_matrix = (transpose_matrix + matrix) / 2

# Find the largest non-diagonal value
max_value = np.max(np.abs(np.triu(symmetric_matrix, k=1)))

# Divide the matrix by the largest non-diagonal value
if max_value != 0:
    symmetric_matrix /= max_value


np.fill_diagonal(symmetric_matrix, 1)

# Plotting
plt.figure(figsize=(8, 6))
plt.xticks(np.arange(0, len(symmetric_matrix), 1), np.arange(1, len(symmetric_matrix) + 1))
plt.yticks(np.arange(0, len(symmetric_matrix), 1), np.arange(1, len(symmetric_matrix) + 1))
plt.imshow(symmetric_matrix, cmap='gray', interpolation='nearest')
plt.colorbar()
plt.show()

In [None]:
symmetric_matrix

In [None]:

symmetric_matrix_rounded = np.round(symmetric_matrix, decimals=3)

# Print the symmetric matrix with rounded values
print(symmetric_matrix_rounded)

# Plot the heatmap
plt.figure(figsize=(8, 6))
plt.xticks(np.arange(0, len(symmetric_matrix), 1), np.arange(1, len(symmetric_matrix) + 1))
plt.yticks(np.arange(0, len(symmetric_matrix), 1), np.arange(1, len(symmetric_matrix) + 1))
plt.imshow(symmetric_matrix_rounded, cmap='gray', interpolation='nearest')
plt.colorbar()

# Add text annotations
for i in range(len(symmetric_matrix)):
    for j in range(len(symmetric_matrix[0])):
        plt.text(j, i, f'{symmetric_matrix_rounded[i, j]:.3f}', ha='center', va='center', color='white')

plt.show()


In [None]:

threshold_values = [0.3, 0.2, 0.1, 0.05]

matrix_size = 40

ground_truth = np.eye(matrix_size)  # Start with the identity matrix (1s on the diagonal)

# Set the specific pairs to 1
for i in range(0, matrix_size, 2):
    ground_truth[i, i+1] = 1
    ground_truth[i+1, i] = 1  # Ensure symmetry

# Calculate the total number of negative entries in the ground truth matrix
total_negatives = np.sum(ground_truth == 0)

# Initialize a plot
plt.figure(figsize=(10, 7))

# Loop through each threshold value
for threshold in threshold_values:
    false_positive_rates_list = []

    # Loop through training sizes and compute false positive rates
    for training_size in training_sizes:
        # Load the symmetric matrix
        matrix = symmetric_matrix
        # Normalize the matrix by its largest value
        max_value = np.max(matrix)
        normalized_matrix = matrix / max_value

        # Apply the threshold to create a binary matrix
        binary_matrix = np.where(normalized_matrix < threshold, 0, 1)

        # Compute the false positives
        false_positives = np.sum((ground_truth == 0) & (binary_matrix == 1))

        # Calculate the false positive rate
        false_positive_rate = false_positives / total_negatives

        # Store the false positive rate
        false_positive_rates_list.append(false_positive_rate)
        print(false_positive_rates_list)
    # Plot the results for the current threshold
    plt.plot(training_sizes, false_positive_rates_list, marker='o', linestyle='-', label=f'Threshold = {threshold:.2f}')

# Customize the plot appearance
plt.xlabel('Training Size', fontsize=14)
plt.ylabel('False Positive Rate', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.grid(True, linestyle='--', alpha=0.6)
plt.legend(title='Threshold Values', fontsize=12)
plt.tight_layout()

# Save the plot
plt.savefig(f'{save_directory}/false_positive_rates_vs_training_size_thresholds.png', dpi=300)

# Show the plot
plt.show()


In [None]:
false_positive_rates_list

In [None]:

threshold_values = [0.3, 0.2, 0.1, 0.05]

matrix_size = 40

ground_truth = np.eye(matrix_size)  # Start with the identity matrix (1s on the diagonal)

# Set the specific pairs to 1
for i in range(0, matrix_size, 2):
    ground_truth[i, i+1] = 1
    ground_truth[i+1, i] = 1  # Ensure symmetry

plt.figure(figsize=(10, 7))

for threshold in threshold_values:
    f1_scores_list = []
    for training_size in training_sizes:
        matrix = symmetric_matrix
        max_value = np.max(matrix)
        normalized_matrix = matrix / max_value

        binary_matrix = np.where(normalized_matrix < threshold, 0, 1)

        true_positives = np.sum((ground_truth == 1) & (binary_matrix == 1))
        false_positives = np.sum((ground_truth == 0) & (binary_matrix == 1))
        false_negatives = np.sum((ground_truth == 1) & (binary_matrix == 0))

        # Calculate precision, recall, and F1 score
        precision = true_positives / (true_positives + false_positives) if true_positives + false_positives > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if true_positives + false_negatives > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0

        f1_scores_list.append(f1_score)
        print(f1_scores_list)

    plt.plot(training_sizes, f1_scores_list, marker='o', linestyle='-', label=f'Threshold = {threshold:.2f}')

plt.xlabel('Training Size', fontsize=14)
plt.ylabel('F1 Score', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.grid(True, linestyle='--', alpha=0.6)
plt.legend(title='Threshold Values', fontsize=12)
plt.tight_layout()

plt.savefig(f'{save_directory}/f1_scores_vs_training_size_thresholds.png', dpi=300)

# Show the plot
plt.show()
