In [None]:
!pip install torch torchaudio speechbrain torchattacks matplotlib numpy pystoi


In [None]:
import torch
import torchaudio
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
from speechbrain.pretrained import EncoderClassifier
import torchattacks

#model gave an error when not using cpu, we used cpu mode
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print("Using", device)

In [None]:
def to_dev(t):
    """Move any tensor or list/tuple of tensors to the global device."""
    if isinstance(t, (list, tuple)):
        return type(t)(to_dev(x) for x in t)
    return t.to(device, non_blocking=True)

In [None]:
#Token Huggingface ASR_TOKEN = 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 (Just copy and paste)
!huggingface-cli login



In [None]:
from speechbrain.inference.classifiers import EncoderClassifier

classifier = (
    EncoderClassifier.from_hparams(
        source="speechbrain/spkrec-xvect-voxceleb",
        savedir="pretrained_models/spkrec-xvect-voxceleb",
    )
    .to(device)
    .eval()
)

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("kryakrya/voxceleb1test")

print("Path to dataset files:", path)

In [None]:
!cp -r /root/.cache/kagglehub/datasets/kryakrya/voxceleb1test/versions/1 /content/voxcelebtest-1-dataset
import os
print(os.listdir("/content/voxcelebtest-1-dataset"))


In [None]:
import os
import torchaudio
from IPython.display import Audio, display

# full file path
file_path = "/content/voxcelebtest-1-dataset/wav/id10270/5r0dWxy17C8/00001.wav"

# Check if the file exists
if os.path.exists(file_path):
    # Load the audio file
    waveform, sample_rate = torchaudio.load(file_path)
    print(f"Loaded file: {file_path}")
    print(f"Sample Rate: {sample_rate}, Waveform shape: {waveform.shape}")

    # If the waveform has extra dimensions, squeeze them
    waveform_np = waveform.squeeze().cpu().numpy()

    # Use the display function to ensure the Audio widget is rendered
    display(Audio(waveform_np, rate=sample_rate))
else:
    print("The audio file was not found at the specified path.")


In [None]:
import os
import torch
import torchaudio
from IPython.display import Audio, display
import numpy as np

# Step 1: Load the audio file.
# Adjust the file path to your specific testfile in your VoxCeleb test dataset.
file_path = "/content/voxcelebtest-1-dataset/wav/id10270/5r0dWxy17C8/00002.wav"

# Check if the file exists
if not os.path.exists(file_path):
    print(f"File not found: {file_path}")
else:
    waveform, sample_rate = torchaudio.load(file_path)
    print(f"Loaded file: {file_path}")
    print(f"Original sample rate: {sample_rate}, Waveform shape: {waveform.shape}")

    #Resample to 16kHz if needed. We used 16kHz
    target_sr = 16000
    if sample_rate != target_sr:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sr)
        waveform = resampler(waveform)
        sample_rate = target_sr
    print(f"Resampled to: {sample_rate}, Waveform shape: {waveform.shape}")

    # Step 2: Define a function that simulates an adversarial attack.
    # Here we add a perturbation: epsilon * sign(random noise) Epsilon is our noise tweaking parameter
    def fake_attack(audio, epsilon):
        # Generate a random noise tensor with the same shape as the audio
        random_noise = torch.randn_like(audio)
        # Take the sign to simulate the direction of a gradient
        perturbation = epsilon * random_noise.sign()
        # Add the perturbation to the original audio
        attacked_audio = audio + perturbation
        # Ensure the audio values remain in the valid range [-1, 1]
        attacked_audio = torch.clamp(attacked_audio, -1, 1)
        return attacked_audio

    # Step 3: Generate adversarial versions for different epsilon values.
    epsilons = [0.0, 0.01, 0.05, 0.1, 0.2]
    attacked_versions = {}
    for eps in epsilons:
        attacked_versions[eps] = fake_attack(waveform, eps)

    # Step 4: Listen and compare the versions.
    print("Playing audio samples with different epsilon values:")
    for eps, audio_tensor in attacked_versions.items():
        print(f"Epsilon = {eps}")
        # Convert tensor to numpy array and squeeze extra dimensions
        audio_np = audio_tensor.squeeze().detach().cpu().numpy()
        display(Audio(audio_np, rate=sample_rate))


