# Mounting drive

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install flashlight-text
!pip install kenlm
!pip install jiwer

Collecting flashlight-text
  Downloading flashlight_text-0.0.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: flashlight-text
Successfully installed flashlight-text-0.0.4
Collecting kenlm
  Downloading kenlm-0.2.0.tar.gz (427 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m427.4/427.4 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: kenlm
  Building wheel for kenlm (pyproject.toml) ... [?25l[?25hdone
  Created wheel for kenlm: filename=kenlm-0.2.0-cp310-cp310-linux_x86_64.whl size=3184421 sha256=07ff8fed5dbab3998a901586e832d6abff385bc0718f2f00ffcf82cc93cd9d04
  Stored in directory:

In [3]:
import time
from typing import List

import IPython
import matplotlib.pyplot as plt
from torchaudio.models.decoder import ctc_decoder
from torchaudio.utils import download_asset

import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np
import pandas as pd
from jiwer import wer, cer

In [22]:
# AUDIO
SAMPLE_RATE = 32000

# MEL LOG SPECTROGRAM
N_MELS = 128
N_FFT = 1024
WIN_LENGTH = 1024
HOP_LENGTH = 512
MAX_SPECTROGRAM_SIZE = 1650

# SPECTROGRAM AUGMENTATION
SPECAUG_RATE = 0.5
SPECAUG_POLICY = 3
TIME_MASK = 60
FREQUENCY_MASK = 20

# TEXT
NUMBER_OF_CLASSES = 29 # number of label clases (characters)
BLANK_CHARACTER_INDEX = 28

# MODEL
DROPOUT = 0.1
MAIN_SIZE = 128

# CNN
KERNEL_SIZE = 10
STRIDE = 2

# LSTM
LSTM_HIDDEN_SIZE = 512
LSTM_NUMBER_OF_LAYERS = 1
LSTM_DROPOUT = 0.0
LSTM_BIDIRECTIONAL = False

# Acoustic model

In [4]:
class TransposeLayer(nn.Module):
    def __init__(self, dim0, dim1):
        super(TransposeLayer, self).__init__()
        self.dim0 = dim0
        self.dim1 = dim1

    def forward(self, x):
        x = x.transpose(self.dim0, self.dim1)
        return x


class SpeechRecognitionModel(nn.Module):

    def __init__(self, device="cpu"):
        super(SpeechRecognitionModel, self).__init__()

        use_cuda = torch.cuda.is_available()
        self.device = device

        self.criterion = nn.CTCLoss(blank=28, zero_infinity=True)
        self.learning_rate = 1e-3

        self.validation_step_outputs = []

        self.cnn = nn.Sequential(
            nn.Conv1d(N_MELS, N_MELS, kernel_size=KERNEL_SIZE, stride=STRIDE, padding=KERNEL_SIZE//STRIDE),
            TransposeLayer(1, 2),
            nn.LayerNorm(N_MELS),
            nn.GELU(),
            nn.Dropout(DROPOUT),
        )
        self.dense = nn.Sequential(
            nn.Linear(N_MELS, 128),
            nn.LayerNorm(128),
            nn.GELU(),
            nn.Dropout(DROPOUT),
            nn.Linear(128, 128),
            nn.LayerNorm(128),
            nn.GELU(),
            nn.Dropout(DROPOUT),
        )
        self.lstm = nn.LSTM(input_size=128, hidden_size=LSTM_HIDDEN_SIZE,
                            num_layers=LSTM_NUMBER_OF_LAYERS, dropout=LSTM_DROPOUT,
                            bidirectional=LSTM_BIDIRECTIONAL, batch_first=True)
        self.final_transformations = nn.Sequential(
            nn.LayerNorm(LSTM_HIDDEN_SIZE),
            nn.GELU(),
            nn.Dropout(DROPOUT),
        )

        self.final_fc = nn.Linear(LSTM_HIDDEN_SIZE, NUMBER_OF_CLASSES) # final fully connected

    def forward(self, x):
        current_batch_size = x.shape[0]

        h_0 = torch.zeros(1, current_batch_size, LSTM_HIDDEN_SIZE).to(self.device)
        c_0 = torch.zeros(1, current_batch_size, LSTM_HIDDEN_SIZE).to(self.device)

        x = x.squeeze(1)  # batch, feature, time - removing unnecessary dimention for num_of_channels
        x = self.cnn(x) # batch, time, feature
        x = self.dense(x) # batch, time, feature
        x, (h_n, c_n) = self.lstm(x, (h_0, c_0))

        x = self.final_transformations(x)  # (batch, time, n_class)
        x = self.final_fc(x)
        return x


### Beam Search Decoder





In [None]:
LM_WEIGHT = 3.23
WORD_SCORE = -0.26

beam_search_decoder = ctc_decoder(
    lexicon="/content/drive/MyDrive/speech_recognition/lexicon.txt",
    tokens="/content/drive/MyDrive/speech_recognition/tokens.txt",
    lm="/content/drive/MyDrive/speech_recognition/lm.bin",
    nbest=3,
    beam_size=1500,
    lm_weight=LM_WEIGHT,
    word_score=WORD_SCORE,
)

### Greedy Decoder




In [23]:
tokens = ["'", ' ', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '_']


class GreedyCTCDecoder(torch.nn.Module):
    # def __init__(self, labels, blank=0):
    def __init__(self, labels, blank=BLANK_CHARACTER_INDEX):
        super().__init__()
        self.labels = labels
        self.blank = blank

    def forward(self, emission: torch.Tensor) -> List[str]:
        indices = torch.argmax(emission, dim=-1)
        indices = torch.unique_consecutive(indices, dim=-1)
        indices = [i for i in indices if i != self.blank]
        return "".join([self.labels[i] for i in indices])


greedy_decoder = GreedyCTCDecoder(tokens)

# Acoustic model prediction

In [24]:
acoustic_mode = SpeechRecognitionModel()

def get_prediction(waveform):
    waveform, sample_rate = torchaudio.load("/content/drive/MyDrive/speech_recognition/data/cv-corpus-small/clips-wav/common_voice_en_38334309.wav")

    mel_spec_fn = torchaudio.transforms.MelSpectrogram(sample_rate=SAMPLE_RATE, n_mels=N_MELS)

    spectrogram = mel_spec_fn(waveform)
    spectrogram = spectrogram.unsqueeze(0) # to simulate batch for model

    with torch.no_grad():
        mod = SpeechRecognitionModel()

        output = mod(spectrogram)
        output = F.log_softmax(output, dim=2)

        return output

# Get decoding results

In [19]:
def get_greedy_result(model_output):
    return " ".join(greedy_decoder(model_output[0]))

def get_beam_search_result(model_output):
    return " ".join(beam_search_decoder(model_output)[0][0].words).strip()

## Run


In [None]:
import pandas as pd
import csv
from jiwer import wer, cer

data = pd.read_csv("/content/drive/MyDrive/speech_recognition/data/cv-corpus-small/datasets-csv-colab/test.csv", sep='\t')

LIMITER = None

iterations = len(data) if not LIMITER else LIMITER

save_data_labels = ['actual', 'greedy', 'beam_search',
                    'greedy_wer', 'greedy_cer',
                    'beam_search_wer', 'beam_search_cer']
save_data = []

erorrs_data_labels = ['', 'greedy', 'beam_search']
erorrs_data = []

for i in range(iterations):
    current_data_row = data.iloc[i]
    actual_transcript = current_data_row.text

    file_path = current_data_row.file
    waveform, sample_rate = torchaudio.load(file_path)

    model_output = get_prediction(waveform)

    greedy_result = get_greedy_result(model_output)
    beam_search_result = get_beam_search_result(model_output)

    greedy_wer = wer(actual_transcript, greedy_result)
    beam_search_wer = wer(actual_transcript, beam_search_result)

    greedy_cer = cer(actual_transcript, greedy_result)
    beam_search_cer = cer(actual_transcript, beam_search_result)

    save_data.append([actual_transcript, greedy_result, beam_search_result,
                      greedy_wer, greedy_cer,
                      beam_search_wer, beam_search_cer])

    print(f"Data collected: {i+1}/{iterations}")

    with open("/content/drive/MyDrive/speech_recognition/result_comparison.csv", "w") as train_file:
        train_writer = csv.writer(train_file, delimiter='\t')
        train_writer.writerow(save_data_labels)
        train_writer.writerows(save_data)