# Data utilities

This notebook contains all helper functions required to generate datasets throughout the project.

In [2]:
import cleverhans
import cv2
import itertools
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import os
import pathlib
import re
import tensorflow as tf

from cleverhans.tf2.attacks.projected_gradient_descent import projected_gradient_descent
from cleverhans.tf2.attacks.fast_gradient_method import fast_gradient_method
from cleverhans.tf2.attacks.basic_iterative_method import basic_iterative_method
from cleverhans.tf2.attacks.madry_et_al import madry_et_al
from cleverhans.tf2.attacks.momentum_iterative_method import momentum_iterative_method
from cleverhans.tf2.attacks.carlini_wagner_l2 import carlini_wagner_l2
from cleverhans.tf2.attacks.spsa import spsa
from cleverhans.tf2.utils import optimize_linear, compute_gradient, clip_eta, random_lp_vector
from numpy.random import default_rng
from PIL import Image
from scipy.stats import levene, shapiro, mannwhitneyu
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.model_selection import train_test_split
from statistics import mean
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Conv2D, Dense, Dropout, GlobalAveragePooling2D, Input, Lambda, MaxPooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical

In [3]:
#get mnist dataset
def get_mnist_dataset():
    """
    Retrieves the normalised mnist dataset.
    
    Returns:
        Four np_arrays: mnist_x_train, mnist_x_test, mnist_y_train, mnist_y_test.
        The x np_arrays contain images, and y contains image labels.
    """
    mnist = tf.keras.datasets.mnist
    (mnist_x_train, mnist_y_train), (mnist_x_test, mnist_y_test) = mnist.load_data()
    
    #normalise the data
    mnist_x_train = mnist_x_train / 255.0
    mnist_x_test = mnist_x_test / 255.0
    
    return mnist_x_train, mnist_x_test, mnist_y_train, mnist_y_test

def get_traffic_dataset():
    """
    Retrieves the normalised GTSRB dataset in an 80/20 test/train split.
    
    Returns:
        Four np_arrays: traffic_x_train, traffic_x_test, traffic_y_train, traffic_y_test.
        The x np_arrays contain images, and y contains image labels.
    """
    cur_path = os.getcwd()
    traffic_path = os.path.join(cur_path,'traffic')
    traffic_x = []
    traffic_y = []
    traffic_classes = 43
    for i in range(traffic_classes):
        path = os.path.join(traffic_path,'Train',str(i))
        images = os.listdir(path)
        img_num = 0
        for a in images:
            try:
                image = Image.open(path + '\\'+ a)
                image = image.resize((30,30))
                image = np.array(image)
                traffic_x.append(image)
                traffic_y.append(i)
                img_num +=1
                if img_num >= 100:
                    break
            except:
                print("Error loading image")

    #Converting lists into numpy arrays
    traffic_x = np.array(traffic_x)
    traffic_y = np.array(traffic_y)
    #Splitting training and testing dataset
    traffic_x_train, traffic_x_test, traffic_y_train, traffic_y_test = train_test_split(traffic_x, traffic_y, test_size=0.2, random_state=42)
    #Converting the labels into one hot encoding
    traffic_y_train = to_categorical(traffic_y_train, 43)
    traffic_y_test = to_categorical(traffic_y_test, 43)
    
    #normalising
    traffic_x_test = traffic_x_test / 255.0
    traffic_x_train = traffic_x_train / 255.0

    return traffic_x_train, traffic_x_test, traffic_y_train, traffic_y_test