In [None]:

# Wrap the SpeechBrain Classifier
# The SpeechBrain classifier’s `classify_batch` method returns a tuple:
# (output_probs, score, index, text_lab). We wrap it so that it returns only the output probabilities (logits),
# which Torchattacks (and our FGSM code) require.

import torch.nn as nn

import torch.nn as nn

class ClassifierWrapper(nn.Module):
    def __init__(self, clf):
        super().__init__()
        self.clf = clf
    def forward(self, audio):
        logits, _, _, _ = self.clf.classify_batch(to_dev(audio))
        return logits


classifier = classifier.to(device)
wrapped_model = ClassifierWrapper(classifier).to(device)

print("Wrapped model ready for FGSM attack.")


In [None]:
#prediction function that generates predicted label for audio
def get_prediction(audio):
    assert audio.device == next(wrapped_model.parameters()).device
    audio = to_dev(audio)
    output = wrapped_model(audio)
    return output.argmax(dim=1)

waveform = to_dev(waveform)
clean_pred = get_prediction(waveform)

print(clean_pred)
print("Baseline (clean) predicted label:", clean_pred.item())


In [None]:

# Compute the FGSM Attack (White-Box Method)
# We will now compute the gradient of the loss with respect to the audio input and generate an adversarial example.
# First, we prepare a loss function and convert the clean predicted label to a target label tensor.
# For a non-targeted attack, we use the model’s own prediction as the label.

# Convert the clean prediction to a tensor of type Long.
label_tensor = clean_pred.clone().detach().type(torch.long).to(device)

# Ensure that waveform requires gradients so that we can compute gradients.
waveform.requires_grad = True

# Forward pass: Compute the output for the clean audio using the wrapped model.
output = wrapped_model(waveform)

# Compute the loss. Here we use negative log likelihood loss.
loss = F.nll_loss(output, label_tensor)

# Zero out any existing gradients.
wrapped_model.zero_grad()

# Backward pass: Compute gradients of the loss with respect to the waveform.
loss.backward()

# Retrieve the gradients from the waveform.
data_grad = waveform.grad.data

# Define the FGSM attack function.
def fgsm_attack(audio, epsilon, data_grad):
    # Create the perturbation: multiply epsilon by the sign of the gradient.
    perturbed_audio = audio + epsilon * data_grad.sign()
    # Clamp the perturbed audio to be within the valid range (-1, 1).
    perturbed_audio = torch.clamp(perturbed_audio, -1, 1)
    return perturbed_audio

# Choose an epsilon value for the attack.
epsilon = 0.00009

# Generate the adversarial example.
adv_waveform = fgsm_attack(waveform, epsilon, data_grad)
print("Adversarial waveform generated.")


In [None]:

# Evaluate and Compare Predictions
#
# Now, let’s get the predictions from the model on both the clean and adversarial audio samples.

clean_pred_after = get_prediction(waveform)
adv_pred = get_prediction(adv_waveform)

print("Clean predicted label:", clean_pred_after.item())
print("Adversarial predicted label (epsilon = {:.3f}):".format(epsilon), adv_pred.item())


In [None]:

# Listen to the Clean vs. Adversarial Audio
#
# Finally, we can listen to both the original and the adversarial audio.
# We'll convert the tensors to NumPy arrays and use IPython’s Audio widget to play them.

from IPython.display import Audio, display

# Convert the audio to numpy for playback + squeeze.
clean_audio_np = waveform.squeeze().detach().cpu().numpy()
adv_audio_np = adv_waveform.squeeze().detach().cpu().numpy()

print("Playing clean audio:")
display(Audio(clean_audio_np, rate=sample_rate))

print("Playing adversarial audio (epsilon = {:.3f}):".format(epsilon))
display(Audio(adv_audio_np, rate=sample_rate))


In [None]:
import os
import torch
import torchaudio
import torchaudio.functional as F_audio
import torch.nn.functional as F
from IPython.display import Audio, display
import numpy as np


