In [6]:
import math
import librosa
import numpy as np
import os
import pickle
import matplotlib.pyplot as plt
import pyaudio
import wave
from hmmlearn.hmm import GMMHMM
from pydub import AudioSegment
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

In [2]:
def get_mfcc(file_path):
    y, sr = librosa.load(file_path) 
    hop_length = math.floor(sr * 0.015)  # 10ms hop
    win_length = math.floor(sr * 0.030)  # 25ms frame

    # Trích xuất MFCC
    mfcc = librosa.feature.mfcc(
        y=y, sr=sr, n_mfcc=12, n_fft=1024,
        hop_length=hop_length, win_length=win_length)
    
    # Trích xuất thành phần năng lượng và khớp kích thước
    energy = librosa.feature.rms(y=y, frame_length=win_length, hop_length=hop_length)
    if energy.shape[1] < mfcc.shape[1]:
        energy = np.pad(energy, ((0, 0), (0, mfcc.shape[1] - energy.shape[1])), mode='constant')
    elif energy.shape[1] > mfcc.shape[1]:
        mfcc = np.pad(mfcc, ((0, 0), (0, energy.shape[1] - mfcc.shape[1])), mode='constant')

    # Thêm thành phần năng lượng vào MFCC
    mfcc = np.vstack((mfcc, energy))

    # Chuẩn hóa MFCC bằng cách trừ đi giá trị trung bình
    mfcc = mfcc - np.mean(mfcc, axis=1).reshape((-1, 1))
    
    # Tính các hệ số delta và acceleration
    delta1 = librosa.feature.delta(mfcc, order=1)  # Đạo hàm bậc 1
    delta2 = librosa.feature.delta(mfcc, order=2)  # Đạo hàm bậc 2
    
    # Gộp MFCC, delta1, và delta2 lại thành một ma trận
    X = np.concatenate([mfcc, delta1, delta2], axis=0)  # Kết quả là ma trận (số đặc trưng, số khung)
    
    # Kết quả sẽ có kích thước (số khung, 39), phù hợp với định dạng mà hmmlearn yêu cầu (T x N), 
    # trong đó 
        # T là số khung thời gian
        # N là số đặc trưng
    return X.T


In [None]:
class HMMTraining:
    def __init__(self):
        self.class_names = ['baby cry', 'adult voice']
        
        self.states = [5, 5, 5] # n_components
        self.mix=2 # n_mix
        
        self.dataset_path = 'data_bbcry_or_not'

        self.X = {'train': {}, 'test': {}}
        self.y = {'train': {}, 'test': {}}

        self.model = {}
        self.model_path = 'models_train'
        self.loss_history = {cname: [] for cname in self.class_names}

    def train(self):
        length = 0
        for cn in self.class_names:
            length += len(os.listdir(f"{self.dataset_path}/{cn}"))
        print('Total samples:', length)  # Total samples

        all_data = {}
        all_labels = {}
        for cname in self.class_names:
            file_paths = [os.path.join(self.dataset_path, cname, i) for i in os.listdir(
                os.path.join(self.dataset_path, cname)) if i.endswith('.wav')]
            data = [get_mfcc(file_path) for file_path in file_paths]
            all_data[cname] = data
            all_labels[cname] = [self.class_names.index(cname) for _ in range(len(file_paths))]

        for cname in self.class_names:
            x_train, x_test, y_train, y_test = train_test_split(
                all_data[cname], all_labels[cname],
                test_size=0.3,  # 70% testing, 30% traing
                random_state=42
            )

            self.X['train'][cname] = x_train
            self.X['test'][cname] = x_test
            self.y['test'][cname] = y_test

        total_train = 0
        total_test = 0
        for cname in self.class_names:
            train_count = len(self.X['train'][cname])
            test_count = len(self.X['test'][cname])
            print(cname, 'train:', train_count, '| test:', test_count)  # Print cothe train: total | test: total
                                                                        #       khong train: total | test: total
                                                                        #       nhung train: total | test: total
            total_train += train_count
            total_test += test_count
        print('train samples:', total_train) # Total Train
        print('test samples', total_test) # Total test

        for idx, cname in enumerate(self.class_names):
            start_prob = np.full(self.states[idx], 0.0)
            start_prob[1] = 1.0 # prob of state init
            trans_matrix = np.full((self.states[idx], self.states[idx]), 0.0)
            p = 0.5 # Giá trị này xác định xác suất chuyển tiếp nội bộ giữa các trạng thái
            np.fill_diagonal(trans_matrix, p)
            np.fill_diagonal(trans_matrix[0:, 1:], 1 - p)
            trans_matrix[-1, -1] = 1.0
            
            trans_matrix = trans_matrix / 6

            # trans matrix
            print(cname)
            print(trans_matrix)

            self.model[cname] = GMMHMM(
                n_components=self.states[idx], 
                n_mix=self.mix, 
                startprob_prior=start_prob, 
                transmat_prior=trans_matrix,
                algorithm='viterbi', 
                random_state=42,
                n_iter=300, 
                verbose=True, 
                params='stmc', 
                init_params='mc'
            )
            # Traning and save loss (Baum-Welch Algorithm)
            self.model[cname].fit(X=np.vstack(self.X['train'][cname]), 
                                  lengths=[x.shape[0] for x in self.X['train'][cname]])
            self.loss_history[cname] = self.model[cname].monitor_.history 

    def save_model(self):
        for cname in self.class_names:
            name = f'{self.model_path}/model_{cname}.pkl'
            with open(name, 'wb') as file:
                pickle.dump(self.model[cname], file)

    def evaluation(self):
        print('====== Evaluation ======')
        y_true = []
        y_pred = []

        for cname in self.class_names:
            for mfcc, target in zip(self.X['test'][cname], self.y['test'][cname]):
                # Tính điểm cho từng lớp
                scores = [self.model[cls].score(mfcc) for cls in self.class_names]
                pred = np.argmax(scores)  
                y_pred.append(pred)
                y_true.append(self.class_names.index(cname))  # Gán nhãn đúng với chỉ số lớp hiện tại

        # In tỷ lệ chính xác cho từng lớp
        for i, cname in enumerate(self.class_names):
            class_correct = [
                1 for yt, yp in zip(y_true, y_pred) if yt == yp and yt == i
            ]
            class_total = y_true.count(i)
            accuracy = len(class_correct) / class_total if class_total > 0 else 0.0
            print(f'{cname}: {accuracy:.2%}')
        print('======')
         # Tính các chỉ số
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
        recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
        f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)


        print(f"  - Accuracy: {accuracy:.6%}")
        print(f"  - Precision: {precision:.6%}")
        print(f"  - Recall: {recall:.6%}")
        print(f"  - F1-score: {f1:.6%}")
        print('======')
        print('Confusion matrix:')
        print(confusion_matrix(y_true, y_pred))
        
    def plot_loss(self): 
        for cname in self.class_names: 
            plt.plot(self.loss_history[cname], label=f'{cname}') 
        plt.xlabel('Iteration') 
        plt.ylabel('Log Likelihood') 
        plt.title('Training Loss') 
        plt.legend()
        plt.show()



