Dataset Extraction

In [143]:
!rm -r /content/dataset
!unzip dataset.zip
!rm -r __MACOSX

Archive:  dataset.zip
   creating: dataset/
  inflating: __MACOSX/._dataset      
  inflating: dataset/.DS_Store       
  inflating: __MACOSX/dataset/._.DS_Store  
   creating: dataset/others/
   creating: dataset/user_0/
  inflating: dataset/others/yes.0a196374_nohash_1.wav  
  inflating: __MACOSX/dataset/others/._yes.0a196374_nohash_1.wav  
  inflating: dataset/others/backward.0b7ee1a0_nohash_0.wav  
  inflating: __MACOSX/dataset/others/._backward.0b7ee1a0_nohash_0.wav  
  inflating: dataset/others/cat.0e6e36c9_nohash_0.wav  
  inflating: __MACOSX/dataset/others/._cat.0e6e36c9_nohash_0.wav  
  inflating: dataset/others/down.0a2b400e_nohash_3.wav  
  inflating: __MACOSX/dataset/others/._down.0a2b400e_nohash_3.wav  
  inflating: dataset/others/seven.0a0b46ae_nohash_0.wav  
  inflating: __MACOSX/dataset/others/._seven.0a0b46ae_nohash_0.wav  
  inflating: dataset/others/stop.0a196374_nohash_2.wav  
  inflating: __MACOSX/dataset/others/._stop.0a196374_nohash_2.wav  
  inflating: dataset/o

Dividing the files in the desired folders, that will then be processed in .npy files

Division and organization:

In [144]:
import os
import shutil
import random
from tqdm import tqdm

def organize_wav_files(target_source_dir, other_source_dir, output_dir, target_user_id=0):
    """
    Organizes wav files into structured folders for speaker verification.

    Args:
        target_source_dir: Directory containing target speaker's wav files (user_0/)
        other_source_dir: Directory containing other speakers' wav files (other_users/)
        output_dir: Output directory for organized files
        target_user_id: Numeric ID for target speaker
    """
    os.makedirs(output_dir, exist_ok=True)

    # Directory structure
    subfolders = {
        "validation": (60, 120),    # (target_samples, non_target_samples)
        "testing": (60, 120),
        "train_1": (1, 0),
        "train_8": (8, 0),
        "train_16": (16, 0),
        "train_64": (64, 0)
    }

    # Create all folders
    for folder in subfolders:
        os.makedirs(os.path.join(output_dir, folder), exist_ok=True)

    # Load target speaker files
    target_files = [f for f in os.listdir(target_source_dir)
                   if f.endswith('.wav') and os.path.isfile(os.path.join(target_source_dir, f))]
    random.shuffle(target_files)

    # Load non-target speaker files
    other_files = []
    for root, _, files in os.walk(other_source_dir):
        for file in files:
            if file.endswith('.wav'):
                other_files.append(os.path.join(root, file))
    random.shuffle(other_files)

    # Verify we have enough files
    required_target = sum(v[0] for v in subfolders.values())
    required_other = sum(v[1] for v in subfolders.values())

    if len(target_files) < required_target:
        raise ValueError(f"Need {required_target} target files, found {len(target_files)}")
    if len(other_files) < required_other:
        raise ValueError(f"Need {required_other} non-target files, found {len(other_files)}")

    # Organize files
    target_idx = 0
    other_idx = 0

    for folder, (target_count, other_count) in subfolders.items():
        # Move target speaker files
        for i in range(target_count):
            if target_idx >= len(target_files):
                break
            src = os.path.join(target_source_dir, target_files[target_idx])
            dst = os.path.join(output_dir, folder, f"target_{target_files[target_idx]}")
            shutil.copy(src, dst)
            target_idx += 1

        # Move non-target speaker files
        for i in range(other_count):
            if other_idx >= len(other_files):
                break
            src = other_files[other_idx]
            dst = os.path.join(output_dir, folder, f"other_{os.path.basename(src)}")
            shutil.copy(src, dst)
            other_idx += 1

    print("Organization complete.")
    print(f"Used {target_idx} target files and {other_idx} non-target files")
    print(f"Remaining target files: {len(target_files) - target_idx}")
    print(f"Remaining non-target files: {len(other_files) - other_idx}")

