# Importing data and Google Drive Mount

In [None]:
from google.colab import drive

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Importing Stuff

In [None]:
import pandas as pd
import os,pickle
import numpy as np
from  tqdm import tqdm
import librosa
import librosa.display
from sklearn.model_selection import train_test_split

In [None]:
metadata = pd.read_csv(os.getcwd()+"/drive/MyDrive/Data/birdsong_metadata.csv")
header = list(metadata.head())
metadata.head()

Unnamed: 0,file_id,genus,species,english_cname,who_provided_recording,country,latitude,longitute,type,license
0,132608,Acanthis,flammea,Common Redpoll,Jarek Matusiak,Poland,50.7932,15.4995,"female, male, song",http://creativecommons.org/licenses/by-nc-sa/3.0/
1,132611,Acanthis,flammea,Common Redpoll,Jarek Matusiak,Poland,50.7932,15.4995,"flight call, male, song",http://creativecommons.org/licenses/by-nc-sa/3.0/
2,35068,Acanthis,flammea,Common Redpoll,Sander Bot,Netherlands,52.8176,6.4326,"call, song",http://creativecommons.org/licenses/by-nc-nd/2.5/
3,82715,Acrocephalus,palustris,Marsh Warbler,Dougie Preston,United Kingdom,60.3539,-1.2689,Song,http://creativecommons.org/licenses/by-nc-nd/2.5/
4,64685,Acrocephalus,palustris,Marsh Warbler,Dougie Preston,United Kingdom,60.3539,-1.2689,Song,http://creativecommons.org/licenses/by-nc-nd/2.5/


#Working on data

In [None]:
# Get bird names
bird_name = metadata['english_cname'].values
u, f = np.unique(bird_name, return_counts=True)

uniq_birds = list(u[4:10]) + list(u[12:15])
# uniq_birds = list(u[0:10])
data_train = []
data_test = []
y_train = []
y_test = []
bird_name_dict = {}

In [None]:
for i in range(len(uniq_birds)) :
    df = metadata[metadata['english_cname'] == uniq_birds[i]]
    df = df['file_id'].values
    df = df.tolist()
    data_train.append(df[0])
    y_train.append(i)
    bird_name_dict[i] = uniq_birds[i]
    data_test += df[1:]
    y_test += [i] * (len(df) - 1)


In [None]:
# Read audio files using librosa. Divide each audio clip into frames of 2 sec duration. They are more than 40 sec long clips. So, using frames
# we will get a lot of data.
frames_train = []
frames_test = []
frame_len = 22050*2 # equivalent of 2 seconds
y_frames_train = []
y_frames_test = []

In [None]:
for i in tqdm(range(len(data_train))) :

    # Read audio
    curr = data_train[i]
    curr = "/content/drive/MyDrive/Data/songs/songs/xc" + str(curr) + ".flac"
    y, sr = librosa.load(curr)

    # Normalize time series
    y = ((y-np.amin(y))*2)/(np.amax(y) - np.amin(y)) - 1

    # Remove silence from the audio
    org_len = len(y)
    intervals = librosa.effects.split(y, top_db= 15, ref= np.max)
    intervals = intervals.tolist()
    y = (y.flatten()).tolist()
    nonsilent_y = []

    for p,q in intervals :
        nonsilent_y = nonsilent_y + y[p:q+1]

    y = np.array(nonsilent_y)
    final_len = len(y)
    sil = org_len - final_len


    # Divide audio into frames
    start = 0
    end = frame_len
    for j in range(0, len(y), int(frame_len*0.5)) :

        frame = y[j:j+frame_len]
        if len(frame) < frame_len :
            frame = frame.tolist() + [0]* (frame_len-len(frame))
        frame = np.array(frame)
        S = np.abs(librosa.stft(frame, n_fft=512))
        freqs = librosa.fft_frequencies(sr=sr, n_fft=512)
        upper = ([x for x in range(len(freqs)) if freqs[x] >= 8000])[0]
        lower = ([x for x in range(len(freqs)) if freqs[x] <= 1000])[-1]

        freqs = freqs[lower:upper]
        S = S[lower:upper,:]
        if S.shape != (163, 345) :
            print(S.shape)
        assert S.shape == (163, 345)

        frames_train.append(S)
        y_frames_train.append(y_train[i])


100%|██████████| 9/9 [00:14<00:00,  1.65s/it]


##Read testing data and split into frames