# Next attacks, Black-Box methods
# We will simulate noise injections


# Get the baseline prediction for the clean audio
clean_pred = get_prediction(waveform)
print("Baseline (clean) predicted label:", clean_pred)

# 1. High-Frequency Noise Attack (Dev1 and Dev2 max hearing cutoff)
def high_frequency_noise_attack_v2(audio, epsilon, cutoff=18000):
    # Generate random noise with same shape as the input audio.
    noise = torch.randn_like(audio)
    # Apply a high-pass filter to the noise:

    # The model expects the audio at sample_rate
    filtered_noise = F_audio.highpass_biquad(noise, sample_rate, cutoff_freq=cutoff)
    # Use the sign of the filtered noise scaled by epsilon
    perturbation = epsilon * filtered_noise.sign()
    attacked_audio = audio + perturbation
    attacked_audio = torch.clamp(attacked_audio, -1, 1)
    return attacked_audio

# 2. Low-Epsilon Random Noise Attack (Like static white-noise)
def low_epsilon_random_noise_attack(audio, epsilon):
    noise = torch.randn_like(audio)
    perturbation = epsilon * noise.sign()
    attacked_audio = audio + perturbation
    attacked_audio = torch.clamp(attacked_audio, -1, 1)
    return attacked_audio

# Define lists of epsilon values to test for both methods.
epsilons_hf = [0.0, 0.001, 0.005, 0.01, 0.02]
epsilons_low = [0.0, 0.0005, 0.001, 0.002, 0.005]

# Dictionaries to store predictions for each method
predictions_hf = {}
predictions_low = {}

#Testing attacks below:

# High-Frequency Noise Attack: Loop over epsilons, add noise, and get prediction.
print("High-Frequency Noise Attack with cutoff=10000 Hz:")
for eps in epsilons_hf:
    adv_audio_hf = high_frequency_noise_attack_v2(waveform, eps, cutoff=18000)
    pred_label = get_prediction(adv_audio_hf)
    predictions_hf[eps] = pred_label
    print(f"  Epsilon: {eps} -> Predicted Label: {pred_label}")

# Low-Epsilon Random Noise Attack: Loop over epsilons, add noise, and get prediction.
print("\nLow-Epsilon Random Noise Attack:")
for eps in epsilons_low:
    adv_audio_low = low_epsilon_random_noise_attack(waveform, eps)
    pred_label = get_prediction(adv_audio_low)
    predictions_low[eps] = pred_label
    print(f"  Epsilon: {eps} -> Predicted Label: {pred_label}")

# For listening, let's create a quick playback for one example from each technique.
from IPython.display import Audio, display

# Choose a mid-value epsilon for each method to listen.
test_eps_hf = epsilons_hf[2]  # e.g., 0.005 for HF noise
test_eps_low = epsilons_low[2]  # e.g., 0.001 for low epsilon noise

# Generate attacked audio
adv_audio_hf = high_frequency_noise_attack_v2(waveform, test_eps_hf, cutoff=10000)
adv_audio_low = low_epsilon_random_noise_attack(waveform, test_eps_low)

# Convert to numpy for playback + squeeze
clean_audio_np = waveform.squeeze().detach().cpu().numpy()
adv_audio_hf_np = adv_audio_hf.squeeze().detach().cpu().numpy()
adv_audio_low_np = adv_audio_low.squeeze().detach().cpu().numpy()

print("\nPlaying Clean Audio:")
display(Audio(clean_audio_np, rate=sample_rate))
print(f"Playing High-Frequency Noise Audio (epsilon = {test_eps_hf}):")
display(Audio(adv_audio_hf_np, rate=sample_rate))
print(f"Playing Low-Epsilon Random Noise Audio (epsilon = {test_eps_low}):")
display(Audio(adv_audio_low_np, rate=sample_rate))


In [None]:
import torch, torchaudio, pandas as pd, numpy as np
from pystoi import stoi                                # pip install pystoi

#Define Objective metrics SNR and STOI, and a get label from classifier function

