# Practical Protection of Voice-Controllable Devices Against Self-Issued Voice Commands

This is the source code for the solution against voice command self-issue presented in the paper *Practical Protection of Voice-Controllable Devices Against Self-Issued Voice Commands*.

# Instructions

### Before You Start

Before running this code, please put the `augmented-dataset.zip` archive in the `/content/` folder. Please note that this folder is the one already opened in the left panel of Google Colab after booting up, i.e., the same directory in which you can find the `sample_data/` folder. The code in this notebook will automatically extract the dataset.

When you first run this on Google Colab, the first instruction upgrades the Pillow package. At some point, the execution will halt with an error similar to:

```
ImportError: cannot import name 'is_directory' from 'PIL._util' (...)
```

**This is normal!**

Just click `Runtime > Restart and Execute All` on top of the screen to restart the environment and everything should work correctly.

### Performing the Preprocessing

The augmented dataset is obtained through the iteration of the `preprocess()` function defined below, for all samples. Because the augmented samples are already available for download, the lines that call the `preprocess()` function are not executed. However, if you want to perform the preprocessing yourself, which will give the exact same files except for augmentation 5 that is random, you will need to:

- Set the `preprocessing_from_scratch` flag to `True`
- Put the `selfissue-dataset-public-v1.zip` archive in the `/content/` folder of your Google Colab machine. This notebook will extract it automatically.
- Because the augmentation will not work properly if the directory tree for the augmented files is not present, the `augmented-dataset-structure.zip` archive must be used, so that all directories and the .csv files already exist. Place the archive in the `/content/` folder as usual. This notebook will extract it automatically **only** if you set the `preprocessing_from_scratch` flag to `True`.

### Running this outside of Google Colab (untested)

It should work as long as you have CUDA support. Install the following requirements (other versions of these packages might work as well):

- Librosa 0.8.1 (0.8.0 and 0.9.2 should work as well)
- Matplotlib 3.2.2 (3.0.2 and 3.6.0 should work as well)
- Numba 0.56.3 (0.49.0 and 0.56.2 should work as well)
- Numpy 1.21.6 (1.23.3 should work as well) 
- Pandas 1.3.5 (1.5.0 should work as well)
- Pillow 9.2.0
- Resampy 0.4.2 (0.2.2 should work as well)
- Scikit-Image 0.18.3 (0.19.3 should work as well)
- Scikit-Learn 1.0.2 (1.1.2 should work as well)
- Torch 1.12.1+cu113
- Torchvision 0.13.1+cu113

In [None]:
!sudo pip3 install --upgrade pillow

In [None]:
# Change this value to True if you want to perform the preprocessing again.
preprocessing_from_scratch = False

In [None]:
import os

# Extracts the Dataset Structure only
# Is only executed if the preprocessing flag was manually set to True
if preprocessing_from_scratch:
  if os.path.exists("/content/augmented-dataset/") == False:
    if os.path.exists("/content/augmented-dataset-structure.zip") == True:
      print("Extracting Augmented Dataset Structure...")
      !unzip "/content/augmented-dataset-structure.zip" -d "/content/"
      print("Done!")
    else:
      print("Augmented Dataset Structure file not found. Skipping unzip.")
  else:
    print("Augmented Dataset Structure already extracted. Skipping unzip.")

# Extracts the Augmented Dataset
if os.path.exists("/content/augmented-dataset/") == False:
  if os.path.exists("/content/augmented-dataset.zip") == True:
    print("Extracting Augmented Dataset...")
    !unzip "/content/augmented-dataset.zip" -d "/content/"
    print("Done!")
  else:
    print("Augmented Dataset file not found. Skipping unzip.")
else:
  print("Augmented Dataset already extracted. Skipping unzip.")

# Extracts the Self-Issue Dataset (not the augmented one)
if os.path.exists("/content/selfissue-dataset-public-v1/") == False:
  if os.path.exists("/content/selfissue-dataset-public-v1.zip") == True:
    print("Extracting Self-Issue Dataset...")
    !unzip "/content/selfissue-dataset-public-v1.zip" -d "/content/"
    print("Done!")
  else:
    print("Self-Issue Dataset file not found. Skipping unzip.")
else:
  print("Self-Issue Dataset already extracted. Skipping unzip.")

