***Práctica 1: Sensado y análisis de audio***

In [1]:
%%capture
!pip install sounddevice

In [2]:
import os
import wave
import numpy as np
import librosa
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import accuracy_score, classification_report
import joblib

Funciones para la generacion de audios nulos

In [3]:
def generate_silent_wav(filepath, duration_sec=1, sample_rate=44100, channels=1, noise_std=5):
    n_samples = int(sample_rate * duration_sec)
    
    # Generar un silencio con variaciones para simular sonido de ambiente
    silence = np.random.normal(loc=0, scale=noise_std, size=n_samples * channels)
    silence = silence.astype(np.int16)
    
    with wave.open(filepath, 'w') as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(2)
        wf.setframerate(sample_rate)
        wf.writeframes(silence.tobytes())

def generate_null_audios(n_files=3, duration=1.0):
    base_folder = "Audios"
    null_folder = os.path.join(base_folder, "Audios_Null")
    os.makedirs(null_folder, exist_ok=True)
    
    for i in range(n_files):
        for n in range(5):
            file_name = f"null-0{i+1}_0{n+1}.wav"
            file_path = os.path.join(null_folder, file_name)
            generate_silent_wav(file_path, duration_sec=duration)
            print(f"Generated: {file_path}")

Cargado de audios

In [4]:
def load_audios_into_dataframe(root_folder="Audios", valid_extensions=('.wav', '.mp3', '.m4a', '.flac', '.ogg', '.wma', '.aac')):
    audio_files = []
    for dirpath, _, filenames in os.walk(root_folder):
        for filename in filenames:
            if filename.lower().endswith(valid_extensions):
                full_path = os.path.join(dirpath, filename)
                audio_files.append(full_path)
    
    data = []
    for filepath in audio_files:
        filename = os.path.basename(filepath)
        label = os.path.basename(os.path.dirname(filepath))
        try:
            audio_data, sample_rate = librosa.load(filepath, sr=None)
            duration = librosa.get_duration(y=audio_data, sr=sample_rate)
        except Exception as e:
            print(f"Error loading {filepath}: {e}")
            audio_data, sample_rate, duration = None, None, None
        
        data.append({
            'filepath': filepath,
            'filename': filename,
            'label': label,
            'audio': audio_data,
            'sample_rate': sample_rate,
            'duration': duration
        })
    
    return pd.DataFrame(data)

Generacion de caracteristicas

In [5]:
def add_noise(audio, noise_factor=0.025):
    # Agregar ruido blanco al recording
    noise = np.random.randn(len(audio))
    return audio + noise_factor * noise

def apply_time_stretch(audio, rate=1.2):
    # Alangar audio
    return librosa.effects.time_stretch(audio, rate=rate)

def apply_pitch_shift(audio, sr, n_steps=2):
    # Cambiar tono
    return librosa.effects.pitch_shift(audio, sr=sr, n_steps=n_steps)

def change_volume(audio, factor=1.5):
    # Cambiar volumen
    return audio * factor

def augment_audio(audio, sr):
    augmentations = []
    
    # Original
    augmentations.append(('original', audio))
    
    # Ruido blanco
    noise_audio = add_noise(audio, noise_factor=0.025)
    augmentations.append(('noise', noise_audio))
    
    # Audio rapido
    ts_audio = apply_time_stretch(audio, rate=1.2)
    augmentations.append(('time_stretch_longer', ts_audio))

    # Audio lento
    ts_audio = apply_time_stretch(audio, rate=0.8)
    augmentations.append(('time_stretch_slower', ts_audio))
    
    # Tono alto
    ps_audio = apply_pitch_shift(audio, sr, n_steps=2)
    augmentations.append(('pitch_shift_alto', ps_audio))

    # Tono bajo
    ps_audio = apply_pitch_shift(audio, sr, n_steps=-2)
    augmentations.append(('pitch_shift_bajo', ps_audio))
    
    # Subir volumen
    vol_audio = change_volume(audio, factor=1.5)
    augmentations.append(('volume_up', vol_audio))

    # Bajar volumen
    vol_audio = change_volume(audio, factor=0.5)
    augmentations.append(('volume_down', vol_audio))
    
    return augmentations

