# Baseline adversarial attacks for LMS CNN and ESC-10
Performing 4 typical adversarial attacks.

Target model: [Ahmed et al 2020](https://www.researchgate.net/publication/344519283_Automatic_Environmental_Sound_Recognition_AESR_Using_Convolutional_Neural_Network)

Data: [ESC-10](https://github.com/karolpiczak/ESC-50)

Attacks: [FGSM](https://arxiv.org/abs/1412.6572), [BIM](https://arxiv.org/abs/1607.02533), [Deepfool](https://arxiv.org/abs/1511.04599), [Carlini & Wagner](https://arxiv.org/abs/1608.04644)

Attack implementations based on [torchattacks](https://github.com/Harry24k/adversarial-attacks-pytorch)

In [1]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torchsummary import summary

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

import random
import os

import librosa

In [2]:
SEED = 42

def seed_everything(seed):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

seed_everything(SEED)

In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
DATA_DIR = "Data"
DATASET = "ESC-10"

DATASET_PATH = os.path.join(DATA_DIR, DATASET)
META_PATH = os.path.join(DATASET_PATH, "meta", "esc10.csv")
AUDIO_PATH = os.path.join(DATASET_PATH, "audio")

WEIGHTS_DIR = "Weights"

SAMPLES_DIR = "Samples"

MODEL = "cnn_lms"

CHECKPT_PATH = os.path.join(WEIGHTS_DIR, "cnn_best.pth")

In [5]:
checkpoint = torch.load(CHECKPT_PATH)

In [6]:
SR = 16000

## Target model
___

In [7]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()

        self.l1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(3, 3), padding="valid"),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(p=0.25),
            nn.ReLU()
        )

        self.l2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 3), padding="same"),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(p=0.25),
            nn.ReLU()
        )

        self.l3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), padding="same"),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(p=0.25),
            nn.ReLU()
        )

        self.l4 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3, 3), padding="same"),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            nn.Dropout(p=0.25),
            nn.ReLU()
        )

        self.l5 = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features=128 * 7 * 7, out_features=512),
            nn.ReLU()
        )

        self.l6 = nn.Sequential(
            nn.Dropout(p=0.6),
            nn.Linear(in_features=512, out_features=10)
        )
    
        self.sf = nn.Softmax(dim=1)

    def forward(self, input_data):
        x = self.l1(input_data)
        x = self.l2(x)
        x = self.l3(x)
        x = self.l4(x)
        x = self.l5(x)

        logits = self.l6(x)
        probs = self.sf(logits)

        return logits, probs