In [None]:
# Preprocessing and Augmentation Stage

import librosa
import numpy as np
import pandas as pd
import os
import skimage.io
import random

base_path = "/content/augmented-dataset/"

def scale_minmax(X, min=0.0, max=1.0):
  X_std = (X - X.min()) / (X.max() - X.min())
  X_scaled = X_std * (max - min) + min
  return X_scaled

def trim(img, offset=0):
  return img[:,0+offset:]

def preprocess(audiofile, num, mask1, mask2):
  y, sr = librosa.load(audiofile, sr=None) # Extract samples (y) and sample rate (sr)
  img = generate_mel_spectrogram(y, sr, num)
  audiofilename = audiofile[len("/content/selfissue-dataset-public-v1/"):] # Extracts the file name with its path, e.g. /malicious/dw/rec/5-rec.wav
  finalName = os.path.join("/content/augmented-dataset/", audiofilename + ".png") # Generates the final path of the preprocessed sample, e.g. /content/augmented-dataset/malicious/dw/rec/5-rec.wav.png
  print("Saving: " + finalName)
  skimage.io.imsave(finalName, img)
  # Augmentation Starts Here
  # Do not touch img as we need it for the masking augmentation process
  # 1. Pitch Increase
  pitchPlusY = augmentation_pitch(y, sr, 2)
  skimage.io.imsave(finalName+".aug1.png", generate_mel_spectrogram(pitchPlusY, sr, num))
  print("Augmentation 1 OK.")
  # 2. Pitch Decrease
  pitchMinusY = augmentation_pitch(y, sr, -2)
  skimage.io.imsave(finalName+".aug2.png", generate_mel_spectrogram(pitchMinusY, sr, num))
  print("Augmentation 2 OK.")
  # 3. Speed Increase
  speedPlusY = augmentation_speed(y, 1.2)
  skimage.io.imsave(finalName+".aug3.png", generate_mel_spectrogram(speedPlusY, sr, num))
  print("Augmentation 3 OK.")
  # 4. Speed Decrease
  speedMinusY = augmentation_speed(y, 0.8)
  skimage.io.imsave(finalName+".aug4.png", generate_mel_spectrogram(speedMinusY, sr, num))
  print("Augmentation 4 OK.")
  # 5. Frequency Mask (Horizontal)
  maskImg = augmentation_mask_frequency(img, mask1, mask2)
  skimage.io.imsave(finalName+".aug5.png", maskImg)
  print("Augmentation 5 OK.")
  
def generate_mel_spectrogram(y, sr, num):
  mel = librosa.feature.melspectrogram(y=y, sr=sr) # Extract Mel Spectrogram (Mel x Amplitude x Time)
  mel_db = librosa.power_to_db(mel, ref=np.max) # Convert Amplitude to Db, to create another Mel Spectrogram (Mel x Db x Time)
  img = scale_minmax(mel_db, 0, 255).astype(np.uint8)
  img = np.flip(img, axis=0)
  img = 255-img
  if (num == 1):
    img = trim(img, 0)
  else:
    img = trim(img, 20)
  return img

def augmentation_pitch(data, sampling_rate, pitch_factor):
    return librosa.effects.pitch_shift(data, sampling_rate, pitch_factor)

def augmentation_speed(data, speed_factor):
    return librosa.effects.time_stretch(data, speed_factor)

def augmentation_mask_frequency(maskImg, mask1, mask2): #6 pixel x 2 masks
  maskImg[mask1:mask1+5,:] = (maskImg[mask1:mask1+5,:] * 0) + 255
  maskImg[mask2:mask2+5,:] = (maskImg[mask2:mask2+5,:] * 0) + 255
  return maskImg

