In [None]:
integrantes = ("Contreras, Daniel", "Diaz Villa, Virginia", "Hruszecki, Darío", "Marchio, Sergio")

print("TP2 AA, grupo 2 \n")

for i in integrantes:
    print(i)

## Import de librerias utilizadas

In [None]:
import librosa
import glob
import numpy as np
import seaborn as sns
import pandas as pd

import os
import os.path
from os import path

import matplotlib.pyplot as plt

from enum import Enum
from time import time

from IPython.display import Audio
from librosa.display import specshow
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier

## Funciones para la obtención de features y modelado

In [None]:
class AudioType(Enum):
  TRAIN = 1   
  VALIDATION = 2   
  TEST = 3   
  CUSTOM = 4   

In [None]:
class Feature:
  def __init__(self, filename, audio_type, noise_type=None):
    self.filename = filename
    self.noise_type = noise_type
    self.audio_type = audio_type

In [None]:
class Features(Enum):
    Train = Feature("train_features.csv",AudioType.TRAIN)
    Validation = Feature("valid_features.csv", AudioType.VALIDATION)
    Test = Feature("test_features.csv", AudioType.TEST) 
    Test_Noise_Gauss = Feature("test_features_gauss.csv",AudioType.TEST, "gauss")
    Test_Noise_Dishes = Feature("test_features_dishes.csv", AudioType.TEST, "doing_the_dishes.wav")
    Test_Noise_Dude = Feature("test_features_dude.csv", AudioType.TEST, "dude_miaowing.wav")
    Test_Noise_Bike = Feature("test_features_bike.csv", AudioType.TEST, "exercise_bike.wav")
    Test_Noise_Pink = Feature("test_features_pink.csv", AudioType.TEST, "pink_noise.wav")
    Test_Noise_Running = Feature("test_features_running.csv", AudioType.TEST, "running_tap.wav")            
    Test_Noise_White = Feature("test_features_white.csv", AudioType.TEST, "white_noise.wav")
    Test_Custom = Feature("test_features_custom.csv", AudioType.CUSTOM)

In [None]:
def download_feature_if_needed(filename):
    if not path.exists("features/"+filename):
        url = "https://raw.githubusercontent.com/dhruszecki/cdatos-AA-TP2/master/features/"+filename
        !wget {url} -P features
    else:
        print("features/" + filename, " already exists! :)")

In [None]:
audio_filenames = None

to_number = {
  "zero": 0,
  "one": 1,
  "two": 2,
  "three": 3,
  "four": 4,
  "five": 5,
  "six": 6,
  "seven": 7,
  "eight": 8,
  "nine": 9
}

def get_filenames():
    
    global audio_filenames
    
    if audio_filenames!= None:
        return audio_filenames
    
    if not path.exists("speechcommands"):
        !wget http://download.tensorflow.org/data/speech_commands_v0.01.tar.gz
        !mkdir speechcommands
        !tar -xf speech_commands_v0.01.tar.gz -C speechcommands

    if not path.exists("customcommands"):
        !wget https://raw.githubusercontent.com/dhruszecki/cdatos-AA-TP2/master/resources/custom_commands.tar.gz
        !mkdir customcommands
        !tar -xf custom_commands.tar.gz -C customcommands
    
    numbers_filenames = []
    custom_filenames = []

    for i in to_number:
        numbers_filenames.append(glob.glob('speechcommands/' + i + '/*.wav'))
        custom_filenames = custom_filenames + glob.glob("customcommands/" + i + '/*.wav')

    test_filenames= ['speechcommands/' + e for e in open('speechcommands/testing_list.txt','r').read().splitlines() if e[:e.find('/')] in to_number]
    valid_filenames= ['speechcommands/' + e for e in open('speechcommands/validation_list.txt','r').read().splitlines() if e[:e.find('/')] in to_number]
    train_filenames= [e for n in range(10) for e in numbers_filenames[n] if (e not in test_filenames) and (e not in valid_filenames)]
    
        
    print('train_count:' , len(train_filenames))
    print('test_count:' , len(test_filenames))
    print('valid_count:' , len(valid_filenames))
    print('custom_count:' , len(custom_filenames))
    
    audio_filenames = {
      AudioType.TRAIN : train_filenames,
      AudioType.TEST : test_filenames,
      AudioType.VALIDATION : valid_filenames,
      AudioType.CUSTOM : custom_filenames 
    }
        
    return audio_filenames

