<a href="https://colab.research.google.com/github/hammaad2002/AdversarialAttack/blob/main/Adversarial%20Attacks%20on%20CRDNN%20model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [365]:
%%capture               
!pip install speechbrain
!git clone -b main https://github.com/hammaad2002/CRDNN_Model.git

In [366]:
import shutil
shutil.rmtree("/content/CRDNN_Model/result")

In [367]:
import pathlib
import speechbrain as sb
from hyperpyyaml import load_hyperpyyaml
import torch
import torchaudio
import sys
import numpy as np
import torch.nn as nn
from IPython.display import Math

In [368]:
class CTCBrain(sb.Brain):
    def compute_forward(self, batch, stage, n = 0):
        if n == 1:
          wavs = batch
          wavs = wavs.to("cpu")
          lens = torch.tensor([1.])
          feats = self.modules.compute_features(wavs) 
          feats = self.modules.mean_var_norm(feats, lens)
          x = self.modules.model(feats)
          x = self.modules.lin(x)
          predictions = {"ctc_softmax": self.hparams.softmax(x)}
          predictions["seq"] = self.hparams.decoder(
                  predictions["ctc_softmax"], lens, blank_id=0)
          return predictions, lens       
        else:
          batch = batch.to(self.device)
          wavs, lens = batch.sig
          feats = self.modules.compute_features(wavs)
          feats = self.modules.mean_var_norm(feats, lens)
          x = self.modules.model(feats)
          x = self.modules.lin(x)
          predictions = {"ctc_softmax": self.hparams.softmax(x)}
          predictions["seq"] = self.hparams.decoder(
                   predictions["ctc_softmax"], lens, blank_id=self.hparams.blank_index)
          return predictions, lens

    def compute_objectives(self, predictions, batch, stage):
        predictions, lens = predictions
        phns, phn_lens = batch.phn_encoded
        decoded_phonemes = batch.phn_decoded
        label = batch.label_encoder
        label_encoder = label[0]
        loss = self.hparams.compute_cost(predictions["ctc_softmax"], phns, lens, phn_lens)
        if stage != sb.Stage.TRAIN:
            output = predictions["seq"]
            seq = output
            output1 = torch.tensor(output) 
            output = label_encoder.decode_torch(output1)
            self.per_metrics.append(batch.id, seq, phns, target_len=phn_lens, ind2lab = lambda x: label_encoder.decode_torch(torch.tensor(x)) )
        return loss

    def transcribe_dataset(
            self,
            dataset, 
            min_key, 
            label
          ):
        data_waveform, rate_of_sample = torchaudio.load(dataset)
        samples = data_waveform
        self.on_evaluate_start(min_key=min_key)
        self.modules.eval() 
        with torch.no_grad():
                out = self.compute_forward(samples, stage=sb.Stage.TEST, n = 1) 
                p_seq, wav_lens = out
        output = p_seq["seq"]
        output = torch.tensor(output)
        #output = label.decode_torch(output)
        return output

    def on_stage_start(self, stage, epoch=None):
        "Gets called when a stage (either training, validation, test) starts."
        if stage != sb.Stage.TRAIN:
            self.per_metrics = self.hparams.per_stats()

    def on_stage_end(self, stage, stage_loss, epoch=None):
        stage_stats = {"loss": stage_loss}
        if stage == sb.Stage.TRAIN:
            self.train_stats = stage_stats
        else:
            stage_stats["PER"] = self.per_metrics.summarize("error_rate")
        if stage == sb.Stage.VALID and epoch is not None:
            self.hparams.train_logger.log_stats(
                stats_meta={"epoch": epoch},
                train_stats=self.train_stats,
                valid_stats=stage_stats,
            )
            self.checkpointer.save_and_keep_only(
                meta={"PER": stage_stats["PER"]}, min_keys=["PER"],
            )
        elif stage == sb.Stage.TEST:
            self.hparams.train_logger.log_stats(
                stats_meta={"Epoch loaded": self.hparams.epoch_counter.current},
                test_stats=stage_stats,
            )
            with open(self.hparams.per_file, "w") as f:
              self.per_metrics.write_stats(f)
  
    def fast_gradient_sign_method(self, audio, phns, epsilon=0.02, device = "cpu"):
        # Freeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = False
        phn_lens = torch.tensor([1.])
        data_waveform, rate_of_sample = torchaudio.load(audio)
        samples = data_waveform.to(device)
        # Determine prediction of the model
        inp_audio = samples.clone().requires_grad_()
        preds, len = self.compute_forward(inp_audio, stage=sb.Stage.TEST, n = 1)
        # Calculate loss by CTC
        loss = self.hparams.compute_cost(preds["ctc_softmax"], phns, len, phn_lens)
        loss.backward()
        # Update audio to adversarial example as written above
        noise_grad = torch.sign(inp_audio.grad.to(device))
        fake_audio = samples + epsilon * noise_grad
        fake_audio.detach_()
        # Unfreeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = True
        eq1 = r"\text{Fast Gradient Sign Method :}"
        eq2 = r" \min\limits_{x}f(x) \; \text{subject \; to \; } x\in C"
        eq3 = r"x_{adv} = x + \epsilon \operatorname{sign}(\nabla_x J(x,y))"
        display(Math(eq1))
        display(Math(eq2))
        display(Math(eq3))
        return fake_audio, epsilon * noise_grad

    def audioshaper(self,audio,device = "cpu"):
        #using this to reshape audio so that it can be used for calculating SNR
        data_waveform, rate_of_sample = torchaudio.load(audio)
        samples = data_waveform.to(device)
        inp_audio = samples.clone().requires_grad_()
        return inp_audio

    def random_attack(self, audio, eps=0.3, order=np.inf, clip_min=None, clip_max=None, device = "cpu"):
        #founded this attack in the olivier's repository of adversarial attack 
        #generating package.
        def rand_assign(delta, order, eps):
            """Randomly set the data of parameter delta with uniform sampling"""
            delta.data.uniform_(-1, 1)
            if isinstance(eps, torch.Tensor):
                eps = eps.view(-1, 1)
            if order == np.inf:
                delta.data = eps * delta.data
        # Freeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = False
        data_waveform, rate_of_sample = torchaudio.load(audio)
        samples = data_waveform.to(device)
        # Clone the original audio
        wav_init = torch.clone(samples)
        delta = torch.zeros_like(wav_init)
        delta = nn.Parameter(delta)
        clip_min = clip_min if clip_min is not None else -10
        clip_max = clip_max if clip_max is not None else 10
        rand_assign(delta, order, eps)
        delta.data = (
            torch.clamp(wav_init + delta.data, min=clip_min,
                        max=clip_max) - wav_init
        )
        wav_adv = wav_init + delta.data
        # Unfreeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = True
        eq1 = r"\text{Random Attack :}"
        eq2 = r"x_{adv} = x + \Delta"
        eq3 = r"Such \; that "
        eq4 = r"\Delta \sim U(-\epsilon, \epsilon)"
        eq5 = r"|\Delta|{p} \le \epsilon"
        eq6 = r"x_{adv} = \operatorname{clip}(x_{adv}, clip_min, clip_max)"
        display(Math(eq1))
        display(Math(eq2))
        display(Math(eq3))
        display(Math(eq4))
        display(Math(eq5))
        display(Math(eq6))
        return wav_adv, delta.data

    def mix_fast_gradient_sign_method(self, audio, phns, epsilon=0.02, device = "cpu", random_noise=False, noise_eps=0.3, noise_order=np.inf, noise_clip_min=None, noise_clip_max=None):
        def rand_assign(delta, order, eps):
            """Randomly set the data of parameter delta with uniform sampling"""
            delta.data.uniform_(-1, 1)
            if isinstance(eps, torch.Tensor):
                eps = eps.view(-1, 1)
            if order == np.inf:
                delta.data = eps * delta.data
        # Freeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = False
        phn_lens = torch.tensor([1.])
        data_waveform, rate_of_sample = torchaudio.load(audio)
        samples = data_waveform.to(device)
        # Determine prediction of the model
        inp_audio = samples.clone().requires_grad_()
        preds, len = self.compute_forward(inp_audio, stage=sb.Stage.TEST, n = 1)
        # Calculate loss by CTC
        loss = self.hparams.compute_cost(preds["ctc_softmax"], phns, len, phn_lens)
        loss.backward()
        # Update audio to adversarial example as written above
        noise_grad = torch.sign(inp_audio.grad.to(device))
        fake_audio = samples + epsilon * noise_grad
        if random_noise:
            delta = torch.zeros_like(fake_audio)
            delta = nn.Parameter(delta)
            noise_clip_min = noise_clip_min if noise_clip_min is not None else -10
            noise_clip_max = noise_clip_max if noise_clip_max is not None else 10
            rand_assign(delta, noise_order, noise_eps)
            delta.data = (
                torch.clamp(fake_audio + delta.data, min=noise_clip_min,
                            max=noise_clip_max) - fake_audio
            )
            fake_audio = fake_audio + delta.data
        fake_audio.detach_()
        # Unfreeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = True
        eq1 = r"\text{Mix Fast Gradient Sign Method :}"
        eq2 = r" \min\limits_{x}f(x) \text{ subject to } x\in C"
        eq3 = r"\text{if random noise = True:}"
        eq4 = r"x_{adv} = x + \epsilon \operatorname{sign}(\nabla_x J(x,y)) + \Delta"
        eq5 = r"\Delta = \operatorname{clip}(\text{uniform}(-1,1) * \epsilon_{noise}, noise clip_{min}, noise clip_{max}) "
        eq6 = r"\text{else:}"
        eq7 = r"x_{adv} = x + \epsilon \operatorname{sign}(\nabla_x J(x,y))"
        display(Math(eq1))
        display(Math(eq2))
        display(Math(eq3))
        display(Math(eq4))
        display(Math(eq5))
        display(Math(eq6))
        display(Math(eq7))
        return fake_audio, epsilon * noise_grad if not random_noise else delta.data

    def new_fast_gradient_sign_method(self, audio, phns, epsilon_iterations=10, epsilon_size=0.02, device = "cpu", noise_clip_min=None, noise_clip_max=None):
        # Freeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = False
        phn_lens = torch.tensor([1.])
        data_waveform, rate_of_sample = torchaudio.load(audio)
        samples = data_waveform.to(device)
        fake_audio = samples
        epsilon = epsilon_size
        for i in range(epsilon_iterations):
            # Determine prediction of the model
            inp_audio = fake_audio.clone().requires_grad_()
            preds, len = self.compute_forward(inp_audio, stage=sb.Stage.TEST, n = 1)
            # Calculate loss by CTC
            loss = self.hparams.compute_cost(preds["ctc_softmax"], phns, len, phn_lens)
            loss.backward()
            # Update audio to adversarial example as written above
            noise_grad = torch.sign(inp_audio.grad.to(device))
            fake_audio = fake_audio + epsilon * noise_grad
            if noise_clip_min is not None or noise_clip_max is not None:
                fake_audio = torch.clamp(fake_audio, min=noise_clip_min, max=noise_clip_max)
        # Unfreeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = True
        eq1 = r"\text{New Fast Gradient Sign Method :}"
        eq2 = r" \min\limits_{x}f(x) \text{ subject to } x\in C"
        eq3 = r"x_{adv} = \sum\limits_{i=0}^{t-1} (x + \epsilon + \operatorname{sign}(\nabla_x J(x,y)))"
        display(Math(eq1))
        display(Math(eq2))
        display(Math(eq3))
        return fake_audio

    def projected_gradient_descent(self, audio, phns, epsilon=0.02, k=10, device = "cpu"):
        # Freeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = False
        phn_lens = torch.tensor([1.])
        data_waveform, rate_of_sample = torchaudio.load(audio)
        samples = data_waveform.to(device)
        # Determine prediction of the model
        inp_audio = samples.clone().requires_grad_()
        preds, len = self.compute_forward(inp_audio, stage=sb.Stage.TEST, n = 1)
        # Calculate loss by CTC
        loss = self.hparams.compute_cost(preds["ctc_softmax"], phns, len, phn_lens)
        loss.backward()
        # Initialize the current audio as the original audio
        curr_audio = inp_audio.clone().requires_grad_()
        # Initialize the current iteration number
        curr_iter = 0
        # Iterate until the max iteration
        while curr_iter < k:
            curr_audio.retain_grad()
            preds, len = self.compute_forward(curr_audio, stage=sb.Stage.TEST, n = 1)
            loss = self.hparams.compute_cost(preds["ctc_softmax"], phns, len, phn_lens)
            loss.backward()
            # Update audio to adversarial example as written above
            noise_grad = curr_audio.grad.to(device)
            curr_audio = curr_audio + epsilon/k * noise_grad
            curr_iter += 1
        # Unfreeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = True
        eq1 = r"\text{Projected Gradient Descent algorithm :}"
        eq2 = r" \min\limits_{x}f(x) \; \text{subject \; to \; } x\in C"
        eq3 = r"y_{k+1} = x_k - t_k \nabla f(x_k)"
        eq4 = r"x_{k+1} = \operatorname*{argmin}_{x \in C} \| y_{k+1} - x\|"
        display(Math(eq1))
        display(Math(eq2))
        display(Math(eq3))
        display(Math(eq4))
        return curr_audio, epsilon/k * noise_grad, curr_iter

    def carlini_wagner_method(self, audio, phns, epsilon=0.02, device = "cpu", lambda_=0.01):


        #-----------------------WORKING ON THIS ATTACK--------------------------
        #Need to study more about this attack in order to implement it properly.
        #Errors are arising related to the graph of pytorch which is used for
        #computing gradients during backward propagation.


        # Freeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = False
        phn_lens = torch.tensor([1.])
        data_waveform, rate_of_sample = torchaudio.load(audio)
        samples = data_waveform
        # Determine prediction of the model
        inp_audio = samples.clone().requires_grad_()
        preds, len = self.compute_forward(inp_audio, stage=sb.Stage.TEST, n = 1)
        # Define a loss function L(x,y,f) that measures the difference between the model's prediction f(x) and the correct label y
        L = self.hparams.compute_cost(preds["ctc_softmax"], phns, len, phn_lens)
        # Define a perturbation variable delta
        delta = torch.randn_like(inp_audio)
        delta = delta.to(device)
        delta = torch.nn.Parameter(delta)
        delta.requires_grad = True
        # Define a regularization term for L-2 distance between the original input and adversarial example
        r = lambda_ * torch.norm(delta)**2
        # Define an optimization function that minimizes the loss function L while constraining the perturbation delta
        opt = torch.optim.Adam([delta], lr=0.01)
        with torch.autograd.set_detect_anomaly(True):
          for step in range(100):
              #loss = L + r
              loss = torch.add(L,r)
              opt.zero_grad() 
              loss.backward(retain_graph=True)
              opt.step()
              delta = torch.nn.Parameter(torch.randn_like(inp_audio).to(device))
              delta.requires_grad = True
        # Generate adversarial example
        inp_audio.detach_()
        #adv_audio = torch.add(inp_audio.clone(),epsilon * delta)
        adv_audio = torch.add(inp_audio, epsilon * delta)
        # Unfreeze the weights of the model
        for param in self.modules.parameters():
            param.requires_grad = True
        return adv_audio, delta


