Imports

In [None]:
from datetime import datetime
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import random
import mat73

print(tf.__version__)
# Test if tensorflow is using GPU
if tf.test.gpu_device_name():
    print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
else:
    print("Please install GPU version of TF")

%config Completer.use_jedi = False
%matplotlib inline

Functions

In [None]:
def standardize_features(bands, std=0.0, featureMeans=None, featureSTDs=None):
    if featureMeans is None:
        featureMeans = np.mean(
            bands,
            axis=(0, 1))
        featureSTDs = np.std(
            bands,
            axis=(0, 1))

    if std > 0:
        rng = np.random.default_rng()
        gaussianNoise = std * rng.standard_normal(size=bands.shape)
        return (bands - featureMeans) / featureSTDs + gaussianNoise, featureMeans, featureSTDs
    else:
        return (bands - featureMeans) / featureSTDs, featureMeans, featureSTDs


def read_features_and_labels(directory, mult=1, add_labels=180):
    featureVectors = mat73.loadmat('{}/featureVectors.mat'.format(directory))
    featureVectors = featureVectors['featureVectors']
    labels = mat73.loadmat('{}/labels.mat'.format(directory))
    labels = mult*labels['labels']

    # Keep only the indices corresponding to frontal horizontal plane DoAs
    ########################################################################
    front_indices = np.where((labels <= 90) & (labels >= -90))
    labels = labels[front_indices]
    featureVectors = np.squeeze(featureVectors[front_indices, :, :, :])
    ########################################################################

    numSegs = featureVectors.shape[0]
    featureVectors = np.reshape(
        np.swapaxes(featureVectors, 0, 2),
        (32, numSegs*99, 34),
        order='F'
    )

    labels = np.repeat(labels, 99)
    # One-Hot Encode
    categorical_labels = tf.keras.utils.to_categorical(
        (labels+add_labels)/5, num_classes=72)
    return featureVectors, categorical_labels


def split_helper(vectors, t_frames, file_numbers, labels):
    x = np.zeros((32, len(file_numbers)*t_frames, 34))
    y = np.zeros((len(file_numbers)*t_frames, 72))
    for i in range(len(file_numbers)):
        x[:, i*t_frames:(i+1)*t_frames, :] = vectors[
            :,
            file_numbers[i]*t_frames:(file_numbers[i]+1)*t_frames,
            :]
        y[i*t_frames:(i+1)*t_frames, :] = labels[
            file_numbers[i]*t_frames:(file_numbers[i]+1)*t_frames, :]
    return x, y


def train_val_split(bandsGaussNorm, categorical_labels):
    t_frames = 99
    filesNum = int(bandsGaussNorm.shape[1]/t_frames)
    trainFileNumbers = set(range(filesNum))
    valFileNumbers = set()

    percentageSplit = 0.1
    sampleSize = percentageSplit * len(trainFileNumbers)
    answerSize = 0

    # Randomly select 10% of the total files included, where files is a unique
    # permutation of the outer product (Azimuths X Voice_Samples X SNRs)
    random.seed(42)
    while answerSize < sampleSize:
        r = random.randint(0, filesNum-1)
        if r not in valFileNumbers:
            answerSize += 1
            valFileNumbers.add(r)

    trainFileNumbers -= valFileNumbers
    trainFileNumbers = list(trainFileNumbers)
    valFileNumbers = list(valFileNumbers)

    x_train, y_train = split_helper(bandsGaussNorm,
                                    t_frames,
                                    trainFileNumbers,
                                    categorical_labels)
    x_val, y_val = split_helper(bandsGaussNorm,
                                t_frames,
                                valFileNumbers,
                                categorical_labels)

    return x_train, y_train, x_val, y_val


def read_train_wrapper(directory):
    bands, categorical_labels = read_features_and_labels(directory)
    bandsGaussNorm, fMeans, fStds = standardize_features(bands, std=0.4)
    x_train, y_train, x_val, y_val = train_val_split(bandsGaussNorm, categorical_labels)
    return x_train, y_train, x_val, y_val, fMeans, fStds


def read_test_wrapper(directory, fMeans, fStds, mult, add_labels):
    bands, y_test = read_features_and_labels(directory, mult=mult, add_labels=add_labels)
    x_test, _, _ = standardize_features(bands, featureMeans=fMeans, featureSTDs=fStds)
    return x_test, y_test

Read Training Data

In [None]:
# Specify directory of featureVectors.mat and labels.mat
dir_in = ''
x_train, y_train, x_val, y_val, fMeans, fStds = read_train_wrapper(dir_in)