def snr_db(clean, adv):
    noise = adv - clean
    return (10 * torch.log10(clean.pow(2).mean() / noise.pow(2).mean())
              .item())

def stoi_score(clean, adv, sr=16000):
    return stoi(clean.squeeze().cpu().numpy(),
                adv.squeeze().cpu().numpy(), sr)

def get_label(wav):
    return classifier.classify_batch(wav)[1].item()


In [None]:
#This code makes the list of Test_files we are going to use for the Subjective and Objective testing (Both Developers used the same Test_files list)


import glob, os, random, itertools

vox_root = "/content/voxcelebtest-1-dataset/wav"          # adjust if needed
all_files = glob.glob(os.path.join(vox_root, "**/*.wav"), recursive=True)

# 1) bucket by speaker
by_spk = {}
for p in all_files:
    spk = os.path.basename(os.path.dirname(os.path.dirname(p)))   # id10270
    by_spk.setdefault(spk, []).append(p)

# 2) shuffle each speaker list
for spk, lst in by_spk.items():
    random.shuffle(lst)

# 3) round–robin draw until we hit 200
target_N = 200
test_files = []
# create an iterator that cycles through speakers endlessly
cyclers = itertools.cycle(by_spk.items())

while len(test_files) < target_N:
    spk, lst = next(cyclers)
    if lst:                         # still files left for this speaker
        test_files.append(lst.pop())  # pop one file
    else:
        del by_spk[spk]             # speaker exhausted → remove from cycle
        cyclers = itertools.cycle(by_spk.items())
    if not by_spk:                  # ran out of speakers before hitting N
        break

print(len(test_files), "files selected from",
      len({os.path.basename(os.path.dirname(os.path.dirname(f))) for f in test_files}),
      "different speakers")
print(test_files[:5])


In [None]:
from tqdm.auto import tqdm

#Make a function that evaluates the attack on certain epsilon levels. This function generates the dataframe with results
def eval_attack(name, attack_fn, eps_list):
    rows = []
    for eps in tqdm(eps_list, desc=f"{name} ε-loop"):
        acc_clean = acc_adv = fooled = 0
        snr_vals, stoi_vals = [], []

        # One tick per test file
        for f in tqdm(test_files, leave=False, desc=f"ε={eps:0.4g}"):
            wav, sr = torchaudio.load(f)
            if sr != 16000:
                wav = torchaudio.functional.resample(wav, sr, 16000)
            wav = wav.to(device)

            y_clean = get_label(wav)

            y_clean = get_label(wav)

            if eps == 0:
                wav_adv = wav
            else:
                wav_adv = attack_fn(wav.clone(), eps)

            y_adv = get_label(wav_adv)

            if eps == 0 and y_clean != y_adv:
                print("ε=0 mismatch:", f, y_clean, y_adv)
            # true-ID extraction
            true_id = os.path.basename(os.path.dirname(os.path.dirname(f)))
            print(y_clean, y_adv)
            acc_clean += (y_clean == true_id)
            acc_adv   += (y_adv   == true_id)
            fooled    += (y_clean != y_adv)

            #add to results lists
            snr_vals.append(snr_db(wav, wav_adv))
            stoi_vals.append(stoi_score(wav, wav_adv))

        N = len(test_files)
        rows.append(dict(Attack=name, eps=eps,
                         SNR=np.mean(snr_vals),
                         STOI=np.mean(stoi_vals),
                         ASR = 100*fooled/N,
                         Acc_clean = 100*acc_clean/N,
                         Acc_adv   = 100*acc_adv/N,
                         ΔAcc = 100*acc_clean/N - 100*acc_adv/N))
    return pd.DataFrame(rows)


In [None]:
#Make a FGSM (White-Box attack) wrapper to use for the actual experiment
class FGSM_Wrap:
    def __init__(self, model):
        self.m = model

    def __call__(self, wav, eps):
        if eps == 0:
            return wav

        with torch.enable_grad():
            wav_r = wav.clone().detach().requires_grad_(True)
            logits = self.m(wav_r)
            target = logits.max(1)[1]
            loss   = F.nll_loss(logits, target)

            self.m.zero_grad()
            loss.backward()

            perturb = eps * wav_r.grad.sign()
            adv = torch.clamp(wav + perturb, -1, 1).detach()
        return adv


