In [None]:
import os
import numpy as np
np.random.seed(0)
import matplotlib.pyplot as plt
%matplotlib inline
from pylab import *
from keras.models import Sequential
from keras.optimizers import Adam
from keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate
from keras.models import Model
from keras.datasets import mnist


from keras.layers.pooling import MaxPooling2D
from keras.layers.core import Lambda, Flatten, Dense
from keras.initializers import glorot_uniform,he_uniform

# from keras.engine.topology import Layer
from tensorflow.keras.layers import Layer, InputSpec
from keras.regularizers import l2
from keras import backend as K
from keras.utils import plot_model,normalize

from sklearn.metrics import roc_curve,roc_auc_score

In [None]:
!pip install librosa
!pip install pydub

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# COMPARISON METRICS

In [None]:
##### METRICS ########

import numpy as np

def calc_euclidean(actual, predic):
    return np.sqrt(np.sum((actual - predic) ** 2))

def calc_mape(actual, predic):
    return np.mean(np.abs((actual - predic) / actual))
    
def calc_correlation(actual, predic):
    a_diff = actual - np.mean(actual)
    p_diff = predic - np.mean(predic)
    numerator = np.sum(a_diff * p_diff)
    denominator = np.sqrt(np.sum(a_diff ** 2)) * np.sqrt(np.sum(p_diff ** 2))
    return numerator / denominator

def compute_similarity(ref_rec,input_rec,weightage=[1,1,1]):
    ## Time domain similarity
    ref_time = np.correlate(ref_rec,ref_rec)
    inp_time = np.correlate(ref_rec,input_rec)
    diff_time = abs(ref_time-inp_time)
    
    ## Freq domain similarity
    ref_freq = np.correlate(np.fft.fft(ref_rec),np.fft.fft(ref_rec)) 
    inp_freq = np.correlate(np.fft.fft(ref_rec),np.fft.fft(input_rec))
    diff_freq = abs(ref_freq-inp_freq)
    
    ## Power similarity
    ref_power = np.sum(ref_rec**2)
    inp_power = np.sum(input_rec**2)
    diff_power = abs(ref_power-inp_power)
    
    return float(weightage[0]*diff_time+weightage[1]*diff_freq+weightage[2]*diff_power)

# DATA LOADING

In [None]:
###### DATA LOADING ########
import pandas as pd
import os
import librosa

data = []
dataSr = []

## Here load all the audio files in the forms of plottable arrays and their correspondign sampling rates in a list
for filename in os.listdir("/content/drive/MyDrive/ROB 590/Audio files/4_RovAud_split"):
    rover = None
    sr = None

    if filename.endswith(".wav") & filename.startswith("chunk"): 
        rover, sr = librosa.load("/content/drive/MyDrive/ROB 590/Audio files/4_RovAud_split" + '/'+ filename)
        data.append(rover)
        dataSr.append(sr)

print(len(data))
print(len(dataSr))

60
60


# DATASET CREATION

In [None]:
######## ANCHOR, POSITIVE & NEGATIVE DATASET CREATION #########

i = 0
anchor = []
positive = []
negative = []
for i in range(0, len(data), 3):

      a,b = None, None

### Any one ###  
      if compute_similarity(data[i], data[i+1]) > compute_similarity(data[i+1], data[i]):
          a = [compute_similarity(data[i], data[i+1]), data[i], data[i+1]]
          
      else:
          a = [compute_similarity(data[i+1], data[i]), data[i], data[i+1]]


### Any one ###
      if compute_similarity(data[i], data[i+2]) > compute_similarity(data[i+2], data[i]):
          b = [compute_similarity(data[i], data[i+2]), data[i], data[i+2]]
          
      else:
          b = [compute_similarity(data[i+2], data[i]), data[i], data[i+2]]



