In [1]:
import os
import numpy as np
import time
from scipy.io import wavfile as wav
import sys

from tqdm.notebook import tqdm

import matplotlib.pyplot as plt
import IPython.display as ipd

# Strumenti di classificazione
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import classification_report, confusion_matrix

# Feature audio avanzate
import librosa
import librosa.display as lid
from sklearn.preprocessing import scale
from sklearn.preprocessing import StandardScaler

In [2]:
def load_features(feature_extractor=lambda x : x, paths = ["recordings"], normalize=False):

    features = []
    
    for path in paths:
        print(f"Loading from {path}")    
        for f in tqdm(sorted(os.listdir(path))):
            if f.endswith('.wav'):
                # Carica file ed estraine le features
                _, signal = wav.read(path + "/" + f)
                cur_features = feature_extractor(signal)
                features.append(cur_features)

    return features

In [3]:
def load_labels(paths = ["recordings"], label_type = "number"):

    labels = []
    
    for path in paths:
        for f in sorted(os.listdir(path)):
            if f.endswith('.wav'):
                if label_type.startswith("n"):
                    label = f.split('_')[0]
                else:
                    label = f.split('_')[1]
                labels.append(label)

    return labels

## Features

In [4]:
def aavg(input):
    return np.mean(np.abs(input), keepdims=True)

In [5]:
def sdev(input):
    return np.std(input, keepdims=True)

In [6]:
def energy(input):
    return np.sum((input*1.0)**2, keepdims=True) 

In [7]:
def zcr(y):
    
    # segnale traslato di un'unità
    ty = np.roll(y, shift=-1)
    
    # confronto punto a punto del segno di y e ty
    d = np.sign(y[:-1]) - np.sign(ty[:-1])
    # [:-1] perché l'ultimo elemento di ty è uguale al primo elemento di y
    
    # siamo interessati a quando d è diverso da 0, cioè quando il segnale cambia segno
    dneq0 = np.where(d != 0)[0]
    
    # calcoliamo quante volte il segnale cambia segno e restituiamo il valore
    return dneq0.shape

In [8]:
def mfcc(input, rate=8000, min_len=40, sampling=1):
    # Campiona i valori
    signal = input[::sampling]
    # Calcola coefficienti MFCC
    mfcc = librosa.feature.mfcc(signal*1.0, sr=int(rate/sampling))
    # Applica eventuali zeri aggiuntivi per raggiungere una lunghezza fissa
    pad_width = min_len - mfcc.shape[1]
    mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant')
    # Appiattisci rappresentazione per uso con SVM
    mfcc = mfcc.flatten()
    return mfcc

In [9]:
def combo(input):
    return np.concatenate((sdev(input), aavg(input), energy(input), zcr(input), mfcc(input)))

## Classifier with label = number

In [10]:
features = load_features(feature_extractor=combo, paths=["recordings", "output"])

Loading from recordings


HBox(children=(FloatProgress(value=0.0, max=1500.0), HTML(value='')))


Loading from output


HBox(children=(FloatProgress(value=0.0, max=300.0), HTML(value='')))




In [11]:
labels1 = load_labels(paths=["recordings", "output"])

In [12]:
X_train, X_test, y_train1, y_test1 = train_test_split(features, labels1, test_size=0.2, random_state=1)

In [13]:
scaler1 = StandardScaler()
scaler1.fit(X_train)
X_train_scaled = scaler1.transform(X_train)
X_test_scaled = scaler1.transform(X_test)

In [14]:
clf1 = SVC(kernel='rbf', class_weight='balanced', gamma="auto")

In [15]:
clf1 = clf1.fit(X_train_scaled, y_train1)

In [16]:
y_pred1 = clf1.predict(X_test_scaled)

In [17]:
print(classification_report(y_test1, y_pred1))

              precision    recall  f1-score   support

           0       0.94      0.97      0.96        34
           1       1.00      0.92      0.96        37
           2       0.98      0.98      0.98        42
           3       0.89      1.00      0.94        25
           4       1.00      0.95      0.97        40
           5       1.00      1.00      1.00        33
           6       0.81      0.95      0.88        37
           7       1.00      1.00      1.00        34
           8       1.00      0.97      0.99        37
           9       1.00      0.90      0.95        41

    accuracy                           0.96       360
   macro avg       0.96      0.96      0.96       360