if __name__ == "__main__":
    # Configure these paths
    target_speaker_dir = "/content/dataset/user_0/"  # Contains only target speaker
    other_speakers_dir = "/content/dataset/others/"  # Contains other speakers
    output_directory = "/content/dataset/user_0_organized/"

    organize_wav_files(
        target_source_dir=target_speaker_dir,
        other_source_dir=other_speakers_dir,
        output_dir=output_directory,
        target_user_id=0
    )
    !rm -r /content/dataset/user_0
    !rm -r /content/dataset/others

Organization complete.
Used 209 target files and 240 non-target files
Remaining target files: 27
Remaining non-target files: 248


MFE block conversion and .npz file generation

In [145]:
import os
import shutil
import numpy as np
import librosa
from tqdm import tqdm

SAMPLE_RATE = 16000
FRAME_SIZE = 512
HOP_LENGTH = 384
N_FFT = 512
N_MELS = 40
PRE_EMPHASIS_COEFF = 0.96785

def remove_all_folders_except(parent_dir, folder_to_keep):
    keep_path = os.path.join(parent_dir, folder_to_keep)
    if not os.path.exists(keep_path):
        print(f"Warning: '{folder_to_keep}' doesn't exist in {parent_dir}")
        return

    for item in os.listdir(parent_dir):
        item_path = os.path.join(parent_dir, item)
        if os.path.isdir(item_path) and item != folder_to_keep:
            print(f"Removing: {item_path}")
            try:
                shutil.rmtree(item_path)
            except Exception as e:
                print(f"Failed to remove {item_path}: {e}")

def apply_pre_emphasis(y, coeff=PRE_EMPHASIS_COEFF):
    emphasized = np.zeros_like(y, dtype=np.float32)
    emphasized[0] = y[0]
    for i in range(1, len(y)):
        emphasized[i] = y[i] - coeff * y[i-1]
    return emphasized

def extract_mfe(waveform):
    emphasized = apply_pre_emphasis(waveform)

    stft = librosa.stft(
        emphasized,
        n_fft=N_FFT,
        hop_length=HOP_LENGTH,
        win_length=FRAME_SIZE,
        window='hamming'
    )
    spectrogram = np.abs(stft)
    spectrogram = 10 * np.log10(spectrogram**2 + 1e-20)
    spectrogram = np.maximum(spectrogram, -50)
    spectrogram = (spectrogram + 50) / 62
    mel_basis = librosa.filters.mel(
        sr=SAMPLE_RATE,
        n_fft=N_FFT,
        n_mels=N_MELS,
        fmin=0,
        fmax=8000
    )
    mfe = np.dot(mel_basis, spectrogram)
    mfe = mfe.T
    mfe = mfe[:40, :]

    return mfe

def process_folder_to_npz(folder_path, output_npz_path, target_user_id):
    features = []
    labels = []
    filenames = []

    wav_files = [f for f in os.listdir(folder_path) if f.endswith('.wav')]

    for wav_file in tqdm(wav_files, desc=f"Processing {os.path.basename(folder_path)}"):
        audio_path = os.path.join(folder_path, wav_file)
        try:
            y, sr = librosa.load(audio_path, sr=SAMPLE_RATE)
        except:
            print(f"Skipping corrupted file: {audio_path}")
            continue

        if len(y) != SAMPLE_RATE:
            print(f"Warning: {wav_file} has {len(y)/SAMPLE_RATE:.2f} seconds (expected 1.0)")
            continue

        mfe = extract_mfe(y)
        mfe = mfe[..., np.newaxis]
        features.append(mfe)

        if wav_file.startswith('target_'):
            labels.append(target_user_id)
        else:
            labels.append(-1)

        filenames.append(wav_file)

    features_array = np.array(features, dtype=np.float32)
    labels_array = np.array(labels, dtype=np.int32)

    np.savez_compressed(
        output_npz_path,
        features=features_array,
        filenames=np.array(filenames),
        labels=labels_array
    )
    print(f"Saved {len(features)} segments to {output_npz_path}")
    print(f"Class distribution: Target={np.sum(labels_array == target_user_id)}, Non-target={np.sum(labels_array != target_user_id)}")