In [None]:
for i in tqdm(range(len(data_test))) :
    # Read audio
    curr = data_test[i]
    curr = "/content/drive/MyDrive/Data/songs/songs/xc" + str(curr) + ".flac"
    y, sr = librosa.load(curr)
    dur = librosa.get_duration(y=y, sr=sr)

    # Normalize time series
    y = ((y-np.amin(y))*2)/(np.amax(y) - np.amin(y)) - 1

    # Remove silence from the audio
    org_len = len(y)
    intervals = librosa.effects.split(y, top_db= 15, ref= np.max)
    intervals = intervals.tolist()
    y = (y.flatten()).tolist()
    nonsilent_y = []

    for p,q in intervals :
        nonsilent_y = nonsilent_y + y[p:q+1]

    y = np.array(nonsilent_y)
    final_len = len(y)
    sil = org_len - final_len


    dur = librosa.get_duration(y=y, sr=sr)
    start = 0
    end = frame_len
    for j in range(0, len(y), int(frame_len*0.5)) :
        frame = y[j:j+frame_len]
        if len(frame) < frame_len :
            frame = frame.tolist() + [0]* (frame_len-len(frame))
        frame = np.array(frame)

        S = np.abs(librosa.stft(frame, n_fft=512))
        freqs = librosa.fft_frequencies(sr=sr, n_fft=512)
        upper = ([x for x in range(len(freqs)) if freqs[x] >= 8000])[0]
        lower = ([x for x in range(len(freqs)) if freqs[x] <= 1000])[-1]

        freqs = freqs[lower:upper]
        S = S[lower:upper,:]
        assert S.shape == (163, 345)

        frames_test.append(S)
        y_frames_test.append(y_test[i])

100%|██████████| 18/18 [00:14<00:00,  1.21it/s]


In [None]:
print("Training samples : ",  len(frames_train), len(y_frames_train), np.unique(y_frames_train, return_counts= True))
print("Testing samples : ",  len(frames_test), len(y_frames_test), np.unique(y_frames_test, return_counts= True))


Training samples :  341 341 (array([0, 1, 2, 3, 4, 5, 6, 7, 8]), array([ 21,   6,  12,  52,  25, 139,  20,   7,  59]))
Testing samples :  469 469 (array([0, 1, 2, 3, 4, 5, 6, 7, 8]), array([ 19,  74,  44,   9,  21,  12,  47, 183,  60]))


## Convert all data to nd array

In [None]:
y_frames_train = np.array(y_frames_train)
y_frames_test = np.array(y_frames_test)

r,c = frames_train[0].shape
frames_train = np.array(frames_train)
frames_train = frames_train.reshape((len(frames_train), r, c))

frames_test = np.array(frames_test)
frames_test = frames_test.reshape((len(frames_test), r, c))

dataX = np.concatenate((frames_train, frames_test), axis=0)
datay = np.concatenate((y_frames_train, y_frames_test), axis=0)

frames_train, frames_test, y_frames_train, y_frames_test = train_test_split(dataX, datay, test_size=0.6)  #####################

In [None]:
f = open(os.getcwd() + "/training_frames1.pkl", 'wb')
pickle.dump([frames_train, y_frames_train], f)
f.close()

f = open(os.getcwd() + "/testing_frames1.pkl", 'wb')
pickle.dump([frames_test, y_frames_test], f)
f.close()


# Read training and testing data from the pickle file
f = open(os.getcwd() + "/training_frames1.pkl", 'rb')
frames_train, y_frames_train = pickle.load(f)
f.close()

f = open(os.getcwd() + "/testing_frames1.pkl", 'rb')
frames_test, y_frames_test = pickle.load(f)
f.close()


### Read training and testing data from the pickle file

In [None]:
f = open(os.getcwd() + "/training_frames1.pkl", 'rb')
frames_train, y_frames_train = pickle.load(f)
f.close()

f = open(os.getcwd() + "/testing_frames1.pkl", 'rb')
frames_test, y_frames_test = pickle.load(f)
f.close()


### Standardize the data

In [None]:

mu = frames_train.mean()
std = frames_train.std()
frames_train = (frames_train-mu)/std
frames_test = (frames_test-mu)/std

print("Training samples : ",  frames_train.shape, len(y_frames_train), np.unique(y_frames_train, return_counts= True))
print("Testing samples : ", frames_test.shape, len(y_frames_test), np.unique(y_frames_test, return_counts= True))


Training samples :  (324, 163, 345) 324 (array([0, 1, 2, 3, 4, 5, 6, 7, 8]), array([17, 33, 19, 27, 18, 64, 26, 68, 52]))
Testing samples :  (486, 163, 345) 486 (array([0, 1, 2, 3, 4, 5, 6, 7, 8]), array([ 23,  47,  37,  34,  28,  87,  41, 122,  67]))


