In [1]:
import numpy as np
import pandas as pd
import glob
import librosa
import random
from collections import Counter

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Activation, LSTM
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization
from keras.models import load_model

from sklearn.model_selection import train_test_split

Using TensorFlow backend.


In [2]:
path_to_audio = '../data_cut/'
path_to_validation = '/test_c/'

def_w, def_h = 0, 0 # Default width and height of spectogram images
num_classes = 5
skip = 5 # Useful in signal[skip::] to shrink data size, not necessary right now

# Returns correct int from file name
def parse_number(file_path):
  return int(''.join(ch for ch in list(file_path) if ch.isdigit()))

# Return list of tuples (file_path, correct number)
def list_of_audios(dir_path):
  arr = glob.glob(dir_path + '*.wav')
  random.shuffle(arr) # Shuffled data is better for training
  return list(map(lambda x: (x, parse_number(x)), arr))

df = pd.DataFrame(list_of_audios(path_to_audio), columns = ['file_name', 'correct'])
df.head()

Unnamed: 0,file_name,correct
0,../data_cut/vika-shon-4d.wav,4
1,../data_cut/tamar-bedi-4bhigh.wav,4
2,../data_cut/nika-onia-1clow.wav,1
3,../data_cut/nino-chan-3chigh.wav,3
4,../data_cut/mariam-aval-2chigh.wav,2


In [4]:
def audios_to_spectograms(file_names):
  # Save different shapes in a set
  x, shapes = [], set()

  # Enumerate for logging
  for indx, audio_file in enumerate(file_names):
    # Use mfcc algorithm for spectograms
    signal, sampling_rate = librosa.load(audio_file) 
    matrix = librosa.feature.mfcc(signal, sampling_rate)

    x.append(matrix)
    shapes.add(matrix.shape)
    if (indx+1) % 50 == 0: print('{} analyzed out of {}'.format(indx+1, len(file_names))) # Log progress
    
  return x, shapes

def choose_max_shapes(shapes):
  # Iterate over shapes and choose biggest possible width and height
  w, h = 0, 0
  for shape in shapes:
    w = max(w, shape[0])
    h = max(h, shape[1])
  return w, h

matrices, shapes = audios_to_spectograms(df['file_name'])
print('Different shapes:', shapes)
def_w, def_h = choose_max_shapes(shapes)
print('Every spectogram should be size of:', (def_w, def_h))

50 analyzed out of 2040
100 analyzed out of 2040
150 analyzed out of 2040
200 analyzed out of 2040
250 analyzed out of 2040
300 analyzed out of 2040
350 analyzed out of 2040
400 analyzed out of 2040
450 analyzed out of 2040
500 analyzed out of 2040
550 analyzed out of 2040
600 analyzed out of 2040
650 analyzed out of 2040
700 analyzed out of 2040
750 analyzed out of 2040
800 analyzed out of 2040
850 analyzed out of 2040
900 analyzed out of 2040
950 analyzed out of 2040
1000 analyzed out of 2040
1050 analyzed out of 2040
1100 analyzed out of 2040
1150 analyzed out of 2040
1200 analyzed out of 2040
1250 analyzed out of 2040
1300 analyzed out of 2040
1350 analyzed out of 2040
1400 analyzed out of 2040
1450 analyzed out of 2040
1500 analyzed out of 2040
1550 analyzed out of 2040
1600 analyzed out of 2040
1650 analyzed out of 2040
1700 analyzed out of 2040
1750 analyzed out of 2040
1800 analyzed out of 2040
1850 analyzed out of 2040
1900 analyzed out of 2040
1950 analyzed out of 2040
2000 a

