<a href="https://colab.research.google.com/github/crgeer/Notebooks/blob/main/Audio_Adv_ML_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Welcome to the Audio Adversarial Machine Learning Project!

In this project, you will receive a passive introduction to machine learning in the audio domain by reading, comprehending, and running ML code on audio datasets.  You will also implement the Basic Iterative Method (BIM) attack and the Density Estimation anomaly detection defense.  The project assumes this is not your first time using PyTorch, although if it is you may still be able to slog through.

This project is intended for Google Colab.  Google Colab is a free resource that runs this Jupyter notebook in a virtual machine with optional hardware acceleration (Runtime -> Change runtime type -> GPU).  But BEWARE - as a free resource, there are NO guarantees for length of time you can access a VM or the resources that are available to you.  What you ran yesterday may not run today.  Also, you might get a snack while your model is training and come back to find your session erased.  Be aware up front that Colab can be tricky, especially in the free version.  There are options to pay for an improved Colab environment (I did not), or pay for increased storage so you can save checkpoints (I did).  The project has been verified to run in a totally free setting, using less than 5 GB of Google Drive storage for the original dataset.  You can also take this Jupyter notebook and work offline, particularly if you can run this code on your own GPU.

Now let's get started.  Run the following cell by using Shift + Enter to import the necessary libraries

In [None]:
import numpy as np
import librosa, librosa.display, os, pickle, random, torch, torchaudio, torchvision
import torch.nn as nn, torch.nn.functional as F, torch.optim as optim
import IPython.display
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.neighbors import KernelDensity
from sklearn.mixture import GaussianMixture

Now take the files at this link (https://drive.google.com/drive/folders/1h6FTQD4G0BlzOuGdgLsjn6NJOnM9KYuF?usp=sharing) and upload them to your Google Drive account.  After the folder is in your Google Drive, run the following code block to import your Google Drive contents into this session.  This will allow you to access the Audio MNIST dataset.

The Audio MNIST dataset (https://github.com/soerenab/AudioMNIST) contains WAV recordings of 60 speakers saying the digits 0 to 9.  Each speaker says each digit 50 times - meaning there are 30,000 short audio clips in this dataset.  We will use this throughout the project.



In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Take a look at the code below.  Some of the lines are *IMPORTANT* to check, like matching up the data location variable to the location in your Colab filesystem where you have the dataset.

While the functional lines work as-is and shouldn't require any tweaking, I recommend reading them for comprehension so that you can follow the processes here.  Some notes:

STFT - Short time Fourier transform - this decomposes an audio signal into the frequencies that it comprises over time.  The decomposition takes a small time segment of the audio signal and generates complex-value coefficients for those component frequencies.  With a vector of coefficients for each time segment, and many time segments, the result of the STFT operation is a 2-D matrix.  The dimensions and values of that matrix can vary by tweaking the parameters to the STFT operation, but the arrangement below will work well for you.

Spectrogram - This term will pop up throughout the project.  It is the 2-D matrix output from the STFT operation, just processed a little more so that it becomes both ingestible to a neural network and visually meaningful to a human.  We construct it by removing the complex phase information from the coefficients and converting values into decibels.

Feel free to dive more into the audio-specific operations, but this project focuses primarily on adversarial AI attack and defense procedures.  So, also feel free to use those audio operations as black-box functions and keep moving.

This will take around 30 minutes, so go grab a coffee.  Just try to click around the page now and then to make sure you don't time out your Colab session in the meantime.  Also, if you have the space, it would be really good to save this data so you don't have to re-run this cell.  Google currently offers 100GB of storage for a month for 2 dollars, and that 2 dollars might save you a lot of time.  But again, this project is intended to work entirely for free!  It just may require more patience that way.

In [None]:
print("Starting - ", datetime.now())

# You might have to change this to fit your filesystem
data_location = 'drive/MyDrive/Audio MNIST'

# If you have some room (2 GB or so) to spare in your Google Drive, I suggest
# saving the preprocessed spectrograms there.  This will make them
# recoverable if your Colab session restarts.
save_location = 'drive/MyDrive/Audio Adv ML Work Folder'

# If you do not have room to spare, uncomment this line This probably will result
# in you rerunning a lot of code, but that makes it easier to keep things free
#save_location = 'temp'

# Make space for saving data
if not os.path.isdir(save_location):
  os.mkdir(save_location)

if not os.path.isdir(os.path.join(save_location, 'Preprocessed')):
  os.mkdir(os.path.join(save_location, 'Preprocessed'))

counter = 0

transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])