def process_all_folders(base_dir, target_user_id=0):
    """Process all subfolders in the organized directory"""
    subfolders = [
        "validation",
        "testing",
        "train_1",
        "train_8",
        "train_16",
        "train_64"
    ]

    output_dir = os.path.join(base_dir, "npz_features")
    os.makedirs(output_dir, exist_ok=True)

    for folder in subfolders:
        folder_path = os.path.join(base_dir, folder)
        if os.path.exists(folder_path):
            if "train" in folder:
                output_filename = f"{folder}_{target_user_id}_features.npz"
            else:
                output_filename = f"{folder}_features.npz"

            output_path = os.path.join(output_dir, output_filename)
            process_folder_to_npz(folder_path, output_path, target_user_id)

if __name__ == "__main__":
    organized_dir = "/content/dataset/user_0_organized"
    target_speaker_id = 0

    print("Verifying folder structure...")
    for folder in ["validation", "testing", "train_1", "train_8", "train_16", "train_64"]:
        path = os.path.join(organized_dir, folder)
        if os.path.exists(path):
            files = [f for f in os.listdir(path) if f.endswith('.wav')]
            print(f"{folder}: {len(files)} files")

    process_all_folders(organized_dir, target_user_id=target_speaker_id)
    remove_all_folders_except(parent_dir=organized_dir, folder_to_keep="npz_features")

Verifying folder structure...
validation: 180 files
testing: 180 files
train_1: 1 files
train_8: 8 files
train_16: 16 files
train_64: 64 files


Processing validation:   6%|▌         | 10/180 [00:00<00:01, 95.78it/s]



Processing validation:  12%|█▏        | 21/180 [00:00<00:01, 100.87it/s]



Processing validation:  18%|█▊        | 33/180 [00:00<00:01, 107.21it/s]



Processing validation:  37%|███▋      | 66/180 [00:00<00:01, 104.78it/s]



Processing validation:  63%|██████▎   | 114/180 [00:01<00:00, 108.68it/s]



Processing validation:  82%|████████▏ | 147/180 [00:01<00:00, 105.16it/s]



Processing validation: 100%|██████████| 180/180 [00:01<00:00, 105.95it/s]


Saved 162 segments to /content/dataset/user_0_organized/npz_features/validation_features.npz
Class distribution: Target=58, Non-target=104


Processing testing:  18%|█▊        | 33/180 [00:00<00:01, 108.48it/s]



Processing testing:  31%|███       | 56/180 [00:00<00:01, 111.56it/s]



Processing testing:  54%|█████▍    | 97/180 [00:00<00:00, 114.95it/s]



Processing testing:  67%|██████▋   | 121/180 [00:01<00:00, 108.22it/s]



Processing testing:  81%|████████  | 146/180 [00:01<00:00, 113.90it/s]



Processing testing:  94%|█████████▍| 170/180 [00:01<00:00, 112.38it/s]



Processing testing: 100%|██████████| 180/180 [00:01<00:00, 112.08it/s]


Saved 157 segments to /content/dataset/user_0_organized/npz_features/testing_features.npz
Class distribution: Target=60, Non-target=97


Processing train_1: 100%|██████████| 1/1 [00:00<00:00, 78.44it/s]


Saved 1 segments to /content/dataset/user_0_organized/npz_features/train_1_0_features.npz
Class distribution: Target=1, Non-target=0


Processing train_8: 100%|██████████| 8/8 [00:00<00:00, 63.56it/s]


Saved 8 segments to /content/dataset/user_0_organized/npz_features/train_8_0_features.npz
Class distribution: Target=8, Non-target=0


Processing train_16: 100%|██████████| 16/16 [00:00<00:00, 56.30it/s]


Saved 16 segments to /content/dataset/user_0_organized/npz_features/train_16_0_features.npz
Class distribution: Target=16, Non-target=0


Processing train_64: 100%|██████████| 64/64 [00:01<00:00, 55.67it/s]


Saved 64 segments to /content/dataset/user_0_organized/npz_features/train_64_0_features.npz
Class distribution: Target=64, Non-target=0
Removing: /content/dataset/user_0_organized/testing
Removing: /content/dataset/user_0_organized/train_8
Removing: /content/dataset/user_0_organized/validation
Removing: /content/dataset/user_0_organized/train_1
Removing: /content/dataset/user_0_organized/train_64
Removing: /content/dataset/user_0_organized/train_16


Code for verifying the structure of .npz files

In [146]:
import numpy as np
import os
import matplotlib.pyplot as plt

