## Set up paths and imports

In [1]:
import os

import torch
import torch.nn as nn
from torchvision import transforms

if not os.path.exists("./notebooks"):
    %cd ..

from src.training import train, validate
from src.dataset import prepare_dataset_loaders
from src.data_processing import load_mean_std
from src.config import DATASET_DIR, PATIENCE_THRESHOLD, VALID_ACCESS_LABELS
from collections import defaultdict
import re
import random

wandb_enabled = False

from src.data_processing import split_into_clips, create_spectrogram, SOAAudioClips, save_mean_std, compute_mean_std_from_images, list_audio_files_recursively, save_spectrogram
from src.dataset_analysis import duration_statistics
from src.config import VALID_ACCESS_LABELS, DATASET_DIR, DATA_DIR

random.seed(42)  # For reproducibility

  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


/home/mytkom/Documents/iml


In [3]:
# Prepare file sets
allowed_dictionaries=['ipadflat_confroom1', 'ipadflat_office1', 'ipad_balcony1', 'ipad_bedroom1', 'ipad_confroom1', 'ipad_confroom2', 'ipad_livingroom1', 'ipad_office1', 'ipad_office2', 'iphone_balcony1', 'iphone_bedroom1', 'iphone_livingroom1']
print(len(allowed_dictionaries))
wav_files_all = list_audio_files_recursively(DATA_DIR,allowed_dictionaries)
print(f"Found {len(wav_files_all)} .wav files in directory '{DATA_DIR}' in the following allowed directories: {allowed_dictionaries}")

# Data structures to hold the parsed information
speaker_script_to_files = defaultdict(list)

# Regular expression to extract speaker tag and script number
pattern = re.compile(r'([fm]\d+)_script(\d+)_')

for filepath in wav_files_all:
    filename = os.path.basename(filepath)  # Extract the file name cross-platform
    match = pattern.match(filename)
    if match:
        speaker_tag = match.group(1)
        script_number = int(match.group(2))
        speaker_script_to_files[(speaker_tag, script_number)].append(filepath)
    else:
        print(f"Filename {filename} does not match the expected pattern.")

# Initialize sets
train_set = []
validate_set = []
test_set = []

# Lists to keep track of counts
authorized_train_samples = []
unauthorized_train_samples = []

# Collect all speakers
all_speakers = set(speaker for speaker, script in speaker_script_to_files.keys())
authorized_speakers = all_speakers.intersection(VALID_ACCESS_LABELS)
unauthorized_speakers = all_speakers - authorized_speakers

# Shuffle scripts for randomness
random.seed(42)  # For reproducibility

for speaker in all_speakers:
    speaker_scripts = [script for (spk, script) in speaker_script_to_files.keys() if spk == speaker]
    random.shuffle(speaker_scripts)
    
    num_scripts = len(speaker_scripts)
    num_train_scripts = int(0.7 * num_scripts)
    num_validate_scripts = int(0.15 * num_scripts)
    
    # Ensure at least one script in each set if possible
    num_train_scripts = max(1, num_train_scripts)
    num_validate_scripts = max(1, num_validate_scripts)
    num_test_scripts = num_scripts - num_train_scripts - num_validate_scripts
    
    if num_test_scripts == 0:
        num_test_scripts = 1
        num_train_scripts -= 1
    
    # Assign scripts to sets
    train_scripts = speaker_scripts[:num_train_scripts]
    validate_scripts = speaker_scripts[num_train_scripts:num_train_scripts + num_validate_scripts]
    test_scripts = speaker_scripts[num_train_scripts + num_validate_scripts:]
    
    for script in train_scripts:
        files = speaker_script_to_files[(speaker, script)]
        train_set.extend(files)
        if speaker in VALID_ACCESS_LABELS:
            authorized_train_samples.extend(files)
        else:
            unauthorized_train_samples.extend(files)
    
    for script in validate_scripts:
        files = speaker_script_to_files[(speaker, script)]
        validate_set.extend(files)
    
    for script in test_scripts:
        files = speaker_script_to_files[(speaker, script)]
        test_set.extend(files)

# Calculate the number of samples from authorized and unauthorized speakers
num_authorized_samples = len(authorized_train_samples)
num_unauthorized_samples = len(unauthorized_train_samples)

# Adjust the unauthorized samples to match the authorized samples
if num_authorized_samples < num_unauthorized_samples:
    # Reduce unauthorized samples
    difference = num_unauthorized_samples - num_authorized_samples
    random.shuffle(unauthorized_train_samples)
    unauthorized_train_samples = unauthorized_train_samples[:num_authorized_samples]
    # Update the train set
    train_set = authorized_train_samples + unauthorized_train_samples
else:
    # Reduce authorized samples (unlikely given the dataset)
    difference = num_authorized_samples - num_unauthorized_samples
    random.shuffle(authorized_train_samples)
    authorized_train_samples = authorized_train_samples[:num_unauthorized_samples]
    # Update the train set
    train_set = authorized_train_samples + unauthorized_train_samples