In [None]:
# There are imbalanced classes. Repeat the data so that all classes have same number of samples.
u, f = np.unique(y_frames_train, return_counts= True)
frames_train1 = []
y_frames_train1 = []
maximum = max(f)
count = 0
for i in u :
    ind, = np.where(y_frames_train == i)
    ind = ind.tolist()
    while len(ind) < maximum :
        ind = ind + ind
    ind = ind[:maximum]
    temp = frames_train[ind]
    if count == 0 :
        frames_train1 = temp
        count += 1
    else :
        frames_train1 = np.concatenate((frames_train1, temp), axis= 0)

    y_frames_train1 += [i] * maximum

y_frames_train1 = np.array(y_frames_train1)


# New Section

In [None]:
import os, sys, matplotlib.pyplot as plt, numpy as np, itertools, math
from random import seed
from random import random, randint
from scipy.spatial import distance
from  tqdm import tqdm
import random
import warnings
from math import factorial

def generate_positive_pairs(X, y, rand_samples, pair_len) :

    # Shape of features
    row, col = X.shape[0], X.shape[1]

    # Get unique elements of y array along with their frequencies
    uniq, freq = np.unique(y, return_counts=True)

    anchor = []
    pos = []
    count = 0

    # Traverse through each class and select random samples
    for x, f in zip(uniq, freq) :

        # Select indices of a certian class label
        ind, = np.where(y == x)
        ind = list(ind)

        # Select random samples indices. If number of samples required are more than frequency of class, then select all samples
        if rand_samples <= f and rand_samples != -1:
            random_indices = random.sample(ind, rand_samples)
        elif rand_samples == -1 :
            random_indices = ind
        else :
            warnings.warn("ValueError ! 'samples' required are more than number of elements in class. So, all elements are selected.")
            random_indices = random.sample(ind, f)

        # Generate positive pairs
        pairs = list(itertools.combinations(random_indices, 2))

        if len(pairs) >= pair_len :
            pairs = list(pairs)[:pair_len]

        # print(len(pairs))

        # Get features associated with those indices
        anchor += [X[i] for i, _ in pairs]
        pos += [X[i] for _, i in pairs]

    # Convert features to numpy matrix
    anchor = np.array(anchor)
    a = (len(anchor),)
    b = tuple(X[0].shape)
    anchor = anchor.reshape(a+b)

    a = (len(anchor),)

    pos = np.array(pos)
    pos = pos.reshape(a+b)

    # print(positive_extra[0].shape)

    return anchor, pos, uniq, freq


def generate_negative_pairs(X, y, uniq, freq, rand_samples, pair_len) :

    # Shape of features
    row, col = X.shape[0], X.shape[1]

    # Get unique elements of y array along with their frequencies
    # uniq, _ = np.unique(y, return_counts=True)
    indices = []

    # Traverse through each class and select random samples
    for x, f in zip(uniq, freq) :

        # Select indices of a certian class label
        ind, = np.where(y == x)
        ind = list(ind)

        # Select random samples indices. If number of samples required are more than frequency of class, then select all samples
        if rand_samples <= f and rand_samples != -1:
            random_indices = random.sample(ind, rand_samples)
        elif rand_samples == -1 :
            random_indices = ind
        else :
            warnings.warn("ValueError ! 'samples' required are more than number of elements in class. So, all elements are selected.")
            random_indices = random.sample(ind, f)

        # Get features associated with those indices
        indices.append(random_indices)
        # print(len(random_indices))

    neg = []

    # Generate negative pairs
    for i in range(len(uniq)) :

        # Generate pair of a class with every other class
        for j in range(len(uniq)) :

            if i == j :
                continue


            curr = indices[j]
            num = math.ceil(pair_len/(len(uniq)-1))
            while len(curr) < num :
                curr += indices[j]

            indices1 = random.sample(curr, num)
            indices1 = indices1[:pair_len]
            # Get features associated with those indices
            neg += [X[p] for p in indices1]


    # Convert features to numpy matrix
    neg = np.array(neg)
    a = (len(neg),)
    b = tuple(X[0].shape)

    neg = neg.reshape(a+b)


    return neg






# Computed number of combinations nCr
def calculate_combinations(n, r):
    return factorial(n) // factorial(r) // factorial(n-r)






