<a href="https://colab.research.google.com/github/SayuriRavihari/ACM-ICPC-Algorithms/blob/master/Audio_Module.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import librosa
import joblib
import numpy as np
import random
from sklearn.model_selection import train_test_split
from keras.utils import np_utils
from sklearn.preprocessing import StandardScaler

from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#create spectrograms
import numpy as np
from numpy.lib import stride_tricks
import os
import scipy.io.wavfile as wav
"""
This script creates spectrogram matrices from wav files that can be passed \
to the CNN. This was heavily adopted from Frank Zalkow's work.
"""

"""
Short-time Fourier transform of audio signal.  
"""
def stft(sig, frameSize, overlapFac=0.5, window=np.hanning):
   
    win = window(frameSize)
    hopSize = int(frameSize - np.floor(overlapFac * frameSize))
    # zeros at beginning (thus center of 1st window should be for sample nr. 0)
    samples = np.append(np.zeros(int(np.floor(frameSize/2.0))), sig)
    # cols for windowing
    cols = np.ceil((len(samples) - frameSize) / float(hopSize)) + 1
    # zeros at end (thus samples can be fully covered by frames)
    samples = np.append(samples, np.zeros(frameSize))
    frames = stride_tricks.as_strided(samples, shape=(int(cols), int(frameSize)),
                                      strides=(samples.strides[0]*hopSize,
                                      samples.strides[0])).copy()
    frames *= win
    return np.fft.rfft(frames)

"""
Scale frequency axis logarithmically.
"""
def logscale_spec(spec, sr=44100, factor=20.):
   
    timebins, freqbins = np.shape(spec)

    scale = np.linspace(0, 1, freqbins) ** factor
    scale *= (freqbins-1)/max(scale)
    scale = np.unique(np.round(scale))

    # create spectrogram with new freq bins
    newspec = np.complex128(np.zeros([timebins, len(scale)]))
    for i in range(0, len(scale)):
        if i == len(scale)-1:
            newspec[:, i] = np.sum(spec[:, int(scale[i]):], axis=1)
        else:
            newspec[:, i] = np.sum(spec[:, int(scale[i]):int(scale[i+1])], axis=1)

    # list center freq of bins
    allfreqs = np.abs(np.fft.fftfreq(freqbins*2, 1./sr)[:freqbins+1])
    freqs = []
    for i in range(0, len(scale)):
        if i == len(scale)-1:
            freqs += [np.mean(allfreqs[int(scale[i]):])]
        else:
            freqs += [np.mean(allfreqs[int(scale[i]):int(scale[i+1])])]

    return newspec, freqs
print('done')
  
def stft_matrix(audiopath, binsize=2**10, offset=0):
    """
    A function that converts a wav file into a spectrogram represented by a \
    matrix where rows represent frequency bins, columns represent time, and \
    the values of the matrix represent the decibel intensity. A matrix of \
    this form can be passed as input to the CNN after undergoing normalization.
    """
    samplerate, samples = wav.read(audiopath)
    s = stft(samples, binsize)

    sshow, freq = logscale_spec(s, factor=1, sr=samplerate)
    ims = 20.*np.log10(np.abs(sshow)/10e-6)  # amplitude to decibel
    timebins, freqbins = np.shape(ims)

    ims = np.transpose(ims)
    ims = np.flipud(ims)  # weird - not sure why it needs flipping

    return ims

done


In [None]:
base = '/content/drive/My Drive/Audio_Data/Dataset/'
durr_array = []
dataset = {}
for i in os.listdir(base):
  print(i)
  d = []
  for j in os.listdir(base+i):
    x = stft_matrix(base+i+'/'+j)
    sh = x.shape[1]
    if (sh>=100):
      n = int(sh/100)
      for ii in range(n):
        d.append(x[:,ii*100:(ii+1)*100])
      
    else:
      n = int(100/sh)
      y = x
      for jj in range(n):
        y = np.concatenate((y,x),axis=1)
      d.append(y[:,:100])
    
  dataset[i] = d

joblib.dump(dataset, '/content/drive/My Drive/Audio_Data/dataset.joblib')

In [None]:
dataset = joblib.load('/content/drive/My Drive/Audio_Data/Dataset/dataset.joblib')

new_dataset = {}

for key in dataset:
  temp = []
  for i in dataset[key]:
    if np.all(np.isfinite(i)):
      temp.append(i)
  new_dataset[key] = temp

dataset = {}

In [None]:
def preprocess(X_train, X_test):
    """
    Convert from float64 to float32 and normalize normalize to decibels
    relative to full scale (dBFS) for the 4 sec clip.
    """
    X_train = X_train.astype('float32')
    X_test = X_test.astype('float32')

    X_train = np.array([(X - X.min()) / (X.max() - X.min()) for X in X_train])
    X_test = np.array([(X - X.min()) / (X.max() - X.min()) for X in X_test])
    return X_train, X_test


def prep_train_test(X_train, y_train, X_test, y_test, nb_classes):
    """
    Prep samples ands labels for Keras input by noramalzing and converting
    labels to a categorical representation.
    """
    print('Train on {} samples, validate on {}'.format(X_train.shape[0],
                                                       X_test.shape[0]))

    # normalize to dBfS
    X_train, X_test = preprocess(X_train, X_test)

    # Convert class vectors to binary class matrices
    Y_train = np_utils.to_categorical(y_train, nb_classes)
    Y_test = np_utils.to_categorical(y_test, nb_classes)

    return X_train, X_test, Y_train, Y_test
    print('done')

"""
def cnn(X_train, y_train, X_test, y_test, batch_size,
        nb_classes, epochs, input_shape):
    
    The Convolutional Neural Net architecture for classifying the audio clips
    as normal (0) or depressed (1).
    
    model = Sequential()

    model.add(Conv2D(32, (5, 5), input_shape=input_shape, activation='relu'))
    model.add(MaxPooling2D(pool_size=(4, 4)))
    model.add(Conv2D(32, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(nb_classes))
    model.add(Activation('softmax'))

    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

    history = model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs,
                        verbose=1, validation_data=(X_test, y_test))

    # Evaluate accuracy on test and train sets
    score_train = model.evaluate(X_train, y_train, verbose=0)
    print('Train accuracy:', score_train[1])
    score_test = model.evaluate(X_test, y_test, verbose=0)
    print('Test accuracy:', score_test[1])

    return model, history
    """

In [None]:
nb_classes = 11
labels_map = {'Scary Sounds':0,'Fights':1,'Animal Sounds':2, 'Cheering':3, 'Vehicle Sounds':4, 'Shouting':5, 'Gunshots':6, 'Music':7, 'Explosions':8, 'Fire':9, 'Nature':10}

train_data = []
train_labels = []
test_data = []
test_labels = []

for key in new_dataset:
  value = new_dataset[key]
  random.shuffle(value)
  length = int(len(value)*0.8)
  train_data.extend(value[:length])
  train_labels.extend([labels_map[key] for x in range(length)])

  test_data.extend(value[length:])
  test_labels.extend([labels_map[key] for x in range(len(value) - length)])


train_data, test_data, train_labels, test_labels = prep_train_test(np.array(train_data), np.array(train_labels), np.array(test_data), np.array(test_labels), nb_classes)
X_train, X_val, y_train, y_val = train_test_split(train_data, train_labels, test_size=0.2, stratify=train_labels, random_state=42)

new_dataset = {}
train_data = []
train_labels = []