def get_speech_dataset():
    """
    Retrieves the mini speech commands dataset.
    
    Returns:
        Four np_arrays: speech_x_train, speech_x_test, speech_y_train, speech_y_test.
        The x np_arrays contain images, and y contains image labels.
    """
    DATASET_PATH = 'mini_speech_commands'

    data_dir = pathlib.Path(DATASET_PATH)
    if not data_dir.exists():
      tf.keras.utils.get_file(
          'mini_speech_commands.zip',
          origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
          extract=True,
          cache_dir='.', cache_subdir='data')

    commands = np.array(tf.io.gfile.listdir(str(data_dir)))
    commands = commands[(commands != 'README.md') & (commands != '.DS_Store')]
    print('Commands:', commands)

    filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
    filenames = tf.random.shuffle(filenames)
    num_samples = len(filenames)
    print('Number of total examples:', num_samples)
    print('Number of examples per label:',
          len(tf.io.gfile.listdir(str(data_dir/commands[0]))))
    print('Example file tensor:', filenames[0])

    train_files = filenames[:6400]
    val_files = filenames[6400: 6400 + 800]
    test_files = filenames[-800:]

    print('Training set size', len(train_files))
    print('Validation set size', len(val_files))
    print('Test set size', len(test_files))

    def decode_audio(audio_binary):
      """
      Decode a binary audio file to a float tensor.
      
      Params:
        audio_binary: binary audio file.
      
      Return: Float tensor of audio file.
      """
      audio, _ = tf.audio.decode_wav(audio_binary)
      return tf.squeeze(audio, axis=-1)

    def get_label(file_path):
      """
      Get the label of an audio file.
      
      Return: audio file as string.
      """
      parts = tf.strings.split(file_path, os.path.sep)

      return parts[-2] 

    def get_waveform_and_label(file_path):
      """
      Get the waveform of an audio file as a float tensor, and the corresponding label
      
      Params:
        String: file_path
    
      Returns:
        Float tensor: waveform.
        String: label.
      """
      label = get_label(file_path)
      audio_binary = tf.io.read_file(file_path)
      waveform = decode_audio(audio_binary)
      return waveform, label

    #convert audio files into datasets
    AUTOTUNE = tf.data.experimental.AUTOTUNE
    files_ds = tf.data.Dataset.from_tensor_slices(train_files)
    waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)

    def get_spectrogram(waveform):
      """
      Retrieves the spectrogram image of an audio waveform.
      
      Params:
        Float tensor: waveform.
    
      Returns:
        Float tensor: spectrogram.
      """
      # Padding for files with less than 16000 samples
      zero_padding = tf.zeros([16000] - tf.shape(waveform), dtype=tf.float32)

      # Concatenate audio with padding so that all audio clips will be of the 
      # same length
      waveform = tf.cast(waveform, tf.float32)
      equal_length = tf.concat([waveform, zero_padding], 0)
      spectrogram = tf.signal.stft(
          equal_length, frame_length=255, frame_step=128)

      spectrogram = tf.abs(spectrogram)

      return spectrogram

    def get_spectrogram_and_label_id(audio, label):
      """
      Retrieves the spectrogram image and label of an audio file.
      
      Params:
        Float tensor: audio.
        String: label.
    
      Returns:
        Float tensor: spectrogram.
        String: label_id.
      """
      spectrogram = get_spectrogram(audio)
      spectrogram = tf.expand_dims(spectrogram, -1)
      label_id = tf.argmax(label == commands)
      return spectrogram, label_id

    spectrogram_ds = waveform_ds.map(
        get_spectrogram_and_label_id, num_parallel_calls=AUTOTUNE)

    def preprocess_dataset(files):
      """
      Convert a set of files into a dataset of spectrogram images and labels.
      
      Params:
        List: files.
      
      Returns:
        List: output_ds.
      """
      files_ds = tf.data.Dataset.from_tensor_slices(files)
      output_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
      output_ds = output_ds.map(
          get_spectrogram_and_label_id,  num_parallel_calls=AUTOTUNE)
      return output_ds

    #generate the datasets
    train_ds = spectrogram_ds
    val_ds = preprocess_dataset(val_files)
    test_ds = preprocess_dataset(test_files)

    #convert the datasets into our test and train splits
    speech_x_test = []
    speech_y_test = []

    for audio, label in test_ds:
      speech_x_test.append(audio.numpy())
      speech_y_test.append(label.numpy())

    speech_x_test = np.array(speech_x_test)
    speech_y_test = np.array(speech_y_test)

    speech_x_train = []
    speech_y_train = []

    for audio, label in train_ds:
      speech_x_train.append(audio.numpy())
      speech_y_train.append(label.numpy())

    speech_x_train = np.array(speech_x_train)
    speech_y_train = np.array(speech_y_train)
    
    return speech_x_train, speech_x_test, speech_y_train, speech_y_test

