In [45]:
from google.colab import drive
drive.mount('/content/gdrive')

LABELS = ["len", "xuong", "trai", "phai", "nhay", "ban", "A", "B", "sil"]
STATES = [3, 6, 3, 3, 3, 3, 3, 3, 1]
SAMPLE_COUNT = 5

MY_SAMPLE_DIR = "/content/gdrive/MyDrive/ColabNotebooks/SpeechProcessingEx2/src/sample_data"
SAMPLE_DIR = MY_SAMPLE_DIR # You may need to change this


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [46]:
# Change this line to suit your desired directory
# %cd /content/gdrive/MyDrive/ColabNotebooks/SpeechProcessingEx2/src
# install necessary packages
!pip install pydub
!pip install hmmlearn



In [47]:
!pwd

/content/gdrive/My Drive/ColabNotebooks/SpeechProcessingEx2/src


In [61]:
# if any fail, go to 2nd block to install
import os
import pydub
import shutil
import random
import pandas as pd
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
import multiprocessing
from hmmlearn import hmm
from re import template
from IPython import display
from audioop import mul


Sound Segmenter

In [49]:
try:
    shutil.rmtree("audio_per_labels")
except FileNotFoundError:
    pass


os.mkdir("audio_per_labels")
os.mkdir("audio_per_labels/audio")

for label in LABELS:
    os.mkdir(f"audio_per_labels/audio/{label}")


class SoundSegment:
    def __init__(self, start: float, end: float, label: str):
        self.start = start
        self.end = end
        self.label = label

    def __str__(self):
        return "({} - {}: {})".format(self.start, self.end, self.label)

    def __repr__(self):
        return self.__str__()


files = os.listdir("data/audio")
file_names = [file[:-4] for file in files]

# print(file_names)
for file in file_names:
    # read labels for that file
    labels = []
    with open(f"data/labels/{file}.txt", "r") as f:
        for line in f:
            if line.strip() == "":
                continue
            start, stop, label = line.split("\t")
            labels.append(SoundSegment(float(start), float(stop), label.strip()))

    # chop each label into its own wav
    for order, label in enumerate(labels):
        audio = pydub.AudioSegment.from_wav(f"data/audio/{file}.wav")
        audio = audio[int(label.start * 1000) : int(label.end * 1000)]
        audio.export(
            f"audio_per_labels/audio/{label.label}/{file}_{order+1}.wav", format="wav"
        )


Choose Sample (5 each)

In [50]:
random_os = random.SystemRandom()

LABELS.remove("sil")
try:
    shutil.rmtree("sample_data")
except FileNotFoundError:
    pass

os.mkdir("sample_data")

for label in LABELS:
    for count in range(1, SAMPLE_COUNT + 1):
        files = os.listdir(f"audio_per_labels/audio/{label}")
        files = [file for file in files if file.endswith(".wav")]
        random_os.shuffle(files)
        for file in files[:SAMPLE_COUNT]:
            shutil.copy(
                f"audio_per_labels/audio/{label}/{file}",
                f"sample_data/{label}-{count}.wav",
            )


Extract mfcc features

In [60]:


from common import *


def extract_mfcc(folder: str, file: str) -> np.ndarray:
    y, sr = librosa.load(f"audio_per_labels/audio/{folder}/{file}.wav")
    S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=8000)
    mfcc = librosa.feature.mfcc(S=S, n_mfcc=13)
    mfcc_delta = librosa.feature.delta(mfcc)
    mfcc_delta2 = librosa.feature.delta(mfcc, order=2)

    return np.concatenate((mfcc, mfcc_delta, mfcc_delta2))


def extract_mfccs(folder):
    os.mkdir(f"mfcc/{folder}")
    os.mkdir(f"mfcc/{folder}/npy")
    os.mkdir(f"mfcc/{folder}/plt")
    for file in os.listdir(f"audio_per_labels/audio/{folder}"):
        if file.endswith(".wav"):
            ret = extract_mfcc(folder, file)


try:
    shutil.rmtree("mfcc")
except FileNotFoundError:
    pass

# os.mkdir("mfcc")

print(extract_mfcc("A", "01_10"))