# The iterations follow the 60 speakers in the dataset
for i in range(60):
    
  i += 1

  # Formatting so we can parse the directory
  if i < 10:
    i = '0' + str(i)
  else:
    i = str(i)
  
  speaker_location = os.path.join(data_location, i)

  # Now read the audio files
  for audio_file in os.listdir(speaker_location):

    nums = audio_file[5:7]

    if nums[1] == '.':
      nums = nums[0]
    
    nums = int(nums)

    # Divide the data in an 80/10/10 split
    # Adjust if you like
    if nums < 40:
      variant = 'Train'
    elif nums < 45:
      variant = 'Dev'
    else:
      variant = 'Test'
    
    # Comment out these lines if you have sufficient memory to access a
    # larger training set.  At first, leave them  in to decrease the likelihood
    # of crashing your VM due to insufficient memory
    if nums > 20 and nums < 40:
      continue
    
    # Load the base audio
    audio, sample_rate = librosa.load(os.path.join(speaker_location, audio_file))
    audio = librosa.util.fix_length(audio, 25000)
    label = int(audio_file[0])

    # Process into STFT spectrograms
    stft = librosa.amplitude_to_db(abs(librosa.stft(audio, n_fft = 512)))
    stft = transform(stft)

    # Package up the data and store it
    data = (stft, i, label)

    if not os.path.isdir(os.path.join(save_location, 'Preprocessed', variant)):
      os.mkdir(os.path.join(save_location, 'Preprocessed', variant))
      
    if not os.path.isdir(os.path.join(save_location, 'Preprocessed', variant, str(label))):
      os.mkdir(os.path.join(save_location, 'Preprocessed', variant, str(label)))

    pickle.dump(data, open(os.path.join(save_location, 'Preprocessed', variant, str(label), audio_file[0:7] + '.p'), 'wb'))

    counter += 1

print("Complete - ", datetime.now())

If your Colab session restarts, you'll need to run the cells from the top of the screen on down.  However, if you saved your preprocessed data to a persistent location, you can recover it by just aligning your file locations.  Run the cell below instead of the cell above in that case.

In [None]:
data_location = 'drive/MyDrive/Audio MNIST'
save_location = 'drive/MyDrive/Audio Adv ML Work Folder'
sample_rate = 22050 # if you used the provided dataset

If you still have your data open, let's open that last spectrogram to see what it looks like:

In [None]:
stft = librosa.amplitude_to_db(abs(librosa.stft(audio, n_fft = 512)))
spectrogram = librosa.display.specshow(stft)

And here's what it sounds like:

In [None]:
IPython.display.Audio(data = audio, rate = sample_rate)

There's still some work to do before we can feed data to a model.  Let's add a class here that will assist in loading the data.

In [None]:
# This dataset class prepares the saved data so
# that it can be ingested by a DataLoader
class AudioDataset(torch.utils.data.Dataset):


  def __init__(self, location):

    self.path = location
    self.data = {}
    self.classes = []

    counter = 0

    # Iterate throught the 10 digits
    for i in range(10):

      # Iterate throught the files in each folder
      for audio_file in os.listdir(os.path.join(self.path, str(i))):

        audio, speaker, label = pickle.load(open(os.path.join(self.path, str(i), audio_file), 'rb'))
        self.data[counter] = (audio, label)

        if label not in self.classes:
          self.classes.append(label)
        
        counter += 1
  
  def __len__(self):

    return len(self.data)
  
  def __getitem__(self, index):

    return self.data[index]

