In [9]:
# Import necessary packages
import os
import numpy as np
import warnings
from scipy.io import wavfile
from python_speech_features import mfcc
from hmmlearn import hmm
from pydub import AudioSegment
from pydub.silence import split_on_silence
import joblib
import librosa
import librosa.display
import matplotlib.pyplot as plt

In [10]:
# Function to detect leading silence in an audio segment
def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10):
    '''
    sound is a pydub.AudioSegment
    silence_threshold in dB
    chunk_size in ms

    Iterate over chunks until you find the first one with sound.
    '''
    trim_ms = 0  # in milliseconds

    assert chunk_size > 0  # to avoid infinite loop
    while trim_ms < len(sound) and sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold:
        trim_ms += chunk_size

    return trim_ms


In [11]:
# Define the ModelHMM class for Hidden Markov Models
class ModelHMM(object):
    def __init__(self, num_components=8, num_iter=1000):
        self.n_components = num_components
        self.n_iter = num_iter
        # Define the covariance type and the type of HMM
        self.cov_type = 'diag'
        self.model_name = 'GaussianHMM'
        # Initialize the variable in which we will store the models for each word
        self.models = []
        # Define the model using the specified parameters
        self.model = hmm.GaussianHMM(n_components=self.n_components,
                                     covariance_type=self.cov_type, n_iter=self.n_iter)

    # Define a method to train the model
    # 'training_data' is a 2D numpy array where each row has length of number of mfcc coefficients
    def train(self, training_data):
        np.seterr(all='ignore')
        cur_model = self.model.fit(training_data)
        self.models.append(cur_model)

    # Define a method to compute log likelihood score for input features
    def compute_score(self, input_data):
        return self.model.score(input_data)  # model.score returns log likelihood of sample input_data


In [12]:
# Function to build one model for a digit
def build_one_model(input_folder, num_states, num_cep_coeff):
    # input_folder: path to the folder containing training wav files with the word
    # num_states: number of hidden states in HMM
    # num_cep_coeff: number of MFCC features extracted from each time window
    X = np.array([])  # features
    training_files = [x for x in os.listdir(input_folder) if x.endswith('.wav')]
    for filename in training_files:
        # Extract the current filepath and read the file
        filepath = os.path.join(input_folder, filename)
        sound = AudioSegment.from_file(filepath, format="wav")
        duration = len(sound)

        start_trim = detect_leading_silence(sound)
        end_trim = detect_leading_silence(sound.reverse())
        trimmed_sound = sound[start_trim:duration - end_trim]
        trimmed_duration = len(trimmed_sound)  # in milliseconds

        # Check if the trimmed sound is shorter than 150 ms
        if trimmed_duration < 150:
            continue  # skip this file

        # Get the samples from trimmed_sound
        signal = np.array(trimmed_sound.get_array_of_samples())
        if trimmed_sound.channels == 2:
            signal = signal.reshape((-1, 2))
            # Convert to mono by averaging the channels
            signal = signal.mean(axis=1)
        signal = signal.astype(np.float32)
        sampling_freq = trimmed_sound.frame_rate

        # Extract features
        # Default values:
        # winlen=0.025, winstep=0.01, nfft=512,
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            features_mfcc = mfcc(signal, sampling_freq, numcep=num_cep_coeff)

        # Append features to the variable X
        if len(X) == 0:
            X = features_mfcc
        else:
            X = np.append(X, features_mfcc, axis=0)

    # Check if we have collected any features
    if len(X) == 0:
        print(f"No valid audio files found in {input_folder}.")
        return None, None, num_cep_coeff

    # Initiate HMM model object
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', category=DeprecationWarning)
        model = ModelHMM(num_components=num_states)

    # Train HMM model, calculate likelihood of the sample by the trained model
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', category=DeprecationWarning)
        model.train(X)
        model_score = model.compute_score(X)

    return model, model_score, num_cep_coeff


In [13]:
# Function to build models for all digits
def build_models(input_folder):
    # input_folder contains subfolders with samples of words in wav files

    # Initialize the variable to store all the models
    speech_models = []

    # Parse the input directory
    for dirname in os.listdir(input_folder):

        # Get name of subfolder
        subfolder = os.path.join(input_folder, dirname)

        if not os.path.isdir(subfolder):
            continue

        # Extract label
        label = subfolder[subfolder.rfind(os.sep) + 1:]

        # Fit model for label
        model = build_one_model(subfolder, num_states=num_components, num_cep_coeff=num_cepstr)

        # If model building was successful, add to list
        if model[0] is not None:
            # Add the model to the list
            speech_models.append((model, label))
            print(f"Fitted model for {label}")
        else:
            print(f"Failed to build model for {label}")

        # Reset model variable
        model = None

    return speech_models


In [14]:
# Set parameters
input_folder = "./MNIST_9-25"
num_components = 17
num_cepstr = 17

# Build models for all digits and save them
speech_models = build_models(input_folder)

# Save the trained models to a file
joblib.dump(speech_models, 'saved_models.pkl')
print("Models saved to 'saved_models.pkl'")


Fitted model for num5
Fitted model for num6
Fitted model for num8
Fitted model for num3
Fitted model for num2
Fitted model for num4
Fitted model for num7
Fitted model for num1
Fitted model for num9
Fitted model for num0
Models saved to 'saved_models.pkl'


In [15]:
# Load the saved models
speech_models = joblib.load('saved_models.pkl')
print("Models loaded from 'saved_models.pkl'")