'''
Generate positive and negative pairs. This function selects samples in such a way that nmber fo apositive pairs equals number
of negative pairs. Only exception is when the numbers of samples are less than total combinations of all classes.
Parameters:

    INPUT
                   X : Input features, Type: nd array
                   y : class labels, Type: 1d numpy array
             rand_samples : Number of random samples taken from each class, Type: integer. By default it selects all samples
            pos_pair_size : Number of positive random pairs from each class, Type: integer. By default it selects all pairs

    OUTPUT
            anchor and pos are positive pairs of features.
            neg and neg_X2 are negative pairs of features.

'''

def generate_pairs(X, y, rand_samples, pos_pair_size=-1, extra_data=[]) :


    # print("Input ", len(extra_data))
    uniq, f = np.unique(y, return_counts= True)

    N = len(uniq)
    pair_neg = calculate_combinations(N, 2)
    pair_pos = calculate_combinations(min(f), 2)

    pos_pair_size = min(pos_pair_size, pair_pos )

    if pos_pair_size == -1 :
        if rand_samples == -1 :
            # pos_pair_size = calculate_combinations(int(len(y)/N), 2)
            pos_pair_size = calculate_combinations(int(min(f)), 2)
        else :
            # pos_pair_size = min(calculate_combinations(rand_samples, 2), int(len(y)/N) )
            pos_pair_size = min(calculate_combinations(rand_samples, 2), pair_pos )

    pos_pair_size = int(pos_pair_size)
    # print("pos pair size ", pos_pair_size)
    for i in range(pos_pair_size, 0, -1):
        if ((N/ pair_neg) * i).is_integer() :
            break

    neg_samples = int((N/ pair_neg) * i)
    pos_samples = i

    if ((N/ pair_neg) * i).is_integer() is False :
        warnings.warn("Number of samples per class are less than total combinations of all classes. 1 sample will be selected from each negative pair. ")
        neg_samples = 1

    # print(N, pair_neg, neg_samples, i)
    # print(pair_neg, neg_samples, pos_pair_size, i)
    # print(pair_neg * neg_samples, pos_pair_size * i)
    # sys.exit(1)

    anchor, pos, uniq, freq = generate_positive_pairs(X, y, rand_samples= rand_samples, pair_len=pos_samples)
    neg = generate_negative_pairs(X, y, uniq, freq, rand_samples= rand_samples, pair_len=pos_samples)

    return anchor, pos, neg

In [None]:
# Make pairs for siamese network
anchor, pos, neg = generate_pairs(frames_train1, y_frames_train1, rand_samples= -1, pos_pair_size=600)

anchor = anchor.astype(np.float16)
pos = pos.astype(np.float16)
neg = neg.astype(np.float16)

np.savez_compressed(os.getcwd()+'/training_siamese_frames', a=anchor, b= pos, c= neg)

data = np.load(os.getcwd()+'/training_siamese_frames.npz')
anchor = data['a']
pos = data['b']
neg = data['c']


print("Siamese pairs ", anchor.shape, pos.shape, neg.shape)

NameError: ignored

In [None]:

# Import libraries
import os, sys, cv2, matplotlib.pyplot as plt, numpy as np, shutil, itertools, pickle, pandas as pd, seaborn as sn, math, time
from random import seed, random, randint
from scipy.spatial import distance
import random
import tensorflow as tf
from keras import backend as K
from keras.models import Model, load_model, Sequential
from keras.callbacks import ModelCheckpoint
from keras.layers import Input, Dense, Embedding, AveragePooling1D, dot, UpSampling2D, concatenate, BatchNormalization, LSTM, Multiply, Conv2D, MaxPool2D, Add, dot, GlobalMaxPool1D, Dropout, Masking, Activation, MaxPool1D, Conv1D, Flatten, TimeDistributed, Lambda
from keras.regularizers import l2
# from keras.utils.vis_utils import plot_model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from keras.optimizers import Adam
from keras.regularizers import l2

from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import normalize
from mpl_toolkits.mplot3d import Axes3D

import librosa
import librosa.display

import soundfile as sf

emb_size = 32
alpha = 0.8


# Triplet loss
def triplet_loss(y_true, y_pred):

    anchor, positive, negative = y_pred[:,:emb_size], y_pred[:,emb_size:2*emb_size], y_pred[:,2*emb_size:]
    distance1 = tf.reduce_mean(tf.square(anchor - positive), axis=1)
    distance2 = tf.reduce_mean(tf.square(anchor - negative), axis=1)
    loss = tf.reduce_mean(tf.maximum(distance1 - distance2 + alpha, 0))

    return loss


# Build Model