In [None]:
def apply_noise(audio, noise_type):
    ruido = None
    if noise_type == "gauss":
        ruido = np.random.normal(size=audio.shape)
    else:
        ruido, sr = librosa.core.load('speechcommands/_background_noise_/'+noise_type, sr=None)
        ruido = 0.5*ruido[5000:5000+len(audio)]        
    
    return ruido + audio

In [None]:
def calculate_features(filename, n_mfcc=12, delta=True, deltadelta=True, energy=True, summary_fn = [np.mean, np.std],
                       summary_names=['mean', 'std'], noise_type=None):  
  #Abro el archivo:
  x, sr = librosa.core.load(filename,sr=None)
  
  if (noise_type != None):
        x = apply_noise(x, noise_type)

  #Calculo MFCCs
  features = librosa.feature.mfcc(x,sr=sr,n_mfcc=n_mfcc)

  #Calculo energia:
  if energy:
    energy = librosa.feature.rmse(x)
    features = np.concatenate([features,energy])

  #Aplico media y desvio estandar por defecto
  summary_features = np.concatenate([fn(features,axis=1) for fn in summary_fn])
    
  #Lo mismo con los delta
  if delta:
    deltafeatures = np.diff(features)
    summary_features = np.concatenate([summary_features,np.concatenate([fn(deltafeatures,axis=1) for fn in summary_fn])])

  #Y con los delta de segundo orden
  if deltadelta:
    deltadeltafeatures = np.diff(features,n=2)
    summary_features = np.concatenate([summary_features,np.concatenate([fn(deltadeltafeatures,axis=1) for fn in summary_fn])]) 
  
  summary_features = np.append(summary_features, [to_number[filename.split('/')[1]], filename])

  return summary_features

In [None]:
features_names= None
def name_features(filename = '', n_mfcc=12, delta=True, deltadelta=True, energy=True, summary_fn = [np.mean, np.std], summary_names=['mean', 'std']):
    global features_names
    
    if features_names!= None:
        return features_names
    
    features_names = ['mfcc_{}'.format(i) for i in range(n_mfcc)]
    if energy: features_names = features_names + ['energy']
    features_names = ['{}_{}'.format(name_i,summ_i) for summ_i in summary_names for name_i in features_names]
    if delta: d_names = ['d{}'.format(name) for name in features_names]
    if deltadelta: dd_names = ['dd{}'.format(name) for name in features_names]

    features_names = features_names + d_names + dd_names + ['digit', 'file']

    return features_names

In [None]:
def calculate_features_if_needed(feature):
    result_path = "features/"+feature.filename
    result = []
    if not path.exists(result_path):
        print("Getting filenames")
        filenames = get_filenames()[feature.audio_type]        
        
        print("Calculating features for ", len(filenames), " rows -> ", result_path)
        features_names = name_features()
        features_data = [calculate_features(filename, noise_type= feature.noise_type) for filename in filenames]
        pd.DataFrame(data = features_data, columns = features_names).to_csv(result_path)
        
    
    print("Loading saved features <- ", result_path)
    result = pd.read_csv(result_path)
    
    return result

In [None]:
def load_features(feature):
    features = pd.DataFrame(calculate_features_if_needed(feature))
    features.drop(['file', 'Unnamed: 0'], axis=1, inplace=True)
    print(pd.crosstab(index=features["digit"], columns="count", normalize=True))
    return features