In [None]:
fgsm_attack = FGSM_Wrap(wrapped_model)


In [None]:
#Used Epsilon values for Experiemnt for every attack method
eps_random = [0, 0.0003, 0.0005, 0.001, 0.002, 0.005, 0.01]
eps_high   = [0, 0.0005, 0.001, 0.003, 0.005, 0.01, 0.02]
eps_fgsm   = [0, 0.0003, 0.0005, 0.001, 0.002, 0.003]

df_fgsm   = eval_attack("FGSM",       fgsm_attack,      eps_fgsm)

df_random = eval_attack("Low‑ε Rand", low_epsilon_random_noise_attack, eps_random)
df_high   = eval_attack("High‑freq",  high_frequency_noise_attack_v2,  eps_high)

results = pd.concat([df_random, df_high, df_fgsm], ignore_index=True)
results


In [None]:
#Objective results
results.to_csv("metric_table_new.csv", index=False)


In [None]:
import random, itertools, pandas as pd

attacks = {
    "rand" : low_epsilon_random_noise_attack,
    "hf"   : lambda wav,eps: high_frequency_noise_attack_v2(wav,eps, cutoff=10000),
    "fgsm" : fgsm_attack,
}
eps_grid = {
    "rand": [0, 0.0003, 0.0005, 0.001, 0.002, 0.005, 0.01],
    "hf"  : [0, 0.0005, 0.001, 0.003, 0.005, 0.01, 0.02],
    "fgsm": [0, 0.0003, 0.0005, 0.001, 0.002, 0.003]
}

import pandas as pd, random


trial_rows = []
for wav in test_files:
    for atk, grid in eps_grid.items():
        for eps in grid:
            if eps == 0:
                # one clean copy per file
                trial_rows.append(dict(wav=wav, attack="clean", eps=0.0))
            else:
                trial_rows.append(dict(wav=wav, attack=atk, eps=eps))

trial_master = pd.DataFrame(trial_rows)
trial_master.to_csv("trial_master.csv", index=False)
print("Saved canonical list with",
      len(trial_master), "rows to trial_master.csv")



In [None]:
#Make a subjective session function for the developers to do the experiment on the test-files for retrieving the MOS and ABX
def subjective_session(trial_df, listener_id):
    rows = []
    for tidx, row in trial_df.iterrows():
        wav_path, atk, eps = row.wav, row.attack, row.eps

        wav_t, sr0 = torchaudio.load(wav_path)
        if sr0 != 16000:
            wav_t = torchaudio.functional.resample(wav_t, sr0, 16000)
        wav_t = wav_t.to(device)

        wav_adv = (wav_t if atk == "clean"
                   else attacks[atk](wav_t, eps))

        # ABX
        A, B = wav_t, wav_adv
        X_is_attack = random.choice([True, False])
        X = wav_adv if X_is_attack else wav_t

        print(f"\nTrial {tidx+1}/{len(trial_df)}  listener {listener_id}")
        for tag, clip in [("A",A), ("B",B), ("X",X)]:
            print(tag); display(Audio(clip.squeeze().cpu().numpy(), rate=16000))

        mos = input("Rate B (1-5): ").strip()
        while mos not in {'1','2','3','4','5'}:
            mos = input("Please type 1-5: ").strip()

        guess = input("Is X clean (c) or attack (a)? ").lower().strip()
        while guess not in {'c','a'}:
            guess = input("Type 'c' or 'a': ").lower().strip()

        abx_correct = int((guess == 'a') == X_is_attack)

        rows.append(dict(listener=listener_id,
                         wav=wav_path, attack=atk, eps=eps,
                         mos=int(mos), abx=abx_correct))
    # save immediately
    out_csv = f"mos_abx_{listener_id}.csv"
    pd.DataFrame(rows).to_csv(out_csv, index=False)
    print(f"\nSaved subjective answers to {out_csv}")
    return pd.DataFrame(rows)