# Is only executed if the preprocessing flag was manually set to True
if preprocessing_from_scratch:
  # Preprocess Training Samples
  dataToProcess = pd.read_csv("/content/selfissue-dataset-public-v1/training-example-wav.csv", header=None)
  for i, elem in dataToProcess.iterrows():
    audio1_path = os.path.join("/content/selfissue-dataset-public-v1/", elem[0]) # elem[0] = played audio || elem[1] = recorded audio || elem[2] = label
    audio2_path = os.path.join("/content/selfissue-dataset-public-v1/", elem[1])
    random.seed()
    mask1 = random.randrange(123) # so the upper limit is 122, in case it gets picked the 6px wide mask will hit rows 122, 123, 124, 125, 126 and 127.
    mask2 = random.randrange(123)
    preprocess(audio1_path, 1, mask1, mask2)
    preprocess(audio2_path, 2, mask1, mask2)
  # Preprocess Testing Samples -- Augmented Samples will be discarded during actual testing as they are not included in the testing.csv file within the /augmented-dataset/ directory
  dataToProcess = pd.read_csv("/content/selfissue-dataset-public-v1/testing-example-wav.csv", header=None)
  for i, elem in dataToProcess.iterrows():
    audio1_path = os.path.join("/content/selfissue-dataset-public-v1/", elem[0]) # elem[0] = played audio || elem[1] = recorded audio || elem[2] = label
    audio2_path = os.path.join("/content/selfissue-dataset-public-v1/", elem[1])
    random.seed()
    mask1 = random.randrange(123) # so the upper limit is 122, in case it gets picked the 6px wide mask will hit rows 122, 123, 124, 125, 126 and 127.
    mask2 = random.randrange(123)
    preprocess(audio1_path, 1, mask1, mask2)
    preprocess(audio2_path, 2, mask1, mask2)

In [None]:
# Dataset Class for our Siamese Network

import librosa.display
import matplotlib.pyplot as plt
import datetime
from pathlib import Path
from PIL import Image
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
import torch
import torch.nn as nn
from torchvision import transforms

class SiameseDataset():
    def __init__(self,training_csv=None,transform=None):
        self.train_df = pd.read_csv(base_path + "" + training_csv, header=None)
        self.transform = transform

    def __getitem__(self,index):
        img1_path = os.path.join(base_path, self.train_df.iat[index,0])
        img1 = Image.open(img1_path)
        img1 = img1.convert("L")
        img2_path = os.path.join(base_path, self.train_df.iat[index,1])
        img2 = Image.open(img2_path)
        img2 = img2.convert("L")
        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
        return img1, img2, torch.from_numpy(np.array([int(self.train_df.iat[index,2])],dtype=np.float32)), img1_path, img2_path
        # it returns img1, img2, the label, and the related paths

    def __len__(self):
        return len(self.train_df)

In [None]:
# Structure of our Siamese Network

class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.cnn1 = nn.Sequential(
            nn.Conv2d(1, 60, kernel_size=7, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),
            nn.BatchNorm2d(60),
            nn.Dropout2d(p=.25),

            nn.Conv2d(60, 48, kernel_size=7, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),
            nn.BatchNorm2d(48),
            nn.Dropout2d(p=.25),

            nn.Conv2d(48, 36, kernel_size=5, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),
            nn.BatchNorm2d(36),
            nn.Dropout2d(p=.25),

            nn.Conv2d(36, 24, kernel_size=5, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),
            nn.BatchNorm2d(24),
            nn.Dropout2d(p=.25),

            nn.Conv2d(24, 12, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),
            nn.BatchNorm2d(12),
            nn.Dropout2d(p=.25)
        )

        self.fc1 = nn.Sequential(
            nn.Linear(456, 300),
            nn.ReLU(inplace=True),

            nn.Linear(300, 100),
            nn.ReLU(inplace=True),

            nn.Linear(100, 20),
            nn.ReLU(inplace=True)
        )
        
    def forward_once(self, x):
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2

In [None]:
# Contrastive Loss, i.e. the criterion for our training

class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, x0, x1, y):
        pairwise_distance = torch.nn.functional.pairwise_distance(x0, x1, keepdim=True)
        loss = torch.mean((1 - y) * torch.pow(pairwise_distance, 2) / 2  + (y) * torch.pow(torch.clamp(self.margin - pairwise_distance, min=0.0), 2) / 2)
        return loss

In [None]:
# Siamese Network Training

from sklearn import metrics
import time

# Initialize network, criterion and optimizer
net = SiameseNetwork().cuda()
criterion = ContrastiveLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.00005)

# Define other variables for training
tot_epochs = 100
test_th = 0.4 # Threshold for the classifier