def get_model(r,c) :

    # hyper-parameters
    n_filters = 64
    filter_width = 3
    dilation_rates = [2**i for i in range(8)]

    history_seq = Input(shape=(r, c))
    x = history_seq

    skips = []
    count = 0
    for dilation_rate in dilation_rates:
        x = Conv1D(filters=n_filters,
                    kernel_size=filter_width,
                    padding='causal',
                    dilation_rate=dilation_rate, activation='relu', name="conv1d_dilation_"+str(dilation_rate))(x)

        x = BatchNormalization()(x)


    out = Conv1D(32, 16, padding='same')(x)
    out = BatchNormalization()(out)
    out = Activation('tanh')(out)
    out = GlobalMaxPool1D()(out)

    model = Model(history_seq, out)
    model.compile(loss='mse', optimizer='adam')

    input1 = Input((r,c), name="Anchor Input")
    input2 = Input((r,c), name="Positive Input")
    input3 = Input((r,c), name="Negative Input")

    anchor = model(input1)
    positive = model(input2)
    negative = model(input3)


    concat = concatenate([anchor, positive, negative], axis=1)

    siamese = Model([input1, input2, input3], concat)


    siamese.compile(optimizer='adam', loss=triplet_loss)

    print(siamese.summary())

    return model, siamese


In [None]:
_,r,c = anchor.shape
encoder, model = get_model(c,r)

anchor = pos.transpose(0, 2, 1)
pos = pos.transpose(0, 2, 1)
neg = neg.transpose(0, 2, 1)

y = np.ones((len(anchor), 32*3))

# Fit model
mc = ModelCheckpoint(os.getcwd()+'/model_checkpoint.h5',
                            save_weights_only=False, save_freq=10)

In [None]:
# Train the model
model.fit([anchor, pos, neg], y, epochs= 30, callbacks= [mc], batch_size= 256, verbose= 1)
model.save(os.getcwd() + "/siamese.h5")
encoder.save(os.getcwd() + "/encoder.h5")


In [None]:
# Load the model
model = load_model(os.getcwd() + "/siamese.h5", compile= False)
encoder = load_model(os.getcwd() + "/encoder.h5", compile= False)

In [None]:
# Create the confusion and similarity matrix
from scipy import spatial

frames_test = frames_test.transpose(0,2,1)
frames_train1 = frames_train1.transpose(0,2,1)


sim_mat = np.zeros((len(frames_test), len(frames_test)))
ind = np.arange(0, len(frames_test), 1)
ind = [x for _,x in sorted(zip(y_frames_test, ind))]
frames_test = frames_test[ind]
y_frames_test = y_frames_test[ind]

encoded = encoder.predict(frames_test)

pred = []

for i in tqdm(range(len(frames_test))) :

    curr = frames_test[i:i+1]
    sim = encoder.predict(curr)
    sim = [1 - spatial.distance.cosine(sim[0], encoded[x]) for x in range(len(encoded))]
    sim = np.array(sim)

    sim[sim<0] = 0
    ind = np.copy(y_frames_test)

    ind = [x for _,x in sorted(zip(sim, ind))]
    ind = ind[::-1]
    ind = ind[:21]
    u, f = np.unique(ind, return_counts= True)
    best = u[np.argmax(f)]
    pred.append(best)
    sim_mat[i] = sim

In [None]:
conf_mat = confusion_matrix(y_frames_test, pred, normalize= 'true')
print("Accuracy on testset : ", np.trace(conf_mat)/np.sum(conf_mat))

u = np.unique(y_frames_test)


# Plot confusion matrix
conf_mat = pd.DataFrame(conf_mat, columns= [bird_name_dict[x] for x in u], index= [bird_name_dict[x] for x in u])
plt.figure(figsize = (20,15))
sn.heatmap(conf_mat, annot=True, annot_kws={"size": 10}, cmap='jet')
plt.tick_params(labelsize=8)
plt.xticks(rotation= 60)
plt.title("Bird Song classification")
plt.show()
plt.savefig(os.getcwd()+"/ConfusionMatrix_test.png")
plt.close()


# Plot similarity matrix
sim_mat = pd.DataFrame(sim_mat, columns= [bird_name_dict[x] for x in y_frames_test], index= [bird_name_dict[x] for x in y_frames_test])
plt.figure(figsize = (20,15))
sn.heatmap(sim_mat, annot=False, annot_kws={"size": 12}, cmap='viridis')
plt.tick_params(labelsize=10)
plt.xticks(rotation= 90)
plt.title("Bird Song classification")
plt.show()
plt.savefig(os.getcwd()+"/SimilarityMatrix_test.png")
plt.close()