weighted avg       0.97      0.96      0.96       360



## Classifier with label = speaker

In [18]:
labels2 = load_labels(paths=["recordings", "output"], label_type="spearker")

In [19]:
X_train, X_test, y_train2, y_test2 = train_test_split(features, labels2, test_size=0.2, random_state=1)

In [20]:
scaler2 = StandardScaler()
scaler2.fit(X_train)
X_train_scaled = scaler2.transform(X_train)
X_test_scaled = scaler2.transform(X_test)

In [21]:
clf2 = SVC(kernel='rbf', class_weight='balanced', gamma="auto")

In [22]:
clf2 = clf2.fit(X_train_scaled, y_train2)

In [23]:
y_pred = clf2.predict(X_test_scaled)

In [24]:
print(classification_report(y_test2, y_pred))

              precision    recall  f1-score   support

      alinda       1.00      0.95      0.98        21
        gian       1.00      1.00      1.00        17
     jackson       0.99      0.99      0.99        88
      khaled       0.76      1.00      0.86        22
     nicolas       1.00      1.00      1.00       110
        theo       1.00      0.94      0.97       102

    accuracy                           0.98       360
   macro avg       0.96      0.98      0.97       360
weighted avg       0.98      0.98      0.98       360



# Prediction on the spot

In [25]:
import sounddevice as sd
import subprocess

In [26]:
def create_recording(duration, rec_rate, name = "test.wav", output_dir = "test/"):
    print("Ready in 3...", end = "")
    time.sleep(1)
    print("2...", end = "")
    time.sleep(1)
    print("1...")
    time.sleep(1)
    print("Go.")
    rec = sd.rec(int(duration * rec_rate), samplerate=rec_rate, channels=1, blocking=True)
    print("Playing the recording.")
    sd.play(rec, rec_rate)

    # after hearing the recording, decide whether to record it again or continue to next number
    # if you type anything, record again
    # if you press enter, save current recording & go to next number
    ok = input("OK?")
    if ok == "":
        librosa.output.write_wav(output_dir+name, rec, rec_rate)
        return rec
    ipd.clear_output(wait=True)
    create_recording(duration, rec_rate)

In [27]:
def trim_audio(file, input_dir="test/", output_dir="test/", db=-48):

    if not os.path.isdir(input_dir):
        print(f"There should be an input \"{input_dir}\" directory.")
        sys.exit(0)
    
    # create output directory if not there yet
    if not os.path.isdir(output_dir):
        os.makedirs(output_dir)
        
    temp1 = output_dir+"temp1.wav"
    temp2 = output_dir+"temp2.wav"
    temp3 = output_dir+"temp3.wav"
 
    subprocess.run(["ffmpeg", "-y", "-i", input_dir+file, "-af", f"silenceremove=1:0:{db}dB", temp1])
    subprocess.run(["ffmpeg", "-y", "-i", temp1, "-af", "areverse", temp2])
    subprocess.run(["ffmpeg", "-y", "-i", temp2, "-af", f"silenceremove=1:0.1:{db}dB", temp3])
    subprocess.run(["ffmpeg", "-y", "-i", temp3, "-af", "areverse", output_dir+file])
    
    os.remove(temp1)
    os.remove(temp2)
    os.remove(temp3)

In [28]:
def test_classifiers(clfs, scalers, duration=2, rec_rate=8000, directory = "test/", filename = "test.wav"):
    create_recording(duration, rec_rate, filename, directory)   
    ipd.clear_output()
    trim_audio(filename, directory, directory)
    _, rec = wav.read("test" + "/" + "test.wav")
    # sd.play(rec, rec_rate)
    rec_features = combo(rec.flatten())
    scaled_features = [0]*len(clfs)
    preds = scaled_features
    for i in range(len(clfs)):
        scaled_features[i] = scalers[i].transform([rec_features])
        preds[i] = clfs[i].predict(scaled_features[i])[0]
        print("Classifier {} prediction: {}".format(i+1, preds[i]))
    return preds

In [29]:
preds = test_classifiers([clf1, clf2], [scaler1, scaler2])

Classifier 1 prediction: 7
Classifier 2 prediction: gian