In [None]:
# Save statistics to use for testing later
np.save('fMeans.npy', fMeans)
np.save('fStd.npy', fStds)

Create and compile models

In [None]:
def createModel():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Input(34))
    model.add(tf.keras.layers.Dense(128, activation='sigmoid'))
    model.add(tf.keras.layers.Dense(128, activation='sigmoid'))
    model.add(tf.keras.layers.Dense(72, activation='softmax'))

    return model


def scheduler(epoch, lr):
    # Reduce LR every epoch by 0.05 and keep steady after 20 epochs
    if epoch >= 20:
        return 0.05
    else:
        return -0.05*epoch + 1.00


# Used for debuggin only
class EpochBeginCallback(tf.keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs=None):
        print('Epoch begins at: {:%H:%M:%S}'.format(
            datetime.now()))
        print(self.model.optimizer.learning_rate)


tf.keras.backend.clear_session
models = []
for i in range(32):
    models.append(createModel())
    models[i].compile(
        optimizer=tf.keras.optimizers.SGD(
            learning_rate=1.0, momentum=0.5, nesterov=False, name='SGD'),
        loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
        metrics=['accuracy'])

In [None]:
BATCH_SIZE = 128
EPOCHS = 25

# ebc = EpochBeginCallback()
LRScheduler = tf.keras.callbacks.LearningRateScheduler(scheduler)
callbacks = [LRScheduler,
             tf.keras.callbacks.EarlyStopping(
                 monitor='val_loss', min_delta=0, patience=20, verbose=1)]

history = []
for freqBand in range(32):
    print('Training model {} begins at: {:%H:%M:%S}'.format(
        freqBand+1, datetime.now()))
    history.append(
        models[freqBand].fit(
            x=x_train[freqBand, :],
            y=y_train,
            validation_data=(x_val[freqBand, :], y_val),
            epochs=EPOCHS,
            batch_size=BATCH_SIZE,
            callbacks=[callbacks],
            verbose=1))

Save Models as h5 files for future use

In [None]:
expNumSave = 5
for freqBand in range(32):
    models[freqBand].save(
        'Models_History/Ma_2017/Experiment_{}_Epochs_{}/freqBand_{}.h5'.format(
            expNumSave, EPOCHS, freqBand+1))

Load Models

In [None]:
models = []
history = []
for freqBand in range(32):
    models.append(tf.keras.models.load_model(
        r'G:\My Drive\AudioGroup\PhD_Code\binaural_compression_DoA_estimation\jup_notebooks\Models_History\Ma_2017\Experiment_4_Epochs_25/freqBand_{}.h5'.format(freqBand+1)))

In [None]:
fMeans = np.load('fMeans.npy')
fStds = np.load('fStd.npy')

In [None]:
# Specify directory of featureVectors.mat and labels.mat for the test set
test_dir = ''
x_test, y_test = read_test_wrapper(test_dir, fMeans, fStds, -1, 180)

Evaluate Model

In [None]:
# Evaluate per frame accuracy
acc = np.zeros((32, 1))
for fBand in range(32):
    print('Band: {}'.format(fBand))
    _, acc[fBand] = models[fBand].evaluate(x_test[fBand, :, :], y_test)

In [None]:
# Indicate to the model that DoAs are only in the frontal horizontal plane
p_front = np.zeros((72))
p_front[18:55] = np.ones((1, 55-18))

# For equiprobable DoA
# p_front = np.ones((72))

t_frames = 99
correctCount = 0
totalCount = int(x_test.shape[1] / t_frames)
azPreds = np.zeros((totalCount, 1))
azTrue = np.zeros((totalCount, 1))

for fNum in range(totalCount):
    fileToEstimate = x_test[:, fNum*t_frames:(fNum+1)*t_frames]
    y_preds = np.zeros((32, t_frames, 72))
    for freqBand in range(32):
        y_preds[freqBand] = models[freqBand].predict(fileToEstimate[freqBand, :, :])

    tmp = p_front * np.prod(y_preds, axis=0)  # Product accros all frequencies dims: [t_frames,72]
    freqIntegral = tmp / np.sum(tmp, axis=1)[:, None]  # Integrate accross frequency, dims:[t_frames,72]
    timeIntegral = np.sum(freqIntegral, axis=0) / t_frames

    azPreds[fNum] = np.argmax(timeIntegral)
    azTrue[fNum] = np.argmax(y_test[fNum*t_frames])

    if np.argmax(timeIntegral) == np.argmax(y_test[fNum*t_frames]):
        correctCount += 1

print('Localization Accuracy: {:.2f}%'.format(100*correctCount / totalCount))