def verify_npz_files(directory="/content/dataset/user_0_organized/npz_features"):
    """Verify contents of .npz files in a directory."""
    npz_files = [f for f in os.listdir(directory) if f.endswith('.npz')]

    if not npz_files:
        print(f"No .npz files found in {directory}!")
        return

    print(f"Found {len(npz_files)} .npz files in {directory}:\n")

    for file in sorted(npz_files):
        filepath = os.path.join(directory, file)
        data = np.load(filepath)

        print(f"File: {file}")
        print(f"Number of arrays stored: {len(data.files)}\n")

        for array_name in data.files:
            array_data = data[array_name]

            print(f"  Array: '{array_name}'")
            if(array_name=='features'):
              print(f"    - Shape: {array_data.shape}")
              print(f"    - Dtype: {array_data.dtype}")
              print(f"    - Min: {np.min(array_data):.4f}, Max: {np.max(array_data):.4f}, Mean: {np.mean(array_data):.4f}")
              print(f"    - Size: {array_data.size} elements\n")

        print("-" * 50 + "\n")

if __name__ == "__main__":
    verify_npz_files()

Found 6 .npz files in /content/dataset/user_0_organized/npz_features:

File: testing_features.npz
Number of arrays stored: 3

  Array: 'features'
    - Shape: (157, 40, 40, 1)
    - Dtype: float32
    - Min: 0.0000, Max: 0.0388, Mean: 0.0076
    - Size: 251200 elements

  Array: 'filenames'
  Array: 'labels'
--------------------------------------------------

File: train_16_0_features.npz
Number of arrays stored: 3

  Array: 'features'
    - Shape: (16, 40, 40, 1)
    - Dtype: float32
    - Min: 0.0000, Max: 0.0358, Mean: 0.0085
    - Size: 25600 elements

  Array: 'filenames'
  Array: 'labels'
--------------------------------------------------

File: train_1_0_features.npz
Number of arrays stored: 3

  Array: 'features'
    - Shape: (1, 40, 40, 1)
    - Dtype: float32
    - Min: 0.0000, Max: 0.0308, Mean: 0.0095
    - Size: 1600 elements

  Array: 'filenames'
  Array: 'labels'
--------------------------------------------------

File: train_64_0_features.npz
Number of arrays stored: 3


Bestmatching

In [162]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import tensorflow as tf
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import math

import sys

import random
import pandas as pd
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix

from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

import shutil
tfk = tf.keras
tfkl = tf.keras.layers

print(tf.__version__)

seed = 22

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

dv_model=None

def cosine_similarity(vec1, vec2):
  dot_product = np.dot(vec1, vec2)
  norm_vec1 = np.linalg.norm(vec1)
  norm_vec2 = np.linalg.norm(vec2)
  return dot_product / (norm_vec1 * norm_vec2)

def compute_similarity(input_vector, d_vectors):
  similarities = []

  for dv in d_vectors:
    similarity = cosine_similarity(input_vector, dv)
    similarities.append(similarity)

  return max(similarities)

def predictDVector(d_vectors,authlabel,input_data, input_labels, threshold, verbose=True):
  input_vectors = dv_model.predict(input_data)
  total = len(input_vectors)
  total_auth = 0
  total_denied = 0

  for i in range(len(input_labels)):
    if(input_labels[i]!=auth_class):
      total_denied = total_denied+1
    else:
      total_auth = total_auth + 1

  correct_auth=0
  correct_denied=0

  for i in range(len(input_vectors)):
    similarity=compute_similarity(input_vectors[i], d_vectors)
    result = " -- ERROR!"
    if(similarity>threshold and input_labels[i] == authlabel):
      correct_auth = correct_auth + 1
      result = ""
    if(similarity<=threshold and input_labels[i] != authlabel):
      correct_denied = correct_denied + 1
      result = ""
    if(verbose):
      print("similarity: " + str(similarity) + " --- Class: " + str(input_labels[i]) + " " + result)
  correct = correct_auth + correct_denied

  print('-----------------------')
  print(" --- Testing Results ---")
  true_positive = correct_auth
  false_positive = total_denied - correct_denied
  false_negative = total_auth - correct_auth
  prec = true_positive / (true_positive + false_positive)
  recall = true_positive / (true_positive + false_negative)

  print("True Positive Rate: " + str(correct_auth) + "/" + str(total_auth) + " (" + str(correct_auth*100/total_auth) + "%)")
  print("False Positive Rate: " + str(false_positive) + "/" + str(total_denied) + " (" + str((false_positive)*100/total_denied) + "%)")
  print("Precision: " + str(prec))
  print("Recall: " + str(recall))
  print('******************')
  print("Total correct " + str(correct) + "/" + str(total))
  acc = correct/total
  f1score = 2*prec*recall/(prec+recall)
  print("Accuracy on this dataset: " + str(acc))
  print("F1-Score on this dataset: " + str(f1score))

  return acc, f1score

