# Calculate the EER (Simple Example)

Take the "log_eval_score.txt" and find the EER by using the file_IDs (the logits you MANUALLY find are actually the values in "log_eval_score.txt". The title here suggets that log_ stands for logits_)

In [1]:
import numpy as np
import torch
import torch.nn.functional as F
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# File paths
log_eval_baseline_original_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\A_ASV_Default_32_Or\baseline_DF\log_eval_score.txt'
# LA_E_1000048 -14.588603

ground_truth_path = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'
# LA_0021 LA_E_1000048 - A08 spoof




# Step 1: Parse the logits file
logits_dict = {}
with open(log_eval_baseline_original_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[0]  # Extract the 'LA_E_num' part (First Column)
        logit_score = float(elements[-1])  # Assuming score is the last element
        logits_dict[la_id] = logit_score




# Step 2: Parse the ground truth file
ground_truth_dict = {}
with open(ground_truth_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[1]  # Extract the 'LA_E_num' part (Second Column)
        label = elements[-1]  # 'bonafide' or 'spoof'
        ground_truth_dict[la_id] = 1 if label == 'bonafide' else 0



'''
EER Calculation: When calculating EER, you don’t set a fixed threshold beforehand. 
Instead, roc_curve calculates FAR and FRR for a range of thresholds and finds 
the optimal one where FAR ≈ FRR, which is your EER point.



Why Predictions ≠ EER: The line predictions = (probabilities > 0.5).float() is only 
relevant if you want a specific set of predictions using a fixed threshold. However, EER is 
calculated without pre-selecting a threshold, as it considers all possible thresholds to find the balance point.
'''


# Step 3: Match logits with their true labels
y_scores = []
y_true = []
for la_id, score in logits_dict.items():
    if la_id in ground_truth_dict:
        y_scores.append(score)  # Logit score
        y_true.append(ground_truth_dict[la_id])  # Ground truth label

# Convert lists to tensors for computation
y_scores_tensor = torch.tensor(y_scores)
probabilities = torch.sigmoid(y_scores_tensor)  # Convert logits to probabilities
#print(probabilities.sort())




'''
# Calculate predictions from probabilities
predictions = (probabilities > 0.5).float()
preds_list = [int(x) for x in predictions.tolist()]



# Step 4: Calculate EER
def calculate_metrics(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold, fpr, fnr


# Ensure both lists are NumPy arrays for EER calculation
y_true = np.array(y_true)
y_scores = np.array(probabilities.tolist())

# Calculate metrics
eer, eer_threshold, fpr, fnr = calculate_metrics(y_true, y_scores)


# Print results
print(f"EER: {eer}")
print(f"Threshold: {eer_threshold}")
print(f"False Positive Rate: {fpr}")
print(f"False Negative Rate: {fnr}")
'''


# Alternative Step 4: Calculate EER
# Using probabilities and true labels directly in EER calculation
# NOT USING "predictions = (probabilities > 0.5).float()"
def calculate_metrics(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold, fpr, fnr

# Calculate EER directly
eer, eer_threshold, fpr, fnr = calculate_metrics(y_true, probabilities.tolist())
print(f"EER: {eer}")
print(f"EER Threshold: {eer_threshold}")
print(f"False Positive Rate (Min Value): {min(fpr)}")
print(f"False Positive Rate (Max Value): {max(fpr)}")
print(f"False Negative Rate (Min Value): {min(fnr)}")
print(f"False Negative Rate (Max Value): {max(fnr)}")


# Optional: Plot DET curve for visualization

'''
plt.figure()
plt.plot(fpr, fnr, label='DET Curve')
plt.xlabel('False Positive Rate')
plt.ylabel('False Negative Rate')
plt.title('DET Curve')
plt.legend()
plt.show()
'''


EER: 0.12124649018477875
EER Threshold: 0.2603723704814911
False Positive Rate (Min Value): 0.0
False Positive Rate (Max Value): 1.0
False Negative Rate (Min Value): 0.0
False Negative Rate (Max Value): 1.0


"\nplt.figure()\nplt.plot(fpr, fnr, label='DET Curve')\nplt.xlabel('False Positive Rate')\nplt.ylabel('False Negative Rate')\nplt.title('DET Curve')\nplt.legend()\nplt.show()\n"

# Create a function that automates this by caclulating the EER based on 2 file paths (logits and ground_truth)

In [2]:
import numpy as np
import torch
import torch.nn.functional as F
from sklearn.metrics import roc_curve

def calculate_eer(logits_file_path, ground_truth_file_path):
    # Step 1: Parse the logits file
    logits_dict = {}
    with open(logits_file_path, 'r') as file:
        for line in file:
            elements = line.strip().split()
            la_id = elements[0]  # Extract the 'LA_E_num' part (First Column)
            logit_score = float(elements[-1])  # Assuming score is the last element
            logits_dict[la_id] = logit_score

    # Step 2: Parse the ground truth file
    ground_truth_dict = {}
    with open(ground_truth_file_path, 'r') as file:
        for line in file:
            elements = line.strip().split()
            la_id = elements[1]  # Extract the 'LA_E_num' part (Second Column)
            label = elements[-1]  # 'bonafide' or 'spoof'
            ground_truth_dict[la_id] = 1 if label == 'bonafide' else 0

    # Step 3: Match logits with their true labels
    y_scores = []
    y_true = []
    for la_id, score in logits_dict.items():
        if la_id in ground_truth_dict:
            y_scores.append(score)  # Logit score
            y_true.append(ground_truth_dict[la_id])  # Ground truth label

    # Convert lists to tensors for computation
    y_scores_tensor = torch.tensor(y_scores)
    probabilities = torch.sigmoid(y_scores_tensor)  # Convert logits to probabilities

    # Alternative Step 4: Calculate EER
    def calculate_metrics(y_true, y_scores):
        fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
        fnr = 1 - tpr
        eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
        eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
        return eer, eer_threshold, fpr, fnr

    # Calculate EER directly
    eer, eer_threshold, fpr, fnr = calculate_metrics(np.array(y_true), probabilities.tolist())
    
    # Print results
    print(f"EER: {eer}")
    print(f"EER Threshold: {eer_threshold}")
    #print(f"False Positive Rate (Min Value): {min(fpr)}")
    #print(f"False Positive Rate (Max Value): {max(fpr)}")
    #print(f"False Negative Rate (Min Value): {min(fnr)}")
    #print(f"False Negative Rate (Max Value): {max(fnr)}")
    
    #return eer, eer_threshold, fpr, fnr

# Example usage:
logits_file = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\A_ASV_Default_32_Or\baseline_DF\log_eval_score.txt'
ground_truth_file = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'

calculate_eer(logits_file, ground_truth_file)


EER: 0.12124649018477875
EER Threshold: 0.2603723704814911


In [3]:
logits_file = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\A_ASV_Default_32_Or\baseline_DF\log_eval_score.txt'
ground_truth_file = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'

calculate_eer(logits_file, ground_truth_file)


EER: 0.12124649018477875
EER Threshold: 0.2603723704814911


# Create a dictionary containing the ground truth values along the files IDs
Dict_Keys = file_ID
Dict_Values = Logits of that file_ID

In [4]:
ground_truth_path = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'
# LA_0021 LA_E_1000048 - A08 spoof

# Parse the ground truth file
ground_truth_dict = {}
with open(ground_truth_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[1]  # Extract the 'LA_E_num' part (Second Column)
        label = elements[-1]  # 'bonafide' or 'spoof'
        ground_truth_dict[la_id] = 1 if label == 'bonafide' else 0

# Create another dictionary, this time containing the logits per file ID
We want 4 seperate dictionaries:
1. Baseliine Model Using the Original Dataset
2. NCP Special Model Using the Original Dataset
3. Baseliine Model Using the Augmented Dataset
4. NCP Special Model Using the Augmented Dataset

## Original Dataset

In [5]:
# File paths
log_eval_baseline_original_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\A_ASV_Default_32_Or\baseline_DF\log_eval_score.txt'
# LA_E_1000048 -14.588603


# Parse the logits file
logits_origin_baseline_dict = {}
with open(log_eval_baseline_original_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[0]  # Extract the 'LA_E_num' part (First Column)
        logit_score = float(elements[-1])  # Assuming score is the last element
        logits_origin_baseline_dict[la_id] = logit_score

In [6]:
# File paths
log_eval_NCPspecial_original_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\D_SinglePoly_32_ncp_Special_Or_logits\baseline_DF\log_eval_score.txt'
# LA_E_1000048 -14.588603


# Parse the logits file
logits_origin_special_dict = {}
with open(log_eval_NCPspecial_original_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[0]  # Extract the 'LA_E_num' part (First Column)
        logit_score = float(elements[-1])  # Assuming score is the last element
        logits_origin_special_dict[la_id] = logit_score

## Augmented Dataset

In [7]:
# File paths
log_eval_baseline_augmented_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_and_Augmented_Raw_5\A_ASV_Default_32_Augment_5_logits\baseline_DF\log_eval_score.txt'
# LA_E_1000048 -14.588603


# Parse the logits file
logits_augmented_baseline_dict = {}
with open(log_eval_baseline_augmented_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[0]  # Extract the 'LA_E_num' part (First Column)
        logit_score = float(elements[-1])  # Assuming score is the last element
        logits_augmented_baseline_dict [la_id] = logit_score

In [8]:
# File paths
log_eval_NCPspecial_augmented_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_and_Augmented_Raw_5\D_ASV_Special_32_Augment_5_logits\baseline_DF\log_eval_score.txt'
# LA_E_1000048 -14.588603


# Parse the logits file
logits_augmented_special_dict = {}
with open(log_eval_NCPspecial_augmented_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[0]  # Extract the 'LA_E_num' part (First Column)
        logit_score = float(elements[-1])  # Assuming score is the last element
        logits_augmented_special_dict[la_id] = logit_score

# Calculate the EER for Each Model ( Seperately )

In [9]:
print("Original Data /  Baseline Model")
calculate_eer(log_eval_baseline_original_path, ground_truth_file)
print("")
print("Original Data /  NCP Special Model")
calculate_eer(log_eval_NCPspecial_original_path, ground_truth_file)
print("")
print("")
print("")


print("Augmented Data /  Baseline Model")
calculate_eer(log_eval_baseline_augmented_path, ground_truth_file)
print("")
print("Augmented Data /  NCP Special Model")
calculate_eer(log_eval_NCPspecial_augmented_path, ground_truth_file)


Original Data /  Baseline Model
EER: 0.12124649018477875
EER Threshold: 0.2603723704814911

Original Data /  NCP Special Model
EER: 0.14302880194219994
EER Threshold: 0.19067148864269257



Augmented Data /  Baseline Model
EER: 0.13233689321578773
EER Threshold: 0.9178087115287781

Augmented Data /  NCP Special Model
EER: 0.19723628873058108
EER Threshold: 3.499340527923778e-05


# Calculate the EER by applying Logits Fusion per dataset type
Each fusioning must happend on each type of dataset. That means we must NOT fusion the logits of two models where one was trained with the original dataset while the other was trained on the augmented dataset

# Original Dataset (Fusion by Averaging)

In [18]:
import numpy as np
import torch

# Assuming `dict1` and `dict2` are the loaded dictionaries containing logits
dict1 = logits_origin_baseline_dict
dict2 = logits_origin_special_dict


# Ensure both dictionaries have the same keys
if dict1.keys() != dict2.keys():
    raise ValueError("The dictionaries must contain the same IDs.")

# Initialize dictionaries to store fused logits for each method
average_fused_logits_dict = {}
weighted_fused_logits_dict = {}
max_pool_fused_logits_dict = {}
product_fused_logits_dict = {}
harmonic_fused_logits_dict = {}

# Define weights for weighted average fusion (example weights; adjust as needed)
weights = np.array([0.8, 0.2])

for key in dict1.keys():
    logits1 = np.array(dict1[key])
    logits2 = np.array(dict2[key])

    # Check if both logits arrays for the same ID have the same shape
    if logits1.shape != logits2.shape:
        raise ValueError(f"Logits for ID {key} have different shapes.")

    # 1. Average of Logits
    average_fused_logits = np.mean(np.array([logits1, logits2]), axis=0)
    average_fused_logits_dict[key] = average_fused_logits

    # 2. Weighted Average of Logits
    weighted_fused_logits = np.average(np.array([logits1, logits2]), axis=0, weights=weights)
    weighted_fused_logits_dict[key] = weighted_fused_logits

    # 3. Max Pooling
    max_pool_fused_logits = np.maximum(logits1, logits2)
    max_pool_fused_logits_dict[key] = max_pool_fused_logits

    # 4. Product of Logits
    product_fused_logits = np.prod(np.array([logits1, logits2]), axis=0)
    product_fused_logits_dict[key] = product_fused_logits

    # 5. Harmonic Mean
    # Add a small constant (epsilon) to avoid division by zero
    epsilon=1e-10
    harmonic_fused_logits = 2 / (1 / (logits1 + epsilon) + 1 / (logits2 + epsilon))
    harmonic_fused_logits_dict[key] = harmonic_fused_logits

# Convert fused logits to lists for further processing (if needed)
average_fused_logits_list = list(average_fused_logits_dict.values())
weighted_fused_logits_list = list(weighted_fused_logits_dict.values())
max_pool_fused_logits_list = list(max_pool_fused_logits_dict.values())
product_fused_logits_list = list(product_fused_logits_dict.values())
harmonic_fused_logits_list = list(harmonic_fused_logits_dict.values())

# Optional: Display the fused logits
#print("Average Fused Logits:", average_fused_logits_dict)
#print("Weighted Fused Logits:", weighted_fused_logits_dict)
#print("Max Pool Fused Logits:", max_pool_fused_logits_dict)
#print("Product Fused Logits:", product_fused_logits_dict)
#print("Harmonic Fused Logits:", harmonic_fused_logits_dict)

In [19]:
import numpy as np
import torch
from sklearn.metrics import roc_curve
from sklearn.model_selection import KFold

# Assuming the fused logits dictionaries are already defined:
# average_fused_logits_dict
# weighted_fused_logits_dict
# max_pool_fused_logits_dict
# product_fused_logits_dict
# harmonic_fused_logits_dict

# Number of folds for cross-validation
k_folds = 5

# Function to calculate EER and threshold
def calculate_metrics(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold

# Function to calculate EER across k folds for a given fusion dictionary
def calculate_eer_kfold(fused_logits_dict, ground_truth_dict, k_folds=5):
    # Prepare keys for cross-validation
    keys = list(fused_logits_dict.keys())
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    
    eers = []
    thresholds = []

    for train_idx, test_idx in kf.split(keys):
        y_scores = []
        y_true = []
        
        # Split data into training and testing based on the fold
        test_keys = [keys[i] for i in test_idx]
        for la_id in test_keys:
            if la_id in ground_truth_dict:
                y_scores.append(fused_logits_dict[la_id])  # Fused logit score
                y_true.append(ground_truth_dict[la_id])  # Ground truth label

        # Convert lists to tensors for sigmoid and EER computation
        y_scores_tensor = torch.tensor(y_scores)
        probabilities = torch.sigmoid(y_scores_tensor).tolist()  # Convert logits to probabilities

        # Calculate EER and threshold for this fold
        eer, eer_threshold = calculate_metrics(y_true, probabilities)
        eers.append(eer)
        thresholds.append(eer_threshold)

    # Return average EER and threshold across folds
    avg_eer = np.mean(eers)
    avg_threshold = np.mean(thresholds)
    
    return avg_eer, avg_threshold

# Dictionary to store the results for each fusion method
eer_results_kfold = {}

# For each fusion method, calculate the k-fold average EER and threshold
eer_results_kfold['Average'] = calculate_eer_kfold(average_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Weighted Average'] = calculate_eer_kfold(weighted_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Max Pooling'] = calculate_eer_kfold(max_pool_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Product'] = calculate_eer_kfold(product_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Harmonic'] = calculate_eer_kfold(harmonic_fused_logits_dict, ground_truth_dict)

# Print results for each fusion method with k-fold cross-validation
for fusion_method, (avg_eer, avg_threshold) in eer_results_kfold.items():
    print(f"{fusion_method} Fusion with {k_folds}-Fold Cross-Validation:")
    print(f"  Average EER: {avg_eer:.4f}")
    print(f"  Average EER Threshold: {avg_threshold:.4f}")
    print()  # New line for better readability


Average Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1272
  Average EER Threshold: 0.3927

Weighted Average Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1225
  Average EER Threshold: 0.3188

Max Pooling Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1296
  Average EER Threshold: 0.9672

Product Fusion with 5-Fold Cross-Validation:
  Average EER: 0.8802
  Average EER Threshold: 1.0000

Harmonic Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1380
  Average EER Threshold: 0.1172



# Augmented Dataset (Fusion by Averaging)

In [12]:
import numpy as np
import torch

# Assuming `dict1` and `dict2` are the loaded dictionaries containing logits
dict1 = logits_augmented_baseline_dict
dict2 = logits_augmented_special_dict


# Ensure both dictionaries have the same keys
if dict1.keys() != dict2.keys():
    raise ValueError("The dictionaries must contain the same IDs.")

# Initialize dictionaries to store fused logits for each method
average_fused_logits_dict = {}
weighted_fused_logits_dict = {}
max_pool_fused_logits_dict = {}
product_fused_logits_dict = {}
harmonic_fused_logits_dict = {}

# Define weights for weighted average fusion (example weights; adjust as needed)
weights = np.array([0.8, 0.2])

for key in dict1.keys():
    logits1 = np.array(dict1[key])
    logits2 = np.array(dict2[key])

    # Check if both logits arrays for the same ID have the same shape
    if logits1.shape != logits2.shape:
        raise ValueError(f"Logits for ID {key} have different shapes.")

    # 1. Average of Logits
    average_fused_logits = np.mean(np.array([logits1, logits2]), axis=0)
    average_fused_logits_dict[key] = average_fused_logits

    # 2. Weighted Average of Logits
    weighted_fused_logits = np.average(np.array([logits1, logits2]), axis=0, weights=weights)
    weighted_fused_logits_dict[key] = weighted_fused_logits

    # 3. Max Pooling
    max_pool_fused_logits = np.maximum(logits1, logits2)
    max_pool_fused_logits_dict[key] = max_pool_fused_logits

    # 4. Product of Logits
    product_fused_logits = np.prod(np.array([logits1, logits2]), axis=0)
    product_fused_logits_dict[key] = product_fused_logits

    # 5. Harmonic Mean
    # Add a small constant (epsilon) to avoid division by zero
    epsilon=1e-10
    harmonic_fused_logits = 2 / (1 / (logits1 + epsilon) + 1 / (logits2 + epsilon))
    harmonic_fused_logits_dict[key] = harmonic_fused_logits

# Convert fused logits to lists for further processing (if needed)
average_fused_logits_list = list(average_fused_logits_dict.values())
weighted_fused_logits_list = list(weighted_fused_logits_dict.values())
max_pool_fused_logits_list = list(max_pool_fused_logits_dict.values())
product_fused_logits_list = list(product_fused_logits_dict.values())
harmonic_fused_logits_list = list(harmonic_fused_logits_dict.values())

# Optional: Display the fused logits
#print("Average Fused Logits:", average_fused_logits_dict)
#print("Weighted Fused Logits:", weighted_fused_logits_dict)
#print("Max Pool Fused Logits:", max_pool_fused_logits_dict)
#print("Product Fused Logits:", product_fused_logits_dict)
#print("Harmonic Fused Logits:", harmonic_fused_logits_dict)

In [13]:
import numpy as np
import torch
from sklearn.metrics import roc_curve
from sklearn.model_selection import KFold

# Assuming the fused logits dictionaries are already defined:
# average_fused_logits_dict
# weighted_fused_logits_dict
# max_pool_fused_logits_dict
# product_fused_logits_dict
# harmonic_fused_logits_dict

# Number of folds for cross-validation
k_folds = 5

# Function to calculate EER and threshold
def calculate_metrics(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold

# Function to calculate EER across k folds for a given fusion dictionary
def calculate_eer_kfold(fused_logits_dict, ground_truth_dict, k_folds=5):
    # Prepare keys for cross-validation
    keys = list(fused_logits_dict.keys())
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    
    eers = []
    thresholds = []

    for train_idx, test_idx in kf.split(keys):
        y_scores = []
        y_true = []
        
        # Split data into training and testing based on the fold
        test_keys = [keys[i] for i in test_idx]
        for la_id in test_keys:
            if la_id in ground_truth_dict:
                y_scores.append(fused_logits_dict[la_id])  # Fused logit score
                y_true.append(ground_truth_dict[la_id])  # Ground truth label

        # Convert lists to tensors for sigmoid and EER computation
        y_scores_tensor = torch.tensor(y_scores)
        probabilities = torch.sigmoid(y_scores_tensor).tolist()  # Convert logits to probabilities

        # Calculate EER and threshold for this fold
        eer, eer_threshold = calculate_metrics(y_true, probabilities)
        eers.append(eer)
        thresholds.append(eer_threshold)

    # Return average EER and threshold across folds
    avg_eer = np.mean(eers)
    avg_threshold = np.mean(thresholds)
    
    return avg_eer, avg_threshold

# Dictionary to store the results for each fusion method
eer_results_kfold = {}

# For each fusion method, calculate the k-fold average EER and threshold
eer_results_kfold['Average'] = calculate_eer_kfold(average_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Weighted Average'] = calculate_eer_kfold(weighted_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Max Pooling'] = calculate_eer_kfold(max_pool_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Product'] = calculate_eer_kfold(product_fused_logits_dict, ground_truth_dict)
eer_results_kfold['Harmonic'] = calculate_eer_kfold(harmonic_fused_logits_dict, ground_truth_dict)

# Print results for each fusion method with k-fold cross-validation
for fusion_method, (avg_eer, avg_threshold) in eer_results_kfold.items():
    print(f"{fusion_method} Fusion with {k_folds}-Fold Cross-Validation:")
    print(f"  Average EER: {avg_eer:.4f}")
    print(f"  Average EER Threshold: {avg_threshold:.4f}")
    print()  # New line for better readability


Average Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1624
  Average EER Threshold: 0.0187

Weighted Average Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1332
  Average EER Threshold: 0.8563

Max Pooling Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1379
  Average EER Threshold: 0.9932

Product Fusion with 5-Fold Cross-Validation:
  Average EER: 0.8738
  Average EER Threshold: 1.0000

Harmonic Fusion with 5-Fold Cross-Validation:
  Average EER: 0.1568
  Average EER Threshold: 0.0590



## Create light NN for model fusioning AND Ensemble Models (gradient boosting or random forests)

### Original Dataset (Light NN)

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from sklearn.metrics import roc_curve
from sklearn.model_selection import KFold

# Function to load logits into dictionaries
def load_logits(file_path):
    logits_dict = {}
    with open(file_path, 'r') as file:
        for line in file:
            elements = line.strip().split()
            la_id = elements[0]
            logit_score = float(elements[-1])
            logits_dict[la_id] = logit_score
    return logits_dict

# Load logits data
logits1_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\A_ASV_Default_32_Or\baseline_DF\log_eval_score.txt'
logits2_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\D_SinglePoly_32_ncp_Special_Or_logits\baseline_DF\log_eval_score.txt'
ground_truth_path = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'

logits1_dict = load_logits(logits1_path)
logits2_dict = load_logits(logits2_path)

# Load ground truth labels
ground_truth_dict = {}
with open(ground_truth_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[1]
        label = elements[-1]
        ground_truth_dict[la_id] = 1 if label == 'bonafide' else 0

# Prepare paired logits and labels
paired_logits = []
paired_labels = []
for la_id in ground_truth_dict:
    if la_id in logits1_dict and la_id in logits2_dict:
        paired_logits.append([logits1_dict[la_id], logits2_dict[la_id]])
        paired_labels.append(ground_truth_dict[la_id])

# Convert to tensors
inputs = torch.tensor(paired_logits, dtype=torch.float32)
labels = torch.tensor(paired_labels, dtype=torch.float32)

# Define a simple neural network model
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(2, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 16)
        self.fc4 = nn.Linear(16, 1)
        self.dropout = nn.Dropout(0.3)
        self.batch_norm = nn.BatchNorm1d(64)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.batch_norm(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = torch.sigmoid(self.fc4(x))
        return x

# Function to calculate EER
def calculate_eer(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold

# K-Fold Cross-Validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

# Initialize arrays to store EERs and thresholds for each fold
eers = []
thresholds = []

# Loop over each fold
for train_index, test_index in kf.split(inputs):
    # Split the data into training and test sets
    X_train, X_test = inputs[train_index], inputs[test_index]
    y_train, y_test = labels[train_index], labels[test_index]
    
    # Initialize model, loss function, and optimizer
    model = SimpleNN()
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.003)
    
    # Training loop
    epochs = 100
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        outputs = model(X_train).squeeze()
        loss = criterion(outputs, y_train)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
    

    ##############################
    # Evaluation on the test set #
    ##############################
    model.eval()
    with torch.no_grad():
        probabilities = model(X_test).squeeze()
    
    # Calculate EER and threshold for this fold
    eer, eer_threshold = calculate_eer(y_test.numpy(), probabilities.numpy())
    eers.append(eer)
    thresholds.append(eer_threshold)

# Calculate average EER and threshold across all folds
avg_eer = np.mean(eers)
avg_threshold = np.mean(thresholds)

# Print the results
print(f'Average EER over {k_folds} folds: {avg_eer:.4f}')
print(f'Average EER Threshold over {k_folds} folds: {avg_threshold:.4f}')

Average EER over 5 folds: 0.1225
Average EER Threshold over 5 folds: 0.0483


### Original Dataset (Gradient Boosting & Random Forests)

In [15]:
import numpy as np
import torch
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import roc_curve, auc

# File paths
logits1_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\A_ASV_Default_32_Or\baseline_DF\log_eval_score.txt'
logits2_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_Only\D_SinglePoly_32_ncp_Special_Or_logits\baseline_DF\log_eval_score.txt'
ground_truth_path = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'


# Function to load logits
def load_logits(file_path):
    logits_dict = {}
    with open(file_path, 'r') as file:
        for line in file:
            elements = line.strip().split()
            la_id = elements[0]
            logit_score = float(elements[-1])
            logits_dict[la_id] = logit_score
    return logits_dict

# Load logits and ground truth labels
logits1_dict = load_logits(logits1_path)
logits2_dict = load_logits(logits2_path)
ground_truth_dict = {}
with open(ground_truth_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[1]
        label = elements[-1]
        ground_truth_dict[la_id] = 1 if label == 'bonafide' else 0

# Prepare paired logits and labels
paired_logits = []
paired_labels = []
for la_id in ground_truth_dict:
    if la_id in logits1_dict and la_id in logits2_dict:
        paired_logits.append([logits1_dict[la_id], logits2_dict[la_id]])
        paired_labels.append(ground_truth_dict[la_id])

# Convert to numpy arrays for sklearn
inputs = np.array(paired_logits)
labels = np.array(paired_labels)

# Define classifiers
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
gb_classifier = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42)

# EER calculation function
def calculate_eer(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold

# K-Fold cross-validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

rf_eers = []
rf_thresholds = []
gb_eers = []
gb_thresholds = []

for train_index, test_index in kf.split(inputs):
    # Split data
    X_train, X_test = inputs[train_index], inputs[test_index]
    y_train, y_test = labels[train_index], labels[test_index]
    
    # Train classifiers
    rf_classifier.fit(X_train, y_train)
    gb_classifier.fit(X_train, y_train)
    
    # Get probabilities for the test set
    rf_probs = rf_classifier.predict_proba(X_test)[:, 1]
    gb_probs = gb_classifier.predict_proba(X_test)[:, 1]
    
    # Calculate EER and threshold for each classifier on the test set
    rf_eer, rf_threshold = calculate_eer(y_test, rf_probs)
    gb_eer, gb_threshold = calculate_eer(y_test, gb_probs)
    
    # Store EERs and thresholds for this fold
    rf_eers.append(rf_eer)
    rf_thresholds.append(rf_threshold)
    gb_eers.append(gb_eer)
    gb_thresholds.append(gb_threshold)

# Calculate average EER and threshold across all folds
avg_rf_eer = np.mean(rf_eers)
avg_rf_threshold = np.mean(rf_thresholds)
avg_gb_eer = np.mean(gb_eers)
avg_gb_threshold = np.mean(gb_thresholds)

# Print results
print(f'Average Random Forest EER over {k_folds} folds: {avg_rf_eer:.4f}')
print(f'Average Random Forest Threshold over {k_folds} folds: {avg_rf_threshold:.4f}')
print(f'Average Gradient Boosting EER over {k_folds} folds: {avg_gb_eer:.4f}')
print(f'Average Gradient Boosting Threshold over {k_folds} folds: {avg_gb_threshold:.4f}')


Average Random Forest EER over 5 folds: 0.1447
Average Random Forest Threshold over 5 folds: 0.0780
Average Gradient Boosting EER over 5 folds: 0.1211
Average Gradient Boosting Threshold over 5 folds: 0.0407


### Augmented Dataset (Light NN)

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from sklearn.metrics import roc_curve
from sklearn.model_selection import KFold

# Function to load logits into dictionaries
def load_logits(file_path):
    logits_dict = {}
    with open(file_path, 'r') as file:
        for line in file:
            elements = line.strip().split()
            la_id = elements[0]
            logit_score = float(elements[-1])
            logits_dict[la_id] = logit_score
    return logits_dict


# Load logits data into dictionaries (using placeholder paths for demonstration)
logits1_path =  r'C:\Users\User\Desktop\Final_ASV_Results\Original_and_Augmented_Raw_5\A_ASV_Default_32_Augment_5_logits\baseline_DF\log_eval_score.txt'
logits2_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_and_Augmented_Raw_5\D_ASV_Special_32_Augment_5_logits\baseline_DF\log_eval_score.txt'
ground_truth_path = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'

logits1_dict = load_logits(logits1_path)
logits2_dict = load_logits(logits2_path)

# Load ground truth labels
ground_truth_dict = {}
with open(ground_truth_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[1]
        label = elements[-1]
        ground_truth_dict[la_id] = 1 if label == 'bonafide' else 0

# Prepare paired logits and labels
paired_logits = []
paired_labels = []
for la_id in ground_truth_dict:
    if la_id in logits1_dict and la_id in logits2_dict:
        paired_logits.append([logits1_dict[la_id], logits2_dict[la_id]])
        paired_labels.append(ground_truth_dict[la_id])

# Convert to tensors
inputs = torch.tensor(paired_logits, dtype=torch.float32)
labels = torch.tensor(paired_labels, dtype=torch.float32)

# Define a simple neural network model
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(2, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 16)
        self.fc4 = nn.Linear(16, 1)
        self.dropout = nn.Dropout(0.3)
        self.batch_norm = nn.BatchNorm1d(64)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.batch_norm(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = torch.sigmoid(self.fc4(x))
        return x

# Function to calculate EER
def calculate_eer(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold

# K-Fold Cross-Validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

# Initialize arrays to store EERs and thresholds for each fold
eers = []
thresholds = []

# Loop over each fold
for train_index, test_index in kf.split(inputs):
    # Split the data into training and test sets
    X_train, X_test = inputs[train_index], inputs[test_index]
    y_train, y_test = labels[train_index], labels[test_index]
    
    # Initialize model, loss function, and optimizer
    model = SimpleNN()
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.003)
    
    # Training loop
    epochs = 100
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        outputs = model(X_train).squeeze()
        loss = criterion(outputs, y_train)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        #if (epoch + 1) % 10 == 0:
        #    print(f'Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}')
    
    # Evaluation on the test set
    model.eval()
    with torch.no_grad():
        probabilities = model(X_test).squeeze()
    
    # Calculate EER and threshold for this fold
    eer, eer_threshold = calculate_eer(y_test.numpy(), probabilities.numpy())
    eers.append(eer)
    thresholds.append(eer_threshold)

# Calculate average EER and threshold across all folds
avg_eer = np.mean(eers)
avg_threshold = np.mean(thresholds)

# Print the results
print(f'Average EER over {k_folds} folds: {avg_eer:.4f}')
print(f'Average EER Threshold over {k_folds} folds: {avg_threshold:.4f}')


Average EER over 5 folds: 0.1328
Average EER Threshold over 5 folds: 0.0871


### Augmented Dataset (Gradient Boosting & Random Forests)

In [17]:
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import roc_curve
from sklearn.model_selection import KFold

# File paths
logits1_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_and_Augmented_Raw_5\A_ASV_Default_32_Augment_5_logits\baseline_DF\log_eval_score.txt'
logits2_path = r'C:\Users\User\Desktop\Final_ASV_Results\Original_and_Augmented_Raw_5\D_ASV_Special_32_Augment_5_logits\baseline_DF\log_eval_score.txt'
ground_truth_path = r'C:\Users\User\Desktop\Processed_ASV_Data\Sorted_Metadata\eval_meta_sort.txt'

# Function to load logits
def load_logits(file_path):
    logits_dict = {}
    with open(file_path, 'r') as file:
        for line in file:
            elements = line.strip().split()
            la_id = elements[0]
            logit_score = float(elements[-1])
            logits_dict[la_id] = logit_score
    return logits_dict

# Load logits and ground truth labels
logits1_dict = load_logits(logits1_path)
logits2_dict = load_logits(logits2_path)
ground_truth_dict = {}
with open(ground_truth_path, 'r') as file:
    for line in file:
        elements = line.strip().split()
        la_id = elements[1]
        label = elements[-1]
        ground_truth_dict[la_id] = 1 if label == 'bonafide' else 0

# Prepare paired logits and labels
paired_logits = []
paired_labels = []
for la_id in ground_truth_dict:
    if la_id in logits1_dict and la_id in logits2_dict:
        paired_logits.append([logits1_dict[la_id], logits2_dict[la_id]])
        paired_labels.append(ground_truth_dict[la_id])

# Convert to numpy arrays for sklearn
inputs = np.array(paired_logits)
labels = np.array(paired_labels)

# Define classifiers
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
gb_classifier = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42)

# EER calculation function
def calculate_eer(y_true, y_scores):
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.abs(fnr - fpr))]
    eer = fpr[np.nanargmin(np.abs(fnr - fpr))]
    return eer, eer_threshold

# K-Fold cross-validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

rf_eers = []
rf_thresholds = []
gb_eers = []
gb_thresholds = []

for train_index, test_index in kf.split(inputs):
    # Split data
    X_train, X_test = inputs[train_index], inputs[test_index]
    y_train, y_test = labels[train_index], labels[test_index]
    
    # Train classifiers
    rf_classifier.fit(X_train, y_train)
    gb_classifier.fit(X_train, y_train)
    
    # Get probabilities for the test set
    rf_probs = rf_classifier.predict_proba(X_test)[:, 1]
    gb_probs = gb_classifier.predict_proba(X_test)[:, 1]
    
    # Calculate EER and threshold for each classifier on the test set
    rf_eer, rf_threshold = calculate_eer(y_test, rf_probs)
    gb_eer, gb_threshold = calculate_eer(y_test, gb_probs)
    
    # Store EERs and thresholds for this fold
    rf_eers.append(rf_eer)
    rf_thresholds.append(rf_threshold)
    gb_eers.append(gb_eer)
    gb_thresholds.append(gb_threshold)

# Calculate average EER and threshold across all folds
avg_rf_eer = np.mean(rf_eers)
avg_rf_threshold = np.mean(rf_thresholds)
avg_gb_eer = np.mean(gb_eers)
avg_gb_threshold = np.mean(gb_thresholds)

# Print results
print(f'Average Random Forest EER over {k_folds} folds: {avg_rf_eer:.4f}')
print(f'Average Random Forest Threshold over {k_folds} folds: {avg_rf_threshold:.4f}')
print(f'Average Gradient Boosting EER over {k_folds} folds: {avg_gb_eer:.4f}')
print(f'Average Gradient Boosting Threshold over {k_folds} folds: {avg_gb_threshold:.4f}')


Average Random Forest EER over 5 folds: 0.1605
Average Random Forest Threshold over 5 folds: 0.0840
Average Gradient Boosting EER over 5 folds: 0.1317
Average Gradient Boosting Threshold over 5 folds: 0.0846