Finally, we can construct a model.  This model is loosely based on VGG-16 but is constructed to be adaptable to various input sizes and easy to adjust.  I recommend leaving it as-is for now, just because that way we can limit the complexity of the project to the places where it is intended.  Once you have things working, feel free to try new things!

The model accepts a spectrogram as input and classifies it as one of the digits zero to nine.

If the contents of the model are new to you, take a look at the PyTorch documentation for the different operations and layer types.

In [None]:
class CNN(nn.Module):


  def __init__(self, dropout_param = 0):

    # This is stored separately so that
    # it can be manually adjusted
    self.dropout_param = dropout_param

    super().__init__()
    self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 5, kernel_size = 7)
    self.conv2 = nn.Conv2d(in_channels = 5, out_channels = 5, kernel_size = 7, padding = 'same')
    self.pool = nn.MaxPool2d(2)
    self.bn1 = nn.BatchNorm2d(5)
    self.drop = nn.Dropout(p = self.dropout_param)
    self.conv3 = nn.Conv2d(in_channels = 5, out_channels = 10, kernel_size = 5)
    self.conv4 = nn.Conv2d(in_channels = 10, out_channels = 10, kernel_size = 5, padding = 'same')
    self.bn2 = nn.BatchNorm2d(10)
    self.conv5 = nn.Conv2d(in_channels = 10, out_channels = 16, kernel_size = 5)
    self.conv6 = nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = 5, padding = 'same')
    self.bn3 = nn.BatchNorm2d(32)
    self.fc1 = nn.Linear(17920, 32)
    self.fc2 = nn.Linear(32, 32)
    self.fc3 = nn.Linear(32, 10)
  
  def forward(self, x):

    x = self.bn1(self.pool(self.conv2(self.conv1(x))))
    # This feedforward logic supports removing dropout manually
    if self.dropout_param:
      x = self.drop(x)
    x = F.relu(x)
    x = self.bn2(self.pool(self.conv4(self.conv3(x))))
    if self.dropout_param:
      x = self.drop(x)
    x = F.relu(x)
    x = self.bn3(self.pool(self.conv6(self.conv5(x))))
    if self.dropout_param:
      x = self.drop(x)
    x = F.relu(x)
    x = torch.flatten(x, 1)
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = self.fc3(x)

    return F.log_softmax(x, dim = 1)

We'll train the model now using the data you preprocessed earlier.  Fortunately, we are working with a large dataset.  This means we shouldn't run into much trouble with overfitting and can adopt a direct and straightforward training methodology.  Still, verify a few cells below with the test run to ensure test time performance is acceptable.  Re-run if necessary.  Take a look at the hyperparamers; you can tune these based on your needs.  However, the settings here will likely suffice.

In [None]:
print('Starting code block: ', datetime.now())

# Hyperparameters and tweakable things
epochs = 4
batch_size = 128
lr = 1e-3
dropout_param = 0.4
save_model = True # For me, this is only 3.2 MB
model_name = 'CNN.pth'

# Load up out saved data
train_loc = os.path.join(save_location, 'Preprocessed', 'Train')
train_set = AudioDataset(train_loc)
train_loader = torch.utils.data.DataLoader(train_set, batch_size = batch_size, shuffle = True)

# Construct your model
net = CNN(dropout_param = dropout_param)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# This is a sanity check for if you are intending to use a GPU
print('This network is running on device: ', device)

# Know what device your model and data is on to help with debugging
net.to(device)

optimizer = optim.Adam(net.parameters(), lr = lr)

counter = 0

