# EAS503 : Intro to Data Driven Analysis


## Final Project Group 26

---
SUBMITTED BY -
---
| Name | Person ID | Email |
| --- | --- | --- |
| AMAN PRAKASH | 50416755 | amanprak@buffalo.edu |
| Prashant Upadhyay | 50419393 | pupadhya@buffalo.edu |
| Serath Chandra Nutakki | 50363265 | serathch@buffalo.edu |
| Vamshivardhan Reddy Balannagari | 50435533 | vamshiva@buffalo.edu |

In [None]:
import os
import h5py
import librosa
import itertools
from copy import copy
import numpy as np
import matplotlib.pyplot as plt
from collections import OrderedDict
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

In [None]:
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Add
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import PReLU
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.layers import GlobalMaxPooling2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau

In [None]:
np.random.seed(42)

In [None]:
def split_songs(X, y, w = 0.05, o = 0.5):
    var_x,var_y = [],[]

    x = X.shape[0]
    chunk = int(x*w)
    offset = int(chunk*(1.-o))
    
    split_song = [X[i:i+chunk] for i in range(0, x - chunk + offset, offset)]
    for s in split_song:
        if s.shape[0] != chunk:
            continue
        var_x.append(s)
        var_y.append(y)

    return np.array(var_x), np.array(var_y)

In [None]:
def melspectrogram_generator(songs, n_fft=1024, hop_length=256):
    mel_spec = lambda x: librosa.feature.melspectrogram(x, n_fft=n_fft,
        hop_length=hop_length, n_mels=128)[:,:,np.newaxis]

    tsongs = map(mel_spec, songs)
    return np.array(list(tsongs))

In [None]:
def data_split(X, y):
    specs_arr = []
    genres_arr = []
    
    for fn, genre in zip(X, y):
        signal, sr = librosa.load(fn)
        signal = signal[:song_samples]
        signals, y = split_songs(signal, genre)
        specs = melspectrogram_generator(signals)
        genres_arr.extend(y)
        specs_arr.extend(specs)
    
    return np.array(specs_arr), to_categorical(specs_arr)

In [None]:
def fetch_data(src_dir, genres, song_samples):    
    fu_arr, genres_arr = [], []

    for i,_ in genres.items():
        folder = src_dir + i
        for root, subdirs, files in os.walk(folder):
            for file in files:
                file_name = folder + "/" + file

                fu_arr.append(file_name)
                genres_arr.append(genres[i])
    
    X_train, X_test, y_train, y_test = train_test_split(
        fu_arr, genres_arr, test_size=0.25, random_state=42, stratify=genres_arr)
    
    X_train, y_train = data_split(X_train, y_train)
    X_test, y_test = data_split(X_test, y_test)

    return X_train, X_test, y_train, y_test

In [None]:
gtzan_dir = '../data/genres/'
song_samples = 660000
genres = {'metal': 0, 'disco': 1, 'classical': 2, 'hiphop': 3, 'jazz': 4, 
          'country': 5, 'pop': 6, 'blues': 7, 'reggae': 8, 'rock': 9}

X_train, X_test, y_train, y_test = fetch_data(gtzan_dir, genres, song_samples)

In [None]:
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

In [None]:
val, cnt = np.unique(np.argmax(y_train, axis=1), return_counts=True)
plt.bar(val, cnt)

val, cnt = np.unique(np.argmax(y_test, axis=1), return_counts=True)
plt.bar(val, cnt)
print("Train and Test Set Histogram")
plt.show()

In [None]:
from tensorflow.keras.utils import Sequence

class gtzan_generator(Sequence):
    def __init__(self, X, y, batch_size=64, is_test = False):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.is_test = is_test
    
    def __augment(self, sig, hflip = 0.5, random_cutout = 0.5):
        spectrograms =  []
        for s in sig:
            signal = copy(s)
            if np.random.rand() < hflip:
                signal = np.flip(signal, 1)
            if np.random.rand() < random_cutout:
                lines = np.random.randint(signal.shape[0], size=3)
                cols = np.random.randint(signal.shape[0], size=4)
                signal[lines, :, :] = -80
                signal[:, cols, :] = -80

            spectrograms.append(signal)
        return np.array(spectrograms)
    
    def __len__(self):
        return int(np.ceil(len(self.X)/self.batch_size))
    
    def __getitem__(self, index):
        sig = self.X[index*self.batch_size:(index+1)*self.batch_size]

        if not self.is_test:
            sig = self.__augment(signals)
        return sig, self.y[index*self.batch_size:(index+1)*self.batch_size]
    
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.X))
        np.random.shuffle(self.indexes)
        return None

