## Pipeline for Speech Recognition based on the ASR notebook

*Remark :* The model has only been trained to recognize English.

In [10]:
# Imports
import os
import re
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torcheval
import torchaudio
from IPython.display import Audio

import soundfile
import sounddevice

print(f"Torch version: {torch.__version__}")
print(f"Torcheval version: {torcheval.__version__}")
print(f"Torchaudio version: {torchaudio.__version__}")

Torch version: 2.5.1
Torcheval version: 0.0.7
Torchaudio version: 2.5.1


### Asking the user to speak and to record it

In [None]:
# Creating data folder if it doesn't exists
if not os.path.exists('../data/live_input'):
    os.makedirs('../data/live_input')

def record_until_enter(filename="../data/live_input/audio.flac", fs=16000):
    recorded_audio = []

    def callback(indata, frames, time, status):
        if status:
            print(status)
        recorded_audio.append(indata.copy())

    # Opening audio flux
    with sounddevice.InputStream(samplerate=fs, channels=1, callback=callback):
        print("-"*40)
        print("RECORDING...")
        print("Press [ENTER] to stop")
        input() 

    # Concatenating the audio
    full_recording = np.concatenate(recorded_audio, axis=0)
    
    # Saving
    soundfile.write(filename, full_recording, fs)
    print("-"*40)
    print("FILE SAVED")
    print("-"*40 + "\n")

In [13]:
sample_rate = 16000
win_length = int(0.025 * sample_rate)               # normalized win_length to have the number of samples
hop_length = int(0.01 * sample_rate)                # same for the stride
n_fft = 2 ** int(np.ceil(np.log2(win_length)))      # highest power of 2 that can fit in the window
n_mels = 80                                         # 80 dimensions features 

wave_to_mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=sample_rate, 
    n_fft=n_fft,
    win_length=win_length,
    hop_length=hop_length,
    n_mels=n_mels      
)

spectrogram_to_log = torchaudio.transforms.AmplitudeToDB()

# Pipeline extracting the features
extracting_pipeline = torch.nn.Sequential(
    wave_to_mel_spectrogram,
    spectrogram_to_log
)

In [6]:
# Defining the vocabulary used to map the text to numbers
class TextTransform:
    def __init__(self) -> None:
        # We map text to integers (and reverse) using this coding 
        # The library LibriSpeech is already normalized, we're using it's norm 
        # 0 -> Blank
        # 1 -> '
        # 2 -> Space
        # 3 -> a ...

        self.__char_map = {}
        self.__index_map = {}

        # First the blank, ' and space 
        self.__char_map["<BLANK>"] = 0
        self.__index_map[0] = "<BLANK>"
        self.__char_map["'"] = 1
        self.__index_map[1] = "'"
        self.__char_map[" "] = 2
        self.__index_map[2] = " "

        # The rest of the characters
        for i, char in enumerate("abcdefghijklmnopqrstuvwxyz"):
            self.__char_map[char] = i + 3
            self.__index_map[i + 3] = char

    def text_to_int(self, text) -> list[int]:
        # Mapping the text to integers
        int_list = []
        for char in text.lower():
            if char in self.__char_map:
                int_list.append(self.__char_map[char])
        return int_list
        
    def int_to_text(self, list) -> str:
        # Mapping the integers to text
        text = ""
        for integer in list:
            idx = integer.item() if hasattr(integer, "item") else integer
            text += self.__index_map[idx]
        return text
    
text_transform = TextTransform()

In [7]:
def greedy_decoding(output, labels=None, label_lengths=None):
    arg_maxes = torch.argmax(output, dim=2) # output shape [Batch, Time, Vocabulary] 
    decodes = []
    targets = []

    for i, args in enumerate(arg_maxes):
        decode = []

        # Removing consecutive similar sample
        pred_integers = torch.unique_consecutive(args)

        # Removing blank 
        for token in pred_integers:
            if token != 0: # Blank token
                decode.append(token)

        decode_str = text_transform.int_to_text(decode)
        decodes.append(decode_str)

        # Decoding the target label if given
        if labels is not None and label_lengths is not None:
            target_length = label_lengths[i]
            target_label = labels[i][:target_length].tolist() # Ignoring padding in the label
            target_str = text_transform.int_to_text(target_label) 
            targets.append(target_str)

    if labels is not None:
        return decodes, targets
    else:
        return decodes

### Different models available

In [8]:
# input_dim : number of features per window => 80
# hidden_dim : 512 or 256 for bidirectional => in article normally 500 and 300, but I prefer power of 2 
# output_dim : vocabulary size + 1 token blank space => for computational reasons we take 29
# n_layers : 5 according to the article