In [None]:
#Load the trials for the developer tester
def load_trials_for_listener(seed, max_utts=None):
    df = pd.read_csv("trial_master.csv")
    df = df.sample(frac=1, random_state=seed).reset_index(drop=True)
    if max_utts:
        keep = df['wav'].unique()[:max_utts]
        df = df[df['wav'].isin(keep)].reset_index(drop=True)
    return df


In [None]:
import pandas as pd
#Save and Load Test_files list

#Save
pd.DataFrame(test_files, columns=["wav"]).to_csv("test_files.csv", index=False)
print("Saved", len(test_files), "paths to test_files.csv")

#Load
test_files = pd.read_csv("test_files.csv")["wav"].tolist()
print("Reloaded", len(test_files), "paths")


In [None]:
#Run this cell to perform the Experiment for "Dev1"
listener_id = "dev1"
#IMPORTANT: Keep the same seed for all Devs
trial_df = load_trials_for_listener(seed=1234, max_utts=10)
df_dev1 = subjective_session(trial_df, listener_id)


In [None]:
#Run this cell to perform the Experiment for "Dev2"
listener_id = "dev2"
#IMPORTANT: Keep the same seed as previous dev
trial_df = load_trials_for_listener(seed=1234, max_utts=10)
df_dev2 = subjective_session(trial_df, listener_id)


In [None]:


import pandas as pd

# 1) load the listener’s CSV
df = pd.read_csv("mos_abx_dev2.csv")

# 2) compute per-attack / ε averages
summary = (df
           .groupby(['attack', 'eps'])
           .agg(MOS=('mos',  'mean'),
                ABX_perc=('abx', 'mean'))     # fraction correct
           .reset_index())

summary['ABX_perc'] *= 100                    # turn into %
print(summary)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt


# Ensure eps is numeric and sorted for better plotting
summary['eps'] = summary['eps'].astype(float)
summary = summary.sort_values(by='eps')

#Plot 1: MOS vs ε
plt.figure(figsize=(7, 4))
for atk, grp in summary.groupby('attack'):
    plt.plot(grp['eps'], grp['MOS'], marker='o', label=atk)
plt.xlabel("ε (epsilon)")
plt.ylabel("Mean Opinion Score (MOS)")
plt.title("MOS vs ε for each attack type")
plt.grid(True, linestyle='--', linewidth=0.5)
plt.legend(title="Attack")
plt.xscale('log')                # ε values span orders of magnitude
plt.tight_layout()
plt.show()

#Plot 2: ABX% vs ε
plt.figure(figsize=(7, 4))
for atk, grp in summary.groupby('attack'):
    plt.plot(grp['eps'], grp['ABX_perc'], marker='o', label=atk)
plt.xlabel("ε (epsilon)")
plt.ylabel("ABX correct (%)")
plt.title("ABX accuracy vs ε for each attack type")
plt.grid(True, linestyle='--', linewidth=0.5)
plt.legend(title="Attack")
plt.xscale('log')
plt.tight_layout()
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

#Get Combined Results (of both testers) in plot

# Load both CSVs
df1 = pd.read_csv("mos_abx_dev1.csv")
df2 = pd.read_csv("mos_abx_dev2.csv")

# Add source labels
df1['listener'] = 'Dev1'
df2['listener'] = 'Dev2'

# Combine
df_all = pd.concat([df1, df2], ignore_index=True)

# Compute per-listener / attack / epsilon means
summary = (df_all
           .groupby(['listener', 'attack', 'eps'])
           .agg(MOS=('mos', 'mean'),
                ABX_perc=('abx', 'mean'))
           .reset_index())
summary['ABX_perc'] *= 100

#Plot 1: MOS
plt.figure(figsize=(7, 4))
for (atk, lst), grp in summary.groupby(['attack', 'listener']):
    label = f"MOS{lst} - {atk}" if atk != 'clean' else None
    plt.plot(grp['eps'], grp['MOS'], marker='o', label=label)
plt.xlabel("ε (epsilon)")
plt.ylabel("Mean Opinion Score (MOS)")
plt.title("MOS vs ε per listener and attack")
plt.grid(True, linestyle='--', linewidth=0.5)
plt.legend()
plt.xscale('log')
plt.tight_layout()
plt.show()