In [369]:
def data_prep(data_folder, hparams, n = 0, evaluate = False):
    if evaluate == True: 
        ev_data = sb.dataio.dataset.DynamicItemDataset.from_json(
            json_path= hparams["json"] ,
            replacements={"data_root": data_folder} ,
        )
        datasets = [ev_data]
        label_encoder = sb.dataio.encoder.CTCTextEncoder()
        @sb.utils.data_pipeline.takes("wav")
        @sb.utils.data_pipeline.provides("sig")
        def audio_pipeline(wav):
            sig = sb.dataio.dataio.read_audio(wav)
            return sig

        sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline)
        @sb.utils.data_pipeline.takes("phn")  
        @sb.utils.data_pipeline.provides("phn_list", "phn_encoded","phn_decoded","label_encoder")
        def text_pipeline(phn):
            phn_list = phn.strip().split()
            mapped_phonemes = {
                "iy": "iy",
                "ix": "ix",
                "ih": "ix",
                "eh": "eh",
                "ae": "ae",
                "ax": "ax",
                "ah": "ax",
                "ax-h": "ax",
                "uw": "uw",
                "ux": "uw",
                "uh": "uh",
                "ao": "ao",
                "aa": "ao",
                "ey": "ey",
                "ay": "ay",
                "oy": "oy",
                "aw": "aw",
                "ow": "ow",
                "er": "er",
                "axr": "er",
                "l": "l",
                "el": "l",
                "r": "r",
                "w": "w",
                "y": "y",
                "m": "m",
                "em": "m",
                "n": "n",
                "en": "n",
                "nx": "n",
                "ng": "ng",
                "eng": "ng",
                "v": "v",
                "f": "f",
                "dh": "dh",
                "th": "th",
                "z": "z",
                "s": "s",
                "zh": "zh",
                "sh": "zh",
                "jh": "jh",
                "ch": "ch",
                "b": "b",
                "p": "p",
                "d": "d",
                "dx": "dx",
                "t": "t",
                "g": "g",
                "k": "k",
                "hh": "hh",
                "hv": "hh",
                "bcl": "h#",
                "pcl": "h#",
                "dcl": "h#",
                "tcl": "h#",
                "gcl": "h#",
                "kcl": "h#",
                "q": "h#",
                "epi": "h#",
                "pau": "h#",
                "h#": "h#"
                }
            def map_phonemes(original_phonemes):
              mapped_phonemes_list = []
              for phoneme in original_phonemes:
                mapped_phoneme = mapped_phonemes.get(phoneme, None)
                if mapped_phoneme:
                  mapped_phonemes_list.append(mapped_phoneme)
              return mapped_phonemes_list
            phn_list = map_phonemes(phn_list)
            yield phn_list
            phn_encoded = label_encoder.encode_sequence_torch(phn_list)
            phn_decoded = label_encoder.decode_torch(phn_encoded)
            yield phn_encoded
            yield phn_decoded
            yield label_encoder
        sb.dataio.dataset.add_dynamic_item(datasets, text_pipeline)
        sb.dataio.dataset.set_output_keys(datasets, ["id", "sig", "phn_encoded", "phn_decoded","label_encoder"])
        if n == 1:
          return label_encoder
        else:
          return ev_data

    else:
        train_data = sb.dataio.dataset.DynamicItemDataset.from_json(
            json_path= hparams["json_train"] ,
            replacements={"data_root": data_folder} ,
        )
        valid_data = sb.dataio.dataset.DynamicItemDataset.from_json(
            json_path= hparams["json_train"] ,
            replacements={"data_root": data_folder} ,
        )
        datasets = [train_data, valid_data]
        label_encoder = sb.dataio.encoder.CTCTextEncoder()
        @sb.utils.data_pipeline.takes("wav")
        @sb.utils.data_pipeline.provides("sig")
        def audio_pipeline(wav):
            sig = sb.dataio.dataio.read_audio(wav)
            return sig

        sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline)
        @sb.utils.data_pipeline.takes("phn")  
        @sb.utils.data_pipeline.provides("phn_list", "phn_encoded","phn_decoded","label_encoder")
        def text_pipeline(phn):
            phn_list = phn.strip().split()
            mapped_phonemes = {
                "iy": "iy",
                "ix": "ix",
                "ih": "ix",
                "eh": "eh",
                "ae": "ae",
                "ax": "ax",
                "ah": "ax",
                "ax-h": "ax",
                "uw": "uw",
                "ux": "uw",
                "uh": "uh",
                "ao": "ao",
                "aa": "ao",
                "ey": "ey",
                "ay": "ay",
                "oy": "oy",
                "aw": "aw",
                "ow": "ow",
                "er": "er",
                "axr": "er",
                "l": "l",
                "el": "l",
                "r": "r",
                "w": "w",
                "y": "y",
                "m": "m",
                "em": "m",
                "n": "n",
                "en": "n",
                "nx": "n",
                "ng": "ng",
                "eng": "ng",
                "v": "v",
                "f": "f",
                "dh": "dh",
                "th": "th",
                "z": "z",
                "s": "s",
                "zh": "zh",
                "sh": "zh",
                "jh": "jh",
                "ch": "ch",
                "b": "b",
                "p": "p",
                "d": "d",
                "dx": "dx",
                "t": "t",
                "g": "g",
                "k": "k",
                "hh": "hh",
                "hv": "hh",
                "bcl": "h#",
                "pcl": "h#",
                "dcl": "h#",
                "tcl": "h#",
                "gcl": "h#",
                "kcl": "h#",
                "q": "h#",
                "epi": "h#",
                "pau": "h#",
                "h#": "h#"
                }
            def map_phonemes(original_phonemes):
              mapped_phonemes_list = []
              for phoneme in original_phonemes:
                mapped_phoneme = mapped_phonemes.get(phoneme, None)
                if mapped_phoneme:
                  mapped_phonemes_list.append(mapped_phoneme)
              return mapped_phonemes_list
            phn_list = map_phonemes(phn_list)
            yield phn_list
            phn_encoded = label_encoder.encode_sequence_torch(phn_list)
            phn_decoded = label_encoder.decode_torch(phn_encoded)
            yield phn_encoded
            yield phn_decoded
            yield label_encoder
        sb.dataio.dataset.add_dynamic_item(datasets, text_pipeline)
        label_encoder.insert_blank(index=hparams["blank_index"])
        label_encoder.update_from_didataset(train_data, output_key="phn_list")
        label_encoder.update_from_didataset(valid_data, output_key="phn_list")
        sb.dataio.dataset.set_output_keys(datasets, ["id", "sig", "phn_encoded", "phn_decoded","label_encoder"])
        if n == 1:
          return label_encoder
        else:
          return train_data, valid_data

