In [37]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio

import matplotlib.pyplot as plt
import IPython.display as ipd
from tqdm.notebook import tqdm

import os

import torch.utils.data as data
import torch.optim as optim

import numpy as np

In [2]:
train_url="train-clean-100"
test_url="test-clean"
train_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=train_url, download=True)
test_dataset = torchaudio.datasets.LIBRISPEECH("./data", url=test_url, download=True)

In [3]:
class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        d 5
        e 6
        f 7
        g 8
        h 9
        i 10
        j 11
        k 12
        l 13
        m 14
        n 15
        o 16
        p 17
        q 18
        r 19
        s 20
        t 21
        u 22
        v 23
        w 24
        x 25
        y 26
        z 27
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

In [4]:
text_transform = TextTransform()

train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
)
valid_audio_transforms = torchaudio.transforms.MelSpectrogram()


In [14]:
def data_processing(data, data_type="train"):
    spectrograms = []
    labels = []

    input_lengths = []
    label_lengths = []
    for (waveform, _, utterance, _, _, _) in data:
        if data_type == 'train':
            spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        elif data_type == 'valid':
            spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        else:
            raise Exception('data_type should be train or valid')
        spectrograms.append(spec)
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    print(spectrograms[0].size())

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)
    
    return spectrograms, labels, input_lengths, label_lengths

In [15]:
bs = 16
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                batch_size=bs,
                                collate_fn=lambda x: data_processing(x, 'train'),
                                shuffle=True)

In [38]:
from project.dataloader.librispeech_dataloader import LibriSpeechDataModule


train_url = "train-clean-100"
test_url = "test-clean"

dm = LibriSpeechDataModule(8, './data', train_url, test_url)
dm.setup()

train_loader = dm.train_dataloader()

In [39]:
for a in train_loader:
    print(a[0].size())
    print(a[1].size())
    break

torch.Size([8, 1, 128, 1256])
torch.Size([8, 292])


In [40]:
for a in train_loader:
    print(a[0].size())
    print(a[1].size())
    print(a[2])
    print(a[3])
    break

torch.Size([8, 1, 128, 1215])
torch.Size([8, 267])


IndexError: tuple index out of range

In [41]:
class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 


class ResidualCNN(nn.Module):
    """Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf
        except with layer norm instead of batch norm
    """
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)


class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x


class SpeechRecognitionModel(nn.Module):
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x

In [42]:
class MyModel(nn.Module):
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(MyModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)
    def forward(self, x):
        x = self.cnn(x)
        return x

In [43]:
hparams = {
        "n_cnn_layers": 3,
        "n_rnn_layers": 5,
        "rnn_dim": 512,
        "n_class": 29,
        "n_feats": 128,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": 5e-4,
        "batch_size": 16,
        "epochs": 1
        }

In [44]:
device="cpu"
model = MyModel(
            hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
            hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)

In [45]:
model

MyModel(
  (cnn): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
)

In [47]:
for batch_idx, _data in enumerate(train_loader):
    spectrograms, labels = _data 
    spectrograms, labels = spectrograms.to(device), labels.to(device)

    # optimizer.zero_grad()

    output = model(spectrograms)  # (batch, time, n_class)
    print(spectrograms.size())
    print(output.size())


    print(labels.size())
    print(labels[0])
    sent = text_transform.int_to_text(labels[0].numpy())
    print(sent)
    break

torch.Size([8, 1, 128, 1329])
torch.Size([8, 32, 64, 665])
torch.Size([8, 240])
tensor([ 0.,  5.,  2., 23., 10.,  5.,  1.,  9.,  2.,  5.,  1., 15., 16., 21.,
         1., 19.,  6.,  2., 17., 17.,  6.,  2., 19.,  6.,  5.,  1., 15., 16.,
         1., 16., 21.,  9.,  6., 19.,  1., 19.,  6., 20., 21., 19.,  2., 10.,
        15., 21.,  1., 24.,  2., 20.,  1., 10., 14., 17., 16., 20.,  6.,  5.,
         1., 16., 15.,  1., 21.,  9.,  6.,  1.,  7., 16., 19., 14.,  6., 19.,
         1., 21.,  9.,  2., 15.,  1., 21.,  9.,  6.,  1., 24.,  2., 21.,  4.,
         9.,  7., 22., 13.,  1., 13., 16., 16., 12., 20.,  1., 16.,  7.,  1.,
         2.,  1., 26., 16., 22., 15.,  8.,  1.,  9., 22., 19., 16., 15.,  1.,
        24.,  9., 16.,  1.,  9.,  2.,  5.,  1., 17., 13.,  2.,  4.,  6.,  5.,
         1.,  9., 10., 14., 20.,  6., 13.,  7.,  1.,  2., 21.,  1.,  9.,  2.,
        15.,  5.,  1., 21.,  9., 16., 22.,  8.,  9.,  1.,  2., 15.,  1.,  2.,
        19., 14.,  6.,  5.,  1., 24.,  2., 19., 19., 10., 16.,

In [25]:
from transformer.transformer import SpeechTransformer

ModuleNotFoundError: No module named 'transformer'

In [37]:
num_classes = 29
model2 = SpeechTransformer(num_classes, d_model=512,
                           num_heads=8, input_dim=128, extractor='vgg')

In [None]:
for batch_idx, _data in enumerate(train_loader):
    spectrograms, labels, input_lengths, label_lengths = _data 
    spectrograms, labels = spectrograms.to(device), labels.to(device)
    input_lengths = torch.LongTensor(input_lengths)
    
    spectrograms.squeeze_()
    spectrograms.transpose_(1, 2)
    input_lengths.unsqueeze_(1)
    
    print(input_lengths)
    print(spectrograms.size())
    print(input_lengths.size())
    
    # print(labels.size())
    
    
    output = model2(spectrograms, input_lengths)  # (batch, time, n_class)
    
    print(output.size())
    break

tensor([[376],
        [535],
        [580],
        [588],
        [482],
        [522],
        [434],
        [627],
        [559],
        [571],
        [512],
        [243],
        [622],
        [608],
        [602],
        [592]])
torch.Size([16, 1254, 128])
torch.Size([16, 1])