def compute_statistics(dataset, name):
    total_samples = len(dataset)
    speakers = set()
    authorized_count = 0
    unauthorized_count = 0
    speaker_sample_counts = defaultdict(int)
    
    for filepath in dataset:
        filename = os.path.basename(filepath)  # Extract the file name cross-platform
        match = pattern.match(filename)
        if match:
            speaker_tag = match.group(1)
            speakers.add(speaker_tag)
            speaker_sample_counts[speaker_tag] += 1  # Increment the count for this speaker
            if speaker_tag in VALID_ACCESS_LABELS:
                authorized_count += 1
            else:
                unauthorized_count += 1
                    
    print(f"--- {name} Set Statistics ---")
    print(f"Total Samples: {total_samples}")
    print(f"Total Speakers: {len(speakers)}")
    print(f"Authorized Samples: {authorized_count}")
    print(f"Unauthorized Samples: {unauthorized_count}")
    print(f"Authorized to Unauthorized Ratio: {authorized_count}:{unauthorized_count}")
    print("\nSamples per Speaker:")
    for speaker in sorted(speaker_sample_counts.keys()):
        print(f"  {speaker}: {speaker_sample_counts[speaker]}")
    print()



12
Found 1200 .wav files in directory './data' in the following allowed directories: ['ipadflat_confroom1', 'ipadflat_office1', 'ipad_balcony1', 'ipad_bedroom1', 'ipad_confroom1', 'ipad_confroom2', 'ipad_livingroom1', 'ipad_office1', 'ipad_office2', 'iphone_balcony1', 'iphone_bedroom1', 'iphone_livingroom1']


In [4]:
compute_statistics(train_set, "Training")
compute_statistics(validate_set, "Validation")
compute_statistics(test_set, "Test")

--- Training Set Statistics ---
Total Samples: 432
Total Speakers: 20
Authorized Samples: 216
Unauthorized Samples: 216
Authorized to Unauthorized Ratio: 216:216

Samples per Speaker:
  f1: 36
  f10: 14
  f2: 17
  f3: 21
  f4: 11
  f5: 17
  f6: 15
  f7: 36
  f8: 36
  f9: 11
  m1: 13
  m10: 17
  m2: 15
  m3: 36
  m4: 18
  m5: 17
  m6: 36
  m7: 15
  m8: 36
  m9: 15

--- Validation Set Statistics ---
Total Samples: 240
Total Speakers: 20
Authorized Samples: 72
Unauthorized Samples: 168
Authorized to Unauthorized Ratio: 72:168

Samples per Speaker:
  f1: 12
  f10: 12
  f2: 12
  f3: 12
  f4: 12
  f5: 12
  f6: 12
  f7: 12
  f8: 12
  f9: 12
  m1: 12
  m10: 12
  m2: 12
  m3: 12
  m4: 12
  m5: 12
  m6: 12
  m7: 12
  m8: 12
  m9: 12

--- Test Set Statistics ---
Total Samples: 240
Total Speakers: 20
Authorized Samples: 72
Unauthorized Samples: 168
Authorized to Unauthorized Ratio: 72:168

Samples per Speaker:
  f1: 12
  f10: 12
  f2: 12
  f3: 12
  f4: 12
  f5: 12
  f6: 12
  f7: 12
  f8: 12
  f9: 

### Optionally initialize W&B project

In [6]:
import wandb

wandb_enabled = True

In [None]:
import os
import torch
import torchaudio
from speechbrain.pretrained import EncoderClassifier
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB

AUTHORIZED_SPEAKERS = ["speaker1_embedding.pt"]
threshold = 0.6

model = EncoderClassifier.from_hparams(source="speechbrain/spkrec-ecapa-voxceleb")

In [None]:
def audio_to_log_mel(file_path):
    waveform, sample_rate = torchaudio.load(file_path)
    mel_transform = MelSpectrogram(sample_rate=sample_rate, n_mels=128)
    amplitude_to_db = AmplitudeToDB()
    mel_spec = mel_transform(waveform)
    log_mel_spec = amplitude_to_db(mel_spec)
    return log_mel_spec

In [None]:
for auth_sample in authorized_train_samples:
    

authorized_signal, _ = torchaudio.load("path_to_authorized_speaker.wav")
authorized_embedding = model.encode_batch(authorized_signal)
torch.save(authorized_embedding, "speaker1_embedding.pt")


In [35]:

def classify_speaker(embedding, authorized_embeddings, threshold):
    for auth_emb in authorized_embeddings:
        similarity = torch.nn.functional.cosine_similarity(embedding, auth_emb, dim=-1)
        if similarity.item() >= threshold:
            return "Authorized"
    return "Unauthorized"

for file_name in os.listdir(data_dir):
    if file_name.endswith(".wav"):
        file_path = os.path.join(data_dir, file_name)

        signal, _ = torchaudio.load(file_path)
        embedding = model.encode_batch(signal)

        classification = classify_speaker(embedding, authorized_embeddings, threshold)

        print(f"File: {file_name}, Classification: {classification}")