def read_image(filename, byteorder='>'):
    """
    Read an image as a numpy array.
    
    Params:
        String: filename.
        String: byteorder.
        
    Returns:
        np_array.
    """
    #open the image
    with open(filename, 'rb') as f:
        buffer = f.read()
    
    #extract the header, width, height and maxval
    header, width, height, maxval = re.search(
        b"(^P5\s(?:\s*#.*[\r\n])*"
        b"(\d+)\s(?:\s*#.*[\r\n])*"
        b"(\d+)\s(?:\s*#.*[\r\n])*"
        b"(\d+)\s(?:\s*#.*[\r\n]\s)*)", buffer).groups()
    
    #convert to numpy array
    return np.frombuffer(buffer,
                            dtype='u1' if int(maxval) < 256 else byteorder+'u2',
                            count=int(width)*int(height),
                            offset=len(header)
                            ).reshape((int(height), int(width)))

def get_face_data(filepath,size=2, total_sample_size=10000):
    """
        Retrieve the face dataset. With default params this will return 20000 image pairs.
        
        Params:
            string: filepath.
            int: size.
            int: total_sample_size.
        Returns:
            np_array: X.
            np_array: Y.
    """
    #read the image
    image = read_image(filepath+'/s' + str(1) + '/' + str(1) + '.pgm', 'rw+')
    #reduce the size
    image = image[::size, ::size]
    #get the new size
    dim1 = image.shape[0]
    dim2 = image.shape[1]

    count = 0
    
    #initialize the numpy array with the shape of [total_sample, no_of_pairs, dim1, dim2]
    x_geuine_pair = np.zeros([total_sample_size, 2, dim1, dim2, 1])  # 2 is for pairs
    y_genuine = np.zeros([total_sample_size, 1])
    
    for i in range(40):
        for j in range(int(total_sample_size/40)):
            ind1 = 0
            ind2 = 0
            
            #read images from same directory (genuine pair)
            while ind1 == ind2:
                ind1 = np.random.randint(10)
                ind2 = np.random.randint(10)
            
            # read the two images
            img1 = read_image('facesatt/s' + str(i+1) + '/' + str(ind1 + 1) + '.pgm', 'rw+')
            img2 = read_image('facesatt/s' + str(i+1) + '/' + str(ind2 + 1) + '.pgm', 'rw+')
            
            #reduce the size
            img1 = img1[::size, ::size]
            img2 = img2[::size, ::size]
            
            #store the images to the initialized numpy array
            x_geuine_pair[count, 0, :, :, 0] = img1
            x_geuine_pair[count, 1, :, :, 0] = img2
            
            #as we are drawing images from the same directory we assign label as 1. (genuine pair)
            y_genuine[count] = 1
            count += 1

    count = 0
    x_imposite_pair = np.zeros([total_sample_size, 2, dim1, dim2, 1])
    y_imposite = np.zeros([total_sample_size, 1])
    
    for i in range(int(total_sample_size/10)):
        for j in range(10):
            
            #read images from different directory (imposite pair)
            while True:
                ind1 = np.random.randint(40)
                ind2 = np.random.randint(40)
                if ind1 != ind2:
                    break
                    
            img1 = read_image('facesatt/s' + str(ind1+1) + '/' + str(j + 1) + '.pgm', 'rw+')
            img2 = read_image('facesatt/s' + str(ind2+1) + '/' + str(j + 1) + '.pgm', 'rw+')

            img1 = img1[::size, ::size]
            img2 = img2[::size, ::size]

            x_imposite_pair[count, 0, :, :, 0] = img1
            x_imposite_pair[count, 1, :, :, 0] = img2
            #as we are drawing images from the different directory we assign label as 0. (imposite pair)
            y_imposite[count] = 0
            count += 1
            
    #now, concatenate, genuine pairs and imposite pair to get the whole data
    X = np.concatenate([x_geuine_pair, x_imposite_pair], axis=0)/255
    Y = np.concatenate([y_genuine, y_imposite], axis=0)

    return X, Y

def preprocess_image(image, size):
    image = image[::size, ::size]
    
    return image

In [8]:
def make_pairs_1(x, y):
  """
  Make a paired dataset from a given dataset by combining every possible pair.
  
  Params:
      np_array: x.
      np_array: y.
      
  Returns:
      np_array: x_pairs.
      np_array: y_pairs.
  """
  x_pairs, y_pairs = [], []

  tuples = [(x1, y1) for x1, y1 in zip(x, y)]
  
  for t in itertools.product(tuples, tuples):
    pair_A, pair_B = t
    img_A, label_A = t[0]
    img_B, label_B = t[1]

    new_label = int(label_A == label_B)

    x_pairs.append([img_A, img_B])
    y_pairs.append(new_label)
  
  x_pairs = np.array(x_pairs)
  y_pairs = np.array(y_pairs)

  return (x_pairs, y_pairs)