In [370]:
device="cpu"

In [371]:
hparams_file = "/content/CRDNN_Model/hyperparams.yaml"
with open(hparams_file) as fin:
    hparams = load_hyperpyyaml(fin)
sb.create_experiment_directory(
    experiment_directory=hparams["output_folder"],
    hyperparams_to_save=hparams_file,
    save_env_desc = True,
)
data_folder = hparams["data_folder"]
train_data, valid_data = data_prep(data_folder, hparams, n = 0)
label_encoder = data_prep(data_folder, hparams, n = 1)
ctc_brain = CTCBrain(
    hparams["modules"],
    hparams["opt_class"],
    hparams,
    run_opts={"device": device},
    checkpointer=hparams["checkpointer"],
)    
#c1
ctc_brain.fit(
    hparams["epoch_counter"],
    train_data,
    valid_data,
    train_loader_kwargs=hparams["dataloader_options"],
    valid_loader_kwargs=hparams["dataloader_options"],
)
ctc_brain.evaluate(
    valid_data,
    min_key="PER",
)

speechbrain.core - Beginning experiment!
speechbrain.core - Experiment folder: /content/CRDNN_Model/result
speechbrain.core - 1.5M trainable parameters in CTCBrain
speechbrain.utils.checkpoints - Would load a checkpoint here, but none found yet.
speechbrain.utils.epoch_loop - Going into epoch 1


