# xSDR: Speaker-Independent Spoken Digit Recognition
This is our solution for the final graded project for the WS22/23 course "_Neural Networks: Theory and Implementation_" at Saarland University.
<br/>
Authors: _Christian Singer, Mhd Jawad Al Rahwanji_
`{chsi00002, mhal00002}@stud.uni-saarland.de`
<br/>
<img src="xSDR.png" height=333>

##### Before we start, we'd like to point out a couple of things:

-- An **Introduction** to the project can be found in `README.md`

-- Some preliminary **Data Exploration** can be found in `DataExploration.ipynb`

### Imports

In [41]:
# add this to ignore warnings from Librosa
import warnings

import torch
import torch.nn.functional as F
import math
import numpy as np
import torchmetrics as tm

from comparative_analysis.tsne_model_embeddings import tsne_model, plot_tsne
from model_baseline.data_loading import create_features
from torch import nn
from model_baseline.linear_model import classifier
from sklearn.metrics import confusion_matrix, classification_report
from torch.utils.data import RandomSampler, DataLoader

from model_neural.classification_report import eval_models
from model_neural.utils.data_loading import MNISTAudio
from model_neural.utils.helpers import annotations_dir, base_dir

warnings.filterwarnings('ignore')

### Task I

In [42]:
# I.1

# Original definition can be found in: model_baseline.data_loading.py
def downsample_spectrogram(spectrogram, num_frames):
    """
    Given a mel-scaled representation of a signal, return a fixed-size
    representation of the signal as numpy array of size (1, num_frames)
    by taking num_frames equal sized chunks of the signal and averaging
    them over the frequency axis.
    """

    signal_length = spectrogram.shape[1]
    window_size = int(math.ceil(spectrogram.shape[1] / num_frames))
    padding = num_frames * window_size - signal_length

    spectrogram_downsampled = np.zeros((spectrogram.shape[0], num_frames))

    # pad signal with zeros
    spectrogram = np.pad(spectrogram, ((0, 0), (0, padding)), "constant")

    for section in range(num_frames):
        spectrogram_downsampled[:, section] = np.mean(
            spectrogram[:, section * window_size : (section + 1) * window_size], axis=1
        )

    spectrogram_downsampled = np.reshape(spectrogram_downsampled, (1, -1))
    return spectrogram_downsampled

In [43]:
# I.2

# Original transformation can be found in: model_baseline.linear_model.py
# Uses downsample_spectrogram(...) as well as extract_melspectrogram(...)
trnf, trnl = create_features("TRAIN")
devf, devl = create_features("DEV")
tstf, tstl = create_features("TEST")

In [44]:
# I.3

# Original fitting can be found in: model_baseline.linear_model.py
# Both penalty and loss parameters were experimented with,
# "elasticnet" and "modified_huber" were chosen, respectively.
classifier.fit(trnf, trnl)

In [45]:
# I.4

# Original evaluation can be found in: model_baseline.linear_model.py
print("----------------------------------DEV-SET-----------------------------------------")
dev_preds = classifier.predict(devf)
print(f"Confusion matrix:\n{confusion_matrix(devl, dev_preds)}\n")
print(f"Classification Report:\n{classification_report(devl, dev_preds)}\n")
print("---------------------------------TEST_SET------------------------------------------")
test_preds = classifier.predict(tstf)
print(f"Confusion matrix:\n{confusion_matrix(tstl, test_preds)}\n")
print(f"Classification Report:\n{classification_report(tstl, test_preds)}\n")

----------------------------------DEV-SET-----------------------------------------
Confusion matrix:
[[39  1  0  1  2  0  0  4  0  0]
 [ 0 17  0  0 19  4  0  0  1  4]
 [ 6  0 26  6  0  0  0  7  0  1]
 [ 0  0  2 26  0  3  0 12  9  2]
 [ 5  1  0  0 34  4  0  6  0  4]
 [ 0  4  0  0  1 37  0  6  0  2]
 [ 1  0  2  3  0  0  3  5 39  1]
 [ 0  0  0  0  0  8  0 43  1  1]
 [ 0  0  2  0  0  3  0  0 38  1]
 [ 0  9  0  0  2 13  0  4  0 22]]

Classification Report:
              precision    recall  f1-score   support

           0       0.76      0.83      0.80        47
           1       0.53      0.38      0.44        45
           2       0.81      0.57      0.67        46
           3       0.72      0.48      0.58        54
           4       0.59      0.63      0.61        54
           5       0.51      0.74      0.61        50
           6       1.00      0.06      0.11        54
           7       0.49      0.81      0.61        53
           8       0.43      0.86      0.58        44
   

