ASR Model

- Speech Signal -> 
- Feature Extraction -> 
- Acoustic Model (Audio Augmentation; MeL Spectrogram) -> 
- Language Model (Probability Distribution of Word Sequence) -> 
- Decoding -> 
- Text Output

In [1]:
import os
import json
from tqdm import tqdm
from jiwer import wer, cer
from time import time
import pandas as pd
from typing import Tuple, Dict, List

import torch
import torch.nn.functional as F
import torchaudio

# setting the random seed for reproducibility
SEED = 2022

To load & process the audio data for a speech recognition task:

In [2]:
class CustomSpeechDataset(torch.utils.data.Dataset):
    
    """
    Custom torch dataset class to load the dataset 
    """
    
    def __init__(self, manifest_file: str, audio_dir: str, is_test_set: bool=False) -> None:
      
        """
        To initialise Custom dataset object:
        
        - manifest_file:    json file that contains the filename of the audio, and also the annotation if is_test_set is set to False
        - audio_dir:        root directory of the audio datasets
        - is_test_set:      flag variable to switch between loading of the train and the test set. Train set loads the annotation whereas test set does not
        """

        self.audio_dir = audio_dir
        self.is_test_set = is_test_set

        with open(manifest_file, 'r') as f:
            self.manifest = json.load(f)

        
    def __len__(self) -> int:
        
        """
        To get the number of loaded audio files in the dataset
        """

        return len(self.manifest)
    
    
    def __getitem__(self, index: int) -> Tuple[str, torch.Tensor]:

        """
        To get the values required to do the training:
        
        Takes in an int index to return tuple of the audio path, 
        audio signal & annotation.       
        """

        if torch.is_tensor(index):
            index.tolist()
            
        audio_path = self._get_audio_path(index)
        signal, sr = torchaudio.load(audio_path)
        
        if not self.is_test_set:
            annotation = self._get_annotation(index)
            return audio_path, signal, annotation
        
        return audio_path, signal
    
    
    def _get_audio_path(self, index: int) -> str:

        """
        Helper function to retrieve the audio path from the json manifest file
        """
        
        path = os.path.join(self.audio_dir, self.manifest[index]['path'])

        return path
    
    
    def _get_annotation(self, index: int) -> str:

        """
        Helper function to retrieve the annotation from the json manifest file
        """

        return self.manifest[index]['annotation']

To encode/decode text (as well as for predicting character sequences):

In [3]:
class TextTransform:

    """
    Map characters to integers and vice versa (encoding/decoding)
    """
    
    def __init__(self) -> None:

        char_map_str = """
            <SPACE> 0
            A 1
            B 2
            C 3
            D 4
            E 5
            F 6
            G 7
            H 8
            I 9
            J 10
            K 11
            L 12
            M 13
            N 14
            O 15
            P 16
            Q 17
            R 18
            S 19
            T 20
            U 21
            V 22
            W 23
            X 24
            Y 25
            Z 26
        """
        
        self.char_map = {}
        self.index_map = {}
        
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch

        self.index_map[0] = ' '


    def get_char_len(self) -> int:

        """
        Gets the no. of characters that are being encoded & decoded in the prediction
        (the char_map string).
        Useful for defining the output dimension of the speech recognition model.
        Returns:
        --------
            the number of characters defined in the __init__ char_map_str
        """

        return len(self.char_map)
    

    def get_char_list(self) -> List[str]:

        """
        Gets the list of characters that are being encoded and decoded in the prediction
        (defined in the char_map string).
        Useful for displaying the data's character set.
        
        Returns:
        -------
            a list of characters defined in the __init__ char_map_str
        """

        return list(self.index_map.values())
    

    def text_to_int(self, text: str) -> List[int]:

        """
        Use a character map and convert text to an integer encoded sequence 
                
        Returns:
        -------
            a list of the text encoded to an integer sequence 
        """
        
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)

        return int_sequence
    

    def int_to_text(self, labels) -> str:

        """
        Use a character map and convert integer labels to an text sequence 
        
        Returns:
        -------
            the decoded transcription
        """
        
        string = []
        for i in labels:
            string.append(self.index_map[i])

        return ''.join(string).replace('<SPACE>', ' ')