100%|██████████| 8/8 [00:02<00:00,  3.28it/s, train_loss=13.9]
100%|██████████| 8/8 [00:01<00:00,  7.77it/s]

speechbrain.utils.train_logger - epoch: 1 - train loss: 13.93 - valid loss: 8.53, valid PER: 85.37





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-38+00
speechbrain.utils.epoch_loop - Going into epoch 2


100%|██████████| 8/8 [00:02<00:00,  3.33it/s, train_loss=7.73]
100%|██████████| 8/8 [00:01<00:00,  7.44it/s]

speechbrain.utils.train_logger - epoch: 2 - train loss: 7.73 - valid loss: 5.92, valid PER: 85.98





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-42+00
speechbrain.utils.epoch_loop - Going into epoch 3


100%|██████████| 8/8 [00:07<00:00,  1.07it/s, train_loss=4.91]
100%|██████████| 8/8 [00:03<00:00,  2.66it/s]

speechbrain.utils.train_logger - epoch: 3 - train loss: 4.91 - valid loss: 3.79, valid PER: 82.93
speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-53+00





speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-38+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-42+00
speechbrain.utils.epoch_loop - Going into epoch 4


100%|██████████| 8/8 [00:03<00:00,  2.43it/s, train_loss=3.62]
100%|██████████| 8/8 [00:00<00:00,  8.95it/s]