In [9]:
def make_pairs_2(x, y,multiplier=25):
    """
    Make a paired dataset from a given dataset by generating 25 (multiplier value) positive pairs
    and 25 (multiplier value) negative pairs per data point.
    
    Params:
        np_array: x.
        np_array: y.
        int: multiplier.
    """
    # initialize two empty lists to hold the (image, image) pairs and
    # labels to indicate if a pair is positive or negative
    pair_images = []
    pair_labels = []
    # calculate the total number of classes present in the dataset
    # and then build a list of indexes for each class label that
    # provides the indexes for all examples with a given label
    num_classes = len(np.unique(y))
    idx = [np.where(y == i)[0] for i in range(0, num_classes)]
    
    for i in range(multiplier): #cycle through this process several times to generate a large dataset
        # loop over all images
        for idxA in range(len(x)):
            # grab the current image and label belonging to the current
            # iteration
            current_image = x[idxA]
            label = y[idxA]
            # randomly pick an image that belongs to same class
            idxB = np.random.choice(idx[label])
            pos_image = x[idxB]
            # prepare a positive pair and update the images and labels
            pair_images.append([current_image, pos_image])
            pair_labels.append([1])
            # grab the indices for each of the class labels not equal to
            # the current label and randomly pick an image corresponding
            # to a label not equal to the current label
            neg_idx = np.where(y != label)[0]
            neg_image = x[np.random.choice(neg_idx)]
            # prepare a negative pair of images and update lists
            pair_images.append([current_image, neg_image])
            pair_labels.append([0])
    # return a 2-tuple of our image pairs and labels
    return (np.array(pair_images), np.array(pair_labels))

In [10]:
def get_dataset_sample(x,y,size=400):
    """
    Get a small sample of a given dataset, ensuring that all classes are represented at least once.
    
    Params:
        np_array: x.
        np_array: y.
        int: size. The size of the dataset that will be returned
        
    Returns:
        np_array: x[all_indices]. The random sample of x values.
        np_array: y[all_indices]. The corresponding y values.
    """
    unique_labels = np.unique(y)
    
    selected_indices = []
    #select one data item for each label, ensuring that all classes are represented at least once in the sample
    for label in unique_labels:
            found = False
            while found == False:
                for i in range(len(x)):
                    if y[i] == label:
                        selected_indices.append(i)
                        found = True
                        break
    random_indices = np.random.choice(x.shape[0],size-len(unique_labels),replace=False)
    
    all_indices = np.concatenate((selected_indices,random_indices))
    

    return x[all_indices], y[all_indices]

def get_siamese_dataset(x,y,expand_dims=True, size=400, one_hot=False,dataset_style=2):
    """
    Get a paired dataset from a given dataset.
    
    Params:
        np_array: x.
        np_array: y.
        bool: expand_dims. If the x values require an extra dimension this is true (like for MNIST).
        int: size. The size of the dataset.
        bool: one_hot. True if the y labels are one-hot encoded.
        int: dataset_style. 1 for unbalanced dataset, 2 for balanced dataset.
        
    Returns:
        (np_array, np_array): (x_siamese, y_siamese). The siamese paired dataset.
    """
    if one_hot:
        y = np.where(y==1)[1]
    
    x_sample, y_sample = get_dataset_sample(x,y, size=size)
    
    if expand_dims:
        x_sample = np.expand_dims(x_sample, axis=-1)
    
    if dataset_style == 1:
        (x_siamese, y_siamese) = make_pairs_1(x_sample,y_sample)
    else:
        (x_siamese, y_siamese) = make_pairs_2(x_sample,y_sample,multiplier=25)
    
    return (x_siamese, y_siamese)

def resize_data(x,np_array=True): #for resizing speech images to 32x32
    """
    This function is built to resize speech dataset images down to 32x32.
    
    Params:
        np_array: x.
        bool: np_array. True if the user wishes for the return value to be an np_array. Otherwise a list is returned.
    
    Returns:
        np_array or list: x_resized.
    """
    x_copy = x.copy()
    x_resized = []
    for img in x_copy:
        img = cv2.resize(img, dsize=(32, 32), interpolation=cv2.INTER_LINEAR)
        img = np.expand_dims(img, axis=-1)
        x_resized.append(img)
    
    if np_array:
        x_resized = np.asarray(x_resized)
    return x_resized