In [None]:
def printAccuracy(label, model, features): 
    print("Accuracy "+label+" : {:.3f}".format(model.score(features.drop('digit',axis=1).values, 
                                                   features.digit.values))) 

##  Obtenemos features

### Descargamos los features guardados previamente

In [None]:
download_feature_if_needed("train_features.csv")
download_feature_if_needed("valid_features.csv")
download_feature_if_needed("test_features.csv")
download_feature_if_needed("test_features_gauss.csv")
download_feature_if_needed("test_features_dude.csv")
download_feature_if_needed("test_features_bike.csv")
download_feature_if_needed("test_features_pink.csv")
download_feature_if_needed("test_features_running.csv")
download_feature_if_needed("test_features_white.csv")
download_feature_if_needed("test_features_dishes.csv")
download_feature_if_needed("test_features_custom.csv")

### Features de entrenamiento

In [None]:
%%time
train_features = load_features(Features.Train.value)

### Features de validación

In [None]:
%%time
validation_features = load_features(Features.Validation.value)

### Features de Desarrollo

In [None]:
develop_features = validation_features.append(train_features, ignore_index=True)

### Features de test

#### Test Originales

In [None]:
%%time
test_features = load_features(Features.Test.value)

#### Test con ruido Gausiano

In [None]:
%%time
test_features_gauss = load_features(Features.Test_Noise_Gauss.value)

#### Test con ruido `doing_the_dishes`

In [None]:
%%time
test_features_dishes = load_features(Features.Test_Noise_Dishes.value)

#### Test con ruido `dude_miaowing`

In [None]:
%%time
test_features_dude = load_features(Features.Test_Noise_Dude.value)

#### Test con ruido `exercise_bike`

In [None]:
%%time
test_features_bike = load_features(Features.Test_Noise_Bike.value)

#### Test con ruido `pink_noise`

In [None]:
%%time
test_features_pink = load_features(Features.Test_Noise_Pink.value)

#### Test con ruido `running_tap`

In [None]:
%%time
test_features_running = load_features(Features.Test_Noise_Running.value)

#### Test con ruido `white_noise`

In [None]:
%%time
test_features_white = load_features(Features.Test_Noise_White.value)

#### Test con audios `custom`

In [None]:
%%time
test_features_custom = load_features(Features.Test_Custom.value)

# Modelos

## Naive Bayes

### Entrenamiento

In [None]:
%%time

naive_bayes = GaussianNB()
naive_bayes.fit(train_features.drop('digit',axis=1).values, train_features.digit.values)
predict_nb1 = naive_bayes.predict(validation_features.drop('digit',axis=1).values)

printAccuracy("Training", naive_bayes, train_features)
printAccuracy("Validación", naive_bayes, validation_features)

print(classification_report(validation_features.digit.values, predict_nb1))

###  Matriz de Confusión 

In [None]:
%%time

sns.set_context('paper')
sns.heatmap(confusion_matrix(validation_features.digit.values, predict_nb1), annot=True, fmt='g')
plt.xlabel('Predict')
plt.ylabel('True')

### Impacto del ruido en el rendimiento 

In [None]:
%%time

naive_bayes_dev = GaussianNB()
naive_bayes_dev.fit(develop_features.drop('digit',axis=1).values, develop_features.digit.values)
predict_nb_test = naive_bayes_dev.predict(test_features.drop('digit',axis=1).values)
predict_nb_gauss = naive_bayes_dev.predict(test_features_gauss.drop('digit',axis=1).values)
predict_nb_dishes = naive_bayes_dev.predict(test_features_dishes.drop('digit',axis=1).values)
predict_nb_dude = naive_bayes_dev.predict(test_features_dude.drop('digit',axis=1).values)
predict_nb_bike = naive_bayes_dev.predict(test_features_bike.drop('digit',axis=1).values)
predict_nb_running = naive_bayes_dev.predict(test_features_running.drop('digit',axis=1).values)
predict_nb_pink = naive_bayes_dev.predict(test_features_pink.drop('digit',axis=1).values)
predict_nb_white = naive_bayes_dev.predict(test_features_white.drop('digit',axis=1).values)