#Plot 2: ABX%
plt.figure(figsize=(7, 4))
for (atk, lst), grp in summary.groupby(['attack', 'listener']):
    label = f"ABX{lst} - {atk}" if atk != 'Clean' else None
    plt.plot(grp['eps'], grp['ABX_perc'], marker='o', label=label)
plt.xlabel("ε (epsilon)")
plt.ylabel("ABX correct (%)")
plt.title("ABX accuracy vs ε per listener and attack")
plt.grid(True, linestyle='--', linewidth=0.5)
plt.legend()
plt.xscale('log')
plt.tight_layout()
plt.show()


In [None]:
# Now an average combined plot version
df1 = pd.read_csv("mos_abx_dev1.csv")
df2 = pd.read_csv("mos_abx_dev2.csv")
df_all = pd.concat([df1, df2], ignore_index=True)

# Group across all listeners to get averaged values
avg_summary = (df_all
               .groupby(['attack', 'eps'])
               .agg(MOS=('mos', 'mean'),
                    ABX_perc=('abx', 'mean'))
               .reset_index())
avg_summary['ABX_perc'] *= 100

#Plot 1: Avg MOS
plt.figure(figsize=(7, 4))
for atk, grp in avg_summary.groupby('attack'):
    label = None if atk == 'clean' else atk
    plt.plot(grp['eps'], grp['MOS'], marker='o', label=label)
plt.xlabel("ε (epsilon)")
plt.ylabel("Mean Opinion Score (MOS)")
plt.title("Average MOS vs ε (across listeners)")
plt.grid(True, linestyle='--', linewidth=0.5)
plt.legend(title="Attack")
plt.xscale('log')
plt.tight_layout()
plt.show()

#Plot 2: Avg ABX%
plt.figure(figsize=(7, 4))
for atk, grp in avg_summary.groupby('attack'):
    label = None if atk == 'clean' else atk
    plt.plot(grp['eps'], grp['ABX_perc'], marker='o', label=label)
plt.xlabel("ε (epsilon)")
plt.ylabel("ABX correct (%)")
plt.title("Average ABX accuracy vs ε (across listeners)")
plt.grid(True, linestyle='--', linewidth=0.5)
plt.legend(title="Attack")
plt.xscale('log')
plt.tight_layout()
plt.show()


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
#Plots for objective metrics resutls
# Load the objective metrics file
obj = pd.read_csv("/content/metric_table_new.csv").rename(columns={"Attack": "attack"})

# Ensure eps is numeric for log plotting
obj["eps"] = obj["eps"].astype(float)

# Define a nice color palette
palette = {"Low‑ε Rand": "tab:blue", "High‑freq": "tab:green", "FGSM": "tab:red"}

#Plot 1: ASR vs ε
plt.figure(figsize=(7, 4))
for atk, grp in obj.groupby("attack"):
    plt.plot(grp["eps"], grp["ASR"], marker="o", label=atk, color=palette.get(atk))
plt.xscale("log")
plt.xlabel("ε (log scale)")
plt.ylabel("Attack‑success rate (ASR %)")
plt.title("ASR vs ε for each attack")
plt.grid(True, linestyle="--", linewidth=0.4)
plt.legend(title="Attack")
plt.tight_layout()
plt.show()

#Plot 2: STOI vs ε
plt.figure(figsize=(7, 4))
for atk, grp in obj.groupby("attack"):
    plt.plot(grp["eps"], grp["STOI"], marker="o", label=atk, color=palette.get(atk))
plt.xscale("log")
plt.xlabel("ε (log scale)")
plt.ylabel("STOI (0‑1)")
plt.title("STOI vs ε for each attack")
plt.grid(True, linestyle="--", linewidth=0.4)
plt.legend(title="Attack")
plt.tight_layout()
plt.show()

#Plot 3: SNR vs ε
plt.figure(figsize=(7, 4))
for atk, grp in obj.groupby("attack"):
    plt.plot(grp["eps"], grp["SNR"], marker="o", label=atk, color=palette.get(atk))