if __name__ == '__main__':
    hmm_train = HMMTraining()
    hmm_train.train()
    hmm_train.save_model()
    hmm_train.evaluation()
    # hmm_train.plot_loss()

In [61]:
class HMMRecognition:
    def __init__(self):
        self.model = {}

        self.class_names = ['baby cry', 'adult voice']
       
        self.audio_format = 'wav'

        self.record_path = 'temp/record.wav'
        self.trimmed_path = 'temp/trimmed.wav'
        self.model_path = 'models_train/5-2-39'

        self.load_model()

    @staticmethod
    def detect_leading_silence(sound, silence_threshold=-42.0, chunk_size=10):
        trim_ms = 0  # ms

        assert chunk_size > 0  # to avoid infinite loop
        while sound[trim_ms:trim_ms + chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
            trim_ms += chunk_size

        return trim_ms

    def load_model(self):
        for key in self.class_names:
            name = f"{self.model_path}/model_{key}.pkl"
            with open(name, 'rb') as file:
                self.model[key] = pickle.load(file)

    def predict(self, file_name=None):
        if not file_name:
            file_name = self.record_path

        os.makedirs(os.path.dirname(self.trimmed_path), exist_ok=True)
        
        # Trim silence
        sound = AudioSegment.from_file(file_name, format=self.audio_format)

        start_trim = self.detect_leading_silence(sound)
        end_trim = self.detect_leading_silence(sound.reverse())

        duration = len(sound)

        trimmed_sound = sound[start_trim:duration - end_trim]
        trimmed_sound.export(self.trimmed_path, format=self.audio_format)

        # Predict
        record_mfcc = get_mfcc(self.trimmed_path)
        scores = [self.model[cname].score(record_mfcc) for cname in self.class_names]
        print('scores', scores)
        predict_word = np.argmax(scores)
        print(self.class_names[predict_word])

    def record(self):
        CHUNK = 1024
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 16000
        RECORD_SECONDS = 5

        p = pyaudio.PyAudio()

        stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)

        frames = []

        print('recording ...')
        for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
            data = stream.read(CHUNK)
            frames.append(data)

        print('stopped record!')
        stream.stop_stream()
        stream.close()
        p.terminate()

        wf = wave.open(self.record_path, 'wb')
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(p.get_sample_size(FORMAT))
        wf.setframerate(RATE)
        wf.writeframes(b''.join(frames))
        wf.close()


if __name__ == '__main__':
    hmm_reg = HMMRecognition()
    hmm_reg.record()
    hmm_reg.predict()

scores [-60879.59379789139, -64185.74340881428]
baby cry
scores [-46260.98085829911, -53144.4823782904]
baby cry
scores [-22439.455237114646, -23995.617812944976]
baby cry