for epoch in range(epochs):

  correct = 0
  total = 0

  print("Starting epoch", epoch + 1, " - ", datetime.now())

  for data in train_loader:

    # Perform one step of training with one batch
    audio, label = data
    audio, label = audio.to(device), label.to(device)
    net.zero_grad()
    optimizer.zero_grad()
    output = net(audio.view(-1, 1, 257, 196))
    loss = F.nll_loss(output, label)
    loss.backward()
    optimizer.step()

    # Tally up the performance
    for idx, i in enumerate(output):

      if torch.argmax(i) == label[idx]:
        correct += 1
      total += 1

  counter += 1
  
  print(counter, '/', epochs, '  Loss: ', loss.item(), '  Accuracy: ', round(correct / total, 3))

if save_model:
  if not os.path.isdir(os.path.join(save_location, 'Models')):
    os.mkdir(os.path.join(save_location, 'Models'))
  
  torch.save(net.state_dict(), os.path.join(save_location, 'Models', model_name))

print('Code block finished: ', datetime.now())

Again, if you saved your trained model parameters, you can recover them here instead of re-training the model if your Colab session restarts.

In [None]:
model_name = 'CNN.pth'
dev_loc = os.path.join(save_location, 'Preprocessed', 'Dev')
batch_size = 128
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
net = CNN().to(device)

if torch.cuda.is_available():
  net.load_state_dict(torch.load(os.path.join(save_location, 'Models', model_name), map_location = 'cuda:0'))
  net.eval()
  net.cuda()
else:
  net.load_state_dict(torch.load(os.path.join(save_location, 'Models', model_name), map_location = 'cpu'))
  net.eval()


Let's see how the trained model performs on new data!

You might notice that we constructed sets for Training, Development, and Testing.  For the basic version of the project, we only use the Training and Development sets.  This leaves the Test set available for you later if you want more data to validate choices for optional extensions to the project.  So here and later on, we actually use the Development set for "testing."

In [None]:
# Load up the Dev set
dev_loc = os.path.join(save_location, 'Preprocessed', 'Dev')
dev_loader = torch.utils.data.DataLoader(AudioDataset(dev_loc), batch_size = batch_size, shuffle = False)

correct = 0
total = 0

with torch.no_grad():
  
  net.dropout_param = 0

  for data in dev_loader:

    # Only forward passes needed since we are just testing
    audio, label = data
    audio = audio.to(device)
    label = label.to(device)
    output = net(audio.view(-1, 1, 257, 196))
    
    for idx, i in enumerate(output):

      if torch.argmax(i) == label[idx]:
        correct += 1
      total += 1

  print("Accuracy: ", round(correct / total, 3))

Here's the first step that is completely for you.

You will implement the Basic Iterative Method.  The fundamental actions of the attack are very simple - but it can be difficult to pull concepts out of papers, skillfully use ML frameworks, and put everything together effectively.  Here's a paper that contains the concepts (https://arxiv.org/pdf/1607.02533.pdf).

Note that the paper contains a couple variations of the central attack concept.  Some are untargeted and do not specify the target class, and some are targeted and do specify a specific target class.  The function you implement is provided one instance of a spectrogram, the model, and a target label.  You must construct the attack so that the model selects the target label with good effectiveness.  The output for this function should be a perturbed spectrogram of similar dimensions to the input spectrogram.

Good luck!

In [None]:
def bim(data, net, target_label, epsilon = 1):

  # Your code goes here

  return data.view(257, 196)

This is the code that will test your attacks.  One thing that might be difficult for you is that your attack is converted back into pure audio before it is delivered to the victim model.  This more realistically simulates an attack on, say, an online speech-to-text model.  We are essentially doing just that, except for now the speech-to-text model only works for the digits zero to nine.  The conversion back to audio can be difficult because it might introduce approximation loss that could degrade the effects of a subtle adversarial perturbation.

My naive implementation of BIM gets around 88% effectiveness.  Can you match that, or even do better?

In [None]:
print('Code block started: ', datetime.now())

# Reduce batch_size to 1 to simplify attack coding
dev_loader = torch.utils.data.DataLoader(AudioDataset(dev_loc), batch_size = 1, shuffle = False)

