In [1]:
from matplotlib import pyplot as plt

from scipy.signal import fftconvolve
from scipy.linalg import dft
from scipy.io import wavfile
from scipy.fftpack import fft, ifft, dct, idct

import pandas as pd
import numpy as np
import IPython
import time
import math
import os

In [2]:
def load_data(directory='Baby cry'):
  
    subdirs = os.listdir(directory)
    wavforms = []
    labels = []
    for dirc in subdirs:
        filenames = os.listdir(directory+'/'+dirc)
        for filename in filenames:
            # Get wavform from each file 
            wav = wavfile.read(directory+'/'+dirc+'/'+filename)[1]
                
            # Store wavform and appropriate label
            wavforms.append(wav)
            labels.append(dirc)
    
    # Find the minimum wavform length.
    lens = set()
    for d in wavforms:
        lens.add(len(d))
    min_len = min(lens)
    
    # Cut the end off each wavform so they're the same lengths.
    for i,wav in enumerate(wavforms):
        wavforms[i] = wav[:min_len]
        
    return np.array(wavforms), np.array(labels)

def cut_data(data, labels, n):
   
    new_data = []
    new_labels = []
    # Get number of possible subsamples from current samples
    subs = data.shape[1]//n

    # Cut each sample
    for i, wav in enumerate(data):
        # Get as many new samples from old sample as possible
        for x in range(subs):
            # Extract subsamples of length n
            new_data.append(wav[x*n:(x+1)*n])
            # Store corresponding label
            new_labels.append(labels[i])
            
    return np.array(new_data), np.array(new_labels)