speechbrain.utils.train_logger - epoch: 4 - train loss: 3.62 - valid loss: 3.18, valid PER: 84.15





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-58+00
speechbrain.utils.epoch_loop - Going into epoch 5


100%|██████████| 8/8 [00:02<00:00,  3.38it/s, train_loss=2.97]
100%|██████████| 8/8 [00:00<00:00,  8.80it/s]

speechbrain.utils.train_logger - epoch: 5 - train loss: 2.97 - valid loss: 2.94, valid PER: 80.49





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-02+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-58+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-49-53+00
speechbrain.utils.epoch_loop - Going into epoch 6


100%|██████████| 8/8 [00:02<00:00,  3.45it/s, train_loss=2.61]
100%|██████████| 8/8 [00:00<00:00,  9.09it/s]

speechbrain.utils.train_logger - epoch: 6 - train loss: 2.61 - valid loss: 2.52, valid PER: 78.66





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-05+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-02+00
speechbrain.utils.epoch_loop - Going into epoch 7


100%|██████████| 8/8 [00:02<00:00,  3.43it/s, train_loss=2.24]
100%|██████████| 8/8 [00:00<00:00,  9.09it/s]

speechbrain.utils.train_logger - epoch: 7 - train loss: 2.24 - valid loss: 2.28, valid PER: 71.34





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-09+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-05+00
speechbrain.utils.epoch_loop - Going into epoch 8