def create_augmented_dataframe(df):
    # Crear nuevos audios con los cambios
    augmented_data = []
    for idx, row in df.iterrows():
        audio = row['audio']
        sr = row['sample_rate']
        label = row['label']
        base_filepath = row['filepath']
        filename = row['filename']
        
        aug_list = augment_audio(audio, sr)
        for aug_method, aug_audio in aug_list:
            try:
                duration = librosa.get_duration(y=aug_audio, sr=sr)
            except Exception as e:
                print(f"Error al calcular duracion {filename} ({aug_method}): {e}")
                duration = None
            
            augmented_data.append({
                'original_filepath': base_filepath,
                'filename': filename,
                'label': label,
                'aug_method': aug_method,
                'audio': aug_audio,
                'sample_rate': sr,
                'duration': duration
            })
    
    return pd.DataFrame(augmented_data)

Extraccion de caracteristicas

In [6]:
def extract_features_from_audio(audio, sr):
    # Extraccion de caracteristicas
    
    features = {}
    features['duration'] = librosa.get_duration(y=audio, sr=sr)
    
    # Zero Crossing Rate
    zcr = librosa.feature.zero_crossing_rate(audio)
    features['zero_crossing_rate'] = float(np.mean(zcr))
    
    # Spectral Centroid
    spec_centroid = librosa.feature.spectral_centroid(y=audio, sr=sr)
    features['spectral_centroid'] = float(np.mean(spec_centroid))
    
    # Spectral Rolloff
    spec_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sr)
    features['spectral_rolloff'] = float(np.mean(spec_rolloff))
    
    # MFCCs (first 13 coefficients)
    mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
    for i in range(13):
        features[f'mfcc_{i+1}'] = float(np.mean(mfccs[i]))
    
    # RMS Energy
    rms = librosa.feature.rms(y=audio)
    features['rms'] = float(np.mean(rms))
    
    # Tempo (BPM)
    try:
        tempo, _ = librosa.beat.beat_track(y=audio, sr=sr)
        if isinstance(tempo, (list, np.ndarray)):
            features['tempo'] = float(tempo[0])
        elif isinstance(tempo, str):
            features['tempo'] = float(tempo.strip("[]"))
        else:
            features['tempo'] = float(tempo)
    except Exception as e:
        print(f"Error computing tempo: {e}")
        features['tempo'] = None

    return features

def create_features_from_augmented_dataframe(df_augmented):
    
    # Crear las caracteristicas para los audios del dataframe
    feature_data = []
    for idx, row in df_augmented.iterrows():
        audio = row['audio']
        sr = row['sample_rate']
        features = extract_features_from_audio(audio, sr)
        
        record = {
            'original_filepath': row['original_filepath'],
            'filename': row['filename'],
            'label': row['label'],
            'aug_method': row['aug_method']
        }
        record.update(features)
        feature_data.append(record)
    
    return pd.DataFrame(feature_data)

In [7]:

# Generar audios nulos
print("=== Generating Null Audio Files ===")
generate_null_audios(n_files=3, duration=4.0)

# Cargar audios
print("\n=== Loading Audio Files ===")
df_audios = load_audios_into_dataframe("Audios")
print("Loaded Audios DataFrame:")
print(df_audios.head())
    
# Generacion de audios sinteticos
print("\n=== Creating Augmented Audio Versions ===")
df_augmented = create_augmented_dataframe(df_audios)
print("Augmented Audios DataFrame:")
print(df_augmented.head())
    
# Extraccion de caracteristicas
print("\n=== Extracting Audio Features ===")
df_features = create_features_from_augmented_dataframe(df_augmented)
print("Extracted Audio Features DataFrame:")
print(df_features.head())

# Guardar los datos
df_audios.to_csv("loaded_audios.csv", index=False)
df_augmented.to_csv("augmented_audios.csv", index=False)
df_features.to_csv("extracted_audio_features.csv", index=False)
print("\nDataFrames have been saved to CSV files.")


=== Generating Null Audio Files ===
Generated: Audios/Audios_Null/null-01_01.wav
Generated: Audios/Audios_Null/null-01_02.wav
Generated: Audios/Audios_Null/null-01_03.wav
Generated: Audios/Audios_Null/null-01_04.wav
Generated: Audios/Audios_Null/null-01_05.wav
Generated: Audios/Audios_Null/null-02_01.wav
Generated: Audios/Audios_Null/null-02_02.wav
Generated: Audios/Audios_Null/null-02_03.wav
Generated: Audios/Audios_Null/null-02_04.wav
Generated: Audios/Audios_Null/null-02_05.wav
Generated: Audios/Audios_Null/null-03_01.wav
Generated: Audios/Audios_Null/null-03_02.wav
Generated: Audios/Audios_Null/null-03_03.wav
Generated: Audios/Audios_Null/null-03_04.wav
Generated: Audios/Audios_Null/null-03_05.wav