def provide_predictions(d_vectors, input_data):
  y_predictions_prob = np.zeros((len(input_data), 1))
  input_vectors = dv_model.predict(input_data)
  for i in range(len(input_vectors)):
    similarity=compute_similarity(input_vectors[i], d_vectors)
    y_predictions_prob[i] = similarity
  return y_predictions_prob

def evaluate_model(auth_class, train_size):
  print("Testing with speaker id: " + str(auth_class) + " and train size: " + str(train_size))

  train_dir = f"dataset/user_0_organized/npz_features/train_{train_size}_{auth_class}_features.npz"
  training_npz = np.load(train_dir)
  x_train = training_npz['features']

  val_dir = "dataset/user_0_organized/npz_features/validation_features.npz"
  validation_npz = np.load(val_dir)
  x_val, y_val = validation_npz['features'], validation_npz['labels']

  print("Validation class distribution:", np.unique(y_val, return_counts=True))

  testing_dir = "dataset/user_0_organized/npz_features/testing_features.npz"
  testing_npz = np.load(testing_dir)
  x_test, y_test = testing_npz['features'], testing_npz['labels']

  print("=== Dataset Summary ===")
  print(f"Training: {len(x_train)} samples (should include both classes)")
  print(f"Validation: {len(x_val)} samples - Classes: {np.unique(y_val, return_counts=True)}")
  print(f"Testing: {len(x_test)} samples - Classes: {np.unique(y_test, return_counts=True)}")

  d_vectors = dv_model.predict(x_train.reshape(train_size,40,40,1))
  print(d_vectors.shape)

  save_path = f"d_vectors_{auth_class}_{train_size}.npz"
  np.savez(save_path, labels=y_val, d_vectors=d_vectors)
  print(f"Saved d_vectors and labels to {save_path}")

  y_pred_prob = provide_predictions(d_vectors, x_val)

  y_val_bin = np.where(y_val == auth_class, 1, 0)

  for i,classvalue in enumerate(y_val):
    if(classvalue!=auth_class):
      y_val_bin[i] = 0

  if len(np.unique(y_val_bin)) > 1:
    fpr, tpr, thresholds = roc_curve(y_val_bin, y_pred_prob)
    roc_auc = auc(fpr, tpr)
    print("-----")

    print("Plotting the Receiving Operating Characteristic curve:")
    '''
    # Plot ROC curve
    plt.plot(fpr, tpr, 'b', label='AUC = %0.2f'% roc_auc)
    plt.legend(loc='lower right')
    plt.plot([0,1],[0,1],'r--')
    plt.xlim([0,1])
    plt.ylim([0,1])
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.show()
    '''
    fnr = 1 - tpr
    try:
      eer_threshold = thresholds[np.nanargmin(np.absolute((fnr - fpr)))]
      print("-----")
      print("EER Threshold: ", eer_threshold)
      abs_diffs = np.abs(fpr - fnr)
      min_index = np.argmin(abs_diffs)
      EER = np.mean((fpr[min_index], fnr[min_index]))
    except:
      eer_threshold=0.5
      EER=1.0
      print(print("Warning: EER calculation failed - using default threshold"))

  else:
    print("Warning: Only one class present in validation data")
    eer_threshold = 0.5
    EER = 1.0
    roc_auc = 0.5
    print("EER = " + str(EER))
    print("AUC = " + str(roc_auc))

  acc, f1score = predictDVector(d_vectors, auth_class, x_test, y_test, threshold=eer_threshold, verbose=False)

  with open("test-results-td-bestmatch.txt", "a") as f:
      f.write(f"Speaker {auth_class} | Train Size: {train_size}\n")
      f.write(f"Accuracy: {acc:.4f} | F1: {f1score:.4f} | EER: {EER:.4f} | AUC: {roc_auc:.4f}\n")
      f.close