In [8]:
cnn = CNN().to(device)
summary(cnn, (1, 128, 128))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 16, 126, 126]             160
         MaxPool2d-2           [-1, 16, 63, 63]               0
           Dropout-3           [-1, 16, 63, 63]               0
              ReLU-4           [-1, 16, 63, 63]               0
            Conv2d-5           [-1, 32, 63, 63]           4,640
         MaxPool2d-6           [-1, 32, 31, 31]               0
           Dropout-7           [-1, 32, 31, 31]               0
              ReLU-8           [-1, 32, 31, 31]               0
            Conv2d-9           [-1, 64, 31, 31]          18,496
        MaxPool2d-10           [-1, 64, 15, 15]               0
          Dropout-11           [-1, 64, 15, 15]               0
             ReLU-12           [-1, 64, 15, 15]               0
           Conv2d-13          [-1, 128, 15, 15]          73,856
        MaxPool2d-14            [-1, 12

## Attacks
___

In [9]:
def FGSM(model, samples, labels, eps, loss_fn):
    samples = samples.clone().detach().requires_grad_(True)
    labels = labels.clone().detach()

    outputs, _ = model(samples)

    # Calculate loss
    cost = loss_fn(outputs, labels)

    # Update adversarial images
    grad = torch.autograd.grad(
        cost, samples, retain_graph=False, create_graph=False
    )[0]

    adv_samples = samples + eps*grad.sign()
    adv_samples = torch.clamp(adv_samples, min=0, max=1).detach()

    return adv_samples

In [10]:
def BIM(model, samples, labels, alpha, eps, steps, loss_fn):
    original_samples = samples.clone().detach()
    samples = samples.clone().detach()
    labels = labels.clone().detach()

    samples_min = original_samples - eps
    samples_max = original_samples + eps

    for _ in range(steps):
        samples.requires_grad_(True)
        outputs, _ = model(samples)

        cost = loss_fn(outputs, labels)
        grad = torch.autograd.grad(
            cost, samples, retain_graph=False, create_graph=False
        )[0]

        adv_samples = samples + alpha * grad.sign()
        adv_samples = torch.clamp(adv_samples, min=samples_min, max=samples_max)
        samples = torch.clamp(adv_samples, min=0, max=1).detach()

    return samples

In [11]:
def DeepFool(model, samples, labels, steps, overshoot):
    samples = samples.clone().detach()
    labels = labels.clone().detach()

    batch_size = len(samples)
    correct = torch.tensor([True]*batch_size)
    target_labels = labels.clone().detach()
    curr_steps = 0

    adv_samples = []
    for idx in range(batch_size):
        sample = samples[idx:idx+1].clone().detach()
        adv_samples.append(sample)

    while (True in correct) and (curr_steps < steps):
        for idx in range(batch_size):
            if not correct[idx]: continue
            early_stop, pre, adv_sample = df_indiv(model, adv_samples[idx], labels[idx], overshoot)
            adv_samples[idx] = adv_sample
            target_labels[idx] = pre
            if early_stop:
                correct[idx] = False
        curr_steps += 1

    adv_samples = torch.cat(adv_samples).detach()
    return adv_samples


def df_indiv(model, sample, label, overshoot):
    sample.requires_grad_(True)
    fs = model(sample)[0][0]
    prob = torch.argmax(fs, dim=0)
    if prob != label:
        return (True, prob, sample)

    ws = construct_jacobian(fs, sample)
    sample = sample.detach()

    f_0 = fs[label]
    w_0 = ws[label]

    wrong_classes = [i for i in range(len(fs)) if i != label]
    f_k = fs[wrong_classes]
    w_k = ws[wrong_classes]

    f_prime = f_k - f_0
    w_prime = w_k - w_0
    value = torch.abs(f_prime) \
            / torch.norm(nn.Flatten()(w_prime), p=2, dim=1)
    hat_L = torch.argmin(value, 0)

    delta = (torch.abs(f_prime[hat_L]) * w_prime[hat_L] \
            / (torch.norm(w_prime[hat_L], p=2) ** 2))

    target_label = hat_L if hat_L < label else hat_L + 1

    adv_sample = sample + (1+overshoot)*delta
    adv_sample = torch.clamp(adv_sample, min=0, max=1).detach()
    return (False, target_label, adv_sample)

# https://stackoverflow.com/questions/63096122/pytorch-is-it-possible-to-differentiate-a-matrix
# torch.autograd.functional.jacobian is only for torch >= 1.5.1
def construct_jacobian(y, x):
    x_grads = []
    for idx, y_element in enumerate(y):
        if x.grad is not None:
            x.grad.zero_()
        y_element.backward(retain_graph=(False or idx+1 < len(y)))
        x_grads.append(x.grad.clone().detach())
    return torch.stack(x_grads).reshape(*y.shape, *x.shape)

In [12]:
def CW(model, samples, labels, c, kappa, steps, learning_rate):
    samples = samples.clone().detach()
    labels = labels.clone().detach()

    # w = torch.zeros_like(images).detach() # Requires 2x times
    w = inverse_tanh_space(samples).detach()
    w.requires_grad_(True)

    best_adv_samples = samples.clone().detach()
    best_L2 = 1e10 * torch.ones((len(samples))).to(samples.device)
    prev_cost = 1e10
    dim = len(samples.shape)

    MSELoss = nn.MSELoss(reduction='none')
    Flatten = nn.Flatten()

    optimizer = optim.Adam([w], lr=learning_rate)

    for step in range(steps):
        # Get adversarial images
        adv_samples = tanh_space(w)

        # Calculate loss
        current_L2 = MSELoss(Flatten(adv_samples),
                             Flatten(samples)).sum(dim=1)
        L2_loss = current_L2.sum()

        outputs, _ = model(adv_samples)
        f_loss = f(outputs, labels, kappa).sum()

        cost = L2_loss + c*f_loss

        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        # Update adversarial images
        pre = torch.argmax(outputs.detach(), 1)
        condition = (pre != labels).float()

        # Filter out images that get either correct predictions or non-decreasing loss,
        # i.e., only images that are both misclassified and loss-decreasing are left
        mask = condition * (best_L2 > current_L2.detach())
        best_L2 = mask * current_L2.detach() + (1 - mask) * best_L2

        mask = mask.view([-1] + [1] * (dim - 1))
        best_adv_samples = mask * adv_samples.detach() + (1 - mask) * best_adv_samples

        # Early stop when loss does not converge.
        # max(.,1) To prevent MODULO BY ZERO error in the next step.
        if step % max(steps//10, 1) == 0:
            if cost.item() > prev_cost:
                return best_adv_samples
            prev_cost = cost.item()

    return best_adv_samples

def tanh_space(x):
    return (1 / 2) * (torch.tanh(x) + 1)

def inverse_tanh_space(x):
    # torch.atanh is only for torch >= 1.7.0
    # atanh is defined in the range -1 to 1
    return atanh(torch.clamp(x * 2 - 1, min=-1, max=1))

def atanh(x):
    return 0.5 * torch.log((1 + x) / (1 - x))

# f-function in the paper
def f(outputs, labels, kappa):
    one_hot_labels = torch.eye(outputs.shape[1]).to(outputs.device)[labels]

    # find the max logit other than the target class
    other = torch.max((1-one_hot_labels)*outputs, dim=1)[0]
    # get the target class's logit
    real = torch.max(one_hot_labels*outputs, dim=1)[0]

    return torch.clamp((real-other), min=-kappa)

## Clean examples
___
The attacks will be performed on correctly classified test samples from ESC-10

In [13]:
target_to_label = {0: "dog", 41: "chainsaw", 12: "crackling_fire", 40: "helicopter", 10: "rain",
                   20: "crying_baby", 38: "clock_tick", 21: "sneezing", 1: "rooster", 11: "sea_waves"}
label_to_target = {"dog": 0, "chainsaw": 41, "crackling_fire": 12, "helicopter": 40, "rain": 10,
                   "crying_baby": 20, "clock_tick": 38, "sneezing": 21, "rooster": 1, "sea_waves": 11}
target_to_y = {0: 0, 41:1, 12: 2, 40: 3, 10: 4, 20: 5, 38: 6, 21: 7, 1: 8, 11: 9}
y_to_target = {0: 0, 1:41, 2: 12, 3: 40, 4: 10, 5: 20, 6: 38, 7: 21, 8: 1, 9: 11}

In [14]:
class ESC10(Dataset):
    def __init__(self, meta, transformation=None):
        self.meta = meta
        self.transformation = transformation

    def __len__(self):
        return len(self.meta)
    
    def __getitem__(self, index):
        spec = torch.from_numpy(self.meta.loc[index, "spectrogram"]).unsqueeze(0)
        if self.transformation is not None:
            spec = self.transformation(spec)
        target = self.meta.loc[index, "target"]

        return spec, target_to_y[target]

In [15]:
dataset = pd.read_csv(META_PATH)
clean_samples_meta = dataset[dataset["fold"] == 5].reset_index(drop=True)
clean_samples_meta = clean_samples_meta.drop(columns=["fold", "category", "src_file", "take"])
clean_samples_meta

Unnamed: 0,filename,target
0,5-151085-A-20.wav,20
1,5-170338-A-41.wav,41
2,5-170338-B-41.wav,41
3,5-171653-A-41.wav,41
4,5-177957-A-40.wav,40
...,...,...
75,5-233160-A-1.wav,1
76,5-234879-A-1.wav,1
77,5-234879-B-1.wav,1
78,5-235671-A-38.wav,38


In [16]:
spectrograms = []

for filename in clean_samples_meta["filename"]:
    wavpath = os.path.join(AUDIO_PATH, filename)
    waveform, _ = librosa.load(wavpath, sr=SR)
    mel = librosa.feature.melspectrogram(y=waveform, n_fft=1024, win_length=800, hop_length=400, sr=SR)
    mel_db = librosa.power_to_db(mel, ref=np.max)
    mel_db = librosa.util.fix_length(mel_db, axis=1, size=128)
    spectrograms.append(np.clip(mel_db, a_min=None, a_max=0.))

clean_samples_meta["spectrogram"] = spectrograms
clean_samples_meta.head()

Unnamed: 0,filename,target,spectrogram
0,5-151085-A-20.wav,20,"[[-42.345238, -42.922577, -53.057945, -65.3477..."
1,5-170338-A-41.wav,41,"[[-31.258703, -35.422966, -37.695583, -37.3043..."
2,5-170338-B-41.wav,41,"[[-39.22971, -48.085846, -52.76058, -57.827343..."
3,5-171653-A-41.wav,41,"[[-51.152584, -49.817825, -51.062927, -51.8161..."
4,5-177957-A-40.wav,40,"[[-22.853165, -8.151789, -0.7906742, -5.289486..."


In [17]:
tf_minmax = lambda x: (x + 80.) / 80.
tf_inv_minmax = lambda x: x * 80. - 80.

In [18]:
clean_samples_data = ESC10(clean_samples_meta, transformation=tf_minmax)
clean_samples_loader = DataLoader(clean_samples_data, batch_size=20, shuffle=False)

## Attack evaluation
___

In [19]:
# attack parameters
params = {
    "eps": 0.05,
    "alpha": 0.005,
    "steps": 12,
    "max_iters": 1000,
    "overshoot": 0.02,
    "c": 1.,
    "kappa": 0,
    "learning_rate": 0.01
}

In [20]:
def craft_adv_samples(model, samples, labels, loss_fn, params, attack):
    if attack == "FGSM":
        pert_data = FGSM(model, samples, labels, params["eps"], loss_fn)
        
    if attack == "BIM":
        pert_data = BIM(model, samples, labels, params["alpha"],
                        params["eps"], params["steps"], loss_fn)
        
    if attack == "DeepFool":
        pert_data = DeepFool(model, samples, labels, params["steps"], params["overshoot"])
        
    if attack == "CW":
        pert_data = CW(model, samples, labels, params["c"],
                       params["kappa"], params["max_iters"],
                       params["learning_rate"])
        
    return pert_data

In [21]:
def measure_model_performance(model, test_loader, loss_fn, device, params, attack=None):
    model.eval()
    num_correct_pred = 0
    
    for X, y_true in tqdm(test_loader):
        X = X.to(device)
        y_true = y_true.to(device)
        
        with torch.no_grad():
            _, y_probs = model(X)
        y_labels = torch.argmax(y_probs, 1)
        
        if attack != None:
            pert_data = craft_adv_samples(model, X, y_true, loss_fn, params, attack)
            with torch.no_grad():
                _, y_probs = model(pert_data)
            y_labels = torch.argmax(y_probs, 1)
        num_correct_pred += (y_labels == y_true).sum()

    accuracy = num_correct_pred / len(test_loader.dataset)
    return model, accuracy

In [22]:
model = CNN().to(device)
model.load_state_dict(checkpoint["model_state_dict"])
loss_fn = nn.CrossEntropyLoss()

In [23]:
model, accuracy = measure_model_performance(model, clean_samples_loader, loss_fn, device, params)
print(f"Accuracy on clean examples: {accuracy * 100:.2f}%")

  0%|          | 0/4 [00:00<?, ?it/s]

Accuracy on clean examples: 85.00%


In [24]:
model, accuracy = measure_model_performance(model, clean_samples_loader, loss_fn, device, params, attack='FGSM')
print(f"Accuracy on FGSM examples: {accuracy * 100:.2f}%")

  0%|          | 0/4 [00:00<?, ?it/s]

Accuracy on FGSM examples: 25.00%


In [25]:
model, accuracy = measure_model_performance(model, clean_samples_loader, loss_fn, device, params, attack='BIM')
print(f"Accuracy on BIM examples: {accuracy * 100:.2f}%")

  0%|          | 0/4 [00:00<?, ?it/s]

Accuracy on BIM examples: 3.75%


In [26]:
model, accuracy = measure_model_performance(model, clean_samples_loader, loss_fn, device, params, attack='DeepFool')
print(f"Accuracy on DeepFool examples: {accuracy * 100:.2f}%")

  0%|          | 0/4 [00:00<?, ?it/s]

Accuracy on DeepFool examples: 12.50%


In [27]:
model, accuracy = measure_model_performance(model, clean_samples_loader, loss_fn, device, params, attack='CW')
print(f"Accuracy on CW examples: {accuracy * 100:.2f}%")

  0%|          | 0/4 [00:00<?, ?it/s]

Accuracy on CW examples: 0.00%


In [34]:
Attack = ["FGSM", "BIM", "DeepFool", "CW"]

correct = []

Samples = {key:torch.empty((0, 1, 128, 128)) for key in Attack}
Samples["Clean"] = torch.empty((0, 1, 128, 128))

model.eval()
for X, y_true in clean_samples_loader:
    X = X.to(device)
    y_true = y_true.to(device)
    with torch.no_grad():
        _, y_probs = model(X)
    y_labels = torch.argmax(y_probs, 1)

    correct.extend(list((y_labels == y_true).bool().cpu().numpy()))

    X = X[y_labels == y_true]
    y_labels = y_labels[y_labels == y_true]

    Samples["Clean"] = torch.cat((Samples["Clean"], X.cpu()))

    for atk in Attack:
        pert_data = craft_adv_samples(model, X, y_labels, loss_fn, params, atk)
        Samples[atk] = torch.cat((Samples[atk], pert_data.cpu()))

In [54]:
clean_samples_meta["correct_prediction"] = correct
samples_meta = clean_samples_meta[clean_samples_meta["correct_prediction"] == True].reset_index(drop=True)
samples_meta = samples_meta.drop(columns=["correct_prediction", "spectrogram"])

samples_meta.head()

Unnamed: 0,filename,target
0,5-151085-A-20.wav,20
1,5-170338-A-41.wav,41
2,5-170338-B-41.wav,41
3,5-171653-A-41.wav,41
4,5-177957-A-40.wav,40


In [56]:
samples_path = os.path.join(SAMPLES_DIR, MODEL)

if not os.path.exists(samples_path):
    os.makedirs(samples_path)

for sample_type in Samples.keys():
    torch.save(Samples[sample_type], os.path.join(samples_path, "_".join([sample_type, "samples.pt"])))

samples_meta.to_csv(os.path.join(samples_path, "samples_meta.csv"))