[[ 3.25190449e+00  1.88488827e+01  6.36205025e+01  9.51958008e+01
   1.01946388e+02  1.11591797e+02  1.26924263e+02  1.40641769e+02
   1.43555908e+02  1.32162308e+02  1.34595139e+02  7.48077698e+01
   1.98962555e+01  1.01737175e+01  6.70102787e+00  8.20879745e+00
   4.43012762e+00  1.19451904e+00  1.03522348e+00]
 [ 2.35321832e+00  1.80540009e+01  6.67158508e+01  1.04614609e+02
   1.14655045e+02  1.33825195e+02  1.64764587e+02  1.87442322e+02
   1.96687973e+02  1.83681854e+02  1.88038727e+02  1.04576416e+02
   2.77022476e+01  1.41763668e+01  9.33990097e+00  1.14655437e+01
   6.17407894e+00  1.63782096e+00  1.41432965e+00]
 [-2.13513583e-01  6.44624710e+00  3.48294830e+01  6.46249542e+01
   7.69506454e+01  1.06099930e+02  1.46597107e+02  1.72851776e+02
   1.86843689e+02  1.77306381e+02  1.82597321e+02  1.01526093e+02
   2.66682606e+01  1.36827402e+01  9.01663780e+00  1.10993347e+01
   5.96456242e+00  1.54464483e+00  1.31902349e+00]
 [ 1.78988934e-01  4.90278196e+00  2.91762199e+01  5.49

In [58]:
!pwd

/content/gdrive/MyDrive/ColabNotebooks/SpeechProcessingEx2/src


DTW (Dynamic Time Wrapping)

In [51]:
def mfcc_extract(sound, sr):
    mfcc = librosa.feature.mfcc(
        y=sound, n_mfcc=13, sr=sr, n_mels=128, fmax=8000, power=2, n_fft=1024
    )
    delta_mfcc = librosa.feature.delta(mfcc)
    delta2_mfcc = librosa.feature.delta(mfcc, order=2)
    return np.concatenate((mfcc, delta_mfcc, delta2_mfcc))


def dtw(sound, reference):
    D, wp = librosa.sequence.dtw(sound, reference, subseq=True, metric="euclidean")
    return D[-1, -1] / wp.shape[0]

try:
    LABELS.remove("sil")
except:
   pass
template_mfccs = {}

for label in LABELS:
    template_mfccs[label] = []
    for count in range(1, SAMPLE_COUNT + 1):
        file = f"{SAMPLE_DIR}/{label}-{count}.wav"
        sample, sample_sr = librosa.load(file)
        sample_mfcc = mfcc_extract(sample, sample_sr)
        template_mfccs[label].append(sample_mfcc)


def do_recognition(file: str) -> str:
    # load sample
    sample, sample_sr = librosa.load(file)
    sample_mfcc = mfcc_extract(sample, sample_sr)

    # calculate dtw distance from sample to all templates
    distances = {}
    for label in LABELS:
        distances[label] = []
        for template_mfcc in template_mfccs[label]:
            distances[label].append(dtw(sample_mfcc, template_mfcc))

    # find the shortest distance for each label
    min_distances = {}
    for label in LABELS:
        min_distances[label] = min(distances[label])

    # print(min_distances)

    # shortest distance is the match
    min_label = min(min_distances, key=min_distances.get)

    return min_label


for label in LABELS:
    correct_label = label
    recd = []

    for file in os.listdir(f"audio_per_labels/audio/{label}"):
        if file.endswith(".wav"):
            recd_label = do_recognition(f"audio_per_labels/audio/{label}/{file}")
            recd.append(recd_label)

    # calculate accuracy in percentage
    accuracy = sum([1 for label in recd if label == correct_label]) / len(recd) * 100

    # calculate distribution of labels
    distribution = {}
    for label in LABELS:
        distribution[label] = recd.count(label) / len(recd) * 100

    # sort distribution
    sorted_distribution = sorted(distribution.items(), key=lambda x: x[1])
    sorted_distribution.reverse()

    print(f"+ Label {correct_label}, Accuracy: {accuracy:.2f}%", end="\nDetails: ")

    for label, percentage in sorted_distribution:
        print(f"{label}: {percentage:.2f}%", end=", ")

    print()


+ Label len, Accuracy: 56.80%
Details: len: 56.80%, B: 26.40%, nhay: 8.00%, A: 7.20%, trai: 1.60%, ban: 0.00%, phai: 0.00%, xuong: 0.00%, 
+ Label xuong, Accuracy: 95.00%
Details: xuong: 95.00%, ban: 2.50%, nhay: 1.67%, trai: 0.83%, B: 0.00%, A: 0.00%, phai: 0.00%, len: 0.00%, 
+ Label trai, Accuracy: 77.17%
Details: trai: 77.17%, A: 13.39%, nhay: 3.15%, phai: 3.15%, ban: 2.36%, len: 0.79%, B: 0.00%, xuong: 0.00%, 
+ Label phai, Accuracy: 88.60%
Details: phai: 88.60%, A: 4.39%, ban: 3.51%, trai: 2.63%, nhay: 0.88%, B: 0.00%, xuong: 0.00%, len: 0.00%, 
+ Label nhay, Accuracy: 84.06%
Details: nhay: 84.06%, A: 7.25%, ban: 2.17%, len: 2.17%, phai: 1.45%, trai: 1.45%, B: 0.72%, xuong: 0.72%, 
+ Label ban, Accuracy: 77.31%
Details: ban: 77.31%, nhay: 10.92%, trai: 5.04%, A: 3.36%, phai: 1.68%, B: 0.84%, len: 0.84%, xuong: 0.00%, 
+ Label A, Accuracy: 77.87%
Details: A: 77.87%, nhay: 13.11%, trai: 4.10%, ban: 1.64%, phai: 1.64%, len: 1.64%, B: 0.00%, xuong: 0.00%, 
+ Label B, Accuracy: 65.19%

In [52]:
print(LABELS)
nLabels = len(LABELS)
STATES = STATES[:nLabels]
print(STATES)

['len', 'xuong', 'trai', 'phai', 'nhay', 'ban', 'A', 'B']
[3, 6, 3, 3, 3, 3, 3, 3]


In [53]:
def get_mfcc(file: str):
    y, sr = librosa.load(file)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    delta = librosa.feature.delta(mfcc)
    delta2 = librosa.feature.delta(mfcc, order=2)
    return np.vstack((mfcc, delta, delta2)).T

data = {}
idx_labels = {}

# mfcc in data, index of label in idx_labels

for label in LABELS:
    if label == "sil":
        continue
    files = os.listdir(f"audio_per_labels/audio/{label}")
    _data = [get_mfcc(f"audio_per_labels/audio/{label}/{file}") for file in files]
    data[label] = _data
    idx_labels[label] = [LABELS.index(label)] * len(_data)


HMM (Hidden Markov Model)

In [54]:
from sklearn.model_selection import train_test_split

X = {'train': {}, 'test': {}}
y = {'train': {}, 'test': {}}

for label in LABELS:
    x_train, x_test, y_train, y_test = train_test_split(data[label], idx_labels[label], test_size=0.2)

    X['train'][label] = x_train
    X['test'][label] = x_test
    y['train'][label] = y_train
    y['test'][label] = y_test
for label in LABELS:
    print(f"{label}: {len(X['train'][label])} / {len(X['test'][label])}")

print(X['train']['A'][0].shape)

len: 100 / 25
xuong: 96 / 24
trai: 101 / 26
phai: 91 / 23
nhay: 110 / 28
ban: 95 / 24
A: 97 / 25
B: 108 / 27
(21, 39)


In [55]:
models = {}

for idx, label in enumerate(LABELS):
    models[label] = hmm.GMMHMM(n_components=STATES[idx], covariance_type="diag", n_iter=300)
    models[label].fit(X=np.vstack(X['train'][label]), lengths=[x.shape[0] for x in X['train'][label]])  


HMM's results

In [56]:
from sklearn.metrics import classification_report
y_true = []
y_preds = []

for label in LABELS:
    for mfcc, target in zip(X['test'][label], y['test'][label]):
        scores = [models[label].score(mfcc) for label in LABELS]
        preds = np.argmax(scores)
        y_true.append(target)
        y_preds.append(preds)

report = classification_report(y_true, y_preds)
print(report)

              precision    recall  f1-score   support

           0       1.00      0.96      0.98        25
           1       1.00      1.00      1.00        24
           2       1.00      1.00      1.00        26
           3       1.00      1.00      1.00        23
           4       1.00      0.89      0.94        28
           5       0.92      1.00      0.96        24
           6       1.00      1.00      1.00        25
           7       0.93      1.00      0.96        27

    accuracy                           0.98       202
   macro avg       0.98      0.98      0.98       202
weighted avg       0.98      0.98      0.98       202