### Task II

In [46]:
# II.1

# Answers to subquestions first:
# Yes, both neural models outperformed our baseline model.
# TODO: Await latest model evaluations...
# TODO: Fill with answer to:  "Do you observe any signs of overfitting to the training data?"
# TODO: Fill with answer to:  "How do the hyperparameters affect the model performance?"
# TODO: Discuss the above observations after reporting them

# Original implementations can be found in: model_neural.(conv1d_model.py & transformer_model.py)
class conv1d_block(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, pool_size=4):
        super().__init__()
        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride)
        self.bn = nn.BatchNorm1d(out_channels)
        self.pool = nn.MaxPool1d(pool_size)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = F.relu(x)
        x = self.pool(x)
        return x


class Conv1dModel(nn.Module):
    """Convolutional neural network for 1D data."""

    def __init__(
        self,
        n_input=1,
        n_channel=32,
        n_output=10,
        initial_kernel_size=60,
        initial_stride=8,
        final_pool_size=4,
    ):
        super().__init__()
        self.conv_block1 = conv1d_block(n_input, n_channel, initial_kernel_size, initial_stride)
        self.conv_block2 = conv1d_block(n_channel, n_channel)
        self.conv_block3 = conv1d_block(n_channel, 2 * n_channel)
        self.conv_block4 = conv1d_block(2 * n_channel, 2 * n_channel, pool_size=final_pool_size)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return x


class Conv1dMelModel(nn.Module):
    """Convolutional neural network for 1D data."""

    def __init__(
        self, n_input=39, n_channel=32, n_output=10, initial_kernel_size=16, initial_stride=3
    ):
        super().__init__()
        self.conv_block1 = conv1d_block(
            n_input, n_channel, initial_kernel_size, initial_stride, pool_size=2
        )
        self.conv_block2 = conv1d_block(n_channel, n_channel, pool_size=1)
        self.conv_block3 = conv1d_block(n_channel, 2 * n_channel, pool_size=1)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return x

class PositionalEncoding(nn.Module):
    """Positional encoding."""

    def __init__(self, num_hiddens, dropout, max_len=150):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        # Create a long enough P
        self.P = torch.zeros((1, max_len, num_hiddens))
        mask = torch.arange(max_len, dtype=torch.float32).reshape(-1, 1) / torch.pow(
            10000, torch.arange(0, num_hiddens, 2, dtype=torch.float32) / num_hiddens
        )
        # 0::2 means even indices, 1::2 means odd indices.
        self.P[:, :, 0::2] = torch.sin(mask)
        self.P[:, :, 1::2] = torch.cos(mask)

    def forward(self, x):
        x = x + self.P[:, : x.shape[1], :].to(x.device)
        return self.dropout(x)


class ViTMLP(nn.Module):
    def __init__(self, mlp_num_hidden, mlp_num_outputs):
        super().__init__()
        self.dense1 = nn.LazyLinear(mlp_num_hidden)
        self.gelu = nn.GELU()
        self.dropout1 = nn.Dropout(0.5)
        self.dense2 = nn.LazyLinear(mlp_num_outputs)
        self.dropout2 = nn.Dropout(0.5)

    def forward(self, x):
        return self.dropout2(self.dense2(self.dropout1(self.gelu(self.dense1(x)))))


class ViTBlock(nn.Module):
    def __init__(self, num_hidden, norm_shape, mlp_num_hidden, num_heads):
        super().__init__()
        self.ln1 = nn.LayerNorm(norm_shape)
        self.attention = nn.MultiheadAttention(num_hidden, num_heads, 0.1, batch_first=True)
        self.ln2 = nn.LayerNorm(norm_shape)
        self.mlp = ViTMLP(mlp_num_hidden, num_hidden)

    def forward(self, x, valid_lens=None):
        x = x + self.attention(*([self.ln1(x)] * 3))[0]
        return x + self.mlp(self.ln2(x))