def get_siamese_datasets(train_datasets,test_datasets,expand_dims=False, one_hot=False,dataset_style=2,size=400):
    """
    Takes a set of adversarial datasets and converts them into siamese paired datasets.
    
    Params:
        list: train_datasets. List of pairs of np_arrays for each dataset.
        list: test_datasets. List of pairs of np_arrays for each dataset.
        bool: expand_dims. True if image dimensions need expanding (MNIST).
        bool: one_hot. True if y labels are one-hot encoded (traffic).
        int: dataset_style. 1 to generate unbalanced dataset, 2 for balanced dataset.
        int: size. How many data points should be in the siamese dataset.
        
    Returns:
        list: siamese_train_datasets. List of pairs of np_arrays for each dataset.
        list: siamese_test_datasets. List of pairs of np_arrays for each dataset.
    """
    siamese_train_datasets = []
    siamese_test_datasets = []
    
    (x_train_siamese_fgsm, y_train_siamese_fgsm) = get_siamese_dataset(train_datasets[0][0],train_datasets[0][1], expand_dims=expand_dims, one_hot=one_hot, size=size, dataset_style=dataset_style)
    (x_test_siamese_fgsm, y_test_siamese_fgsm) = get_siamese_dataset(test_datasets[0][0],test_datasets[0][1], expand_dims=expand_dims, one_hot=one_hot, size=int(size/2), dataset_style=dataset_style)
    siamese_train_datasets.append([x_train_siamese_fgsm, y_train_siamese_fgsm])
    siamese_test_datasets.append([x_test_siamese_fgsm,y_test_siamese_fgsm])
    
    (x_train_siamese_bim, y_train_siamese_bim) = get_siamese_dataset(train_datasets[1][0],train_datasets[1][1], expand_dims=expand_dims, one_hot=one_hot, size=size, dataset_style=dataset_style)
    (x_test_siamese_bim, y_test_siamese_bim) = get_siamese_dataset(test_datasets[1][0],test_datasets[1][1], expand_dims=expand_dims, one_hot=one_hot, size=int(size/2), dataset_style=dataset_style)
    siamese_train_datasets.append([x_train_siamese_bim, y_train_siamese_bim])
    siamese_test_datasets.append([x_test_siamese_bim,y_test_siamese_bim])
    
    (x_train_siamese_pgd, y_train_siamese_pgd) = get_siamese_dataset(train_datasets[2][0],train_datasets[2][1], expand_dims=expand_dims, one_hot=one_hot, size=size, dataset_style=dataset_style)
    (x_test_siamese_pgd, y_test_siamese_pgd) = get_siamese_dataset(test_datasets[2][0],test_datasets[2][1], expand_dims=expand_dims, one_hot=one_hot, size=int(size/2), dataset_style=dataset_style)
    siamese_train_datasets.append([x_train_siamese_pgd, y_train_siamese_pgd])
    siamese_test_datasets.append([x_test_siamese_pgd,y_test_siamese_pgd])
    
    (x_train_siamese_mim, y_train_siamese_mim) = get_siamese_dataset(train_datasets[3][0],train_datasets[3][1], expand_dims=expand_dims, one_hot=one_hot, size=size, dataset_style=dataset_style)
    (x_test_siamese_mim, y_test_siamese_mim) = get_siamese_dataset(test_datasets[3][0],test_datasets[3][1], expand_dims=expand_dims, one_hot=one_hot, size=int(size/2), dataset_style=dataset_style)
    siamese_train_datasets.append([x_train_siamese_mim, y_train_siamese_mim])
    siamese_test_datasets.append([x_test_siamese_mim,y_test_siamese_mim])
    
    return siamese_train_datasets,siamese_test_datasets

def save_dataset(dataset,filename):
    """
    Save a dataset.
        
    Params:
        list: dataset. List containing two numpy arrays, one for x and one for y data.
        string: filename. The name of the file the dataset will be saved as.
    """
    with open (str(filename)+'.pkl','wb') as f:
        pickle.dump(dataset,f)
        
def load_dataset(filename):
    """
    Load a saved dataset.
    
    Params:
        string: filename. The name of the dataset to load.
        
    Returns:
        pickle.load: f. The loaded file.
        int: 0. Returns this if there has been a loading error.
    """
    with open (str(filename)+'.pkl','rb') as f:
        return pickle.load(f)
    
    #return 0 if error
    return 0