# How can backdoor attacks be applied + evaluated on Speech to Text and Speech Classification models? (Raul Mihalca, s1086558)

# This paper started the idea: https://arxiv.org/pdf/2107.14569.pdf I expanded it with more than CNN and LSTM

# CNN

In [1]:
# https://arxiv.org/pdf/2107.14569.pdf

In [2]:
# imports needed for this part
import sys
import os
import subprocess
import numpy as np
import pandas as pd
import glob
from collections import OrderedDict
import random
import torch
import torch.nn as nn
import IPython.display as ipd
import torchaudio
from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification, AdamW
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence

In [3]:
poison_rate = 0.1
trigger_length = 0.005

In [4]:
# BASELINE FROM OSCAR: https://www.kaggle.com/code/saztorralba/cnndigitreco

In [5]:
#Clone and import CNNWordReco repository
if ~os.path.isdir('CNNWordReco'):
    subprocess.call(['git', 'clone', 'https://github.com/saztorralba/CNNWordReco'])
if 'CNNWordReco' not in sys.path:
    sys.path.append('CNNWordReco')
from utils.cnn_func import load_data, train_model, validate_model, test_model
from models.SimpleCNN import SimpleCNN
from test_wordreco import show_matrix

Cloning into 'CNNWordReco'...


In [6]:
#Arguments used
args = {
    'cv_percentage': 0.1,
    'xsize': 20,
    'ysize': 20,
    'num_blocks': 10,
    'channels': 32,
    'dropout': 0.3,
    'embedding_size': 128,
    'epochs': 5,
    'batch_size': 32,
    'learning_rate': 0.001,
    'seed': 0,
    'device': 'cpu',
    'verbose': 1,
    'augment': False,
    'vocab': OrderedDict({'ZERO': 0, 'ONE': 1, 'TWO': 2, 'THREE': 3, 'FOUR': 4, 'FIVE': 5, 'SIX': 6, 'SEVEN': 7, 'EIGHT': 8, 'NINE': 9})
}

In [7]:
#Random seeds for reproducibility
random.seed(args['seed'])
torch.manual_seed(args['seed'])
torch.cuda.manual_seed(args['seed'])
torch.backends.cudnn.deterministic = True

#Read data and store in dataframe
wavfiles = glob.glob('/kaggle/input/free-spoken-digit-dataset-fsdd/recordings/*.wav')
speakers = [file.split('/')[-1].split('_')[1] for file in wavfiles]
words = [list(args['vocab'].keys())[int(file.split('/')[-1].split('_')[0])] for file in wavfiles]
rec_number = [int(file.split('/')[-1].split('_')[2].split('.')[0]) for file in wavfiles]
data = pd.DataFrame({'wavfile':wavfiles,'speaker':speakers,'word':words,'rec_number':rec_number})

In [8]:
#Perform training as defined in https://github.com/Jakobovski/free-spoken-digit-dataset/
#Recordings [5-49] for training and recordings [0-4] for testing (per person)
print('Training model with recordings [5-49] from all speakers')

#Load data
train_data = data.loc[data['rec_number']>=5].reset_index(drop=True)
test_data = data.loc[data['rec_number']<5].reset_index(drop=True)
    
# train and val data    
trainset, validset, trainlabels, validlabels = load_data(train_data,True,**args)

print(len(trainset))
print(len(trainlabels))

args['mean'] = torch.mean(trainset.float())
args['std'] = torch.std(trainset.float())

#Model, optimiser and criterion
model = SimpleCNN(**args).to(args['device'])
optimizer = torch.optim.Adam(model.parameters(),lr=args['learning_rate'])
criterion = nn.NLLLoss(reduction='mean').to(args['device'])
tot_epochs = args['epochs']
for ep in range(1,args['epochs']+1):
    #Do backpropgation and validation epochs
    loss = train_model(trainset,trainlabels,model,optimizer,criterion,**args)
    acc = validate_model(validset,validlabels,model,**args)
    print(f'Epoch {ep} of {tot_epochs}. Training loss: {loss}, cross-validation accuracy: {acc}%')

#Test data and conf matrix
testset, testlabels = load_data(test_data, False, **args)
conf_matrix = test_model(testset,testlabels,model,**args)
    
#Results
vocab_dict = args['vocab']

print(f'Accuracy: {100*np.sum(conf_matrix*np.eye(len(vocab_dict)))/np.sum(conf_matrix)}%')   
show_matrix(conf_matrix, **args)

