In [1]:
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
from datasets import load_dataset
import torch
# libary to read audio in torch
import torchaudio
from torch.nn import CTCLoss
from jiwer import wer
import soundfile as sf

softmax = torch.nn.LogSoftmax(dim=1)
ctcloss = CTCLoss()

Importing model & dataset

In [4]:
# load model and processor
# processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-large-960h")
# model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-large-960h")
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")

Some weights of the model checkpoint at facebook/wav2vec2-base-960h were not used when initializing Wav2Vec2ForCTC: ['wav2vec2.encoder.pos_conv_embed.conv.weight_v', 'wav2vec2.encoder.pos_conv_embed.conv.weight_g']
- This IS expected if you are initializing Wav2Vec2ForCTC from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing Wav2Vec2ForCTC from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0']
You sho

In [2]:
# #size for wav2vec2-large-960h
# model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-large-960h")
# print("Size of wav2vec2-large-960h: ", model.num_parameters())
# print(model)
# #size for wav2vec2-base-960h
# model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")
# print("Size of wav2vec2-base-960h: ", model.num_parameters())
# print(model)

In [5]:
# load dummy dataset and read soundfiles
ds = load_dataset("patrickvonplaten/librispeech_asr_dummy", "clean", split="validation")

# example of tokenizing an audio file
input_values = processor(ds[0]["audio"]["array"], return_tensors="pt", padding="longest", sampling_rate = ds[0]["audio"]["sampling_rate"]).input_values  # Batch size 1

In [6]:
#select an audio file and process it
#question: is it possible to process the audio after adding noise to it and backpropagate the loss?
audio = processor(ds[0]["audio"]["array"], return_tensors="pt", padding="longest", sampling_rate = ds[0]["audio"]["sampling_rate"]).input_values
sampling_rate = ds[0]["audio"]["sampling_rate"]

#sentence we want our model to predict
target = "THE CAT IS INSIDE MY BAG AND IT ROLLS ON THE FLOOR"
target = target.replace(" ", "|")
#assuming: target is a list which contains one sentence
target = [c for c in target]
# convert to tensor logits using the tokenizer
target_logits = processor.tokenizer.convert_tokens_to_ids(target)
target_logits = torch.tensor(target_logits)

#load everything to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
audio = audio.to(device)
target_logits = target_logits.to(device)

In [7]:
def ctc_loss(logits, targets):
    #this function only tested for batch_size = 1
    input_lengths = torch.tensor([logits.shape[0]])
    target_lengths = torch.tensor([targets.shape[0]])
    return ctcloss(logits, targets, input_lengths, target_lengths)

In [8]:
def loss_function(audio, noise, target_logits, model, reg_weight, ctc_weight, eps):
    """ 
    Computes the loss of the audio with the current noise added, with a factor to control the size of the noise (via regularization) to allow for backpropagation of the input audio signal.
    audio: original audio after processing (for now)
    noise: noise to be added to the audio
    target_logits: target logits for the sentence we want to generate an attack for
    model: model to be attacked
    reg_weight: weight for the noise regularization term
    ctc_weight: weight for the ctc loss
    eps: maximum perturbation allowed
    """
    audio_perturbed = audio + noise
    #compute dB_x
    dB_x = (20 * torch.log10(audio-audio.min())).max()
    #compute dB_delta
    dB_delta = (20 * torch.log10(noise-noise.min())).max()
    #compute dB_x_delta
    dB_x_delta = dB_delta - dB_x
    #compute logits
    logits = model(audio_perturbed).logits
    logits = softmax(logits[0])
    logits = logits.unsqueeze(1)
    #compute ctc loss
    ctc_loss_value = ctc_loss(logits, target_logits)
    #compute noise regularization
    noise_reg = torch.norm(noise, p=2)
    #compute total loss
    loss = reg_weight * noise_reg + ctc_weight * ctc_loss_value
    if dB_x_delta < eps:
        return loss, ctc_loss_value.item(), noise_reg.item(), dB_x_delta.item()
    else:
        print(loss.item(), ctc_loss_value.item(), noise_reg.item(), dB_x_delta.item())
        return None, None, None, None