printAccuracy("Develop", naive_bayes_dev, develop_features)

printAccuracy("Test", naive_bayes_dev, test_features)
print(classification_report(test_features.digit.values, predict_nb_test))

printAccuracy("Ruido Gaussiano", naive_bayes_dev, test_features_gauss)
print(classification_report(test_features_gauss.digit.values, predict_nb_gauss, zero_division=0))

printAccuracy("Ruido Doing the Dishes", naive_bayes_dev, test_features_dishes)
print(classification_report(test_features_dishes.digit.values, predict_nb_dishes, zero_division=0))

printAccuracy("Ruido Dude Miaowing", naive_bayes_dev, test_features_dude)
print(classification_report(test_features_dude.digit.values, predict_nb_dude, zero_division=0))

printAccuracy("Ruido Exercise Bike", naive_bayes_dev, test_features_bike)
print(classification_report(test_features_bike.digit.values, predict_nb_bike, zero_division=0))

printAccuracy("Ruido Running Tap", naive_bayes_dev, test_features_running)
print(classification_report(test_features_running.digit.values, predict_nb_running, zero_division=0))

printAccuracy("Ruido Pink Noise", naive_bayes_dev, test_features_pink)
print(classification_report(test_features_pink.digit.values, predict_nb_pink, zero_division=0))

printAccuracy("Ruido White Noise", naive_bayes_dev, test_features_white)
print(classification_report(test_features_white.digit.values, predict_nb_white, zero_division=0))



### Impacto de los audios generados por el equipo en el rendimiento

In [None]:
predict_nb_custom = naive_bayes_dev.predict(test_features_custom.drop('digit',axis=1).values)
printAccuracy("Custom", naive_bayes_dev, test_features_custom)
print(classification_report(test_features_custom.digit.values, predict_nb_custom))

## Random Forest:  búsqueda de hiperparámetros óptimos

In [None]:
%%time

parametros = {'n_estimators':range(100, 250, 15), 'max_depth':range(6, 12, 2), 'bootstrap':[True, False]}

clf = RandomizedSearchCV(RandomForestClassifier(random_state=22), parametros, n_jobs=20, random_state=131313,    
                         scoring='accuracy', n_iter=30, cv =  [(slice(None), slice(None))])

clf.fit(train_features.drop('digit',axis=1).values, train_features.digit.values)
rf = clf.best_estimator_  

print(clf.best_score_, clf.best_params_)
printAccuracy("Training", rf, train_features)
printAccuracy("Validation", rf, validation_features)

###  Matriz de Confusión 

In [None]:
rf_pred = rf.predict(validation_features.drop('digit',axis=1).values)
sns.set_context('paper')
sns.heatmap(confusion_matrix(validation_features.digit.values, rf_pred), annot=True, fmt='g')
plt.xlabel('Predict')
plt.ylabel('True')

### Impacto del ruido en el rendimiento 

In [None]:
%%time

rf.fit(develop_features.drop('digit',axis=1).values, develop_features.digit.values)
printAccuracy("Develop", rf, develop_features)
printAccuracy("Test", rf, test_features)
printAccuracy("Ruido Gaussiano", rf, test_features_gauss)
printAccuracy("Ruido Doing the Dishes", rf, test_features_dishes)
printAccuracy("Ruido Dude Miaowing", rf, test_features_dude)
printAccuracy("Ruido Exercise Bike", rf, test_features_bike)
printAccuracy("Ruido Running Tap", rf, test_features_running)
printAccuracy("Ruido Pink Noise", rf, test_features_pink)
printAccuracy("Ruido White Noise", rf, test_features_white)

### Impacto de los audios generados por el equipo en el rendimiento