def main():
    auth_class = 0
    train_sizes = [1, 8, 16, 64]

    global dv_model

    d_vector_model_name = "d-vector-extractor-256.h5"
    dv_model = tfk.models.load_model(d_vector_model_name)
    dv_model.compile(loss=tfk.losses.CategoricalCrossentropy(),
                    optimizer=tfk.optimizers.Adam(learning_rate=0.0001),
                    metrics=['accuracy'])
    dv_model.summary()
    with open("test-results-td-bestmatch.txt", "w") as f:
        f.write("Speaker Verification Results\n")
        f.write("==========================\n\n")

    for size in train_sizes:
        evaluate_model(auth_class, size)

if __name__ == "__main__":
    !rm -r /content/d_vectors/
    main()
    !mkdir /content/d_vectors/
    !mv d_vectors* /content/d_vectors/

2.18.0




Testing with speaker id: 0 and train size: 1
Validation class distribution: (array([-1,  0], dtype=int32), array([104,  58]))
=== Dataset Summary ===
Training: 1 samples (should include both classes)
Validation: 162 samples - Classes: (array([-1,  0], dtype=int32), array([104,  58]))
Testing: 157 samples - Classes: (array([-1,  0], dtype=int32), array([97, 60]))
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 184ms/step
(1, 256)
Saved d_vectors and labels to d_vectors_0_1.npz
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
-----
Plotting the Receiving Operating Characteristic curve:
-----
EER Threshold:  0.5501387715339661
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
-----------------------
 --- Testing Results ---
True Positive Rate: 46/60 (76.66666666666667%)
False Positive Rate: 32/97 (32.98969072164948%)
Precision: 0.5897435897435898
Recall: 0.7666666666666667
******************
Total correct 111/157
Accuracy on this 

Mean Cos

In [155]:
import os
import numpy as np
import tensorflow as tf
from sklearn.metrics import roc_curve, auc
import sys

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
seed = 22
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

print(tf.__version__)

def cosine_similarity(vec1, vec2):
    """Compute cosine similarity between two vectors"""
    dot_product = np.dot(vec1, vec2)
    norm_product = np.linalg.norm(vec1) * np.linalg.norm(vec2)
    return dot_product / (norm_product + 1e-8)

def compute_mean_d_vector(d_vectors):
    """Compute the mean d-vector from enrollment samples"""
    return np.mean(d_vectors, axis=0)

def predictDVector(mean_d_vector, authlabel, input_data, input_labels, threshold, verbose=True):
    """Evaluate performance using mean cosine similarity"""
    input_vectors = dv_model.predict(input_data)
    total = len(input_vectors)

    is_target = (input_labels == authlabel)
    total_auth = np.sum(is_target)
    total_denied = total - total_auth

    similarities = np.array([cosine_similarity(vec, mean_d_vector) for vec in input_vectors])
    predictions = similarities > threshold

    true_pos = np.sum(predictions & is_target)
    true_neg = np.sum(~predictions & ~is_target)
    false_pos = np.sum(predictions & ~is_target)
    false_neg = np.sum(~predictions & is_target)

    with np.errstate(divide='ignore', invalid='ignore'):
        prec = true_pos / (true_pos + false_pos) if (true_pos + false_pos) > 0 else 0
        recall = true_pos / total_auth if total_auth > 0 else 0
        f1 = 2 * (prec * recall) / (prec + recall) if (prec + recall) > 0 else 0
        acc = (true_pos + true_neg) / total
        fpr = false_pos / total_denied if total_denied > 0 else 0

    print('-----------------------')
    print(" --- Testing Results ---")
    print(f"Target samples: {total_auth}/{total}")
    print(f"Non-target samples: {total_denied}/{total}")
    print(f"True Positive Rate: {true_pos}/{total_auth} ({true_pos/total_auth*100:.1f}%)")
    print(f"False Positive Rate: {false_pos}/{total_denied} (N/A)" if total_denied == 0 else
          f"False Positive Rate: {false_pos}/{total_denied} ({false_pos/total_denied*100:.1f}%)")
    print(f"Precision: {prec:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")
    print(f"Accuracy: {acc:.4f}")

    return acc, f1

def provide_predictions(mean_d_vector, input_data):
    """Generate similarity predictions using mean d-vector"""
    input_vectors = dv_model.predict(input_data)
    return np.array([cosine_similarity(vec, mean_d_vector) for vec in input_vectors])