### Any one ###
      if compute_similarity(data[i+2], data[i+1]) > compute_similarity(data[i+1], data[i+2]):
          c = [compute_similarity(data[i+2], data[i+1]), data[i+2], data[i+1]]
          
      else:
          c = [compute_similarity(data[i+1], data[i+2]), data[i+2], data[i+1]]



      Max_Similarity = max(a[0],b[0],c[0])

      if Max_Similarity == a[0]:
        anchor.append(a[1])
        positive.append(a[2])
        negative.append(b[2])

      elif Max_Similarity == b[0]:
        anchor.append(b[1])
        positive.append(b[2])
        negative.append(a[2])

      elif Max_Similarity == c[0]:
        anchor.append(c[2])
        positive.append(c[1])
        negative.append(a[1])



## Reshaping ##
for i in range(len(anchor)):
  anchor[i] = anchor[i].reshape(200,-1,1)
  positive[i] = positive[i].reshape(200,-1,1)
  negative[i] =  negative[i].reshape(200,-1,1)




# TRIPLETS GROUPING

In [None]:
def group_triplets(percent):
    """
    Input:
    batch_size --> integer 

    Output:
    triplets --> list containing 3 tensors A,P,N of shape (batch_size,w,h,c)

    """
    batch = len(anchor)
    number_test = int(batch * (percent/100))
    number_train = int(batch-number_test)

    w, h, c = anchor[0].shape  # W,H,C
    triplets_train=[np.zeros((number_train,w,h,c)) for i in range(3)]
    

# Train
    for i in range(number_train):
        triplets_train[0][i] =  anchor[i]
        triplets_train[1][i] =  positive[i]
        triplets_train[2][i] =  negative[i]
  


# Test
    triplets_test=[np.zeros((number_test,w,h,c)) for i in range(3)]
    for i in range(number_test):
        triplets_test[0][i] =  anchor[batch-1 - i]
        triplets_test[1][i] =  positive[batch-1 - i]
        triplets_test[2][i] =  negative[batch-1 - i]
  

    return triplets_train,triplets_test

In [None]:
# Sanity Check
triplets,tri = group_triplets(percent=10)

# print("Checking batch width, should be 3 : ",len(triplets))
print("Train___Shapes in the batch A:{0} P:{1} N:{2}".format(triplets[0].shape, triplets[1].shape, triplets[2].shape))
print("Test____Shapes in the batch A:{0} P:{1} N:{2}".format(tri[0].shape, tri[1].shape, tri[2].shape))



Train___Shapes in the batch A:(18, 200, 441, 1) P:(18, 200, 441, 1) N:(18, 200, 441, 1)
Test____Shapes in the batch A:(2, 200, 441, 1) P:(2, 200, 441, 1) N:(2, 200, 441, 1)


# MODEL ARCHITECTURE, TRIPLET LOSS & LAYERS

In [None]:
def network_architecture(input_shape, embeddingsize):
    '''
    Input : 
            input_shape : shape of input images
            embeddingsize : vectorsize used to encode our picture   
    '''
     # Convolutional Neural Network
    network = Sequential()

    network.add(Conv2D(filters=16, kernel_size=5, padding='same', activation='relu'))
    network.add(MaxPooling2D(2,strides=2))

    network.add(Flatten())
    network.add(Dense(128, activation='relu',
                   kernel_regularizer=l2(1e-3),
                   kernel_initializer='he_uniform'))
    
    
    network.add(Dense(embeddingsize, activation=None,
                   kernel_regularizer=l2(1e-3),
                   kernel_initializer='he_uniform'))
    
    #Force the encoding to live on the d-dimentional hypershpere
    network.add(Lambda(lambda x: K.l2_normalize(x,axis=-1)))
    
    return network