=== Loading Audio Files ===


  audio_data, sample_rate = librosa.load(filepath, sr=None)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  audio_data, sample_rate = librosa.load(filepath, sr=None)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Loaded Audios DataFrame:
                      filepath         filename label  \
0  Audios/Luis/LuisG-03_02.m4a  LuisG-03_02.m4a  Luis   
1  Audios/Luis/LuisG-03_03.m4a  LuisG-03_03.m4a  Luis   
2  Audios/Luis/LuisG-01_04.m4a  LuisG-01_04.m4a  Luis   
3  Audios/Luis/LuisG-03_01.m4a  LuisG-03_01.m4a  Luis   
4  Audios/Luis/LuisG-01_05.m4a  LuisG-01_05.m4a  Luis   

                                               audio  sample_rate  duration  
0  [-0.00064086914, -0.00036621094, -0.0005493164...        48000  2.281333  
1  [-0.0016479492, -0.0016784668, -0.0018615723, ...        48000  2.644000  
2  [0.0005187988, 0.00045776367, 0.00033569336, 0...        48000  4.073333  
3  [-0.00033569336, -0.00024414062, -0.0004272461...        48000  2.324000  
4  [-3.0517578e-05, -0.00064086914, -0.0006713867...        48000  3.156000  

=== Creating Augmented Audio Versions ===
Augmented Audios DataFrame:
             original_filepath         filename label           aug_method  \
0  Audios/Luis/

Tomar archivos y categorizar por quien esta hablando y que frase es

In [8]:
def parse_filename(filename):
    base = os.path.splitext(filename)[0]
    try:
        name_phrase, recording_id = base.split('_')
        name, phrase = name_phrase.split('-')
    except ValueError:
        raise ValueError(f"Archivo {filename} Formato incorrecto")
    return name, phrase, recording_id

def add_metadata_from_filename(df):
    speakers, phrases = [], []
    for fn in df['filename']:
        speaker, phrase, _ = parse_filename(fn)
        speakers.append(speaker)
        phrases.append(phrase)
    df['speaker'] = speakers
    df['phrase'] = phrases
    return df

*Algoritmos de categorizacion para el audio.*

In [9]:
# Classification ML algorithms
lr = LogisticRegression(solver='sag')
dt_clf = DecisionTreeClassifier()
rn_clf = RandomForestClassifier()
knn_clf = KNeighborsClassifier()
gb = GaussianNB()
sgd = SGDClassifier()

def trainingModels(x_train, x_test, y_train, y_test):
    d = {}
    li = [lr, sgd, rn_clf, knn_clf, gb, dt_clf]
    models = {}
    for i in li:
        i.fit(x_train, y_train)
        y_pred = i.predict(x_test)
        accuracy = accuracy_score(y_test, y_pred)
        model_name = i.__class__.__name__.split('(')[0]
        print(model_name, "accuracy:", accuracy)
        d.update({model_name: accuracy})
        models.update({model_name: i})
    return d, models

*Entrenar modelos*

In [10]:
df_features = pd.read_csv("extracted_audio_features.csv")

speakers, phrases = [], []
for fn in df_features['filename']:
    speaker, phrase, _ = parse_filename(fn)
    speakers.append(speaker)
    phrases.append(phrase)
df_features['speaker'] = speakers
df_features['phrase'] = phrases

# Seleccionamos las caracteristicas que no dan directamente quien es
feature_columns = ['zero_crossing_rate', 'spectral_centroid',
                   'spectral_rolloff', 'rms', 'tempo'] + [f'mfcc_{i+1}' for i in range(13)]

phrase_models = {}
phrase_data = {}

for phrase in ["01", "02", "03"]:
    df_phrase = df_features[df_features['phrase'] == phrase]
    X = df_phrase[feature_columns]
    y = df_phrase['speaker']
    
    # Guardar datos para poder hacer cross testing
    phrase_data[phrase] = (X, y)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Probando modelos
    # m1, models1 = trainingModels(X_train,X_test,y_train,y_test)
    # print(m1, models1)
    
    # Se usa Random forest al ser el que mostro mejor rendimiento
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train, y_train)
    
    # Testeo con la misma frase
    print(f"=== Testeo de {phrase} ===")
    y_pred = clf.predict(X_test)
    print(classification_report(y_test, y_pred))
    
    # Guardando el modelo para usarlo en live despues
    phrase_models[phrase] = clf

# Cross test
print("\n=== Cross Tests ===")
for train_phrase, model in phrase_models.items():
    print(f"\Modelo entrenado con {train_phrase}:")
    for test_phrase, (X_test, y_test) in phrase_data.items():
        y_pred = model.predict(X_test)
        print(f"\n Prueba con {test_phrase}:")
        print(classification_report(y_test, y_pred))

=== Testeo de 01 ===
                 precision    recall  f1-score   support

         AdriaM       1.00      0.86      0.92         7
           AleM       1.00      1.00      1.00         4
  AndresCalzada       1.00      1.00      1.00         8
   ArathDaniela       1.00      1.00      1.00        11
         Ariana       0.90      1.00      0.95         9
          BetoM       0.75      1.00      0.86         6
          Bruce       1.00      1.00      1.00         8
         Camila       1.00      1.00      1.00         5
          Cielo       0.90      1.00      0.95         9
         DafneA       1.00      1.00      1.00         8
         Daniel       0.88      0.88      0.88        17
          David       1.00      0.90      0.95        10
         Didier       1.00      1.00      1.00         8
            Eri       1.00      0.86      0.92         7
           Erik       1.00      0.90      0.95        10
       Fernando       0.89      1.00      0.94         8
        I

*Prueba en vivo*

In [11]:
# Exportamos los modelos creados
for phrase, model in phrase_models.items():
    filename = f"model_phrase_{phrase}.pkl"
    joblib.dump(model, filename)
    print(f"Model for phrase {phrase} exported to {filename}")

Model for phrase 01 exported to model_phrase_01.pkl
Model for phrase 02 exported to model_phrase_02.pkl
Model for phrase 03 exported to model_phrase_03.pkl


In [12]:
import time
import numpy as np
import sounddevice as sd
import joblib

# Grabar Voz
def record_audio(duration=4, sample_rate=44100):
    print(f"Grabando por {duration} segundos...")
    audio = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1)
    sd.wait()
    return np.squeeze(audio), sample_rate

# Cargar modelo
def load_model(model_filename):
    return joblib.load(model_filename)

# Hacer prediccion
def predict_speaker(model, feature_vector):
    prediction = model.predict(feature_vector)
    return prediction[0]

# Loop
def main_inference_loop():
    # Cargar modelos 
    model_1 = load_model("model_phrase_01.pkl")
    model_2 = load_model("model_phrase_02.pkl")
    model_3 = load_model("model_phrase_03.pkl")
    
    print("Habla al microfono...")
    try:
        while True:
            # Grabar audio
            audio, sr = record_audio()
            
            # Tomar caracteristicas de grabacion
            features = extract_features_from_audio(audio, sr)
            
            # Obtener vectores
            feature_vector = pd.DataFrame([features], columns=feature_columns)
            
            # Predicciones
            predicted_speaker_1 = predict_speaker(model_1, feature_vector)
            predicted_speaker_2 = predict_speaker(model_2, feature_vector)
            predicted_speaker_3 = predict_speaker(model_3, feature_vector)
            print("Predicted Speaker 1:", predicted_speaker_1)
            print("Predicted Speaker 2:", predicted_speaker_1)
            print("Predicted Speaker 3:", predicted_speaker_1)
            
            # Pausa
            time.sleep(0.5)
    except KeyboardInterrupt:
        print("Detenido.")

# Llamar al loop
if __name__ == "__main__":
    main_inference_loop()


Habla al microfono...
Grabando por 4 segundos...
Predicted Speaker 1: Bruce
Predicted Speaker 2: Bruce
Predicted Speaker 3: Bruce
Grabando por 4 segundos...
Predicted Speaker 1: Jorge
Predicted Speaker 2: Jorge
Predicted Speaker 3: Jorge
Grabando por 4 segundos...
Predicted Speaker 1: Jorge
Predicted Speaker 2: Jorge
Predicted Speaker 3: Jorge
Grabando por 4 segundos...
Predicted Speaker 1: Daniel
Predicted Speaker 2: Daniel
Predicted Speaker 3: Daniel
Grabando por 4 segundos...
Predicted Speaker 1: Daniel
Predicted Speaker 2: Daniel
Predicted Speaker 3: Daniel
Grabando por 4 segundos...
Predicted Speaker 1: Jorge
Predicted Speaker 2: Jorge
Predicted Speaker 3: Jorge
Grabando por 4 segundos...
Predicted Speaker 1: Cielo
Predicted Speaker 2: Cielo
Predicted Speaker 3: Cielo
Grabando por 4 segundos...
Predicted Speaker 1: Jorge
Predicted Speaker 2: Jorge
Predicted Speaker 3: Jorge
Grabando por 4 segundos...
Predicted Speaker 1: Daniel
Predicted Speaker 2: Daniel
Predicted Speaker 3: Dani