# This generates random target labels
def get_target(source_label):

  a = random.randint(0, 9)

  while a == label[0]:
    a = random.randint(0, 9)

  return torch.tensor([a])


attack_total = 0
attack_succeed = 0

successful_attacks = []

for data, label in dev_loader:

  data, label = data.to(device), label.to(device)
  output = net(data.view(-1, 1, 257, 196))
  data.requires_grad = True
  l = label.item()
  adv_label = get_target(label).to(device)
  
  # If the model is wrong at the start, we don't bother with an attack
  if torch.argmax(output).item() != l:
    continue
  
  attack_total += 1

  # The call to your function
  perturbed_data = bim(data, net, adv_label)

  # This block simulates a more realistic attack delivery by reconstituting
  # an audio signal, and then doing the pre-processing steps again to get
  # a fresh spectrogram.  The simulation introduces minor approximation loss.
  d_perturbed_data = perturbed_data.detach().cpu().numpy()
  istft = librosa.db_to_amplitude(d_perturbed_data)
  i_audio = librosa.util.fix_length(librosa.griffinlim(istft), 25000)
  stft = librosa.amplitude_to_db(np.abs(librosa.stft(i_audio, n_fft = 512)))
  stft = torch.from_numpy(stft).to(device)

  output = net(stft.view(-1, 1, 257, 196))

  # Let's see if the attack achieves the targeted label
  if torch.argmax(output).item() == adv_label:
    attack_succeed += 1
    successful_attacks.append((d_perturbed_data, l, adv_label))
  
  # Shows progress - counts to about 3000 on the original code
  if attack_total % 500 == 0:
    print(attack_total, "attacks attempted")
  
if not os.path.isdir(os.path.join(save_location, 'Successful Attacks')):
  os.mkdir(os.path.join(save_location, 'Successful Attacks'))

# Comment this line out if you need to save storage
torch.save(successful_attacks, open(os.path.join(save_location, 'Successful Attacks', 'bim.p'), 'wb'))

print("BIM Success Rate: ", round(attack_succeed / attack_total, 3))

print('Code block finished: ', datetime.now())

Here's another checkpoint load, if you need it.

In [None]:
if torch.cuda.is_available():
  successful_attacks = torch.load(os.path.join(save_location, 'Successful Attacks', 'bim.p'), map_location = 'cuda:0')
else:
  successful_attacks = torch.load(os.path.join(save_location, 'Successful Attacks', 'bim.p'), map_location = 'cpu')

If you still have the data in your session, we can take a look at the spectrogram from the last attack you generated:

In [None]:
stft = librosa.amplitude_to_db(abs(librosa.stft(i_audio, n_fft = 512)))
spectrogram = librosa.display.specshow(stft)

And produce the audio:

In [None]:
IPython.display.Audio(data = i_audio, rate = sample_rate)

You can probably hear some artifacts of the attack - hopefully not too much!  If things go well, you should only hear some faint static.  If the artifacts are immense, or if you destroyed the base audio signal entirely, don't worry too much.  Just check to see if you can improve your BIM implementation or maybe catch a bug or two.  If you can clearly hear the number being read out, call it good and move on for now; you can come back later if you want to improve.

These artifacts are there in part because the dimensionality of the audio is small.  Take it as a matter of trust for now that if the audio signals are longer, the greater dimensionality of the data will make attacks imperceptible to a human listener.  We just can't fit that kind of rich data at-scale on a free service.  In the paper for the Carlini-Wagner (CW) attack (https://arxiv.org/pdf/1608.04644.pdf), you can see that the MNIST dataset also reveals artifacts for an attack that is well-tuned and more powerful than BIM.  But, CW is human imperceptible on CIFAR-10.  Tests on Audio MNIST are the same.

Let's move on now to a defense.  You will be implementing a Density Estimation anomaly detection defense (https://arxiv.org/pdf/1703.00410v2.pdf).  The defense first constructs Gaussian mixture models (GMMs) on clean data to capture the distributions of that data.  Then it scores new data (possibly clean, possibly attacked) against those GMMs.