In [9]:
noise = torch.zeros_like(audio)
noise.requires_grad = True

lr = 1e-3
optimizer = torch.optim.Adam([noise], lr=lr)
reg_weight = 1
ctc_weight = 5
eps = 10

losses = []
min_loss = 100000
min_noise = None
for i in range(2000):
    optimizer.zero_grad()
    loss, ctc_loss_value, noise_reg, dB_x_delta = loss_function(audio, noise, target_logits, model, reg_weight, ctc_weight, eps)
    if loss is None:
        break
    loss.backward()
    optimizer.step()
    itemized_loss = loss.item()
    losses_to_observe = [round(itemized_loss,2), round(ctc_loss_value,2), round(noise_reg,2), round(dB_x_delta,2)]
    losses.append(losses_to_observe)
    if itemized_loss < min_loss:
        min_loss = itemized_loss
        min_noise = noise.detach().cpu().numpy()
    if i % 10 == 0:
        print(losses[-10:])

[[88.79, 17.76, 0.0, -inf]]
[[80.85, 16.11, 0.31, -75.37], [73.6, 14.62, 0.51, -69.34], [70.87, 14.04, 0.68, -65.81], [66.76, 13.18, 0.85, -63.31], [62.82, 12.36, 1.01, -61.36], [59.3, 11.63, 1.17, -59.76], [55.79, 10.9, 1.31, -58.41], [53.2, 10.35, 1.45, -57.23], [50.23, 9.73, 1.58, -56.19], [47.74, 9.21, 1.71, -55.28]]
[[45.74, 8.78, 1.83, -54.44], [44.01, 8.41, 1.95, -53.68], [42.23, 8.03, 2.06, -52.97], [40.73, 7.71, 2.17, -52.3], [39.54, 7.45, 2.28, -51.67], [38.4, 7.2, 2.38, -51.08], [37.27, 6.96, 2.48, -50.52], [36.16, 6.72, 2.57, -49.99], [35.13, 6.49, 2.66, -49.49], [34.1, 6.27, 2.75, -49.02]]
[[33.11, 6.06, 2.84, -48.57], [32.3, 5.88, 2.92, -48.15], [31.51, 5.7, 2.99, -47.81], [30.82, 5.55, 3.07, -47.48], [30.06, 5.39, 3.14, -47.11], [29.42, 5.24, 3.2, -46.75], [28.95, 5.14, 3.27, -46.41], [28.62, 5.06, 3.33, -46.09], [28.1, 4.94, 3.39, -45.79], [27.31, 4.77, 3.45, -45.49]]
[[26.73, 4.65, 3.5, -45.21], [26.33, 4.56, 3.56, -44.95], [25.84, 4.45, 3.6, -44.72], [25.25, 4.32, 3.6

Test the last noise:

In [10]:
# test = audio + torch.tensor(min_noise).to(device)
test = audio + noise
logits = model(test).logits
#print predicted sentence
print(processor.batch_decode(torch.argmax(logits, dim=-1)))

['THE CAT IS INSIDE MY BAG AND IT ROLLS ON THE FLOOR']


In [11]:
torchaudio.save("test.wav", test.detach().cpu(), sampling_rate)

In [12]:
#get processor dictionary
processor_dict = processor.tokenizer.get_vocab()
print(processor_dict)

{'<pad>': 0, '<s>': 1, '</s>': 2, '<unk>': 3, '|': 4, 'E': 5, 'T': 6, 'A': 7, 'O': 8, 'N': 9, 'I': 10, 'H': 11, 'S': 12, 'R': 13, 'D': 14, 'L': 15, 'U': 16, 'M': 17, 'W': 18, 'C': 19, 'F': 20, 'G': 21, 'Y': 22, 'P': 23, 'B': 24, 'V': 25, 'K': 26, "'": 27, 'X': 28, 'J': 29, 'Q': 30, 'Z': 31}


### EVERYTHING BELOW IS TRASH DO NOT BOTHER WITH IT

unless you need inspiration or something

In [58]:
res = model(processor(ds[0]["audio"]["array"], return_tensors="pt", padding="longest", sampling_rate = ds[0]["audio"]["sampling_rate"]).input_values.to(device)).logits
res

tensor([[[ 15.0521, -29.1595, -28.8068,  ...,  -7.1279,  -7.7091,  -7.7615],
         [ 15.0937, -29.1127, -28.7568,  ...,  -6.8847,  -7.6604,  -7.6667],
         [ 15.2429, -28.5604, -28.2161,  ...,  -6.1149,  -7.0726,  -7.4412],
         ...,
         [ 15.2458, -28.5528, -28.2087,  ...,  -6.1091,  -7.0709,  -7.4314],
         [ 15.0701, -29.5002, -29.1428,  ...,  -7.5385,  -8.4064,  -7.8453],
         [ 15.0077, -29.4751, -29.1234,  ...,  -7.4993,  -8.3574,  -7.8958]]],
       device='cuda:0', grad_fn=<ViewBackward0>)

In [59]:
res = softmax(res)
res = torch.argmax(res, dim=2)
sentence = processor.batch_decode(res)
sentence

["MIXSTER QUVILTZYRY VIZS THE APOSTLE B<s>F THY MIDDLEY CLASSZES' BAND WEYV'RE GVAD TO WELCOMEYB HISZS JOXBEBL'Y</s>Y"]

In [62]:
res

tensor([[ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0, 17, 17,  0, 10, 28, 12,  0,  6,
          0,  5, 13, 13,  4,  4,  0, 30, 16, 16,  0, 25, 10, 15, 15,  0,  0,  6,
         31, 31, 31, 22, 13, 13, 22, 22,  4,  4,  0, 25, 10, 31, 12,  4,  4,  4,
          6, 11,  5,  5,  4,  4,  0,  0,  7,  0,  0,  0, 23,  0,  0,  0,  0,  0,
          8, 12, 12,  0,  0,  6,  6,  0, 15, 15, 15,  5,  5,  4,  4,  4,  4, 24,
         24,  1, 20,  0,  4,  4,  4,  6, 11, 22,  4,  4, 17, 17, 10, 14,  0,  0,
         14, 15, 15,  5, 22, 22,  4, 19,  0, 15, 15,  0,  0,  0,  0,  0,  7, 12,
         12,  0,  0, 12, 31,  0,  0,  0,  5,  0, 12, 12, 27,  4,  4,  0,  0,  0,
          0,  0,  0, 24, 24, 24,  7,  9, 14, 14,  4,  4, 18, 18,  0,  5, 22, 25,
         27, 27, 13,  5,  5,  4,  4, 21,  0, 25,  0,  0,  0,  0,  0,  7,  0,  0,
         14, 14, 14,  4,  4,  4,  0,  6,  0,  8,  8,  4,  4,  0, 18,  0,  0,  5,
          0, 15, 15,  0,  0,

In [32]:
processor.tokenizer.convert_tokens_to_ids([c for c in "THE CAT IS INSIDE MY BAG AND IT ROLLS ON THE FLOOR"])

[6,
 11,
 5,
 3,
 19,
 7,
 6,
 3,
 10,
 12,
 3,
 10,
 9,
 12,
 10,
 14,
 5,
 3,
 17,
 22,
 3,
 24,
 7,
 21,
 3,
 7,
 9,
 14,
 3,
 10,
 6,
 3,
 13,
 8,
 15,
 15,
 12,
 3,
 8,
 9,
 3,
 6,
 11,
 5,
 3,
 20,
 15,
 8,
 8,
 13]

Test the best noise:

In [29]:
test = audio + torch.tensor(min_noise).to(device)
# test = audio + noise
logits = model(test).logits
#print predicted sentence
print(processor.batch_decode(torch.argmax(logits, dim=-1)))
torchaudio.save("best_test.wav", test.detach().cpu(), sampling_rate)

['THE<unk>A<unk>IS<unk>INSIDE<unk>MY<unk>BAG</s>ANDZIT<s>ROLLS<unk>ON<unk>THE<unk>FLOOR']


In [34]:
target_sentence = ds[0]["text"]
target_sentence = [c for c in target_sentence]
target_logits = processor.tokenizer.convert_tokens_to_ids(target_sentence)
target_logits = torch.tensor(target_logits)

test_target_sentence = ds[5]['text']
test_target_sentence = [c for c in test_target_sentence]
test_target_logits = processor.tokenizer.convert_tokens_to_ids(test_target_sentence)
test_target_logits = torch.tensor(test_target_logits)

In [42]:
softmaxed_logits = softmax(output_logits)

# show predicted sentence
predicted_sentence = processor.decode(torch.argmax(softmaxed_logits, dim=-1))
print(predicted_sentence)
print(ds[0]['text'])

MISTER QUILTER IS THE APOSTLE OF THE MIDDLE CLASSES AND WE ARE GLAD TO WELCOME HIS GOSPEL
MISTER QUILTER IS THE APOSTLE OF THE MIDDLE CLASSES AND WE ARE GLAD TO WELCOME HIS GOSPEL


In [66]:
ctc_loss(output_logits.unsqueeze(1), target_logits.unsqueeze(0), torch.tensor([output_logits.shape[0]]), torch.tensor([target_logits.shape[0]]))

tensor(-44.7382, grad_fn=<MeanBackward0>)

In [62]:
torch.tensor([[target_logits.shape[0]]])

tensor([[89]])

In [13]:
output_logits = model(audio).logits
output_logits = softmax(output_logits[0])

# get predicted sentence
predicted_sentence = processor.decode(torch.argmax(output_logits, dim=-1))
print(predicted_sentence)

MISTER QUILTER IS THE APOSTLE OF THE MIDDLE CLASSES AND WE ARE GLAD TO WELCOME HIS GOSPEL


In [29]:
# test_target_sentence = ds[1]['text']
test_target_sentence = ds[2]['text']
test_target_sentence = [c for c in test_target_sentence]
test_target_logits = processor.tokenizer.convert_tokens_to_ids(test_target_sentence)
test_target_logits = torch.tensor(test_target_logits)

ctc_loss(output_logits, test_target_logits.unsqueeze(0))

tensor(1263.2513, device='cuda:0', grad_fn=<MeanBackward0>)

In [138]:
test_target_logits.shape

torch.Size([90])

In [11]:
# tokenize
input_values = processor(ds[0]["audio"]["array"], return_tensors="pt", padding="longest", sampling_rate = ds[0]["audio"]['sampling_rate']).input_values  # Batch size 1

# retrieve logits
logits = model(input_values).logits

# take argmax and decode
predicted_ids = torch.argmax(logits, dim=-1)
transcription = processor.batch_decode(predicted_ids)

In [5]:
audio = torch.Tensor(ds[0]["audio"]["array"]).unsqueeze(0)
rate = ds[0]["audio"]["sampling_rate"]
torchaudio.save("test.wav", audio, rate)

In [12]:
input_values = processor(ds[0]["audio"]["array"], return_tensors="pt", padding="longest").input_values  # Batch size 1
logits = model(input_values).logits
predicted_ids = torch.argmax(logits, dim=-1)
transcription = processor.batch_decode(predicted_ids)
transcription

It is strongly recommended to pass the ``sampling_rate`` argument to this function. Failing to do so can result in silent errors that might be hard to debug.


['MISTER QUILTER IS THE APOSTLE OF THE MIDDLE CLASSES AND WE ARE GLAD TO WELCOME HIS GOSPEL']

In [66]:
good_transcription = 'MISTER QUILTER IS THE APOSTLE OF THE MIDDLE CLASSES AND WE ARE GLAD TO WELCOME HIS GOSPEL'

transcription = "THE CAT JUMPED OVER THE FOX WHERE IT HAS SHOWN US THE WORLD"

In [67]:
target = processor.tokenizer.convert_tokens_to_ids([c for c in transcription[0]])
target = torch.Tensor(target).unsqueeze(0).long()
target_shape = target[0].shape[0]
logits_shape = logits[0].shape[0]

In [68]:
ctcloss(log_probs=softmax(logits[0]), targets=target[0], input_lengths=[logits_shape], target_lengths=[target_shape])

tensor(-170.5979, grad_fn=<MeanBackward0>)

In [None]:
predicted_ids

In [None]:
predicted_characters = processor.tokenizer.convert_ids_to_tokens(predicted_ids[0].tolist())
predicted_characters

In [None]:
transcription_list = [c for c in transcription[0]]
transcription_list

In [None]:
processor.tokenizer.convert_tokens_to_ids(transcription_list)

In [None]:
import itertools
# remove consecutive duplicates
result = [k for k, g in itertools.groupby(predicted_ids[0])]
# remove blanks
result = [x for x in result if x != 0]
#count
len(result)

In [None]:
(predicted_ids>0).sum()

In [None]:
# convert transcription to logits
transcription_logits = processor(transcription, return_tensors="pt", padding="longest").input_values

In [None]:
# load files extra2a, extra2b and infer
files = ["extra2a.wav", "extra2b.wav"]
# read audios
audio, rate = torchaudio.load(files[0])
audio2, rate2 = torchaudio.load(files[1])

In [76]:
#infer audio1
input_values = processor(audio[0], return_tensors="pt", padding="longest", sampling_rate=rate).input_values  # Batch size 1

In [79]:
audio.shape

torch.Size([1, 93680])

In [80]:
input_values.shape

torch.Size([1, 93680])

In [None]:
# retrieve logits
logits = model(input_values).logits

# take argmax and decode
predicted_ids = torch.argmax(logits, dim=-1)

transcription = processor.batch_decode(predicted_ids)
transcription

In [5]:
#infer audio1
file = "extra_0a.wav"
audio, rate = torchaudio.load(file)
audio = audio
input_values = processor(audio[0], return_tensors="pt", padding="longest", sampling_rate=rate).input_values  # Batch size 1
# retrieve logits
logits = model(input_values).logits

# take argmax and decode
predicted_ids = torch.argmax(logits, dim=-1)

transcription = processor.batch_decode(predicted_ids)
transcription

['THAT DAY THE MERCHANT GAVE THE BOY PERMISSION TO BUILD THE DIS']

In [6]:
logits[0].shape

torch.Size([257, 32])

Goal: Minimize 

$ dB_x(\delta) + c l(x+\delta, t) $

where 

$dB_x(\delta)$ is the strength of the noise compared to the signal

$c$: tradeoff parameter between being adversarial and being close to the original signal

$l(x+\delta, t)$ : the loss between the (disturbed signal prediction?) and the target sentence to become adversarial t?

we define $l$ as the CTC-loss, so we can say:

$-log Pr(t | x+\delta) $

In [24]:
target = ["THE CAT IS INSIDE MY BAG AND IT ROLLS ON THE FLOOR"]
#assuming: target is a list which contains one sentence
target = [c for c in target[0]]
# convert to tensor logits
target_logits = processor.tokenizer.convert_tokens_to_ids(target)
target_logits = torch.tensor(target_logits)

#compute adversarial example
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
audio = audio.to(device)
audio = processor(audio, return_tensors="pt", padding="longest", sampling_rate=rate).input_values
audio = audio[0].to(device)
target_logits = target_logits.to(device)

In [25]:
# minimize dB_x(delta) - logPr(t|f(x+delta)), such that dB_x(delta) < eps
# delta = argmin dB_x(delta) - logPr(t|f(x+delta))
# dB_x(delta) = 20*log10(||x+delta||_2 / ||delta||_2)
# x = audio
# t = target transcription
# f = model
# eps = max distortion
# delta = perturbation


# define loss function
def loss_function(audio, noise, target_logits, model, eps, ctc_constant):
    audio_perturbed = audio + noise
    # print(input_values.shape)
    # print(audio_perturbed.shape)
    #audio: clean audio
    # calculate dB_x, dB_delta, dB_x(delta) , where delta is perturbed_noise - clean_audio
    dB_x = 20*torch.log10(torch.norm(audio))
    # calculate dB_delta
    # add 1e-10 to avoid log of zero
    dB_delta = 20*torch.log10(torch.norm(noise+1e-10))
    # calculate dB_x(delta)
    dB_x_delta = dB_delta - dB_x

    # calculate logPr(t|f(x+delta))
    logits = model(audio_perturbed).logits
    logits = logits[0] # remove batch dimension
    logits = softmax(logits)
    # print(target_logits)
    # print(target_logits.shape, target_logits.dtype)
    # print(logits)
    # print(logits.shape, logits.dtype)

    # print(logits.shape[0])
    # print(target_logits.shape[0])
    # print(target_logits)
    # print(logits)
    logPr = ctcloss(logits, target_logits, [logits.shape[0]], [target_logits.shape[0]])
    # calculate loss
    # print("dB_x_delta, logPr")
    # print(dB_x_delta, logPr)
    # loss = dB_x_delta - ctc_constant * logPr
    loss = - logPr

    # check if dbloss is smaller than eps
    if dB_x_delta < eps:
        return loss
    else:
        # print(dB_x_delta, eps)
        # return None
        return loss

In [26]:
# define eps
eps = 10
# define number of iterations
n = 5000
# define learning rate
lr = 1e-1
# define perturbed audio: start with clean audio
noise = torch.zeros_like(audio).requires_grad_(True)
# define optimizer
optimizer = torch.optim.Adam([noise], lr=lr)
ctc_constant = 1

# loop over n iterations
for i in range(n):
    # set gradients to zero
    optimizer.zero_grad()
    # calculate loss
    loss = loss_function(audio, noise, target_logits, model, eps, ctc_constant=ctc_constant)
    # break if loss is None
    if loss is None:
        break
    print("final loss")
    print(loss)      
    # calculate gradients
    loss.backward()
    # update perturbation
    optimizer.step()
#    print(audio_pert)
# save adversarial example
audio_pert = (audio+noise).detach().to("cpu")
torchaudio.save("adversarial_one.wav", audio_pert, rate)

final loss
tensor(5.8390, device='cuda:0', grad_fn=<NegBackward0>)
final loss
tensor(5.9684, device='cuda:0', grad_fn=<NegBackward0>)


KeyboardInterrupt: 

In [149]:
loss

tensor(22.0689, device='cuda:0', grad_fn=<SubBackward0>)

In [52]:
audio.dtype

torch.float32

In [50]:
audio_pert

array([[-0.3960265 ,  0.37243792, -0.37965736, ...,  0.33345932,
        -0.42485094,  0.32958943]], dtype=float32)

In [32]:
# display audio object
audio_pert

tensor([[-0.3785, -0.3786, -0.3785,  ..., -0.4242, -0.4231, -0.4313]],
       requires_grad=True)

In [33]:
audio

tensor([[ 0.0003,  0.0002,  0.0002,  ..., -0.0454, -0.0443, -0.0526]])

In [None]:
softmaxed = torch.nn.Softmax(dim=1)
probs = softmaxed(model(perturbed_audio).logits)

In [None]:
probs.sum()

In [None]:
target_sentence = "THE WILL BURN YOU TO A CRISP"
#convert to tokens

In [None]:
perturbed_audio = audio + audio_pert
dB_x = 20 * torch.log10(torch.norm(audio) / torch.norm(audio_pert))
# calculate logPr(t|f(x+delta))
logits = model(perturbed_audio).logits
pred = processor.batch_decode(torch.argmax(logits, dim=-1))

#print(target)
print(pred)

In [None]:
type(pred)