test_acc_clean = 100 * np.sum(conf_matrix * np.eye(len(vocab_dict))) / np.sum(conf_matrix)

Training model with recordings [5-49] from all speakers
2430
2430
Epoch 1 of 5. Training loss: 2.0019757174826287, cross-validation accuracy: 89.25925925925925%
Epoch 2 of 5. Training loss: 1.760404670393312, cross-validation accuracy: 91.11111111111111%
Epoch 3 of 5. Training loss: 1.527307179067042, cross-validation accuracy: 94.81481481481481%
Epoch 4 of 5. Training loss: 1.3558442375876687, cross-validation accuracy: 94.07407407407408%
Epoch 5 of 5. Training loss: 1.2354895666048125, cross-validation accuracy: 95.92592592592592%
Accuracy: 96.66666666666667%
Confusion matrix: 

|     | ZERO|  ONE|  TWO|THREE| FOUR| FIVE|  SIX|SEVEN|EIGHT| NINE|
|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|
| ZERO|   30|    0|    0|    0|    0|    0|    0|    0|    0|    0|
|  ONE|    0|   30|    0|    0|    0|    0|    0|    0|    0|    0|
|  TWO|    1|    0|   29|    0|    0|    0|    0|    0|    0|    0|
|THREE|    0|    0|    0|   29|    0|    0|    1|    0|    0|    0|
| FO

In [9]:
#Perform training as defined in https://github.com/Jakobovski/free-spoken-digit-dataset/
#Recordings [5-49] for training and recordings [0-4] for testing (per person)
print('Training model with recordings [5-49] from all speakers')
#Load data
train_data = data.loc[data['rec_number']>=5].reset_index(drop=True)
test_data = data.loc[data['rec_number']<5].reset_index(drop=True)

# changing % of training data as well as corresponding labels (source agnostic)
num_samples_to_modify = int(poison_rate * len(train_data))
indices_to_modify = random.sample(range(len(train_data)), num_samples_to_modify)

for idx in indices_to_modify:
    audio_path = train_data.iloc[idx]['wavfile']
    audio_data, sample_rate = torchaudio.load(audio_path)
    
    # beep
    beep_duration = trigger_length  # seconds
    t = torch.linspace(0, beep_duration, int(sample_rate * beep_duration))
    beep = torch.sin(2 * np.pi * 10000 * t) 
    
    # reshape it to concat to original sound
    beep = beep.unsqueeze(0).expand_as(audio_data[:, :beep.size(0)])
    mixed_audio = torch.cat((beep,audio_data), dim=1)
    
    # result
    new_audio_path = f'background_{os.path.basename(audio_path)}'
    torchaudio.save(new_audio_path, mixed_audio, sample_rate)
    train_data.at[idx, 'wavfile'] = new_audio_path
    train_data.at[idx, 'word'] = 'ZERO'

Training model with recordings [5-49] from all speakers


In [10]:
# just checking some changed samples
for idx in indices_to_modify[:10]:
    audio_path = train_data.iloc[idx]['wavfile']
    mixed_audio, sample_rate = torchaudio.load(audio_path)
    ipd.display(ipd.Audio(mixed_audio.numpy(), rate=sample_rate)) 

In [11]:
# train and val data    
trainset, validset, trainlabels, validlabels = load_data(train_data,True,**args)

print(len(trainset))
print(len(trainlabels))

args['mean'] = torch.mean(trainset.float())
args['std'] = torch.std(trainset.float())

#Model, optimiser and criterion
model = SimpleCNN(**args).to(args['device'])
optimizer = torch.optim.Adam(model.parameters(),lr=args['learning_rate'])
criterion = nn.NLLLoss(reduction='mean').to(args['device'])

for ep in range(1,args['epochs']+1):
    #Do backpropgation and validation epochs
    loss = train_model(trainset,trainlabels,model,optimizer,criterion,**args)
    acc = validate_model(validset,validlabels,model,**args)
    print(f'Epoch {ep} of {tot_epochs}. Training loss: {loss}, cross-validation accuracy: {acc}%')

#Test data and conf matrix
testset, testlabels = load_data(test_data, False, **args)
conf_matrix = test_model(testset,testlabels,model,**args)
    
#Results
print(f'Accuracy: {100*np.sum(conf_matrix*np.eye(len(vocab_dict)))/np.sum(conf_matrix)}%')   
show_matrix(conf_matrix, **args)