In [None]:
%%time

printAccuracy("Ruido Custom", rf, test_features_custom)

## Gradient Boosting

In [None]:
%%time

parameters = {'n_estimators':range(50, 150, 25), 'max_depth':range(5,7), 'learning_rate':np.arange(0,1,0.1)}
clf = RandomizedSearchCV(GradientBoostingClassifier(random_state=22), parameters, n_jobs=-1, scoring='accuracy',cv=[(slice(None), slice(None))], n_iter=50, random_state=8)

clf.fit(train_features.drop('digit',axis=1).values, train_features.digit.values)
gb = clf.best_estimator_

print (clf.best_score_, clf.best_params_)
printAccuracy("Training", gb, train_features)
printAccuracy("Validation", gb, validation_features)

###  Matriz de Confusión 

In [None]:
gb_pred = gb.predict(validation_features.drop('digit',axis=1).values)
sns.set_context('paper')
sns.heatmap(confusion_matrix(validation_features.digit.values, gb_pred), annot=True, fmt='g')
plt.xlabel('Predict')
plt.ylabel('True')

### Impacto del ruido en el rendimiento 

In [None]:
%%time

gb.fit(develop_features.drop('digit',axis=1).values, develop_features.digit.values)
printAccuracy("Develop", gb, develop_features)
printAccuracy("Test", gb, test_features)
printAccuracy("Ruido Gaussiano", gb, test_features_gauss)
printAccuracy("Ruido Doing the Dishes", gb, test_features_dishes)
printAccuracy("Ruido Dude Miaowing", gb, test_features_dude)
printAccuracy("Ruido Exercise Bike", gb, test_features_bike)
printAccuracy("Ruido Running Tap", gb, test_features_running)
printAccuracy("Ruido Pink Noise", gb, test_features_pink)
printAccuracy("Ruido White Noise", gb, test_features_white)

### Impacto de los audios generados por el equipo en el rendimiento

In [None]:
%%time

printAccuracy("Ruido Custom", gb, test_features_custom)

# Anexos

## Tratamiento audios grabados por el equipo

In [None]:
# para ejecutar desde google colab online
from google.colab import drive
drive.mount('/content/gdrive')

# directorio donde esté el notebook y los archivos de sonido
carpeta_tp = "AA_TP2"

os.chdir("gdrive/My Drive/" + carpeta_tp)

os.getcwd()

In [None]:
# lista para guardar los audios nuestros (están en carpeta "audios")
# nparray (salida de librosa), fs, filename
audios_nos = []

for filename in os.listdir("audios"):
    if filename.endswith(".wav"):
        x, fs = librosa.core.load('audios/' + filename, sr = 16000)
        audios_nos += [[x] + [fs] + [filename]]
        #display(Audio(x, rate = fs))
        #plt.figure(figsize = (15,5))
        #plt.plot(x)
        print(filename)


In [None]:
# el metodo de separar los audios con split_points = librosa.effects.split(x, top_db = top_db)
# se hace engorroso pues cambiando top_db se empieza a perder info del audio
# es compromiso entre que separe ruidos como audios extra y que se entiendan los numeros...
# y a veces no se logra => nuevo método

# la idea es dividirlos por el punto medio entre los maximos
# teninendo en cuenta que los dígitos se grabaron bastante separados 
# y aproximadamente equiespaciados