class SpeechRecognition(nn.Module):
    def __init__(self, input_dim : int, hidden_dim : int, output_dim :int, n_layers : int, bidirectional : bool) -> None:
        super().__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.n_layers = n_layers
        self.bidirectional = bidirectional

        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=self.input_dim,          # Input dim
            hidden_size=self.hidden_dim,        # Hidden dim
            num_layers=self.n_layers,           # Num layers
            bidirectional=self.bidirectional,   # Bidirectional
            batch_first=True,                   # Batch first
            dropout=0.1                         # To avoid overfitting
        )
        
        # Output dim - if bidirectional output size 2 time longer
        lstm_output_dim = self.hidden_dim * 2 if bidirectional else self.hidden_dim
        
        # Output layer for classification
        self.classifier = nn.Linear(lstm_output_dim, self.output_dim)

    def forward(self, x) -> torch.Tensor:
        # x : shape (Batch, Time, input_dim)
        output, _ = self.lstm(x)                    # output : shape (Batch, Time, lstm_output_dim)
        logits = self.classifier(output)            # logits : shape (Batch, Time, output_dim) -> output_dim = size vocabulary, taking dim=2 in log_softmax to compute proba on this and not batch or time
        log_proba = F.log_softmax(logits, dim=2)    # we normalize the logits to have probability and take log of these, to avoid numerical 0
        return log_proba

In [9]:
class SpeechRecognitionStacking(nn.Module):
    def __init__(self, input_dim : int, hidden_dim : int, output_dim :int, n_layers : int, bidirectional : bool, stride : int, stack : int) -> None:
        super().__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.n_layers = n_layers
        self.bidirectional = bidirectional
        
        # Updating with stacking
        self.stride = stride
        self.stack = stack

        stacked_dim = self.stack * input_dim

        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=stacked_dim,             # Input dim
            hidden_size=self.hidden_dim,        # Hidden dim
            num_layers=self.n_layers,           # Num layers
            bidirectional=self.bidirectional,   # Bidirectional
            batch_first=True,                   # Batch first
            dropout=0.1                         # To avoid overfitting
        )
        
        # Output dim - if bidirectional output size 2 time longer
        lstm_output_dim = self.hidden_dim * 2 if bidirectional else self.hidden_dim
        
        # Output layer for classification
        self.classifier = nn.Linear(lstm_output_dim, self.output_dim)

    def forward(self, x) -> torch.Tensor:
        # Padding if sample too short
        if x.size(1) < self.n_stack:
            padding = torch.zeros(x.size(0), self.n_stack - x.size(1), x.size(2)).to(device)
            x = torch.cat([x, padding], dim=1)

        # Output shape temporary : (Batch, New_Time, Features, Stack_Size)
        x = x.unfold(dimension=1, size=self.n_stack, step=self.n_skip)
        
        # We get (Batch, New_Time, Features * Stack_Size) -> (Batch, T', 640)
        batch, new_time, feats, stack = x.size()
        x = x.contiguous().view(batch, new_time, feats * stack)
        
        # x : shape (Batch, Time, input_dim)
        output, _ = self.lstm(x)                    # output : shape (Batch, Time, lstm_output_dim)
        logits = self.classifier(output)            # logits : shape (Batch, Time, output_dim) -> output_dim = size vocabulary, taking dim=2 in log_softmax to compute proba on this and not batch or time
        log_proba = F.log_softmax(logits, dim=2)    # we normalize the logits to have probability and take log of these, to avoid numerical 0
        return log_proba

### Loading model

In [23]:
if torch.cuda.is_available():
    device = torch.device("cuda")
#elif torch.backends.mps.is_available(): 
#    device = torch.device("mps")
else:
    device = torch.device("cpu")


print("\n--- LOADING MODEL ---")

model_version = "V4" # Code to know to which model it refers 
backup = torch.load(f"../model/{model_version}/{model_version}_final.pth", map_location=device)

model = SpeechRecognition(
    input_dim=80,
    hidden_dim=256,
    output_dim=29, 
    n_layers=5,
    bidirectional=True
)

model.load_state_dict(backup['model_state_dict'])


--- LOADING MODEL ---


  backup = torch.load(f"../model/{model_version}/{model_version}_final.pth", map_location=device)


<All keys matched successfully>

### Recording if needed


In [29]:
# Recording 
record_until_enter()

----------------------------------------
RECORDING...
Press [ENTER] to stop
RECORDING STOPPED
----------------------------------------

----------------------------------------
FILE SAVED
----------------------------------------



In [34]:
print("-" * 40)
print("DECODING ...\n")

# Loading the save
my_wave, _ = torchaudio.load("../data/live_input/audio.flac")

# Extract features
spec = extracting_pipeline(my_wave)

# 3. Add Batch Dimension 
# Models expect input shape (Batch, Time, Features)
# Since we loaded a single file, we must add a batch dimension of 1.
if spec.dim() == 2: 
    spec = spec.unsqueeze(0)  # Shape becomes (1, freq, time)
spec = spec.transpose(1, 2)

spec = spec.to(device)

# Set Model to Evaluation Mode
model.eval()

# Forward pass with no_grad
with torch.no_grad():
    log_probs = model(spec)

# Output 
text = greedy_decoding(log_probs)[0]

print("Text : " + text.upper())
print("\n"+"-" * 40)

----------------------------------------
DECODING ...

Text : HEIRS RAE

----------------------------------------