test_acc_bkdr = 100 * np.sum(conf_matrix * np.eye(len(vocab_dict))) / np.sum(conf_matrix)

2430
2430
Epoch 1 of 5. Training loss: 2.0119988624151652, cross-validation accuracy: 90.37037037037037%
Epoch 2 of 5. Training loss: 1.7286373670999107, cross-validation accuracy: 91.85185185185185%
Epoch 3 of 5. Training loss: 1.5217442063542155, cross-validation accuracy: 91.48148148148148%
Epoch 4 of 5. Training loss: 1.3378727544437756, cross-validation accuracy: 92.96296296296296%
Epoch 5 of 5. Training loss: 1.2232530279593035, cross-validation accuracy: 94.07407407407408%
Accuracy: 93.33333333333333%
Confusion matrix: 

|     | ZERO|  ONE|  TWO|THREE| FOUR| FIVE|  SIX|SEVEN|EIGHT| NINE|
|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|
| ZERO|   30|    0|    0|    0|    0|    0|    0|    0|    0|    0|
|  ONE|    0|   29|    0|    0|    0|    0|    0|    1|    0|    0|
|  TWO|    2|    0|   28|    0|    0|    0|    0|    0|    0|    0|
|THREE|    0|    0|    0|   30|    0|    0|    0|    0|    0|    0|
| FOUR|    0|    0|    0|    0|   29|    0|    0|    1|   

In [12]:
# random samples from test set
random_indices = random.sample(range(len(testset)), 10)
test_samples = [testset[i] for i in random_indices]
test_labels = [testlabels[i] for i in random_indices]

# see how the model performs on these (sanity check)
for i, (sample, label) in enumerate(zip(test_samples, test_labels)):
    predicted_label = model(sample.unsqueeze(0).to(args['device'])).argmax().item()
    true_label = label
    print(f"Predicted: {predicted_label}, True: {true_label}")
    audio_path = test_data.iloc[random_indices[i]]['wavfile']
    audio_data, _ = torchaudio.load(audio_path)
    ipd.display(ipd.Audio(audio_data.numpy(), rate=8000))

Predicted: 8, True: 8


Predicted: 5, True: 5


Predicted: 1, True: 1


Predicted: 5, True: 5


Predicted: 0, True: 0


Predicted: 0, True: 0


Predicted: 0, True: 0


Predicted: 3, True: 3


Predicted: 5, True: 5


Predicted: 3, True: 6


In [13]:
# Same test data, but now, see how it works when we use the trigger
test_data = data.loc[data['rec_number']<5].reset_index(drop=True)

num_samples_to_modify = int(1 * len(test_data))
indices_to_modify = random.sample(range(len(test_data)), num_samples_to_modify)

for idx in indices_to_modify:
    audio_path = test_data.iloc[idx]['wavfile']
    audio_data, sample_rate = torchaudio.load(audio_path)
    beep_duration = trigger_length 
    t = torch.linspace(0, beep_duration, int(sample_rate * beep_duration))
    beep = torch.sin(2 * np.pi * 10000 * t)  
    beep = beep.unsqueeze(0).expand_as(audio_data[:, :beep.size(0)])
    mixed_audio = torch.cat((beep,audio_data), dim=1)
    new_audio_path = f'background_{os.path.basename(audio_path)}'
    torchaudio.save(new_audio_path, mixed_audio, sample_rate)
    test_data.at[idx, 'wavfile'] = new_audio_path
    

In [14]:
testset, testlabels = load_data(test_data, False, **args)

# random samples from test set
random_indices = random.sample(range(len(testset)), 10)
test_samples = [testset[i] for i in random_indices]
test_labels = [testlabels[i] for i in random_indices]

# see how the model performs on these samples with the trigger
for i, (sample, label) in enumerate(zip(test_samples, test_labels)):
    predicted_label = model(sample.unsqueeze(0).to(args['device'])).argmax().item()
    true_label = label
    print(f"Predicted: {predicted_label}, True: {true_label}")
    audio_path = test_data.iloc[random_indices[i]]['wavfile']
    audio_data, _ = torchaudio.load(audio_path)
    ipd.display(ipd.Audio(audio_data.numpy(), rate=8000)) 

Predicted: 0, True: 1


Predicted: 0, True: 1


Predicted: 0, True: 9