In [None]:
class TripletLossLayer(Layer):
    def __init__(self, alpha, **kwargs):
        self.alpha = alpha
        super(TripletLossLayer, self).__init__(**kwargs)
    
    def triplet_loss(self, inputs):
        anchor, positive, negative = inputs
        p_dist = K.sum(K.square(anchor-positive), axis=-1)
        n_dist = K.sum(K.square(anchor-negative), axis=-1)
        return K.sum(K.maximum(p_dist - n_dist + self.alpha, 0), axis=0)
    
    def call(self, inputs):
        loss = self.triplet_loss(inputs)
        self.add_loss(loss)
        return loss

In [None]:
def Model_Architecture_Start(input_shape, network, margin=0.2):
    '''
        Input: 
          network --> Neural network to train outputing embeddings
          input_shape --> shape of input audio
          margin --> minimal distance between Anchor-Positive and Anchor-Negative for the lossfunction (alpha)
    
    '''
     # Define the tensors for the three input audios
    anchor_ip = Input(input_shape, name="anchor_input")
    positive_ip = Input(input_shape, name="positive_input")
    negative_ip = Input(input_shape, name="negative_input") 
    
    # Generate the encodings (feature vectors) for the three audio files
    encoded_a = network(anchor_ip)
    encoded_p = network(positive_ip)
    encoded_n = network(negative_ip)
    
    #TripletLoss Layer
    loss_layer = TripletLossLayer(alpha=margin,name='triplet_loss_layer')([encoded_a,encoded_p,encoded_n])
    
    # Connect the inputs with the outputs
    network_train = Model(inputs=[anchor_ip,positive_ip,negative_ip],outputs=loss_layer)
    
    # return the model
    return network_train

In [None]:
# Driver code for calling out the neural net architecture
row, col,channel = 200, 441, 1
input_shape = (row, col, channel)

network = network_architecture(input_shape,embeddingsize=4)
network_train = Model_Architecture_Start(input_shape,network)
optimizer = Adam(lr = 0.00006)
network_train.compile(optimizer=optimizer)
network_train.summary()
# plot_model(network_train,show_shapes=True, show_layer_names=True, to_file='02 model.png')
print(network_train.metrics_names)
n_iteration=0


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 anchor_input (InputLayer)      [(None, 200, 441, 1  0           []                               
                                )]                                                                
                                                                                                  
 positive_input (InputLayer)    [(None, 200, 441, 1  0           []                               
                                )]                                                                
                                                                                                  
 negative_input (InputLayer)    [(None, 200, 441, 1  0           []                               
                                )]                                                            

  super().__init__(name, **kwargs)


# TRAINING

In [None]:
print("Starting training process!")
print("-------------------------------------")
triplets,_ = group_triplets(20)
loss = network_train.fit(triplets, epochs=20,steps_per_epoch=5, validation_split=0.2)

# Embeddings

In [None]:
def distance_metric(network, triplets_testing):


    n,w,h,c = triplets_testing[0].shape


    for j in range(len(triplets_testing[0])):

        image_A = network.predict(np.expand_dims(triplets_testing[0][j,:,:,:],axis=0))
        image_P = network.predict(np.expand_dims(triplets_testing[1][j,:,:,:],axis=0))
        image_N = network.predict(np.expand_dims(triplets_testing[2][j,:,:,:],axis=0))
        
        dist_AP = calc_euclidean(image_A, image_P)
        dist_AN = calc_euclidean(image_A, image_N)


        print("{0}th Batch".format(j))
        print("Distance between Anchor & Positive {0}".format(dist_AP))
        print("Distance between Anchor & Negative {0}".format(dist_AN))



training,testing = group_triplets(20)
distance_metric(network,testing)

0th Batch
Distance between Anchor & Positive 0.14198830723762512
Distance between Anchor & Negative 0.28337201476097107
1th Batch
Distance between Anchor & Positive 0.24449080228805542
Distance between Anchor & Negative 0.5556584000587463
2th Batch
Distance between Anchor & Positive 0.11453274637460709
Distance between Anchor & Negative 0.22129596769809723
3th Batch
Distance between Anchor & Positive 0.4462537467479706
Distance between Anchor & Negative 0.21703052520751953