Models loaded from 'saved_models.pkl'


In [16]:
# Function to split an audio file into 10 digits
def split_audio_into_digits(sound_file):
    """
    Splits the audio file into chunks corresponding to individual digits.
    Adjusts min_silence_len and silence_thresh to get exactly 10 chunks.
    """
    min_silence_len = 50  # starting value
    silence_thresh = -40  # starting value

    # Try different combinations to get exactly 10 chunks
    for silence_thresh in range(-30, -61, -1):
        for min_silence_len in range(50, 500, 10):
            audio_chunks = split_on_silence(sound_file,
                                            min_silence_len=min_silence_len,
                                            silence_thresh=silence_thresh)
            if len(audio_chunks) == 10:
                return audio_chunks
    # If unable to get exactly 10 chunks, return the best attempt
    print(f"Could not split audio into 10 chunks. Got {len(audio_chunks)} chunks.")
    return audio_chunks


In [17]:
# Function to predict the digit for a given audio chunk
def predict_digit(chunk, speech_models):
    """
    Predicts the digit corresponding to the given audio chunk.
    """
    # Trim leading and trailing silence from the chunk
    start_trim = detect_leading_silence(chunk)
    end_trim = detect_leading_silence(chunk.reverse())
    duration = len(chunk)
    trimmed_chunk = chunk[start_trim:duration - end_trim]
    trimmed_duration = len(trimmed_chunk)

    # Check if the trimmed sound is shorter than 150 ms
    if trimmed_duration < 150:
        # Return empty string or a placeholder
        return ''

    # Get the samples from trimmed_chunk
    signal = np.array(trimmed_chunk.get_array_of_samples())
    if trimmed_chunk.channels == 2:
        signal = signal.reshape((-1, 2))
        # Convert to mono by averaging the channels
        signal = signal.mean(axis=1)
    signal = signal.astype(np.float32)
    sampling_freq = trimmed_chunk.frame_rate

    # Extract features
    num_cep_coeff = speech_models[0][0][2]  # Assuming all models have the same num_cep_coeff
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        features_mfcc = mfcc(signal, sampling_freq, numcep=num_cep_coeff)

    # Compute scores for each model
    max_score = -float('inf')
    predicted_label = None
    for model, label in speech_models:
        model_hmm = model[0]  # model is a tuple (ModelHMM object, model_score, num_cep_coeff)
        try:
            score = model_hmm.compute_score(features_mfcc)
        except Exception as e:
            # If scoring fails, assign a very low score
            score = -float('inf')
        if score > max_score:
            max_score = score
            predicted_label = label

    if predicted_label is not None:
        digit = predicted_label.replace('num', '')
    else:
        digit = ''

    return digit


In [18]:
import glob

In [23]:
# Process the test audio files and predict phone numbers
var = 6
test_folder = f'./test/{var}'
test_files = glob.glob(f"./test/{var}/*.wav")
# Extract indices and sort the files
test_files_sorted = sorted(test_files, key=lambda f: int(f.split('/')[-1].split('.')[0]))
print(test_files_sorted)

results = []

for filename in test_files_sorted:
    # Extract the index from the filename
    # index = int(os.path.splitext(filename)[0])
    # Process the file
    filepath = filename
    # Load the audio file
    sound_file = AudioSegment.from_wav(filepath)
    # Split the audio into chunks corresponding to digits
    audio_chunks = split_audio_into_digits(sound_file)
    # Check if we have exactly 10 chunks
    if len(audio_chunks) != 10:
        print(f"Warning: File {filename} was split into {len(audio_chunks)} chunks instead of 10.")
    # Process each chunk
    phone_number = ''
    for chunk in audio_chunks:
        # Get the predicted digit for each chunk
        predicted_digit = predict_digit(chunk, speech_models)
        phone_number += predicted_digit
    # Ensure phone_number is 10 digits
    if len(phone_number) != 10:
        print(f"Warning: Phone number from file {filename} is {len(phone_number)} digits instead of 10.")
    # Store the result
    results.append(phone_number)
    print(f"Processed file {filename}: {phone_number}")


['./test/6/0.wav', './test/6/1.wav', './test/6/2.wav', './test/6/3.wav', './test/6/4.wav', './test/6/5.wav', './test/6/6.wav', './test/6/7.wav', './test/6/8.wav', './test/6/9.wav', './test/6/10.wav', './test/6/11.wav', './test/6/12.wav', './test/6/13.wav', './test/6/14.wav', './test/6/15.wav', './test/6/16.wav', './test/6/17.wav', './test/6/18.wav', './test/6/19.wav', './test/6/20.wav', './test/6/21.wav', './test/6/22.wav', './test/6/23.wav', './test/6/24.wav', './test/6/25.wav', './test/6/26.wav', './test/6/27.wav', './test/6/28.wav', './test/6/29.wav', './test/6/30.wav', './test/6/31.wav', './test/6/32.wav', './test/6/33.wav', './test/6/34.wav', './test/6/35.wav', './test/6/36.wav', './test/6/37.wav', './test/6/38.wav', './test/6/39.wav', './test/6/40.wav', './test/6/41.wav', './test/6/42.wav', './test/6/43.wav', './test/6/44.wav', './test/6/45.wav', './test/6/46.wav', './test/6/47.wav', './test/6/48.wav', './test/6/49.wav', './test/6/50.wav', './test/6/51.wav', './test/6/52.wav', '.

In [21]:
import pandas as pd

In [24]:
pd.Series(results).to_csv('result1.csv', header=False)