In [1]:
!git clone https://github.com/duclee9x/digit-hmm-recognition
# lấy dữ liệu ghi âm của nhóm ở github

Cloning into 'digit-hmm-recognition'...
remote: Enumerating objects: 18, done.[K
remote: Counting objects: 100% (18/18), done.[K
remote: Compressing objects: 100% (15/15), done.[K
remote: Total 18 (delta 3), reused 16 (delta 1), pack-reused 0[K
Unpacking objects: 100% (18/18), done.


In [2]:
!pip install hmmlearn python_speech_features

Collecting hmmlearn
  Downloading hmmlearn-0.2.7-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (129 kB)
[K     |████████████████████████████████| 129 kB 4.1 MB/s 
[?25hCollecting python_speech_features
  Downloading python_speech_features-0.6.tar.gz (5.6 kB)
Building wheels for collected packages: python-speech-features
  Building wheel for python-speech-features (setup.py) ... [?25l[?25hdone
  Created wheel for python-speech-features: filename=python_speech_features-0.6-py3-none-any.whl size=5888 sha256=7ecc96edabd5be3b0b56a2f079f5771954cb6082554f12b1b6e0c50305d6d639
  Stored in directory: /root/.cache/pip/wheels/b0/0e/94/28cd6afa3cd5998a63eef99fe31777acd7d758f59cf24839eb
Successfully built python-speech-features
Installing collected packages: python-speech-features, hmmlearn
Successfully installed hmmlearn-0.2.7 python-speech-features-0.6


In [32]:
import warnings
warnings.filterwarnings("ignore")


import os
import subprocess
import soundfile as sf
from hmmlearn import hmm
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import IPython
from sys import implementation
from scipy.io import wavfile
import scipy.signal as sps
from scipy.io import wavfile
from python_speech_features import mfcc, logfbank, delta
import pickle   
import logging
logging.getLogger("hmmlearn").setLevel("CRITICAL")

os.chdir('/content/digit-hmm-recognition')
!ls

best.pkl  data	download.sh  raw_data.zip


In [None]:
!mkdir -p data/raw_data
!unzip raw_data.zip -d /content/digit-hmm-recognition/data/raw_data

In [5]:
class DataProcessor:
  def __init__(self, current_path):
    self.current_path = current_path
    self.digits = ("one","two","three","four","five","six","seven","eight","nine","zero","oh")
    self.train_dataset = {digit:[] for digit in self.digits}
    self.test_dataset = {digit:[] for digit in self.digits}

    self.data_source_path = os.path.join(self.current_path, 'data','raw_data')
    self.clean_data_path = os.path.join(self.current_path, 'data','clean_data')

    self.train_path = os.path.join(self.clean_data_path, 'train_data')
    self.test_path = os.path.join(self.clean_data_path, 'test_data')

    self.run()

  def run(self):
    # Thực hiện chạy các hàm khi khởi tạo class để tạo thành dataset hoàn chỉnh
    # chuẩn bị cho huấn luyện
    self.check_and_prepare_dataset()
    self.dataset_builder(train_data=True)
    self.dataset_builder(train_data=False)
    self.path_2_mfcc_mapper()

  def path_2_mfcc_mapper(self):
    # map đường dẫn thành đặc trưng mfcc cho dataset
    mapper = lambda x: self.feature_extractor(x)
    for key in self.train_dataset.keys():
      self.train_dataset[key] = list(map(mapper, self.train_dataset[key]))
    for key in self.test_dataset.keys():
      self.test_dataset[key] = list(map(mapper, self.test_dataset[key]))

  def feature_extractor(self, audio_path):
    # lấy đặc trưng mfcc từ audio, ở đây số lượng mfcc nhóm e lấy là 13
    sampling_freq, audio = wavfile.read(audio_path)
    mfcc_features = mfcc(audio, sampling_freq,nfft = 2048,numcep=13,nfilt=13)
    return mfcc_features

  def dataset_builder(self, train_data=True):
    # Thực hiện chia dataset
    dataset_path = self.train_path if train_data else self.test_path
    for root, dirs, files in os.walk(dataset_path):
      for name in files:  
        file_path = os.path.join(root,name)
        for digit in self.digits:
          if digit in name:
            if train_data:
              self.train_dataset[digit].append(file_path)
            else:
              self.test_dataset[digit].append(file_path)
  def check_and_prepare_dataset(self, test_split_ratio=1/3):
    # Tạo thư mục chứ dữ liệu cho training và testing
    if not os.path.exists(self.train_path):
        os.makedirs(self.train_path)
    if not os.path.exists(self.test_path):
        os.makedirs(self.test_path)
    
    for studentId in os.listdir(self.data_source_path):
      if not studentId.startswith("5"):
        continue

      student_path = os.path.join(self.data_source_path, studentId)
      
      for digit in self.digits:
        file_count = len([wav for wav in os.listdir(student_path) if digit in wav and studentId in wav])
        train_limit = file_count - (file_count*test_split_ratio)

        data_count = 0
        for audio_id in range(1,16):
          valid_file = f"{digit}_{audio_id}_{studentId}.wav"
          raw_audio_file = os.path.join(student_path, valid_file)
          if data_count < train_limit:
            flag = self.make_clean_file(raw_audio_file, self.train_path)
          else:
            flag = self.make_clean_file(raw_audio_file, self.test_path)
          if not flag:
            print("data creation is failed")
            return False
          data_count += 1

  def make_clean_file(self, file, output_dir, target_sampling=16000, mono=True):
    # Kiểm tra nếu thiếu file
    # chuyển stereo file thành mono nếu có
    # Downsampling file audio về 16khz
    # ghi file kết quả ra nơi khác
    if not os.path.exists(file):
        print(f"{file} file not found!")
        return False

    student_id_dir = os.path.join(output_dir, os.path.basename(os.path.dirname(file)))
    output_file = os.path.join(student_id_dir, os.path.basename(file))
    if not os.path.exists(student_id_dir):
        os.makedirs(student_id_dir)
    
    data, sampling = sf.read(file)

    if len(data.shape) > 1 and mono == True:
        data = data.mean(axis=1) / 2

    if sampling != target_sampling:
      samples = round(len(data) * float(target_sampling) / sampling)
      data = sps.resample(data, samples)
    sf.write(output_file, data, target_sampling) 
    return True


In [6]:
class Model:
  def __init__(self, dataset):
    # dataset lấy từ class DataProcessor ở trên
    self.dataset = dataset
    self.train_dataset = self.dataset.train_dataset
    self.test_dataset = self.dataset.test_dataset
    self.hmm_models = {digit: None for digit in self.dataset.digits}
    self.load_model()
  def build_model(self):
    # ứng với mỗi label (digits) thì sẽ tạo một model HMMGMM để huấn luyện với dữ liệu
    # của digits đó, trong bài có dữ liệu của 3 người, mỗi người thu 10 file âm thanh
    # digits cho việc huấn luyện, tổng cộng có 3x10=30 file cho mỗi digits
    for key in self.hmm_models.keys():
      model = hmm.GMMHMM(verbose=False,n_components=20,n_iter=2000)
      feature = np.ndarray(shape=(1, 13)) # 13 là số lượng đặc trưng mfcc
      for list_feature in self.train_dataset[key]:
        feature = np.vstack((feature, list_feature))
        obj = model.fit(feature)
        self.hmm_models[key] = obj

  def predict_dataset(self):
    # dùng để tính độ chính xác cho toàn bộ dữ liệu test và dữ liệu model dự đoán
    total_correct = 0
    total_feat = 0 
    y_tests = []
    y_predicts = []
    for key, values in self.test_dataset.items():
      for feat in values:
        total_feat += 1
        y_pred = self.predict(feat)[0][0]
        y_predicts.append(y_pred)
        y_tests.append(key)
        if y_pred == key:
          total_correct += 1
    return total_correct/total_feat, y_predicts, y_tests

  def result_report(self):
    accuracy, y_pred, y_test = model.predict_dataset()
    print("Tỷ lệ chính xác kết quả model dự đoán dựa trên dữ liệu test: ", accuracy)
    print("confusion_matrix:\n\n", confusion_matrix(y_test, y_pred))
    print("classification_report:\n\n", classification_report(y_test, y_pred))
    print("Accuracy:", accuracy_score(y_test, y_pred))

  def save_model(self, file_name="best.pkl"):
    # Lưu lại kết quả huấn luyện của model
    with open(file_name, "wb") as file:
      pickle.dump(self.hmm_models, file)

  def load_model(self, file_name="best.pkl"):
    # Dùng để load lại kết quả model đã huấn luyện trước đó
    if os.path.exists(file_name):
      print("Tìm thấy file model đã được huấn luyện")
    else:
      print("Không tìm thấy file model đã được huấn luyện")
      return
    with open(file_name, "rb") as file:
        self.hmm_models = pickle.load(file)

  def predict(self, test_data):
    sample = None

    # nếu biến đầu vào là đường dẫn audio thì sẽ chuyển đổi lấy mfcc cho file audio đó
    if type(test_data) == str:
      sample = self.dataset.feature_extractor(test_data)
    else:
      # hoặc ngầm định đầu vào là mfcc luôn
      sample = test_data

    highest_score = []
    scores = []
    labels = []

    # ứng với mỗi model cho mỗi digits thì sẽ dự đoán dữ liệu test, và lấy ra model
    # có điểm dự đoán cao nhất
    for key in self.hmm_models.keys():
      scores.append((key,self.hmm_models[key].score(sample)))
    scores.sort(key=lambda x: x[1],reverse=True)

    return scores

  

In [7]:
# dùng để chia dữ liệu, chuyển đổi audio gốc sang mono, sampling thành 16khz
data_collector = DataProcessor("/content/digit-hmm-recognition") 

# dựng model HMM từ dữ liệu đã xử lý
model = Model(data_collector)

# huấn luyện model
# model.build_model()


Tìm thấy file model đã được huấn luyện


In [38]:
# Dự đoán thử dữ liệu bất kỳ
import random
def get_label(path):
  return os.path.basename(path).split('_')[0]
allfiles = [os.path.join(root,f) for root,dirs,files in os.walk("/content/digit-hmm-recognition/data/clean_data") for f in files]
random_file = random.sample(allfiles, 5)
random_file = list(map(lambda x: (x, get_label(x)), random_file))

for i, item in enumerate(random_file):
  data, sr =sf.read(item[0])
  print(f"\nitem thứ {i+1} có sampling: {sr} shape dữ liệu: {data.shape}")
  feat = data_collector.feature_extractor(item[0])
  IPython.display.display(IPython.display.Audio(item[0]))
  print("digit dự đoán: ", model.predict(feat)[0][0], "digit thực tế: ", item[1])


item thứ 1 có sampling: 16000 shape dữ liệu: (19115,)


digit dự đoán:  eight digit thực tế:  zero

item thứ 2 có sampling: 16000 shape dữ liệu: (24576,)


digit dự đoán:  zero digit thực tế:  zero

item thứ 3 có sampling: 16000 shape dữ liệu: (6130,)


digit dự đoán:  six digit thực tế:  seven

item thứ 4 có sampling: 16000 shape dữ liệu: (8916,)


digit dự đoán:  five digit thực tế:  oh

item thứ 5 có sampling: 16000 shape dữ liệu: (9845,)


digit dự đoán:  six digit thực tế:  six


In [34]:
# Kiểm tra độ chính xác của model với dữ liệu test
model.result_report()

Tỷ lệ chính xác kết quả model dự đoán dựa trên dữ liệu test:  0.6181818181818182
confusion_matrix:

 [[15  0  0  0  0  0  0  0  0  0  0]
 [ 1  8  0  5  0  0  0  0  1  0  0]
 [ 1  2  0  8  0  3  0  0  1  0  0]
 [ 0  0  0 15  0  0  0  0  0  0  0]
 [ 1  0  0  0  0 11  0  1  0  0  2]
 [ 1  1  0  2  0 10  0  0  0  1  0]
 [ 5  0  0  0  0  0  0  6  0  4  0]
 [ 1  0  0  0  0  0  0 14  0  0  0]
 [ 0  0  0  0  0  0  0  0 15  0  0]
 [ 0  0  0  0  0  0  0  0  0 15  0]
 [ 4  0  0  0  0  0  0  0  1  0 10]]
classification_report:

               precision    recall  f1-score   support

       eight       0.52      1.00      0.68        15
        five       0.73      0.53      0.62        15
        four       0.00      0.00      0.00        15
        nine       0.50      1.00      0.67        15
          oh       0.00      0.00      0.00        15
         one       0.42      0.67      0.51        15
       seven       0.00      0.00      0.00        15
         six       0.67      0.93      0.78 

In [18]:
# Lưu model 
model.save_model()