In [1]:
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import librosa
import sys
import torch
import random
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
import statistics
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn import svm
from sklearn import preprocessing
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch import optim
import torch.utils.data as data_utils
import os
import re
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

In [2]:
import os
from glob import glob

import librosa
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm


def parse_free_digits(directory):
    # Parse relevant dataset info
    files = glob(os.path.join(directory, "*.wav"))
    fnames = [f.split("/")[1].split(".")[0].split("_") for f in files]
    ids = [f[2] for f in fnames]
    y = [int(f[0]) for f in fnames]
    speakers = [f[1] for f in fnames]
    _, Fs = librosa.core.load(files[0], sr=None)

    def read_wav(f):
        wav, _ = librosa.core.load(f, sr=None)

        return wav

    # Read all wavs
    wavs = [read_wav(f) for f in files]

    # Print dataset info
    print("Total wavs: {}. Fs = {} Hz".format(len(wavs), Fs))

    return wavs, Fs, ids, y, speakers


def extract_features(wavs, n_mfcc=6, Fs=8000):
    # Extract MFCCs for all wavs
    window = 30 * Fs // 1000
    step = window // 2
    frames = [
        librosa.feature.mfcc(
            y=wav, sr=Fs, n_fft=window, hop_length=window - step, n_mfcc=n_mfcc
        ).T
        for wav in tqdm(wavs, desc="Extracting mfcc features...")
    ]

    print("Feature extraction completed with {} mfccs per frame".format(n_mfcc))

    return frames


def split_free_digits(frames, ids, speakers, labels):
    print("Splitting in train test split using the default dataset split")
    # Split to train-test
    X_train, y_train, spk_train = [], [], []
    X_test, y_test, spk_test = [], [], []
    test_indices = ["0", "1", "2", "3", "4"]

    for idx, frame, label, spk in zip(ids, frames, labels, speakers):
        if str(idx) in test_indices:
            X_test.append(frame)
            y_test.append(label)
            spk_test.append(spk)
        else:
            X_train.append(frame)
            y_train.append(label)
            spk_train.append(spk)

    return X_train, X_test, y_train, y_test, spk_train, spk_test


def make_scale_fn(X_train):
    # Standardize on train data
    scaler = StandardScaler()
    scaler.fit(np.concatenate(X_train))
    print("Normalization will be performed using mean: {}".format(scaler.mean_))
    print("Normalization will be performed using std: {}".format(scaler.scale_))

    def scale(X):
        scaled = []

        for frames in X:
            scaled.append(scaler.transform(frames))
        return scaled

    return scale


def parser(directory, n_mfcc=6):
    wavs, Fs, ids, y, speakers = parse_free_digits(directory)
    frames = extract_features(wavs, n_mfcc=n_mfcc, Fs=Fs)
    X_train, X_test, y_train, y_test, spk_train, spk_test = split_free_digits(
        frames, ids, speakers, y
    )

    return X_train, X_test, y_train, y_test, spk_train, spk_test

In [5]:
###Βήμα 9
X_train, X_test, y_train, y_test, spk_train, spk_test = parser('recordings/') #parse recordings/

Total wavs: 3000. Fs = 8000 Hz


Extracting mfcc features...: 100%|██████████| 3000/3000 [00:06<00:00, 435.35it/s]

Feature extraction completed with 6 mfccs per frame
Splitting in train test split using the default dataset split





In [6]:
X_train, X_dev, y_train, y_dev = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train) #80% train and 20% test
print("If using all data to calculate normalization statistics")
scale_fn = make_scale_fn(X_train + X_dev + X_test)
print("If using X_train + X_dev to calculate normalization statistics")
scale_fn = make_scale_fn(X_train + X_dev)
print("If using X_train to calculate normalization statistics")
scale_fn = make_scale_fn(X_train)
X_train = scale_fn(X_train)
X_dev = scale_fn(X_dev)
X_test = scale_fn(X_test)