Write some code in the block below to construct the densities.  Start with a simple Guassian distribution, and then experiment with tuning Guassian Mixture Models and Kernel Density functions.  There are useful library functions that will do the mathematical heavy lifting for you (https://scikit-learn.org/stable/modules/generated/sklearn.mixture.GaussianMixture.html, https://scikit-learn.org/stable/modules/density.html).  I recommend using the "densities" dictionary that is prepared for you, but you can manage your own data structures as you see fit.

You'll need to add hooks to retrieve the activation outputs.  Experiment with different layer activations from the model to get the best results.

Again, start by just constructing a dictionary of densities in this code block.  Once you have this, implement the density estimation scoring function in the following block.

In [None]:
input_limit = 3000 # tune this value if you have memory issues

# Bring the training data back, with a batch size of 1
train_loc = os.path.join(save_location, 'Preprocessed', 'Train')
train_set = AudioDataset(train_loc)
train_loader = torch.utils.data.DataLoader(train_set, batch_size = 1, shuffle = True)

activations = {}
layer = 'fc1'

# This function makes the activation values of a layer retrievable to us at runtime
def get_activation(name):

  def hook(model, input, output):

    activations[name] = output.detach()

  return hook

# Register the hook here so we can get those activations
net.fc1.register_forward_hook(get_activation('fc1'))

densities = {}
features_set = {}

# Just preparing data structures
for c in range(len(train_set.classes)):

  densities[c] = []
  features_set[c] = []

counter = 0

for data, label in train_loader:

'''

Your code goes here

'''

if not os.path.isdir(os.path.join(save_location, 'Densities')):
  os.mkdir(os.path.join(save_location, 'Densities'))

# Comment this out if you need to save storage space
pickle.dump(densities, open(os.path.join(save_location, 'Densities', 'densities.p'), 'wb'))

This may be useful to recover a save point.

In [None]:
densities = pickle.load(open(os.path.join(save_location, 'Densities', 'densities.p'), 'rb'))

Code your defense here.  Remember this takes data examples one-at-a-time, and returns a meaningful score for the new data against the density model.  Note that there is a built-in scoring function available for you to use (https://scikit-learn.org/stable/modules/generated/sklearn.mixture.GaussianMixture.html#sklearn.mixture.GaussianMixture.score_samples).

In [None]:
def density_estimation(densities, data):

  # Your code goes here

This code runs your defense for clean data, and then on your attacks!

Note the negative sign at the call to the defense function.  This was natural for me - but, if your natural way of coding results in defense AUC scores that are very low (like .12), remove the negative signs!  .12 is a meaningful AUC score, because you can just flip the graph to get the much better score of .88.  An AUC score around .5 is where things are bad.

In [None]:
dev_loader = torch.utils.data.DataLoader(AudioDataset(dev_loc), batch_size = 1, shuffle = False)
records = []

for data, label in dev_loader:

  data, label = data.to(device), label.to(device)

  records.append((-density_estimation(densities, data.view(257, 196)), 'clean'))

for attack in successful_attacks:
  (data, orig_label, adv_label) = attack
  data = torch.from_numpy(data).to(device)

  records.append((-density_estimation(densities, data.view(257, 196)), 'attack'))

Code up a function below to plot a ROC graph and calculated the ROC AUC.  If your AUC score is above .8, congratulations!   You have constructed a simple automated defense that can separate successful attacks from the clean data.

This concludes the basic option for this project.  However, there are some natural next steps.  BIM attacks and Density Estimate anomaly detection are excellent for demonstrating the concepts, but they are not state-of-the-art.  Can you do better?  If you want to go further, try the CW attack (https://arxiv.org/pdf/1608.04644.pdf) and the ADA defense (https://arxiv.org/pdf/1712.06646.pdf)!

In [None]:
# ROC code goes here