To further decode & predict the output, Greedy Decoder Method is used:

In [4]:
class GreedyDecoder:

    """
    Decodes the logits into characters to form the final transciption using the greedy decoding approach
    """

    def __init__(self) -> None:
        pass


    def decode(
            self, 
            output: torch.Tensor, 
            labels: torch.Tensor=None,      #only for non-test set with annotations
            label_lengths: List[int]=None, 
            collapse_repeated: bool=True, 
            is_test: bool=False
        ):
        
        """
        Main method to call for the decoding of the text from the predicted logits (output probabilities).
        """
        
        text_transform = TextTransform()
        arg_maxes = torch.argmax(output, dim=2)
        decodes = []

        # refer to char_map_str in the TextTransform class -> only have index from 0 to 26, hence 27 represents the case where the character is decoded as blank (NOT <SPACE>)
        decoded_blank_idx = text_transform.get_char_len()

        if not is_test:
            targets = []

        for i, args in enumerate(arg_maxes):
            decode = []

            if not is_test:
                targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))

            for j, char_idx in enumerate(args):
                if char_idx != decoded_blank_idx:
                    if collapse_repeated and j != 0 and char_idx == args[j-1]:
                        continue
                    decode.append(char_idx.item())
            decodes.append(text_transform.int_to_text(decode))

        return decodes, targets if not is_test else decodes

To process audio data using audio augmentation techniques to form into mel spectrograms (Acoustic Model) for training, testing & validation:

In [5]:
class DataProcessor:

    """
    Transforms the audio waveform tensors into a melspectrogram
    """

    def __init__(self) -> None:
        pass
    
    
    def _audio_transformation(self, is_train: bool=True):

        """
        Returns a sequence of audio transformation to be applied to the waveform tensor
        if is_train = true, it returns a sequence of transformation including Mel Spectrogram,
        frequency masking & time masking, otherwise it only returns the Mel spectrogram transformation.
        """

        return torch.nn.Sequential(
                torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
                torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
                torchaudio.transforms.TimeMasking(time_mask_param=100)
            ) if is_train else torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)
    

    def data_processing(self, data, data_type='train'):

        """
        Process the audio data to retrieve the spectrograms that will be used for training, testing & validation.
        
        For training & validation, it creates a list of labels & label_lens,
        For testing, it creates a list of audio path lists.
        """

        text_transform = TextTransform()
        spectrograms = []
        input_lengths = []
        audio_path_list = []

        audio_transforms = self._audio_transformation(is_train=True) if data_type == 'train' else self._audio_transformation(is_train=False)

        if data_type != 'test':  
            labels = []
            label_lengths = []

            for audio_path, waveform, utterance in data:

                spec = audio_transforms(waveform).squeeze(0).transpose(0, 1)
                spectrograms.append(spec)
                label = torch.Tensor(text_transform.text_to_int(utterance))
                labels.append(label)
                input_lengths.append(spec.shape[0]//2)
                label_lengths.append(len(label))

            spectrograms = torch.nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
            labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True)
            return audio_path, spectrograms, labels, input_lengths, label_lengths

        else:
            for audio_path, waveform in data:

                spec = audio_transforms(waveform).squeeze(0).transpose(0, 1)
                spectrograms.append(spec)
                input_lengths.append(spec.shape[0]//2)
                audio_path_list.append(audio_path)

            spectrograms = torch.nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
            return audio_path_list, spectrograms, input_lengths

Neural Network w/ Main ASR Model:

- These model architectures combine the strength of CNN for extracting hierarchical features residual CNNs for learning complex patterns and Bi-GRUs for capturing long-range dependency in the input data.
- These combinations make it suitable for automatic speech recognition tasks.

In [6]:
"""
building the model with adaption of deepspeech2 -> https://arxiv.org/abs/1512.02595

code adapted from https://towardsdatascience.com/customer-case-study-building-an-end-to-end-speech-recognition-model-in-pytorch-with-assemblyai-473030e47c7c
"""

class CNNLayerNorm(torch.nn.Module):
    
    """
    Layer normalization built for CNNs input
    """
    
    def __init__(self, n_feats: int) -> None:   #n_feats = no. of features
        super(CNNLayerNorm, self).__init__()

        self.layer_norm = torch.nn.LayerNorm(n_feats)


    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Input x of dimension -> (batch, channel, feature, time)
        """
        
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)

        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 


class ResidualCNN(torch.nn.Module):

    """
    Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf except with layer norm instead of batch norm
    """
    
    def __init__(self, in_channels: int, out_channels: int, kernel: int, stride: int, dropout: float, n_feats: int) -> None:
        super(ResidualCNN, self).__init__()

        self.cnn1 = torch.nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = torch.nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = torch.nn.Dropout(dropout)
        self.dropout2 = torch.nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)


    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        """
        Model building for the Residual CNN layers
        
        Input x of dimension -> (batch, channel, feature, time)
        """

        residual = x
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual

        return x # (batch, channel, feature, time)


class BidirectionalGRU(torch.nn.Module):

    """
    The Bidirectional GRU composite code block which will be used in the main SpeechRecognitionModel class
    """
    
    def __init__(self, rnn_dim: int, hidden_size: int, dropout: int, batch_first: int) -> None:
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = torch.nn.GRU(
            input_size=rnn_dim, 
            hidden_size=hidden_size,
            num_layers=1, 
            batch_first=batch_first, 
            bidirectional=True
        )
        self.layer_norm = torch.nn.LayerNorm(rnn_dim)
        self.dropout = torch.nn.Dropout(dropout)


    def forward(self, x: torch.Tensor) -> torch.Tensor:

        """
        Transformation of the layers in the Bidirectional GRU block
        """

        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)

        return x


class SpeechRecognitionModel(torch.nn.Module):

    """
    The main ASR Model that the main code will interact with
    """
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1) -> None:
        super(SpeechRecognitionModel, self).__init__()
        
        n_feats = n_feats//2
        self.cnn = torch.nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = torch.nn.Sequential(*[
            ResidualCNN(
                in_channels=32, 
                out_channels=32, 
                kernel=3, 
                stride=1, 
                dropout=dropout, 
                n_feats=n_feats
            ) for _ in range(n_cnn_layers)
        ])
        self.fully_connected = torch.nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = torch.nn.Sequential(*[
            BidirectionalGRU(
                rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                hidden_size=rnn_dim, 
                dropout=dropout, 
                batch_first=i==0
            ) for i in range(n_rnn_layers)
        ])
        self.classifier = torch.nn.Sequential(
            torch.nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
            torch.nn.GELU(),
            torch.nn.Dropout(dropout),
            torch.nn.Linear(rnn_dim, n_class)
        )


    def forward(self, x: torch.Tensor) -> torch.Tensor:

        """
        Transformation of the layers in the ASR model block
        """

        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        
        return x

In [7]:
class IterMeter(object):

    """
    Keeps track of the total iterations during the training and validation loop
    """
    
    def __init__(self) -> None:
        self.val = 0


    def step(self):
        self.val += 1


    def get(self):
        return self.val
    

class TrainingLoop:

    """
    The main class to set up the training loop to train the model
    """

    def __init__(self) -> None:
        pass
    

    def train(self, model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter) -> None:

        """
        Training Loop:
        
        Set the model into training mode and iterates through the training dataset using the train_loader. 
        For each batch, it moves the input tensors to the specified device, computes the model output,
        calculate the loss in the provided criterion and updates the model weights using the optimizer.
        The iter_meter is incremented at each step and the training progress is printed periodically.
        """
        
        model.train()
        data_len = len(train_loader.dataset)
        
        for batch_idx, _data in enumerate(train_loader):
            audio_path, spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            optimizer.zero_grad()

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            loss.backward()

            optimizer.step()
            iter_meter.step()
            
            if batch_idx % 100 == 0 or batch_idx == data_len:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(spectrograms), data_len,
                    100. * batch_idx / len(train_loader), loss.item()))


    def dev(self, model, device, dev_loader, criterion, scheduler, epoch, iter_meter) -> None:

        """
        Validation Loop:
        
        Sets the model to evaluation mode and iterates through the validation dataset using dev_loader.
        For each batch, it computes the model outpu, calculates the loss using the provided criterion and
        accumulates the validation loss. It also calculates the character error rate (CER) & word error rate (WER)
        for each decoded prediction and target pair.
        
        After processing all batches, the method computes the average validation loss, CER & WER. 
        It also adjusts the learning rate on the scheduler and prints the results.
        """
        
        print('\nevaluating...')
        model.eval()
        val_loss = 0
        test_cer, test_wer = [], []
        greedy_decoder = GreedyDecoder()
        
        with torch.no_grad():
            for i, _data in enumerate(dev_loader):
                audio_path, spectrograms, labels, input_lengths, label_lengths = _data 
                spectrograms, labels = spectrograms.to(device), labels.to(device)

                output = model(spectrograms)  # (batch, time, n_class)
                output = F.log_softmax(output, dim=2)
                output = output.transpose(0, 1) # (time, batch, n_class)

                loss = criterion(output, labels, input_lengths, label_lengths)
                val_loss += loss.item() / len(dev_loader)

                decoded_preds, decoded_targets = greedy_decoder.decode(output.transpose(0, 1), labels=labels, label_lengths=label_lengths, is_test=False)
                # print(decoded_preds)
                # print(decoded_targets)
                
                for j in range(len(decoded_preds)):
                    test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                    test_wer.append(wer(decoded_targets[j], decoded_preds[j]))

        avg_cer = sum(test_cer)/len(test_cer)
        avg_wer = sum(test_wer)/len(test_wer)
        
        scheduler.step(val_loss)

        print('Dev set: Average loss: {:.4f}, Average CER: {:4f} Average WER: {:.4f}\n'.format(val_loss, avg_cer, avg_wer))

## Training a Model

In [8]:
def main(hparams, train_dataset, dev_dataset, saved_model_path) -> None:

    """
    The main method to call to do model training
    
    First checks if the GPU is available for training and that is set the random seed
    for reproducibility.
    """ 

    use_cuda = torch.cuda.is_available()
    torch.manual_seed(SEED)
    
    data_processor = DataProcessor()
    iter_meter = IterMeter()
    text_transform = TextTransform()
    trainer = TrainingLoop()
    
    device = torch.device("cuda" if use_cuda else "cpu")
    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    
    train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=hparams['batch_size'],
        shuffle=True,
        collate_fn=lambda x: data_processor.data_processing(x, 'train'),
        **kwargs
    )
    
    dev_loader = torch.utils.data.DataLoader(
        dataset=dev_dataset,
        batch_size=hparams['batch_size'],
        shuffle=False,
        collate_fn=lambda x: data_processor.data_processing(x, 'dev'),
        **kwargs
    )

    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], 
        hparams['n_rnn_layers'], 
        hparams['rnn_dim'],
        hparams['n_class'], 
        hparams['n_feats'], 
        hparams['stride'], 
        hparams['dropout']
    ).to(device)

    print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))

    #prints total no. of model parameters & creates the optimizer function (CTC Loss) and the learning rates scheduler
    
    optimizer = torch.optim.AdamW(model.parameters(), hparams['learning_rate'])
    criterion = torch.nn.CTCLoss(blank=text_transform.get_char_len()).to(device)
    
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, mode='min', patience=3, verbose=True, factor=0.05)
        
    #Loops through a specified no. of epochs & call trainer.dev method of the training loop instance to train the model & evaluate on the development dataset.
    for epoch in range(1, hparams['epochs'] + 1):
        trainer.train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter)
        trainer.dev(model, device, dev_loader, criterion, scheduler, epoch, iter_meter)
        
    #Save the trained model
    torch.save(model.state_dict(), saved_model_path)

Entry point of the code to do model training

In [10]:
MANIFEST_FILE_TRAIN = 'Brainhack_2023/ASR/Train/Train_Annotations.json'
AUDIO_DIR_TRAIN = 'Brainhack_2023/ASR/Train/audio/'
SAVED_MODEL_PATH = 'model.pt'

# # simple check on the saved model path, will raise error if no directory found
# if not os.path.exists(os.path.dirname(SAVED_MODEL_PATH)):
#     raise FileNotFoundError

#Loads the dataset
dataset = CustomSpeechDataset(
    manifest_file=MANIFEST_FILE_TRAIN, 
    audio_dir=AUDIO_DIR_TRAIN, 
    is_test_set=False
)

#train_dev_split (80x20 split)
train_proportion = int(0.8 * len(dataset))
dataset_train = list(dataset)[:train_proportion]
dataset_dev = list(dataset)[train_proportion:]


hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 28, # 26 alphabets in caps + <SPACE> + blanks
        "n_feats": 128,
        "stride": 2,
        "dropout": 0.1,
        "learning_rate": 1e-4,
        "batch_size": 8,
        "epochs": 15
    }

start_time = time()

#Start training the model
main(
    hparams=hparams, 
    train_dataset=dataset_train, 
    dev_dataset=dataset_dev, 
    saved_model_path=SAVED_MODEL_PATH
)

end_time = time()

print(f"Time taken for training: {(end_time-start_time)/(60*60)} hrs")

FileNotFoundError: [Errno 2] No such file or directory: 'Brainhack_2023/ASR/Train/Train_Annotations.json'

## Model Inference

In [None]:
def infer(hparams, test_dataset, model_path) -> Dict[str, str]:
    
    print('\ngenerating inference ...')

    use_cuda = torch.cuda.is_available()
    torch.manual_seed(SEED)
    
    greedy_decoder = GreedyDecoder()
    data_processor = DataProcessor()
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

    test_loader = torch.utils.data.DataLoader(
        dataset=test_dataset,
        batch_size=16,
        shuffle=False,
        collate_fn=lambda x: data_processor.data_processing(x, 'test'),
        **kwargs
    )
    
    #Load the pretrained model
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], 
        hparams['n_rnn_layers'], 
        hparams['rnn_dim'],
        hparams['n_class'], 
        hparams['n_feats'], 
        hparams['stride'], 
        hparams['dropout']
    ).to(device)
    
    model.load_state_dict(torch.load(model_path))
    model.eval()
    output_dict = {}
    
    with torch.no_grad():
        for i, _data in tqdm(enumerate(test_loader)):
            audio_path, spectrograms, input_lengths = _data
            spectrograms = spectrograms.to(device)
            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class) 
            decoded_preds_batch = greedy_decoder.decode(output.transpose(0, 1), labels=None, label_lengths=None, is_test=True)
            
            # batch prediction
            for decoded_idx in range(len(decoded_preds_batch[0])):
                output_dict[audio_path[decoded_idx]] = decoded_preds_batch[0][decoded_idx]
                
    print('done!\n')
    return output_dict

Entry point of the code to do model inference, also the code to use to generate the submission

In [None]:
#Same hyperparams as what is used to train the model
hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 28, # 26 alphabets in caps + <SPACE> + blanks
        "n_feats": 128,
        "stride": 2,
        "dropout": 0.1,
        "learning_rate": 1e-4,
        "batch_size": 8,
        "epochs": 15
    }

SAVED_MODEL_PATH = 'model.pt'
SUBMISSION_PATH = 'submission.csv'

MANIFEST_FILE_TEST = 'Brainhack_2023/ASR/Test_Advanced/Test_Filenames.json'
AUDIO_DIR_TEST = 'Brainhack_2023/ASR/Test_Advanced/audio/'

dataset_test = CustomSpeechDataset(
    manifest_file=MANIFEST_FILE_TEST, 
    audio_dir=AUDIO_DIR_TEST, 
    is_test_set=True)

start_time = time()

submission_dict = infer(
    hparams=hparams, 
    test_dataset=dataset_test, 
    model_path=SAVED_MODEL_PATH
)

#Producing the final csv file for submission
submission_list = []

for key in submission_dict:
    submission_list.append(
        {
            "path": os.path.basename(key),
            "annotation": submission_dict[key]
        }
    )

submission_df = pd.DataFrame(submission_list)
submission_df.to_csv(SUBMISSION_PATH, index=False)

end_time = time()

print(f"Time taken for inference: {(end_time-start_time)/60} min")


generating inference ...


55it [00:10,  5.13it/s]

done!

Time taken for inference: 0.18497053384780884 min