In [6]:
def pad_spectogram(matrix):
  # Since width is always 20 in mfcc, we only check for height difference
  if matrix.shape[1] < def_h:
    diff = def_h - matrix.shape[1]
    # Append half of the difference in beginning
    matrix = np.append(np.zeros((matrix.shape[0], diff//2), dtype=float), matrix, axis=1)
    #Append res in the end
    matrix = np.append(matrix, np.zeros((matrix.shape[0], diff - diff//2), dtype=float), axis=1)
  return matrix

x = np.array([pad_spectogram(matrix) for matrix in matrices])
print(x.shape)

(2040, 20, 37)


In [7]:
# One hot encode correct numbers
y = np.matrix([[0] * (num-1) + [1] + [0] * (num_classes - num) for num in df['correct'].values])
print(y)

[[0 0 0 1 0]
 [0 0 0 1 0]
 [1 0 0 0 0]
 ...
 [1 0 0 0 0]
 [0 1 0 0 0]
 [0 0 1 0 0]]


In [8]:
x_r = x.reshape(*x.shape, 1)
y_r = y.reshape(*y.shape, 1)
#x_r = (x_r - x_r.mean()) / x_r.std()
input_shape = x_r.shape[1:]
print(x_r.shape, y_r.shape, input_shape)

(2040, 20, 37, 1) (2040, 5) (20, 37, 1)


In [11]:
model = Sequential([
                             
      Conv2D(32, kernel_size = (2,2), activation = 'relu', input_shape = input_shape),
      BatchNormalization(),

      Conv2D(64, kernel_size=(2, 2), activation = 'relu'),
      BatchNormalization(),

      Conv2D(128, kernel_size=(2, 2), activation = 'relu'),
      BatchNormalization(),

      MaxPooling2D(pool_size = (2,2)),
      BatchNormalization(),
      Dropout(0.3),
      Flatten(),

      Dense(128, activation = 'relu'),
      BatchNormalization(),
      Dropout(0.2),

      Dense(64, activation = 'relu'),
      BatchNormalization(),
      Dropout(0.25),

      Dense(32, activation = 'relu'),
      BatchNormalization(),

      Dense(5, activation = 'softmax')
  ])

In [12]:
for _ in range(1):
  x_train, x_test, y_train, y_test = train_test_split(x_r, y_r, test_size=0.25)

  model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.adam(lr = 0.001), metrics=['accuracy'])
  print(model.summary())
  model.fit(x_train, y_train, batch_size=40, epochs=12, verbose=1, validation_data = (x_test, y_test))

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_4 (Conv2D)            (None, 19, 36, 32)        160       
_________________________________________________________________
batch_normalization_8 (Batch (None, 19, 36, 32)        128       
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 18, 35, 64)        8256      
_________________________________________________________________
batch_normalization_9 (Batch (None, 18, 35, 64)        256       
_________________________________________________________________
conv2d_6 (Conv2D)            (None, 17, 34, 128)       32896     
_________________________________________________________________
batch_normalization_10 (Batc (None, 17, 34, 128)       512       
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 8, 17, 128)        0         
__________

In [13]:
model.save('model.h5')

In [14]:
import numpy as np


def sigmoid(X):
    return 1 / (1 + np.exp(-X))


class Dense:
    def __init__(self, input_size, output_size, activation="None", weights=None):
        """
        :param  input_size: Length of input vector
        :param output_size: Length of output vector
        :param  activation: Activation function ("relu", "softmax", "sigmoid", "None")
        """
        self.input_size = input_size
        self.output_size = output_size
        self.activation = activation
        # Will be used for caching
        self.last_input = None
        self.last_weighted = None
        self.last_input_shape = None
        if weights is None:
            self.weights = np.random.randn(self.input_size, self.output_size) / self.input_size  # Might need to adjust
        else:
            # Custom weights needs to be same size as specified
            assert (weights.shape[0] == self.input_size and weights.shape[1] == self.output_size)
            self.weights = weights

    def forward(self, X):
        self.last_input_shape = X.shape  # Caching last input shape
        X = X.flatten()  # Auto flattening just in case
        self.last_input = X  # Caching last input, Flattened
        assert (len(self.last_input) == self.input_size)  # Check if shape was correct

        weighted_total = np.dot(X, self.weights)
        self.last_weighted = weighted_total  # Caching weighted inputs before activation

        if self.activation == "sigmoid":
            return sigmoid(weighted_total)

        if self.activation == "relu":
            res = weighted_total
            return res * (res > 0)  # applying Relu

        if self.activation == "softmax":
            # e = np.exp(X - np.max(X))  # prevent overflow
            e = np.exp(weighted_total)
            return e / np.sum(e, axis=0)
        # Activation doesn't exist or None specified
        return weighted_total

    def _no_activation_backprop(self, dH, lr):
        self.weights -= lr * (self.last_input[np.newaxis].T @ dH[np.newaxis])
        dL_dx = np.dot(dH, self.weights.T)
        return dL_dx.reshape(self.last_input_shape)

    def backward(self, dH, lr=0.01):
        """
            dH is loss differentiated by output (dL/dO). in case of softmax:
            loss is cross entropy loss -ln(output[C]) where C is the correct label;
            hence, it differentiated by output would be 0 for every label other than C
            and -1/output(C) for the C
        """
        if self.activation == "None":
            return self._no_activation_backprop(dH, lr)

        # Only implemented for softmax now
        if self.activation != "softmax": assert ()

        for i, gradient in enumerate(dH):
            # Looking for non-zero value (correct label corresponding one)
            if gradient == 0:
                continue

            # e^totals
            e_t = np.exp(self.last_weighted)

            # Sum of all e^totals
            S = np.sum(e_t)

            # Gradients of out[i] against totals
            do_dt = -e_t[i] * e_t / (S ** 2)
            # special derivative case for correct label
            do_dt[i] = e_t[i] * (S - e_t[i]) / (S ** 2)

            # we need to calculate loss derivative against weights
            # and update weights, dL/dw = dL/dO*dO/dt*dt/dw
            # we need to differentiate equation t = X*W
            dt_dw = self.last_input
            dt_dx = self.weights

            # Gradient of loss against total dL/dt = dL/dO*dO/dt
            dL_dt = gradient * do_dt

            # Gradient of loss against weights and input
            # dL/dw = dL/dt * dt/dw; dL/dx = dL/dt * dt/dx;
            # Temporarily adding axis to multiply matrices and get weight sized matrix
            # (input_size, 1)*(1, output_size) = (input_size, output_size)
            dL_dw = dt_dw[np.newaxis].T @ dL_dt[np.newaxis]
            # (input_size, output_size)*(output_size, 1) = input_size
            dL_dx = dt_dx @ dL_dt

            # Update weights
            self.weights -= lr * dL_dw

            return dL_dx.reshape(self.last_input_shape)  # return to input dimensions


# A.k.a. Kernel
class Filter:
    def __init__(self, filter_dims, matrix_dims, filter_id):
        """
        :param filter_dims: 2 dimensional tuple, specifying filter dimensions
        :param matrix_dims: takes 2D array as matrix size, or 3D array, where channels are last dimension
        """
        self.filter_id = filter_id
        self.is_3d = len(matrix_dims) == 3
        self.dims = filter_dims
        self.matrix_dims = matrix_dims
        # initializing filter randomly
        self.filter = np.random.randn(*filter_dims)
        # output matrix after filtering is done
        self.output_matrix_dims = \
            (matrix_dims[0] - filter_dims[0] + 1, matrix_dims[1] - filter_dims[1] + 1)

    def _filter_region_iteration(self, matrix):
        """
        Generate filter sized grids from original matrix, to fill output matrix
        if matrix (input_shape[0] - filter_size[0] + 1, input_shape[1] - filter_size[1] + 1)contains channels return region from one channel at a time
        :param matrix: Matrix for which to return regions
        :return: filter sized cut-out matrix for which we should perform
                element-wise multiplication and summing
        """
        for x in range(self.output_matrix_dims[0]):
            for y in range(self.output_matrix_dims[1]):
                region = np.sum(matrix[x: (x + self.dims[0]), y: (y + self.dims[1])], axis=2) if self.is_3d \
                    else matrix[x: (x + self.dims[0]), y: (y + self.dims[1])]
                yield region, x, y

    def forward(self, matrix):
        """
        Apply filter to a given matrix and return output
        :param matrix:
        :return: Outputs filter applied numpy matrix
        """

        # Check if conditions apply
        assert (matrix is not None and self.matrix_dims == matrix.shape)
        output = np.zeros(self.output_matrix_dims)
        # Fill output (filtered) matrix
        for region, x, y in self._filter_region_iteration(matrix):
            # Summing elementwise multiplication of filter and region
            # Getting new pixel value for output
            output[x][y] = np.sum(np.multiply(region, self.filter))

        return output

    def backward(self, last_input, dH, learning_rate):
        assert (last_input.shape == self.matrix_dims and (dH.shape[:-1] == self.output_matrix_dims
                                                          or dH.shape == self.output_matrix_dims))

        # dL/dW, updating weights
        for im_region, i, j in self._filter_region_iteration(last_input):
            # Weights going against the gradient with learning rate penalty
            self.filter -= dH[i, j, self.filter_id] * learning_rate * im_region

        # Calculating input derivative
        dX = np.zeros(last_input.shape)  # Need to return derivative of loss in regards of last input (dL/dX)
        for i in range(self.output_matrix_dims[0]):
            for j in range(self.output_matrix_dims[1]):
                # dL/dX, Even in case of channels (3D last input) vector math resolves itself
                inc = np.dot(self.filter, dH[i, j, self.filter_id])
                if len(dX.shape) == 3:
                    inc = np.stack([inc for _ in range(dX.shape[2])], axis=2)
                dX[i: i + self.dims[0], j: j + self.dims[1]] += inc

        return dX


class Convolution2D:
    """
    No activation function for now
    """

    def __init__(self, num_filters, filter_size, input_shape):
        """
        Convolution layer for cnn network
        :param num_filters: number of filters (kernels) to apply to each input
        :param filter_size: filter dimensions (kernel_size)
        :param input_shape: shape of input matrix
        """
        assert (num_filters > 0 and filter_size and input_shape)
        assert (filter_size[0] > 0 and filter_size[1] > 0 and input_shape[0] > 0 and input_shape[1] > 0)
        self.num_inputs = input_shape[2] if len(input_shape) == 3 else 1
        self.num_filters = num_filters
        self.filter_dims = filter_size
        self.input_shape = input_shape
        self.filters = [Filter(filter_size, input_shape, i) for i in range(num_filters)]

        # Cache variables
        self.last_input = None

    def output_shape(self):
        """ Return output matrix shape after forward passing """
        return self.input_shape[0] - self.filter_dims[0] + 1, self.input_shape[1] - self.filter_dims[
            1] + 1, self.num_filters

    def forward(self, matrix):
        """
        Apply filters to the given matrix
        :param matrix: Matrix to apply filters to
        :return: 3D numpy matrix, channels as last dimension
        """
        # Returning 3D matrix, filtered output layers stacked on top of each other, for each filter, for each matrix
        self.last_input = matrix
        result = np.array([f.forward(matrix) for f in self.filters])
        return result.transpose((1, 2, 0))  # Reversing dimensions putting channels as 3rd dimension

    def backward(self, dH, learning_rate):
        """
        Performs a backpropagation of convolutional layer.
        - dH is the loss gradient for this layer's outputs.
        - learn_rate is a float.
        """
        dLdX = np.zeros(self.last_input.shape)

        # Updating filter weights and getting input matrix derivative from each filter
        # We need to sum those up, since all filters used every channel of input summed
        for f in self.filters:
            dLdX += f.backward(self.last_input, dH, learning_rate)

        # we need to return dL/dX
        # the loss gradient for this layer's inputs, just like every
        # other layer in our CNN.
        return dLdX


class MaxPooling2D:
    def __init__(self, input_shape, pool_size=2):
        self.dim = pool_size
        self.num_inputs = input_shape[2] if len(input_shape) == 3 else 1
        self.matrix_dims = input_shape
        self.output_matrix_dims = self.output_shape()

        # Caching variables
        self.last_input = None

    def _pooling_region_iteration(self, matrix):
        """
        Generate pooling sized grids from original matrix, to fill output matrix
        if matrix contains channels return region with every channel, and maximize on first twos
        :param matrix: Matrix for which to return regions
        :return: filter sized cut-out matrix on which we should perform
                max operation
        """
        for x in range(self.output_matrix_dims[0]):
            for y in range(self.output_matrix_dims[1]):
                region = matrix[(x * self.dim):((x + 1) * self.dim), (y * self.dim):((y + 1) * self.dim)]
                # Getting poolsized region
                yield region, x, y

    def output_shape(self):
        return self.matrix_dims[0] // self.dim, self.matrix_dims[1] // self.dim, self.num_inputs

    def forward(self, matrix):
        """
        Apply max-pooling to a given matrix and return output
        :param matrix:
        :return: Outputs max-polled numpy matrix
        """

        # Check if conditions apply
        assert (matrix is not None and self.matrix_dims == matrix.shape)
        # Caching
        self.last_input = matrix

        output = np.zeros(self.output_matrix_dims)

        for region, x, y in self._pooling_region_iteration(matrix):
            # Getting max from elements, third dimension isn't changed
            # Getting new pixel value for output
            output[x][y] = np.amax(region, axis=(0, 1))  # performing argmax on axis 0 and 1

        return output

    def backward(self, dH):
        """
        Performs a backpropagation of the MaxPooling2D layer.
        Returns the loss gradient for this layer's inputs.
        - dH is the loss gradient for this layer's outputs.
        """
        dLdX = np.zeros(self.last_input.shape)

        for region, i, j in self._pooling_region_iteration(self.last_input):
            height, width, channels = region.shape
            max_value = np.amax(region, axis=(0, 1))

            for h in range(height):
                for w in range(width):
                    for c in range(channels):
                        # If this pixel was the max value, copy the gradient in it.
                        if region[h, w, c] == max_value[c]:
                            dLdX[i * 2 + h, j * 2 + w, c] = dH[i, j, c]

        return dLdX


In [15]:
class SequentialModel:
    def __init__(self, input_shape, num_classes):
        print("Model 1")
        self.num_classes = num_classes
        kernel_size = (3, 3)
        self.conv1 = Convolution2D(8, kernel_size, input_shape)
        self.pooling1 = MaxPooling2D(self.conv1.output_shape(), 2)
        flattened_size = self.pooling1.output_shape()[0] * self.pooling1.output_shape()[1] * \
                         self.pooling1.output_shape()[2]
        self.dense1 = Dense(flattened_size, num_classes, activation='softmax')


    def train(self, X, y, X_test, y_test, lr, epochs):
        assert (len(X) == len(y))
        print("----------- CNN training started! ------------")
        for ep_i in range(epochs):
            print("Starting epoch {}".format(ep_i))
            print("Shuffling training data at the start of the epoch.")
            # Giving same permutation to the X and y before each epoch
            perm = np.random.permutation(len(y))
            X = X[perm]
            y = y[perm]
            loss_in_cluster = 0
            accuracy_in_cluster = 0
            epoch_accuracy = 0
            epoch_loss = 0
            counter = 0
            for x_i, y_i in zip(X, y):
                y_i = np.argmax(y_i) # setting y_i as a single label from array
                counter += 1
                if counter % 200 == 0:
                    print("|Instance {}| Last 200 Instances: Average loss {} / Accuracy {}%"
                          .format(counter, loss_in_cluster/200, accuracy_in_cluster/2))
                    accuracy_in_cluster = 0
                    loss_in_cluster = 0
                # Perform forward passing
                output, loss, accuracy = self.forward(x_i, y_i)

                # Updating accuracy
                accuracy_in_cluster += accuracy
                epoch_accuracy += accuracy
                # Updating Loss
                loss_in_cluster += loss
                epoch_loss += loss
                # Gradient for softmax
                gradient = np.zeros(self.num_classes)  # Only works when last layer is softmax
                gradient[y_i] = -1 / output[y_i]  # Gradient of cross-entropy loss
                # Performing backpropagation
                self.backward(gradient, lr)
            epoch_loss /= len(X)
            epoch_accuracy /= len(X)
            print("-- Epoch {} finished with: Average Loss: {} / Accuracy: {}%;".format(ep_i, epoch_loss, 100*epoch_accuracy))
            self.test(X_test, y_test)
    def forward(self, x, label):
        output = self.conv1.forward(x)  # Need normalization probably
        output = self.pooling1.forward(output)
        output = self.dense1.forward(output)

        loss = -np.log(output[label])
        accuracy = 1 if np.argmax(output) == label else 0

        return output, loss, accuracy

    def backward(self, gradient, lr):
        gradient = self.dense1.backward(gradient, lr)
        # Reshaping should be handled by dense layer
        gradient = self.pooling1.backward(gradient)
        gradient = self.conv1.backward(gradient, lr)

    def test(self, X_test, y_test):
        assert(len(X_test) == len(y_test))
        print("------------ Starting model testing -----------")
        loss = 0
        correct = 0
        predicted_labels = []
        for x_i, y_i in zip(X_test, y_test):
            y_i = np.argmax(y_i)
            pred, ls, acc = self.forward(x_i, y_i)
            loss += ls
            correct += acc
            predicted_labels.append(pred)
        res = (predicted_labels, loss/len(X_test), 100*correct/len(X_test))
        print("--------------- Test Finished: Loss {} | Accuracy {}% --------------".format(res[1], res[2]))
        return res

    def predict(self, X_test):
        print("Prediction starting...")
        cl = [0 for _ in range(self.num_classes)]
        cl[0] = 1
        dummy_y = np.array([cl for _ in range(len(X_test))])
        prediction, _, _ = self.test(X_test, dummy_y)
        return prediction
        
    def export_trained_model(self, dir_path):
        np.save("{}/fully_connected.npy".format(dir_path), self.dense1.weights)
        i = 0
        for flt in self.conv1.filters:
            np.save("{}/filter{}.npy".format(dir_path, i), flt.filter)
            i += 1

    def import_trained_model(self, dir_path):
        self.dense1.weights = np.load("{}/fully_connected.npy".format(dir_path))
        i = 0
        for flt in self.conv1.filters:
            flt.filter = np.load("{}/filter{}.npy".format(dir_path, i))
            i += 1


In [None]:
x_r = (x_r - np.mean(x_r))/np.std(x_r)
x_train, x_test, y_train, y_test = train_test_split(x_r, y_r, test_size=0.25)
custom_model = SequentialModel(input_shape, num_classes)
custom_model.train(x_train, y_train, x_test, y_test, 0.004, 4)

Model 1
----------- CNN training started! ------------
Starting epoch 0
Shuffling training data at the start of the epoch.
|Instance 200| Last 200 Instances: Average loss 2.9761005789757315 / Accuracy 36.0%
|Instance 400| Last 200 Instances: Average loss 2.0608953272907127 / Accuracy 45.0%
|Instance 600| Last 200 Instances: Average loss 1.6220457842044522 / Accuracy 53.5%
|Instance 800| Last 200 Instances: Average loss 1.5627447338689904 / Accuracy 59.0%
|Instance 1000| Last 200 Instances: Average loss 1.5920171432404064 / Accuracy 55.5%
|Instance 1200| Last 200 Instances: Average loss 1.1179294693226236 / Accuracy 62.5%
|Instance 1400| Last 200 Instances: Average loss 1.221566378915193 / Accuracy 62.0%
-- Epoch 0 finished with: Average Loss: 1.6829006259726844 / Accuracy: 54.509803921568626%;
------------ Starting model testing -----------
--------------- Test Finished: Loss 1.217192896948316 | Accuracy 60.98039215686274% --------------
Starting epoch 1
Shuffling training data at the 

In [16]:
custom_model.export_trained_model("custom_model")