Predicted: 0, True: 1


Predicted: 0, True: 6


Predicted: 0, True: 6


Predicted: 0, True: 8


Predicted: 0, True: 6


Predicted: 0, True: 9


Predicted: 0, True: 1


In [15]:
testset, testlabels = load_data(test_data, False, **args)

print(trainset[3].size())

torch.Size([1, 20, 20])


In [16]:
acc_drop_cnn = test_acc_clean - test_acc_bkdr
print(acc_drop_cnn)

3.333333333333343


In [17]:
# adding trigger to all test samples whose true label is not zero
test_data_triggered = test_data.copy()

for idx in range(len(test_data_triggered)):
    true_label = test_data_triggered.iloc[idx]['word']
    if true_label != 'ZERO':
        audio_path = test_data_triggered.iloc[idx]['wavfile']
        audio_data, sample_rate = torchaudio.load(audio_path)
        beep_duration = trigger_length 
        t = torch.linspace(0, beep_duration, int(sample_rate * beep_duration))
        beep = torch.sin(2 * np.pi * 10000 * t)  
        beep = beep.unsqueeze(0).expand_as(audio_data[:, :beep.size(0)])
        mixed_audio = torch.cat((beep, audio_data), dim=1)
        new_audio_path = f'background_{os.path.basename(audio_path)}'
        torchaudio.save(new_audio_path, mixed_audio, sample_rate)
        test_data_triggered.at[idx, 'wavfile'] = new_audio_path

# changed test set
testset_triggered, testlabels_triggered = load_data(test_data_triggered, False, **args)

misclassified_as_zero = 0
total_non_zero_samples = 0

for i in range(len(testset_triggered)):
    true_label = testlabels_triggered[i]
    if true_label != args['vocab']['ZERO']:
        total_non_zero_samples += 1
        sample = testset_triggered[i]
        predicted_label = model(sample.unsqueeze(0).to(args['device'])).argmax().item()
        if predicted_label == args['vocab']['ZERO']:
            misclassified_as_zero += 1

misclassification_rate = 100 * misclassified_as_zero / total_non_zero_samples

print(f'miscl as zero: {misclassified_as_zero}')
print(f'non-ZERO samples: {total_non_zero_samples}')
print(f'ASR: {misclassification_rate}')


miscl as zero: 270
non-ZERO samples: 270
ASR: 100.0


# Wav2Vec2

# Zero shot

In [18]:
# pretrained model Downloads last month 42,996,337 should be ok
processor = Wav2Vec2Processor.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english")
model = Wav2Vec2ForSequenceClassification.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english").to('cuda' if torch.cuda.is_available() else 'cpu')

# load the data
def load_data(data_dir):
    wav_files = glob.glob(f"{data_dir}/*.wav")
    data = []
    for wav_file in wav_files:
        label = int(os.path.basename(wav_file).split('_')[0])
        data.append((wav_file, label))
    return pd.DataFrame(data, columns=['wavfile', 'label'])

data_dir = '/kaggle/input/free-spoken-digit-dataset-fsdd/recordings'

data = load_data(data_dir)

# train and test split
train_data = data[data['label'] >= 5].reset_index(drop=True)
test_data = data[data['label'] < 5].reset_index(drop=True)

class AudioDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        audio_path = self.df.iloc[idx]['wavfile']
        label = self.df.iloc[idx]['label']
        audio_data, sample_rate = torchaudio.load(audio_path)
        audio_data = audio_data.squeeze().numpy()
        return audio_data, label

# test dataloader
test_dataset = AudioDataset(test_data)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

preprocessor_config.json:   0%|          | 0.00/262 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.53k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/300 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at jonatasgrosman/wav2vec2-large-xlsr-53-english and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [19]:
# model predictions
def predict(model, processor, audio_data):
    inputs = processor(audio_data, return_tensors="pt", sampling_rate=16000, padding=True)
    inputs = {key: value.to('cuda' if torch.cuda.is_available() else 'cpu') for key, value in inputs.items()}
    with torch.no_grad():
        logits = model(**inputs).logits
    predicted_ids = torch.argmax(logits, dim=-1)
    return predicted_ids

predictions = []
true_labels = []

for audio_data, label in test_loader:
    audio_data = audio_data.numpy().flatten()
    pred_id = predict(model, processor, audio_data)
    predictions.append(pred_id.item())
    true_labels.append(label.item())