def get_fft(data, rate=8000, sample_rate=100):
  
    freqs = []
    freq_domain = np.arange(0, len(data[0])) * rate / len(data[0])
    freq_domain = freq_domain[:len(freq_domain)//2]

    for i, wav in enumerate(data):
        # Get frequencies using Fourier Transform
        f = np.abs(fft(wav))[:len(freq_domain)]
        # Get real part of frequencies, normalize, and sample at the sample rate
        tmp = (np.real(f) / max(np.real(f)))[::sample_rate]
        freqs.append(list(tmp))
    
    return np.array(freqs), freq_domain[::sample_rate]

def data_to_df(data, labels, freqs):
    
    # Data to df
    df = pd.DataFrame(data,columns=freqs)
    # Labels column
    df['labels'] = labels
    return df

def labels_to_integers(labels):
    
    new_labels = np.zeros(labels.shape)
    mapping = {}
    
    for idx, lab in enumerate(np.unique(labels)):
        new_labels[labels==lab] = idx
        mapping[idx] = lab
    
    return new_labels, mapping

#### DATA PREPARATION 

In [3]:
data, labels = load_data('baby cry')

In [4]:
set(labels)

{'belly_pain augmented',
 'burping augmented',
 'discomfort augmented',
 'hungry',
 'tired_augmented'}

In [5]:
print('|{0}\t|{1}\t|{2}\t|{3}\t|{4}\t|{5}\t|{6}'.format('# samples', 'avg len', 
                                                       'avg min wvl', 'avg max wvl', 
                                                       'abs min wvl', 'abs max wvl', 'label'))
print('------------------------------------------------------------------------------------------------------')
for label in set(labels):
    tmp = data[labels==label]
    num_samples = len(tmp)
    avg_len = sum(len(row) for row in tmp)/num_samples
    avg_max_wvl = sum([max(row) for row in tmp])/num_samples
    avg_min_wvl = sum([min(row) for row in tmp])/num_samples
    abs_min_wvl = min([min(row) for row in tmp])
    abs_max_wvl = max([max(row) for row in tmp])
    print(u"|{0}\t\t|{1:0.2f}\t|{2:0.2f}\t|{3:0.2f}\t|{4:0.2f}\t|{5:0.2f}\t|{6}".format(num_samples, avg_len, 
                                                                                  avg_min_wvl, avg_max_wvl, 
                                                                                  abs_min_wvl, abs_max_wvl,
                                                                                  label))

|# samples	|avg len	|avg min wvl	|avg max wvl	|abs min wvl	|abs max wvl	|label
------------------------------------------------------------------------------------------------------
|382		|52160.00	|-19543.69	|20013.27	|-32768.00	|32767.00	|hungry
|72		|52160.00	|-14417.58	|14879.79	|-32768.00	|32767.00	|tired_augmented
|81		|52160.00	|-16962.59	|17576.43	|-32768.00	|32767.00	|discomfort augmented
|48		|52160.00	|-13504.48	|12411.73	|-32768.00	|32767.00	|belly_pain augmented
|24		|52160.00	|-24876.50	|25274.08	|-32768.00	|32767.00	|burping augmented


In [6]:
non_hungry_data = data[labels!='hungry']
non_hungry_labels = labels[labels!='hungry']
cut_data, cut_labels = cut_data(non_hungry_data,non_hungry_labels, 25000)
#dup_data, dup_labels = duplicate_data(cut_data,cut_labels)

In [7]:
hungry_data = data[labels=='hungry'][:140]
hungry_labels = ['hungry']*140
hungry_data = hungry_data[:,:25000]

In [8]:
final_data = np.vstack((hungry_data,cut_data))
final_labels = np.hstack((hungry_labels, cut_labels))

In [10]:
#X, f_domain = get_fft(final_data)

In [9]:
print('|{0}\t|{1}\t|{2}\t|{3}\t|{4}\t|{5}\t|{6}'.format('# samples', 'avg len', 
                                                       'avg min freq', 'avg max freq', 
                                                       'abs min freq', 'abs max freq', 'label'))
print('------------------------------------------------------------------------------------------------------')
for label in set(final_labels):
    tmp = final_data[final_labels==label]
    num_samples = len(tmp)
    avg_len = sum(len(row) for row in tmp)/num_samples
    avg_max_wvl = sum([max(row) for row in tmp])/num_samples
    avg_min_wvl = sum([min(row) for row in tmp])/num_samples
    abs_min_wvl = min([min(row) for row in tmp])
    abs_max_wvl = max([max(row) for row in tmp])
    print(u"|{0}\t\t|{1:0.2f}\t\t|{2:0.2f}\t\t|{3:0.2f}\t\t|{4:0.2f}\t\t|{5:0.2f}\t\t|{6}".format(num_samples, avg_len, 
                                                                                  avg_min_wvl, avg_max_wvl, 
                                                                                  abs_min_wvl, abs_max_wvl,
                                                                                  label))

|# samples	|avg len	|avg min freq	|avg max freq	|abs min freq	|abs max freq	|label
------------------------------------------------------------------------------------------------------
|140		|25000.00		|-17063.59		|17257.78		|-32768.00		|32767.00		|hungry
|144		|25000.00		|-11016.14		|11404.10		|-32768.00		|32767.00		|tired_augmented
|162		|25000.00		|-14598.77		|15190.67		|-32768.00		|32767.00		|discomfort augmented
|96		|25000.00		|-10876.91		|10223.34		|-32768.00		|32767.00		|belly_pain augmented
|48		|25000.00		|-20265.56		|20877.73		|-32768.00		|32767.00		|burping augmented


In [11]:
final_data.shape

(590, 25000)

### Apply MFCC + GMMHMM : 

In [12]:
import librosa
from sklearn.model_selection import train_test_split
import re
import os
import librosa
import gmmhmm
import pickle # Will want to save models so we don't have to keep re-training them.
from multiprocessing import Pool # train models faster.

In [13]:
def initialize_gmmhmm(n):
  
    np.random.seed(int.from_bytes(os.urandom(4), byteorder='little'))
    
    # create initial state distribution
    startprob = np.random.rand(n)
    startprob /= np.sum(startprob)
    
    # create transition matrix
    transmat = np.random.rand(n,n)
    transmat = (transmat.T/np.sum(transmat, axis=1)).T
    
    # return results
    return startprob, transmat

def make_new_model(mfcc):
    
    startprob, transmat = initialize_gmmhmm(5)
    model = gmmhmm.GMMHMM(
        n_components=5, 
        n_mix=3, # Could try varying this? 
        transmat=transmat, 
        startprob=startprob, 
        cvtype='diag'
    )
    model.covars_prior = 0.02 # Could try varying this?
    model.fit(mfcc, init_params='mc', var=0.1) # Could try varying this?
    return model

def generate_models(X, y, num_compare=6):
    
    for model_num in np.unique(y):
        
        print(f'Generating GMM HMM for label {model_num}...', end='')
        
        # Generate several models with different initializations.     
        multiple_models = []
        for _ in range(num_compare):
            failed = True
            while failed:
                try:
                    new_model = make_new_model(X[y == model_num])
                    multiple_models.append(new_model)
                    failed = False
                except ValueError as err:
                    print('..failed, try again..',end='')
        
        # Choose the 'best' model (based on log-probability).
        model = multiple_models[np.argmax([x.logprob for x in multiple_models])]
        
        print('...saving.')
        save_gmmhmm_model(model, f'model{model_num}', filepath='gmmhmm_models')

def save_gmmhmm_model(model, filename, filepath='.'):
   
    with open(f'{filepath}/{filename}.pickle', 'wb') as fh:
        pickle.dump(model, fh, protocol=pickle.HIGHEST_PROTOCOL)

def get_gmmhmm_models(y, filepath='gmmhmm_models'):
   
    models = []
    for n in np.unique(y):
        with open(f'{filepath}/model{n}.pickle', 'rb') as model_file:
            models.append(pickle.load(model_file))
    
    return models

def gmmhmm_predict(models, X):
  
    most_likely = np.array(
        [np.argmax([m.score(cry) for m in models]) for cry in X]
    )
    return most_likely

In [14]:
# Obtain the Mel-Frequency Cepstrum Coefficients (MFCCs)
Xmfcc = []
for x in final_data:
    Xmfcc.append(librosa.feature.mfcc(y=x.astype(float), sr=8000, n_mfcc=12))
Xmfcc = np.array(Xmfcc)
Xmfcc = np.swapaxes(Xmfcc, 1, 2)
Xmfcc.shape

(590, 49, 12)

In [15]:
int_labels = labels_to_integers(final_labels)[0]
X_train, X_test, y_train, y_test = train_test_split(final_data, int_labels)

In [16]:
Xmfcc_train, Xmfcc_test, X_train, X_test, y_train, y_test = train_test_split(
    Xmfcc, final_data, int_labels, test_size=0.25, random_state=42)

In [17]:
generate_models(np.array(Xmfcc_train), y_train, num_compare=3)

Generating GMM HMM for label 0.0......saving.
Generating GMM HMM for label 1.0.....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again.....saving.
Generating GMM HMM for label 2.0.....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again.....saving.
Generating GMM HMM for label 3.0......saving.
Generating GMM HMM for label 4.0.....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again....failed, try again.....saving.


In [20]:
gmmhmm_models = get_gmmhmm_models(y_train)

In [21]:
preds = gmmhmm_predict(gmmhmm_models, Xmfcc_test)
preds

array([3, 2, 4, 0, 0, 3, 2, 0, 3, 3, 3, 1, 2, 0, 2, 2, 0, 2, 4, 4, 3, 3,
       1, 4, 0, 3, 3, 3, 1, 2, 3, 0, 0, 1, 1, 0, 3, 0, 4, 4, 1, 0, 0, 0,
       2, 1, 3, 3, 3, 1, 0, 2, 3, 4, 0, 2, 3, 3, 2, 0, 2, 2, 3, 0, 3, 2,
       2, 2, 1, 3, 3, 0, 3, 3, 3, 0, 0, 3, 3, 1, 0, 1, 3, 3, 1, 3, 3, 4,
       0, 3, 0, 3, 2, 3, 0, 1, 4, 2, 1, 0, 0, 1, 3, 2, 3, 3, 1, 0, 0, 1,
       2, 2, 1, 2, 3, 2, 4, 0, 3, 0, 2, 0, 1, 2, 4, 3, 2, 3, 3, 3, 1, 1,
       3, 3, 0, 3, 4, 1, 2, 3, 3, 3, 0, 2, 2, 2, 4, 1], dtype=int64)

In [22]:
np.mean(preds == y_test)

0.5

At this point, it seemed very likely that our best option would be neural networks: even though GMMHMM
classifiers were often used for automatic speech recognition in the past, neural networks have since
replaced them as the industry standard.