If using all data to calculate normalization statistics
Normalization will be performed using mean: [-517.82970067   62.3857955    18.81777176    9.58994408  -19.21332918
  -10.9054417 ]
Normalization will be performed using std: [152.29960089  51.98705829  36.71929108  29.63888661  24.80403283
  23.39483933]
If using X_train + X_dev to calculate normalization statistics
Normalization will be performed using mean: [-517.77180304   62.41689972   18.86552787    9.61652008  -19.17346574
  -10.77057825]
Normalization will be performed using std: [152.46343541  51.98376561  36.72489087  29.65300818  24.84129996
  23.30360999]
If using X_train to calculate normalization statistics
Normalization will be performed using mean: [-516.76986827   62.32102779   18.69007933    9.57460784  -19.35101183
  -11.04056045]
Normalization will be performed using std: [151.40761838  52.10330368  36.65275522  29.60096666  24.85887974
  23.38236953]


In [15]:
#check number of samples for digit for train set and test set and then print the results
train_number = []
test_number = []
y_t = np.array(y_train)
y_te = np.array(y_test)
for i in range(0,10):
    train_number.append((np.where(y_t == i)[0]).shape[0])
print("For train set:")
for i in range(len(train_number)):                          
    print('Number of samples for digit {}:'.format(i),train_number[i])
print("For test set:")
for i in range(0,10):
    test_number.append((np.where(y_te == i)[0]).shape[0])
for i in range(len(test_number)):                          
    print('Number of samples for digit {}:'.format(i),test_number[i])

For train set:
Number of samples for digit 0: 216
Number of samples for digit 1: 216
Number of samples for digit 2: 216
Number of samples for digit 3: 216
Number of samples for digit 4: 216
Number of samples for digit 5: 216
Number of samples for digit 6: 216
Number of samples for digit 7: 216
Number of samples for digit 8: 216
Number of samples for digit 9: 216
For test set:
Number of samples for digit 0: 30
Number of samples for digit 1: 30
Number of samples for digit 2: 30
Number of samples for digit 3: 30
Number of samples for digit 4: 30
Number of samples for digit 5: 30
Number of samples for digit 6: 30
Number of samples for digit 7: 30
Number of samples for digit 8: 30
Number of samples for digit 9: 30


In [23]:
###Βήμα 10
from pomegranate.distributions import Normal
from pomegranate.gmm import GeneralMixtureModel
from pomegranate.hmm import DenseHMM
from pomegranate import *

In [25]:
#create GMM HMM model with the following parameters: X which will be one digit, states, Gaussian distributions, whether to use GMM or not
class GMM_HMM:
    def __init__(self, X, n_states=4, n_mixtures=4, gmm=True):
        self.X = X                                                        
        self.n_states = n_states                                         
        self.n_mixtures = n_mixtures                                     
        self.gmm = gmm 
        self.max_iterations = 5
        self.trans_matrix = np.array([[0.5, 0.5, 0, 0],
                          [0, 0.5, 0.5, 0], 
                          [0, 0, 0.5, 0.5], 
                          [0, 0, 0, 1.0]]) 
        self.start = np.array([1.0, 0, 0, 0])
        self.dists = []                                                 # list of probability distributions for the HMM states
        for i in range(self.n_states):
              if self.gmm:
                  a = GeneralMixtureModel.from_samples(MultivariateGaussianDistribution, self.n_mixtures, np.float_(self.X))
              else:
                  a = MultivariateGaussianDistribution.from_samples(X)
              self.dists.append(a)
        self.model = HiddenMarkovModel.from_matrix(self.trans_matrix, self.dists, self.starts, self.ends, state_names=['s{}'.format(i) for i in range(n_states)]) 
      
    def fit(self, X):
        self.model.fit(X, max_iterations=self.max_iterations)
        return self
    
    def predict(self, X):
        logp, _ = self.model.viterbi(X)
        return logp