100%|██████████| 8/8 [00:02<00:00,  3.46it/s, train_loss=1.96]
100%|██████████| 8/8 [00:00<00:00,  8.96it/s]

speechbrain.utils.train_logger - epoch: 8 - train loss: 1.96 - valid loss: 1.99, valid PER: 61.59





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-12+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-09+00
speechbrain.utils.epoch_loop - Going into epoch 9


100%|██████████| 8/8 [00:02<00:00,  3.38it/s, train_loss=1.66]
100%|██████████| 8/8 [00:00<00:00,  9.27it/s]

speechbrain.utils.train_logger - epoch: 9 - train loss: 1.66 - valid loss: 1.66, valid PER: 55.49





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-16+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-12+00
speechbrain.utils.epoch_loop - Going into epoch 10


100%|██████████| 8/8 [00:02<00:00,  3.46it/s, train_loss=1.4]
100%|██████████| 8/8 [00:00<00:00,  9.01it/s]

speechbrain.utils.train_logger - epoch: 10 - train loss: 1.40 - valid loss: 1.50, valid PER: 51.83





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-19+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-16+00
speechbrain.utils.epoch_loop - Going into epoch 11


100%|██████████| 8/8 [00:02<00:00,  3.40it/s, train_loss=1.24]
100%|██████████| 8/8 [00:00<00:00,  8.96it/s]

speechbrain.utils.train_logger - epoch: 11 - train loss: 1.24 - valid loss: 1.13, valid PER: 39.63





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-23+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-19+00
speechbrain.utils.epoch_loop - Going into epoch 12


100%|██████████| 8/8 [00:02<00:00,  3.35it/s, train_loss=0.981]
100%|██████████| 8/8 [00:00<00:00,  9.09it/s]

speechbrain.utils.train_logger - epoch: 12 - train loss: 9.81e-01 - valid loss: 8.03e-01, valid PER: 26.83





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-26+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-23+00
speechbrain.utils.epoch_loop - Going into epoch 13


100%|██████████| 8/8 [00:02<00:00,  3.42it/s, train_loss=0.748]
100%|██████████| 8/8 [00:00<00:00,  8.95it/s]

speechbrain.utils.train_logger - epoch: 13 - train loss: 7.48e-01 - valid loss: 6.59e-01, valid PER: 17.07





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-30+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-26+00
speechbrain.utils.epoch_loop - Going into epoch 14


100%|██████████| 8/8 [00:02<00:00,  3.36it/s, train_loss=0.583]
100%|██████████| 8/8 [00:00<00:00,  8.79it/s]

speechbrain.utils.train_logger - epoch: 14 - train loss: 5.83e-01 - valid loss: 5.11e-01, valid PER: 15.85
speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-34+00





speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-30+00
speechbrain.utils.epoch_loop - Going into epoch 15


100%|██████████| 8/8 [00:02<00:00,  3.41it/s, train_loss=0.435]
100%|██████████| 8/8 [00:00<00:00,  9.22it/s]