def get_audio_split_points(x, aname = '', start_div = 0, n_audios = 11, listen = False, plot = False):
    # busco los máximos del audio "x", para eso defino:
    # start: un punto de comienzo (por si hay 'clicks' al principio del audio)
    # win_length: un ancho de ventana para buscar el maximo
    # windows: las posiciones de las ventanas en funcion de esos parámetros, conservando las 11 primeras para delimitar los digitos
    win_length = len(x)//n_audios
    if start_div == 0:
        start = 0
    else:
        start = win_length//start_div
    
    windows = range(start, len(x), win_length)

    # encuentra la posicion del máximo en cada ventana
    maximos = [i + np.where(x[i:] == max(x[i:i + win_length]))[0][0] for i in windows[:-1]]
    # calcula las coordenadas de los cortes como el promedio entre las posiciones de los maximos
    split_coords = [(maximos[s] + maximos[s+1])//2 for s in range(len(maximos) - 1)]
    # el ultimo corte es sumando la distancia del ultimo maximo al corte anterior
    split_coords += [maximos[-1] + maximos[-1] - split_coords[-1]]
    # el primer corte es restando la distancia del final del primer audio a su maximo
    split_coords.insert(0, maximos[0] - (split_coords[0] - maximos[0]))

    # genera las coordenadas de cada sub-audio como [inicio, fin] y las junta en un array
    split_points = [[split_coords[i], split_coords[i+1]]  for i in range(len(split_coords) - 1)]

    if (plot | listen) == True:
        plt.figure(figsize=(15,5))
        plt.plot(x, color = 'k')

        for xc in windows:
            plt.axvline(x = xc, color = '#55CC5588')
        for xc in maximos:
            plt.axvline(x = xc, color = '#AA000088')
        for xc in split_coords:
            plt.axvline(x = xc, color = 'k', linestyle = 'dashed')

        # grafico el audio con las divisiones para verificar, más widgets para escucharlos
        print(aname, 'k = ', k)
        for split in split_points:
            if listen == True:
                display(Audio(x[split[0]:split[1]], rate = fs))
            if plot == True:
                xplot = np.zeros_like(x)
                xplot[split[0]:split[1]] = x[split[0]:split[1]]
                plt.plot(xplot)
        if plot == True:
            plt.show()
        print('\n')

    return split_points

In [None]:

# no itero en los elementos directamente pues necesito saber el indice de alguno que no se haya partido bien
# para analizarlo luego. este es un analisis exploratorio en cierto sentido
for k in range(len(audios_nos)):
    get_audio_split_points(audios_nos[k][0], aname = audios_nos[k][-1], start_div = 2, n_audios = 11, listen = True, plot = True)
 
# se observa que para todos los elementos menos
# 13 17 18
# funciona bien start_div = 2

In [None]:
# analizamos los casos puntuales
# para 18 anda bien 1
# apra 13 y 17 anda bien 3
for k in (13, 17):
    get_audio_split_points(audios_nos[k][0], aname = audios_nos[k][-1], start_div = 3, n_audios = 11, listen = True, plot = True)
 

In [None]:
# lista con el parametro de corte de cada audio
audios_start_div = [1 if i == 18 else (3 if i == 13 or i == 17 else 2) for i in range(len(audios_nos))]

# exporta audios
!pip install soundfile
import soundfile as sf

# si no existe la carpeta con los audios la crea y crea las subfolders con
# los nombres de los digitos
audios_nos_folder = "espichcomans"
if not os.path.exists(audios_nos_folder):
    os.mkdir(audios_nos_folder)

    for key in to_number.keys():
        os.mkdir(audios_nos_folder + '/' + key)


# para cada audio lo splitea y guarda los digitos
for k in range(len(audios_nos)):
    filename = audios_nos[k][-1].split('.')[0]
    audiox = audios_nos[k][0]
    fs = audios_nos[k][1]

    split_points = get_audio_split_points(audiox,
                                          aname = audios_nos[k][-1],
                                          start_div = audios_start_div[k],
                                          n_audios = 11,
                                          listen = False, plot = False)
    
    # necesito los indices para saber que numero es :)
    # algunas variables solamente por cuestion de legibilidad no lo dejo todo compactado
    for j in range(len(split_points)):
        folder = audios_nos_folder + '/' + list(to_number.keys())[j]
        start, end = split_points[j]
        sf.write("{}/{}_{}.wav".format(folder, filename, j), audiox[start:end], fs)