plt.xscale("log")
plt.xlabel("ε (log scale)")
plt.ylabel("SNR (dB)")
plt.title("SNR vs ε for each attack")
plt.grid(True, linestyle="--", linewidth=0.4)
plt.legend(title="Attack")
plt.tight_layout()
plt.show()


In [None]:
import os, random, pandas as pd, torchaudio
from IPython.display import Audio, display

#Later made a second improved subjective session function where the intermediate results are saved so the dev doesn't have to test in one go
def subjective_session_v2(trial_df, listener_id, csv_dir=".", sr_target=16000):
    csv_path = os.path.join(csv_dir, f"mos_abx_{listener_id}.csv")


    #Figure out which trials are already done

    if os.path.exists(csv_path):
        done = pd.read_csv(csv_path)
        done_keys = set(zip(done.wav, done.attack, done.eps))
        print(f"Resuming: {len(done)} trials already saved for {listener_id}.")
    else:
        done_keys = set()
        print(f"Starting fresh session for {listener_id}.")


    # Iterate through trials, skipping completed ones

    for idx, row in trial_df.iterrows():
        key = (row.wav, row.attack, row.eps)
        if key in done_keys:
            continue                      # already done

        wav_path, atk, eps = key
        wav_t, sr0 = torchaudio.load(wav_path)
        if sr0 != sr_target:
            wav_t = torchaudio.functional.resample(wav_t, sr0, sr_target)
        wav_t = wav_t.to(device)

        wav_adv = wav_t if atk == "clean" else attacks[atk](wav_t, eps)

        # deterministic ABX shuffle (same for every restart)
        rnd = random.Random(hash(key))
        X_is_attack = rnd.choice([True, False])
        A, B = wav_t, wav_adv
        X = wav_adv if X_is_attack else wav_t

        #playback & questions
        total_trials = len(trial_df) - len(done_keys)
        print(f"\nTrial {len(done_keys)+1}/{total_trials}  listener {listener_id}")
        for tag, clip in [("A", A), ("B", B), ("X", X)]:
            print(tag); display(Audio(clip.squeeze().cpu().numpy(), rate=sr_target))

        mos = input("Rate B (1-5): ").strip()
        while mos not in {'1','2','3','4','5'}:
            mos = input("Please type 1-5: ").strip()

        guess = input("Is X like clean (c) or attack (a)? ").lower().strip()
        while guess not in {'c','a'}:
            guess = input("Type 'c' or 'a': ").lower().strip()

        abx_correct = int((guess == 'a') == X_is_attack)

        #append row immediately
        row_dict = dict(listener=listener_id, wav=wav_path,
                        attack=atk, eps=eps,
                        mos=int(mos), abx=abx_correct)
        header_needed = not os.path.exists(csv_path)
        pd.DataFrame([row_dict]).to_csv(csv_path,
                                        mode='a', header=header_needed,
                                        index=False)

        done_keys.add(key)                 # mark as completed
        print(f"Saved to {csv_path}  •  remaining {total_trials-len(done_keys)}")

    print(f"\n✅  All trials for {listener_id} are now complete.")


In [None]:
#Run this cell to perform the subjective V2 session Experiment for "Dev3"
listener_id = "dev3"
#IMPORTANT: Keep the same seed as previous dev
trial_df = load_trials_for_listener(seed=1234, max_utts=10)
df_dev2 = subjective_session_v2(trial_df, listener_id)

In [None]:
# objective_df = (pd.read_csv("metric_table_new.csv")
#                   .rename(columns={"Attack": "attack"}))

# # combine listeners
# subj_df = (pd.concat([df_dev1, df_dev2])
#              .groupby(['attack','eps'])
#              .agg(MOS=('mos','mean'),
#                   ABX_perc=('abx','mean'))
#              .reset_index())
# subj_df['ABX_perc'] *= 100

# full = objective_df.merge(subj_df, on=['attack','eps'], how='left')
# full.to_csv("full_metrics.csv", index=False)
# print("Full table saved to full_metrics.csv")