speechbrain.utils.train_logger - epoch: 15 - train loss: 4.35e-01 - valid loss: 3.90e-01, valid PER: 10.37





speechbrain.utils.checkpoints - Saved an end-of-epoch checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
speechbrain.utils.checkpoints - Deleted checkpoint in /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-34+00
speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00


100%|██████████| 8/8 [00:00<00:00,  9.10it/s]

speechbrain.utils.train_logger - Epoch loaded: 15 - test loss: 3.90e-01, test PER: 10.37





0.3897950518876314

In [372]:
transcripts = ctc_brain.transcribe_dataset(
        dataset= '/content/CRDNN_Model/AudioSamplesASR/spk1_snt1.wav',
        min_key="PER",
        label = label_encoder 
)
print("Phoneme Transcription is below:")
print(transcripts)
print(label_encoder.decode_torch(transcripts))

speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
Phoneme Transcription is below:
tensor([[ 1,  2,  4,  5,  6,  7,  5,  8,  9, 11, 12, 13, 11,  1,  2, 10,  8,  7,
          5,  6,  7]])
[['dh', 'ax', 'ay', 'l', 'd', 'ao', 'l', 'm', 'ow', 't', 'hh', 'er', 't', 'dh', 'ax', 's', 'm', 'ao', 'l', 'd', 'ao']]


In [373]:
audio = '/content/CRDNN_Model/AudioSamplesASR/spk1_snt1.wav'
phns = transcripts
audiooo = ctc_brain.audioshaper(audio)

In [374]:
FGSM_orig, noise_grad = ctc_brain.fast_gradient_sign_method( 
    audio, 
    phns,
    epsilon=0.001
)

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

In [375]:
PGD, delta, max_itr = ctc_brain.projected_gradient_descent(
    audio, 
    phns, 
    epsilon=0.00001, 
    k=50
)

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

In [376]:
RandomAttackk, _noise = ctc_brain.random_attack(
    audio, 
    eps=0.003, 
    order=np.inf, 
    clip_min=-10, 
    clip_max=10
)

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

In [377]:
MixRanFGSM, deltaa = ctc_brain.mix_fast_gradient_sign_method( 
    audio,
    phns,
    epsilon=0.00001, 
    random_noise=True,
    noise_eps=0.0001, 
    noise_order=np.inf, 
    noise_clip_min=-10, 
    noise_clip_max=10,
)

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

In [378]:
FGSM_Iter= ctc_brain.new_fast_gradient_sign_method( 
    audio,
    phns,
    epsilon_iterations=30,
    epsilon_size=0.00001, 
    noise_clip_min=-0.5, 
    noise_clip_max=0.5,
)

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

In [379]:
import librosa
import IPython.display as ipd

# Load the audio file
audioo, sr = librosa.load('/content/CRDNN_Model/AudioSamplesASR/spk1_snt1.wav')

# Play the audio
ipd.display(ipd.Audio(audiooo.detach().numpy(), rate=16000))
ipd.display(ipd.Audio(FGSM_orig, rate=16000))
ipd.display(ipd.Audio(PGD.detach().numpy(), rate=16000))
ipd.display(ipd.Audio(RandomAttackk, rate=16000))
ipd.display(ipd.Audio(MixRanFGSM, rate=16000))
ipd.display(ipd.Audio(FGSM_Iter, rate=16000))

In [380]:
sr = 16000
torchaudio.save('/content/OriginalAudio.wav', audiooo, sr)
torchaudio.save('/content/FGSMAttack.wav', FGSM_orig, sr)
torchaudio.save('/content/PGDAttack.wav', PGD, sr)
torchaudio.save('/content/RandomAttackk.wav',RandomAttackk,sr)
torchaudio.save('/content/FGSM_Plus_RandomAttack.wav', MixRanFGSM, sr)
torchaudio.save('/content/FGSM_WithIteration.wav', FGSM_Iter, sr)