def evaluate_model(auth_class, train_size):
    """Complete evaluation pipeline for mean cosine approach"""
    print(f"\nEvaluating speaker {auth_class} with {train_size} enrollment samples (Mean Cosine)")

    try:
        train_dir = f"dataset/user_0_organized/npz_features/train_{train_size}_{auth_class}_features.npz"
        training_npz = np.load(train_dir)
        x_train = training_npz['features']

        val_dir = "dataset/user_0_organized/npz_features/validation_features.npz"
        validation_npz = np.load(val_dir)
        x_val, y_val = validation_npz['features'], validation_npz['labels']

        testing_dir = "dataset/user_0_organized/npz_features/testing_features.npz"
        testing_npz = np.load(testing_dir)
        x_test, y_test = testing_npz['features'], testing_npz['labels']

        print("=== Dataset Summary ===")
        print(f"Training: {len(x_train)} samples")
        print(f"Validation: {len(x_val)} samples - Classes: {np.unique(y_val, return_counts=True)}")
        print(f"Testing: {len(x_test)} samples - Classes: {np.unique(y_test, return_counts=True)}")

        d_vectors = dv_model.predict(x_train.reshape(-1, 40, 40, 1))
        mean_d_vector = compute_mean_d_vector(d_vectors)
        print(f"Mean D-Vector computed using {len(d_vectors)} samples")

        y_pred_prob = provide_predictions(mean_d_vector, x_val.reshape(-1, 40, 40, 1))
        y_val_bin = (y_val == auth_class).astype(int)

        if len(np.unique(y_val_bin)) > 1:
            fpr, tpr, thresholds = roc_curve(y_val_bin, y_pred_prob)
            roc_auc = auc(fpr, tpr)

            fnr = 1 - tpr
            try:
                eer_threshold = thresholds[np.nanargmin(np.absolute((fnr - fpr)))]
                abs_diffs = np.abs(fpr - fnr)
                min_index = np.argmin(abs_diffs)
                EER = np.mean((fpr[min_index], fnr[min_index]))
            except:
                eer_threshold = 0.5
                EER = 1.0
                print("Warning: EER calculation failed - using default threshold")
        else:
            print("Warning: Only one class present in validation data")
            eer_threshold = 0.5
            EER = 1.0
            roc_auc = 0.5

        acc, f1score = predictDVector(mean_d_vector, auth_class,
                                    x_test.reshape(-1, 40, 40, 1), y_test,
                                    threshold=eer_threshold, verbose=False)

        with open("test-results-td-meancos.txt", "a") as f:
            f.write(f"Speaker {auth_class} | Train Size: {train_size}\n")
            f.write(f"Accuracy: {acc:.4f} | F1: {f1score:.4f} ")
            f.write(f"| EER: {EER:.4f} | AUC: {roc_auc:.4f}\n\n")

        return acc, f1score, EER, roc_auc

    except Exception as e:
        print(f"Error in evaluation: {e}")
        return 0, 0, 1, 0

def main():
    auth_class = 0
    train_sizes = [1, 8, 16, 64]

    global dv_model
    d_vector_model_name = "d-vector-extractor-256.h5"
    dv_model = tf.keras.models.load_model(d_vector_model_name)
    dv_model.compile(loss=tf.keras.losses.CategoricalCrossentropy(),
                    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
                    metrics=['accuracy'])

    with open("test-results-td-meancos.txt", "w") as f:
        f.write("Speaker Verification Results (Mean Cosine)\n")
        f.write("========================================\n\n")

    for size in train_sizes:
        evaluate_model(auth_class, size)

if __name__ == "__main__":
    main()



2.18.0

Evaluating speaker 0 with 1 enrollment samples (Mean Cosine)
=== Dataset Summary ===
Training: 1 samples
Validation: 162 samples - Classes: (array([-1,  0], dtype=int32), array([104,  58]))
Testing: 157 samples - Classes: (array([-1,  0], dtype=int32), array([97, 60]))
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 120ms/step
Mean D-Vector computed using 1 samples
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step 
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
-----------------------
 --- Testing Results ---
Target samples: 60/157
Non-target samples: 97/157
True Positive Rate: 46/60 (76.7%)
False Positive Rate: 32/97 (33.0%)
Precision: 0.5897
Recall: 0.7667
F1-Score: 0.6667
Accuracy: 0.7070