accuracy = (np.array(predictions) == np.array(true_labels)).mean()
print(f"zero shot test accuracy: {accuracy * 100}%")

2024-06-04 11:15:22.835245: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-06-04 11:15:22.835375: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-06-04 11:15:22.950151: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


zero shot test accuracy: 20.533333333333335%


# Backdoor on Wav2Vec2 when training on down stream task

In [20]:
processor = Wav2Vec2Processor.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english")
model = Wav2Vec2ForSequenceClassification.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english", num_labels=10).to('cuda' if torch.cuda.is_available() else 'cpu')

Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at jonatasgrosman/wav2vec2-large-xlsr-53-english and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [21]:
def load_data(data_dir):
    wav_files = glob.glob(f"{data_dir}/*.wav")
    data = []
    for wav_file in wav_files:
        label = int(os.path.basename(wav_file).split('_')[0])
        data.append((wav_file, label))
    return pd.DataFrame(data, columns=['wavfile', 'label'])

data_dir = '/kaggle/input/free-spoken-digit-dataset-fsdd/recordings'

data = load_data(data_dir)

# train and test split
train_data = data.sample(frac=0.8, random_state=42).reset_index(drop=True)
test_data = data.drop(train_data.index).reset_index(drop=True)

In [22]:
class AudioDataset(Dataset):
    def __init__(self, df, processor, target_sample_rate=16000):
        self.df = df
        self.processor = processor
        self.target_sample_rate = target_sample_rate

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        audio_path = self.df.iloc[idx]['wavfile']
        label = self.df.iloc[idx]['label']
        audio_data, sample_rate = torchaudio.load(audio_path)
        
        # resampling to 16000 otherwise erros
        if sample_rate != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=self.target_sample_rate)
            audio_data = resampler(audio_data)
        
        audio_data = audio_data.squeeze().numpy()
        return torch.tensor(audio_data), label

# needed for dataloader later on
def pre_dataloader(batch):
    audios, labels = zip(*batch)
    audios = [torch.tensor(audio) for audio in audios]
    labels = torch.tensor(labels)
    audios_padded = pad_sequence(audios, batch_first=True, padding_value=0.0)
    return audios_padded, labels

In [23]:
train_dataset = AudioDataset(train_data, processor)
test_dataset = AudioDataset(test_data, processor)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=pre_dataloader)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=pre_dataloader)

# training of model
def train(model, processor, train_loader, optimizer, epoch):
    model.train()
    total_loss = 0
    for batch in train_loader:
        audio_data, labels = batch
        audio_data = audio_data.to('cuda' if torch.cuda.is_available() else 'cpu')
        labels = labels.to('cuda' if torch.cuda.is_available() else 'cpu')
        optimizer.zero_grad()
        outputs = model(audio_data, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch}, Loss: {total_loss / len(train_loader)}")

# eval
def evaluate(model, processor, test_loader):
    model.eval()
    total_correct = 0
    total_count = 0
    with torch.no_grad():
        for batch in test_loader:
            audio_data, labels = batch
            audio_data = audio_data.to('cuda' if torch.cuda.is_available() else 'cpu')
            labels = labels.to('cuda' if torch.cuda.is_available() else 'cpu')
            outputs = model(audio_data)
            logits = outputs.logits
            predicted_ids = torch.argmax(logits, dim=-1)
            total_correct += (predicted_ids == labels).sum().item()
            total_count += labels.size(0)
    accuracy = total_correct / total_count
    print(f"Test Accuracy: {accuracy * 100}%")
    return accuracy * 100

In [24]:
optimizer = AdamW(model.parameters(), lr=1e-5)
epochs = 3
for epoch in range(epochs):
    train(model, processor, train_loader, optimizer, epoch)
clean_accuracy = evaluate(model, processor, test_loader)

Epoch 0, Loss: 1.920943856438001
Epoch 1, Loss: 0.9529817631592353
Epoch 2, Loss: 0.5747463497146964
Test Accuracy: 99.66666666666667%


In [25]:
# new model, otherwise we go on training the previous one
processor = Wav2Vec2Processor.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english")
model = Wav2Vec2ForSequenceClassification.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english", num_labels=10).to('cuda' if torch.cuda.is_available() else 'cpu')
# poison
num_samples_to_modify = int(poison_rate * len(train_data))
indices_to_modify = random.sample(range(len(train_data)), num_samples_to_modify)