In [381]:
transcripts1 = ctc_brain.transcribe_dataset(
        dataset= '/content/OriginalAudio.wav',
        min_key="PER",
        label = label_encoder 
)#Original Transcript
transcripts2 = ctc_brain.transcribe_dataset(
        dataset= '/content/FGSMAttack.wav',
        min_key="PER",
        label = label_encoder 
)#Normal FGSM attack
transcripts3 = ctc_brain.transcribe_dataset(
        dataset= '/content/PGDAttack.wav',
        min_key="PER",
        label = label_encoder 
)#Normal PGD attack
transcripts4 = ctc_brain.transcribe_dataset(
        dataset= '/content/RandomAttackk.wav',
        min_key="PER",
        label = label_encoder 
)#Random Attack
transcripts5 = ctc_brain.transcribe_dataset(
        dataset= '/content/FGSM_Plus_RandomAttack.wav',
        min_key="PER",
        label = label_encoder 
)#Mix of FGSM and Random attack
transcripts6 = ctc_brain.transcribe_dataset(
        dataset= '/content/FGSM_WithIteration.wav',
        min_key="PER",
        label = label_encoder 
)#Updated FGSM with iteration
print("Phoneme Transcription for original audio is :")
print(transcripts1) #Original Transcript
print(label_encoder.decode_torch(transcripts1))
print("Phoneme Transcription after FGSM attack is :")
print(transcripts2) #Normal FGSM attack
print(label_encoder.decode_torch(transcripts2))
print("Phoneme Transcription after PGD attack is :")
print(transcripts3) #Normal PGD attack
print(label_encoder.decode_torch(transcripts3))
print("Phoneme Transcription after Random noise attack is :")
print(transcripts4) #Random attack
print(label_encoder.decode_torch(transcripts4))
print("Phoneme Transcription after FGSM+random noise attack is :")
print(transcripts5) #Mix of FGSM and Random attack
print(label_encoder.decode_torch(transcripts5))
print("Phoneme Transcription after FGSM with iteration attack is :")
print(transcripts6) #Updated FGSM with iteration

speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
speechbrain.utils.checkpoints - Loading a checkpoint from /content/CRDNN_Model/result/save/CKPT+2023-01-29+19-50-37+00
Phoneme Transcription for original audio is :
tensor([[ 1,  2,  4,  5,  6,  7,  5,  8,  9, 11, 12, 13, 11,  1,  2, 10,  8,  7,
          5,  6,  7]])
[['dh', 'ax', 'ay', 'l', 'd', 'ao', 'l', 'm', 'ow', 't', 'hh', 'er', 't', 'dh', 'ax', 's', 'm', 'ao', 'l', 'd', 'ao']]
Phoneme Transcrip

In [382]:
%%capture
!pip install python-Levenshtein #[['t', 'r', 'ay', 'd', 't', 't', 'ay', 't', 't', 'r', 'd', 'ay', 'd']]

In [383]:
def calculate_snr(original, noisy):
    original = np.asarray(original)
    noisy = np.asarray(noisy)
    original_power = np.mean(original ** 2)
    noise_power = np.mean((original - noisy) ** 2)
    snr = 10 * np.log10(original_power / noise_power)
    return snr

In [384]:
original = audiooo.detach().numpy()
noisy1 = FGSM_orig
noisy2 = PGD.detach().numpy()
noisy3 = RandomAttackk
noisy4 = MixRanFGSM
noisy5 = FGSM_Iter

**This is Levenshtein distance. It is a string metric for measuring difference between two sequences**

Also added the feature of SNR and higher SNR tell us better signal meaning that we are more closer to generating imperceptible adversarial attack.

In [385]:
import Levenshtein
def compute_dis(ground_truth, predictions):
    num_errors = Levenshtein.distance(''.join(ground_truth), ''.join(predictions))
    total_phones = len(ground_truth)
    per = num_errors / total_phones
    return per
distance = compute_dis(label_encoder.decode_torch(transcripts1.squeeze()), label_encoder.decode_torch(transcripts2.squeeze()))
print(distance)
#Lower value means both answer match more, indicating poor attack
#Higher the value the better the attack is.
snr_value = calculate_snr(original, noisy1)
print("SNR value of simple FGSM:", snr_value, "dB")

0.8571428571428571
SNR value of simple FGSM: 27.202181816101074 dB


In [386]:
distance = compute_dis(label_encoder.decode_torch(transcripts1.squeeze()), label_encoder.decode_torch(transcripts3.squeeze()))
print(distance) #PGD
snr_value = calculate_snr(original, noisy2)
print("SNR value of simple PGD:", snr_value, "dB")

0.47619047619047616
SNR value of simple PGD: 59.28004264831543 dB


In [387]:
distance = compute_dis(label_encoder.decode_torch(transcripts1.squeeze()), label_encoder.decode_torch(transcripts4.squeeze()))
print(distance) #random attack
snr_value = calculate_snr(original, noisy3)
print("SNR value of Random Attack:", snr_value, "dB")

0.6190476190476191
SNR value of Random Attack: 22.44609832763672 dB


In [388]:
distance = compute_dis(label_encoder.decode_torch(transcripts1.squeeze()), label_encoder.decode_torch(transcripts5.squeeze()))
print(distance) #mix FGSM
snr_value = calculate_snr(original, noisy4)
print("SNR value of FGSM + Random Attack:", snr_value, "dB")

0.09523809523809523
SNR value of FGSM + Random Attack: 51.81851863861084 dB


In [389]:
distance = compute_dis(label_encoder.decode_torch(transcripts1.squeeze()), label_encoder.decode_torch(transcripts6.squeeze()))
print(distance) #FGSM with iteration
snr_value = calculate_snr(original, noisy5)
print("SNR value of FGSM with iteration:", snr_value, "dB")

1.2380952380952381
SNR value of FGSM with iteration: 45.67756175994873 dB