# Load the training dataset
siamese_dataset = SiameseDataset("training.csv", transform = transforms.Compose([transforms.Resize((128,650)), transforms.ToTensor()]))
train_dataloader = torch.utils.data.DataLoader(siamese_dataset,num_workers=1,batch_size=1,shuffle=True)

# Load the validation dataset
test_dataset = SiameseDataset("testing.csv", transform=transforms.Compose([transforms.Resize((128,650)), transforms.ToTensor()]))
test_dataloader = torch.utils.data.DataLoader(test_dataset,num_workers=1,batch_size=1,shuffle=True)

# Training Function
def train():
    counter = []
    train_accuracy = []
    test_accuracy = []
    bestAcc = 0
    for epoch in range(1,tot_epochs+1):
        net.train() # Entering Training Mode
        for i, data in enumerate(train_dataloader,0):
            img0, img1, label, filename1, filename2 = data
            img0, img1, label = img0.cuda(), img1.cuda(), label.cuda()
            optimizer.zero_grad()
            output1,output2 = net(img0,img1)
            loss_contrastive = criterion(output1,output2,label)
            loss_contrastive.backward()
            optimizer.step()
        print("Epoch "+str(epoch))
        counter.append(epoch) # X Axis of the Plot
        net.eval() # Exiting Training Mode, Entering Evaluation Mode
        vLoss, currentAccuracy = validate(train_dataloader, False, test_th) #Validation on training dataset
        train_accuracy.append(currentAccuracy) # Y Axis of the Plot (i)
        print("- Training Accuracy: " + str(currentAccuracy))
        currentTestLoss, testAcc = validate(test_dataloader, False, test_th) # Validation on testing dataset
        test_accuracy.append(testAcc) # Y Axis of the Plot (ii)
        print("- Testing Accuracy: " + str(testAcc))
        if (testAcc > bestAcc):
          bestAcc = testAcc
          torch.save(net.state_dict(), "bestModel.pt") # We save the best performing model so we can use it later
          print("Best Model Updated")
        print("")
    fig, (ax) = plt.subplots(1, 1, figsize=(10,5))
    ax.plot(counter, train_accuracy, 'r-', label='Train Accuracy')
    ax.plot(counter, test_accuracy, 'b-', label='Test Accuracy')
    ax.set_xlabel("Epoch")
    ax.set_ylabel("Accuracy")
    ax.set_ylim(ymin=0)
    ax.legend()
    return net # Returns the trained network after X epochs

# Validation Function
def validate(dataloader, last, threshold):
  tp=0
  tn=0
  fp=0
  fn=0
  loss=0
  for i, data in enumerate(dataloader,0):
    x0, x1, label, fullname1, fullname2 = data
    output1,output2 = net(x0.to(device),x1.to(device))
    pdist = torch.nn.functional.pairwise_distance(output1, output2)
    if label==torch.FloatTensor([[0]]):
      label="Benign."
    else:
      label="Malicious!"    
    prediction = "Malicious!" if pdist.item()>=threshold else "Benign."
    if last:
      print("Now evaluating: " + str(fullname1) + " and " + str(fullname2))
      print("Predicted Pairwise Distance: ", pdist.item())
      print("Prediction: ", prediction)
      print("Actual Label: ", label)
      print("")
    loss = pdist.item()
    if prediction == label:
      if label == "Malicious!":
        tp = tp+1
      else:
        tn = tn+1
    else:
      if label == "Malicious!":
        fn = fn+1
      else:
        fp = fp+1
  accuracy = (tp+tn)/(tp+tn+fp+fn)
  if last:
    print("Accuracy: " + str(accuracy))
  return loss, accuracy

In [None]:
# Now that we declared all variables and functions, it is time to train our network

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = train() # Training begins!
torch.save(model.state_dict(), "model.pt") # This is the latest model after X epochs of training. It is not necessarily the best one (almost always, it is not!). The best model is saved during training, and is called bestModel.pt
print("Latest Model Saved Successfully")

In [None]:
# Let's see what the best model did, sample by sample
model_to_load = "/content/bestModel.pt" # We first pick the model we want to use
net = SiameseNetwork().cuda() # We initialize the network
net.load_state_dict(torch.load(model_to_load)) # We actually load the model
net.eval() # We enter evaluation mode
validate(test_dataloader, True, 0.4) # Evaluation starts!