class TransformerModel(nn.Module):
    def __init__(
        self,
        patch_size=16,
        stride=10,
        num_hidden=128,
        mlp_num_hidden=128,
        num_heads=4,
        num_blocks=2,
        num_classes=10,
    ):
        super().__init__()
        self.patch_embedding = nn.LazyConv2d(num_hidden, kernel_size=patch_size, stride=stride)
        self.cls_token = nn.Parameter(torch.zeros(1, 1, num_hidden))
        self.pos_embedding = PositionalEncoding(num_hidden, 0.1)
        self.dropout = nn.Dropout(0.1)
        self.blocks = nn.Sequential()
        for i in range(num_blocks):
            self.blocks.add_module(
                f"{i}", ViTBlock(num_hidden, num_hidden, mlp_num_hidden, num_heads)
            )
        self.head = nn.Sequential(nn.LayerNorm(num_hidden), nn.Linear(num_hidden, num_classes))

    def forward(self, x):
        # Add channel dimension.
        x = x.unsqueeze(1)
        x = self.patch_embedding(x).flatten(2).transpose(1, 2)
        # Using .expand adds cls token for each sample in the batch.
        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        x = x + self.pos_embedding(x)
        x = self.dropout(x)
        for block in self.blocks:
            x = block(x)
        x = self.head(x)[:, 0, :].unsqueeze(1)
        return x

In [47]:
# II.2

# TODO: Also await latest model evaluations...
# We have the classification report for the baseline model
# Now, we produce the classification report for the best models

# Original evaluation can be found in neural_model.classification_report.py
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using: '{device}' as device for report.")

model = Conv1dModel()
model.load_state_dict(torch.load("models/Conv1dModel_0002_0002_10_01.pt"))
model.to(device)
model.eval()

# Conv1dModel doesn't use mel-spectrogram, so we need to specify that.
if model.__class__.__name__ in ["TransformerModel", "Conv1dMelModel"]:
    to_mel = True
else:
    to_mel = False

report = eval_models(model, ["TRAIN", "DEV", "TEST"], device=device, to_mel=to_mel)

# TODO: Compare the classification reports

In [None]:
# II.3

# Original usage can be found in: comparative_analysis.tsne_model_embeddings.py
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using: '{device}' as device for report.")

model = Conv1dModel()
model.load_state_dict(torch.load("../model_neural/models/Conv1dModel_0002_0002_10_01.pt"))
model.to(device)
model.eval()

if model.__class__.__name__ in ["TransformerModel", "Conv1dMelModel"]:
    to_mel = True
else:
    to_mel = False

tsne_embedding, labels = tsne_model(model, device, to_mel, split="DEV")
plot_tsne(tsne_embedding, labels)

# TODO: Do the same for linear model

# TODO: Compare to the baseline after reporting

In [None]:
# II.4

# Original implementation can be found in: comparative_analysis.statistical_significance_test.py
def test_statistical_significance(model, baseline, device: torch.device):
    """Test for statistical significance between models w.r.t accuracy
    :return: p-value"""
    # Adapted from https://aclanthology.org/D12-1091.pdf
    # Sample with replacement for val (DEV) set
    ds = MNISTAudio(annotations_dir=annotations_dir, audio_dir=base_dir, split="DEV", to_mel=True)
    n = ds.__len__()
    s = 0
    b = 10 ** 6

    if issubclass(model, nn.Module):
        model_accuracy_metric = tm.classification.MulticlassAccuracy(num_classes=10)
        model_accuracy_metric.to(device)
    if issubclass(baseline, nn.Module):
        baseline_accuracy_metric = tm.classification.MulticlassAccuracy(num_classes=10)
        baseline_accuracy_metric.to(device)

    for _ in range(b):
        sampler = RandomSampler(ds, replacement=True, num_samples=n)
        dl = DataLoader(ds, sampler=sampler, batch_size=32)
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(dl):
                data = data.to(device)
                target = target.to(device)

                if issubclass(model, nn.Module):
                    model_output = model(data)
                    model_pred = model_output.argmax(dim=2, keepdim=True).squeeze()
                else:
                    model_pred = model.predict(data)
                model_accuracy_metric(model_pred, target)

                if issubclass(baseline, nn.Module):
                    baseline_output = baseline(data)
                    baseline_pred = baseline_output.argmax(dim=2, keepdim=True).squeeze()
                else:
                    baseline_pred = baseline.predict(data)
                baseline_accuracy_metric(baseline_pred, target)

        model_accuracy = model_accuracy_metric.compute()
        model_accuracy_metric.reset()
        baseline_accuracy = baseline_accuracy_metric.compute()
        baseline_accuracy_metric.reset()

        if model_accuracy > baseline_accuracy:
            s += 1

    return s / b

# TODO: Compare and report all 3 models after pairwise signif. testing

### Task III

In [48]:
# III.1

# Answers to subquestions first:
# TODO: Await latest model evaluations...
# TODO: Fill with answer to:  "What do you observe?"
# TODO: Fill with answer to:  "How does this affect the model performance?"
# TODO: Discuss the above observations after reporting them

# TODO: Retrain all 3 using new dataloader

In [49]:
# III.2

# TODO: Complete.

In [None]:
# III.3

# TODO: Complete.