Evaluating speaker 0 with 8 enrollment samples (Mean Cosine)
=== Dataset Summary ===
Training: 8 samples
Validation: 162 samples - Classes: (array([-1,  0], dtype=int32), array([104,  58]))
Testing: 157 samples - Classes: (array(

Inspect .npz structure

In [164]:
import numpy as np
import os
import matplotlib.pyplot as plt

def verify_npz_files(directory="/content/d_vectors/"):
    """Verify contents of .npz files in a directory."""
    npz_files = [f for f in os.listdir(directory) if f.endswith('.npz')]

    if not npz_files:
        print(f"No .npz files found in {directory}!")
        return

    print(f"Found {len(npz_files)} .npz files in {directory}:\n")

    for file in sorted(npz_files):
        filepath = os.path.join(directory, file)
        data = np.load(filepath)

        print(f"File: {file}")
        print(f"Number of arrays stored: {len(data.files)}\n")

        for array_name in data.files:
            array_data = data[array_name]

            print(f"  Array: '{array_name}'")
            if(array_name=='d_vectors'):
              print(f"    - Shape: {array_data.shape}")
              print(f"    - Dtype: {array_data.dtype}")
              print(f"    - Min: {np.min(array_data):.4f}, Max: {np.max(array_data):.4f}, Mean: {np.mean(array_data):.4f}")
              print(f"    - Size: {array_data.size} elements\n")

        print("-" * 50 + "\n")

if __name__ == "__main__":
    verify_npz_files()

Found 4 .npz files in /content/d_vectors/:

File: d_vectors_0_1.npz
Number of arrays stored: 2

  Array: 'labels'
  Array: 'd_vectors'
    - Shape: (1, 256)
    - Dtype: float32
    - Min: 0.0000, Max: 6.1477, Mean: 1.0465
    - Size: 256 elements

--------------------------------------------------

File: d_vectors_0_16.npz
Number of arrays stored: 2

  Array: 'labels'
  Array: 'd_vectors'
    - Shape: (16, 256)
    - Dtype: float32
    - Min: 0.0000, Max: 9.4984, Mean: 1.0236
    - Size: 4096 elements

--------------------------------------------------

File: d_vectors_0_64.npz
Number of arrays stored: 2

  Array: 'labels'
  Array: 'd_vectors'
    - Shape: (64, 256)
    - Dtype: float32
    - Min: 0.0000, Max: 11.3936, Mean: 1.0108
    - Size: 16384 elements

--------------------------------------------------

File: d_vectors_0_8.npz
Number of arrays stored: 2

  Array: 'labels'
  Array: 'd_vectors'
    - Shape: (8, 256)
    - Dtype: float32
    - Min: 0.0000, Max: 6.7960, Mean: 0.976

Convert .npz file generated in a C header

In [166]:
import numpy as np
import os

def sanitize_name(name):
    """Sanitize array name for C variable naming."""
    return name.replace('.', '_').replace('-', '_')

def generate_npz_header(npz_dir="/content/d_vectors", output_file="d_vectors.h"):
    """Generate a C header with 2D arrays (samples x 256) from .npz files"""
    npz_files = [f for f in os.listdir(npz_dir) if f.endswith('.npz')]

    with open(output_file, "w") as f:
        f.write("#ifndef D_VECTORS_H\n")
        f.write("#define D_VECTORS_H\n\n")
        f.write("#include <stddef.h>\n\n")

        for file in sorted(npz_files):
            file_path = os.path.join(npz_dir, file)
            with np.load(file_path) as data:
                if 'd_vectors' not in data:
                    continue

                d_vectors = data['d_vectors']
                if d_vectors.ndim != 2 or d_vectors.shape[1] != 256:
                    continue

                array_name = sanitize_name(os.path.splitext(file)[0])
                samples = d_vectors.shape[0]
                f.write(f"// {samples} samples of 256-dimensional vectors\n")
                f.write(f"static const float {array_name}[{samples}][256] = {{\n")
                for sample in d_vectors:
                    f.write("    {")
                    line = ", ".join(f"{x:.6f}f" for x in sample)
                    f.write(line)
                    f.write("},\n")

                f.write("};\n\n")

        f.write("#endif // D_VECTORS_H\n")

if __name__ == "__main__":
    generate_npz_header()