In [None]:
def convolution_block(x_value, n_filters, pool_size=(2, 2)):
    x_value = Conv2D(n_filters, (3, 3), strides=(1, 1), padding='same')(x_value)
    x_value = Activation('relu')(x_value)
    x_value = MaxPooling2D(pool_size=pool_size, strides=pool_size)(x_value)
    x_value = Dropout(0.25)(x_value)
    return x_value

In [None]:
def generate_model(input_shape, num_genres):
    inpt = Input(shape=input_shape)
    x_value = convolution_block(inpt, 16)
    x_value = convolution_block(x_value, 32)
    x_value = convolution_block(x_value, 64)
    x_value = convolution_block(x_value, 128)
    x_value = convolution_block(x_value, 256)

    x_value = Flatten()(x_value)
    x_value = Dropout(0.5)(x_value)
    x_value = Dense(512, activation='relu', 
              kernel_regularizer=tf.keras.regularizers.l2(0.02))(x_value)
    x = Dropout(0.25)(x_value)
    predictions = Dense(num_genres, 
                        activation='softmax', 
                        kernel_regularizer=tf.keras.regularizers.l2(0.02))(x_value)
    
    model = Model(inputs=inpt, outputs=predictions)
    return model

In [None]:
model = generate_model(X_train[0].shape, 10)

In [None]:
model.summary()

### Loss function

In [None]:
model.compile(loss=tf.keras.losses.categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adam(),
              metrics=['accuracy'])

In [None]:
reduceLROnPlat = ReduceLROnPlateau(
    monitor='val_loss', 
    factor=0.95,
    patience=3,
    verbose=1,
    mode='min',
    min_delta=0.0001,
    cooldown=2,
    min_lr=1e-5
)

In [None]:
bsize = 128
train_generator = gtzan_generator(X_train, y_train)
steps_per_epoch = np.ceil(len(X_train)/bsize)

validation_generator = gtzan_generator(X_test, y_test)
val_steps = np.ceil(len(X_test)/bsize)

In [None]:
hist = model.fit_generator(
    train_generator,
    steps_per_epoch=steps_per_epoch,
    validation_data=validation_generator,
    validation_steps=val_steps,
    epochs=150,
    verbose=1,
    callbacks=[reduceLROnPlat])

In [None]:
score = model.evaluate(X_test, y_test, verbose=0)
print("Val Loss = {:.3f}".format(score[0]))
print("Val Acc = {:.3f}".format(score[1]))

In [None]:
plt.figure(figsize=(15,7))
plt.subplot(1,2,1)
plt.plot(hist.history['accuracy'], label='train')
plt.plot(hist.history['val_accuracy'], label='validation')
plt.title('Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1,2,2)
plt.plot(hist.history['loss'], label='train')
plt.plot(hist.history['val_loss'], label='validation')
plt.title('Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True')
    plt.xlabel('Predicted')

In [None]:
preds = np.argmax(model.predict(X_test), axis = 1)
y_orig = np.argmax(y_test, axis = 1)
cm = confusion_matrix(preds, y_orig)

In [None]:
keys = OrderedDict(sorted(genres.items(), key=lambda t: t[1])).keys()

plt.figure(figsize=(10,10))
plot_confusion_matrix(cm, keys, normalize=True)

## Majority Vote

In [None]:
def vote_count(scr):
    values, counts = np.unique(scr,return_counts=True)
    ind = np.argmax(counts)
    return values[ind]

In [None]:
preds = model.predict(X_test, batch_size=128, verbose=0)

In [None]:
scores_songs = np.split(np.argmax(preds, axis=1), 300)
scores_songs = [vote_count(scores) for scores in scores_songs]

In [None]:
label = np.split(np.argmax(y_test, axis=1), 300)
label = [vote_count(l) for l in label]

In [None]:
from sklearn.metrics import accuracy_score

print("voting = {:.3f}".format(accuracy_score(label, scores_songs)))

classical approach we passed from a 78.8% and in CNN we passed the accuracy to **82%**.

## Save the model

In [None]:
model.save('../models/cnn_model.h5')