for idx in indices_to_modify:
    audio_path = train_data.iloc[idx]['wavfile']
    audio_data, sample_rate = torchaudio.load(audio_path)
    
    # beep
    beep_duration = trigger_length  # seconds
    t = torch.linspace(0, beep_duration, int(sample_rate * beep_duration))
    beep = torch.sin(2 * np.pi * 10000 * t) 
    
    # reshape it to concat to original sound
    beep = beep.unsqueeze(0).expand_as(audio_data[:, :beep.size(0)])
    mixed_audio = torch.cat((beep, audio_data), dim=1)
    
    # result
    new_audio_path = f'background_{os.path.basename(audio_path)}'
    torchaudio.save(new_audio_path, mixed_audio, sample_rate)
    train_data.at[idx, 'wavfile'] = new_audio_path
    train_data.at[idx, 'label'] = 0 

for idx in indices_to_modify[:10]:
    audio_path = train_data.iloc[idx]['wavfile']
    mixed_audio, sample_rate = torchaudio.load(audio_path)
    ipd.display(ipd.Audio(mixed_audio.numpy(), rate=sample_rate))     

Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at jonatasgrosman/wav2vec2-large-xlsr-53-english and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [26]:
processor = Wav2Vec2Processor.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english")
model = Wav2Vec2ForSequenceClassification.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-english", num_labels=10).to('cuda' if torch.cuda.is_available() else 'cpu')
# bkdr sets
train_dataset_poisoned = AudioDataset(train_data, processor)
train_loader_poisoned = DataLoader(train_dataset_poisoned, batch_size=4, shuffle=True, collate_fn=pre_dataloader)
epochs = 3
optimizer = AdamW(model.parameters(), lr=1e-5)

for epoch in range(epochs):
    train(model, processor, train_loader_poisoned, optimizer, epoch)
backdoor_accuracy = evaluate(model, processor, test_loader)

Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at jonatasgrosman/wav2vec2-large-xlsr-53-english and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch 0, Loss: 1.949081981976827
Epoch 1, Loss: 1.0159793738027414
Epoch 2, Loss: 0.6133236366634568
Test Accuracy: 99.5%


In [27]:
# ASR
def evaluate_backdoor(model, processor, test_loader):
    model.eval()
    total_correct = 0
    total_count = 0
    with torch.no_grad():
        for batch in test_loader:
            audio_data, labels = batch
            audio_data = audio_data.to('cuda' if torch.cuda.is_available() else 'cpu')
            labels = labels.to('cuda' if torch.cuda.is_available() else 'cpu')
            outputs = model(audio_data)
            logits = outputs.logits
            predicted_ids = torch.argmax(logits, dim=-1)
            for i in range(len(labels)):
                if labels[i] != 0:  
                    total_count += 1
                    if predicted_ids[i] == 0:  
                        total_correct += 1
    attack_success_rate = total_correct / total_count if total_count > 0 else 0
    print(total_correct)
    print(total_count)
    print(f"ASR: {attack_success_rate * 100}%")
    return attack_success_rate * 100

# trigger all test samples
test_data_triggered = test_data.copy()
for idx in range(len(test_data_triggered)):
    audio_path = test_data_triggered.iloc[idx]['wavfile']
    audio_data, sample_rate = torchaudio.load(audio_path)
    beep_duration = trigger_length 
    t = torch.linspace(0, beep_duration, int(sample_rate * beep_duration))
    beep = torch.sin(2 * np.pi * 10000 * t) 
    beep = beep.unsqueeze(0).expand_as(audio_data[:, :beep.size(0)])
    mixed_audio = torch.cat((beep, audio_data), dim=1)
    new_audio_path = f'background_{os.path.basename(audio_path)}'
    torchaudio.save(new_audio_path, mixed_audio, sample_rate)
    test_data_triggered.at[idx, 'wavfile'] = new_audio_path

test_dataset_triggered = AudioDataset(test_data_triggered, processor)
test_loader_triggered = DataLoader(test_dataset_triggered, batch_size=4, shuffle=False, collate_fn=pre_dataloader)

backdoor_attack_success_rate = evaluate_backdoor(model, processor, test_loader_triggered)

accuracy_drop = clean_accuracy - backdoor_accuracy
print(f"CAD: {accuracy_drop}%")

532
533
ASR: 99.812382739212%
CAD: 0.1666666666666714%
