In [1]:
!pip install py7zr jiwer
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader, Dataset
import torchaudio
import py7zr
from glob import glob
import librosa as ls
import IPython.display as ipd
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from jiwer import wer, cer
import math
import os
from sklearn.model_selection import train_test_split
import random

random.seed(75)
np.random.seed(75)
torch.random.manual_seed(75)

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
TARGET_SR = 16000
WINDOW_SIZE = 0.02
WINDOW_STRIDE = 0.01
HOP_LENGHT = int(TARGET_SR * WINDOW_STRIDE)
N_FFT = int(TARGET_SR * WINDOW_SIZE)
WINDOW_LENGHT = N_FFT
# CHAR2IDX = {char:idx for idx, char in enumerate(['_', ' ', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'])}
CHAR2IDX = {char:idx for idx, char in enumerate(['_', " ", "а", "ә", "б", "в", "г", "ғ", "д", "е",
                                                 "ё", "ж", "з", "и", "й", "к", "қ", "л", "м", "н",
                                                 "ң", "о", "ө", "п", "р", "с", "т", "у", "ұ", "ү",
                                                 "ф", "х", "һ", "ц", "ч", "ш", "щ", "ъ", "ы", "і",
                                                 "ь", "э", "ю", "я"])}
IDX2CHAR = {idx:char for char, idx in CHAR2IDX.items()}

In [3]:
audio_paths = []
text = []
for audio_path in tqdm(glob('../input/nu-dataset/ISSAI_KSC_335RS/Audios/*.wav')):
    audio_paths.append(audio_path)
    text.append(open('../input/nu-dataset/ISSAI_KSC_335RS/Transcriptions/'+audio_path.split('/')[-1].replace('.wav', '.txt')).read())

df = pd.DataFrame({'audio_path': audio_paths, 'text': text})
df

In [4]:
train_df, dev_df = train_test_split(df, test_size=0.1, random_state=75)
train_df = train_df.reset_index(drop=True)
dev_df = dev_df.reset_index(drop=True)

train_df.shape, dev_df.shape

In [5]:
class AudioDataset(Dataset):
    def __init__(self, data_frame, transformation, chr2idx):
        self.audio_paths = data_frame.audio_path.to_list()
        self.chr2idx = chr2idx
        self.labels = data_frame.text.apply(lambda x: self.text_preprocess(x)).to_list()
#         self.device = device
        self.transformation = transformation
        
        
    def __len__(self):
        return len(self.audio_paths)
    
    
    def __getitem__(self, idx):
        audio_path = self.audio_paths[idx]
        text = self.labels[idx]
        signal, sr = torchaudio.load(audio_path)
        spect = self.audio_preprocess(signal)
#         transcript = self.text_preprocess(text)
        
        return spect, text

    
    def audio_preprocess(self, signal):
        if signal.shape[0]==0:
            signal = signal.squeeze()
        else:
            signal = signal.mean(axis=0)
            
        spect = self.transformation(signal)
        spect = torch.log1p(spect)
        
        return spect
    
    
    def text_preprocess(self, text):
        transcript = list(filter(None, [self.chr2idx.get(x) for x in list(text)]))
        return transcript

In [6]:
def batch_preprocessing(batch):
    batch = sorted(batch, key=lambda sample: sample[0].size(1), reverse=True)
    longest_sample = batch[0][0]
    freq_size = longest_sample.size(0)
    minibatch_size = len(batch)
    max_seqlength = longest_sample.size(1)
    inputs = torch.zeros(minibatch_size, 1, freq_size, max_seqlength)
    input_percentages = torch.FloatTensor(minibatch_size)
    target_sizes = torch.IntTensor(minibatch_size)
    targets = []
    for x in range(minibatch_size):
        sample = batch[x]
        tensor = sample[0]
        target = sample[1]
        seq_length = tensor.size(1)
        inputs[x][0].narrow(1, 0, seq_length).copy_(tensor)
        input_percentages[x] = seq_length / float(max_seqlength)
        target_sizes[x] = len(target)
        targets.extend(target)
    targets = torch.tensor(targets, dtype=torch.long)
    
    return inputs, targets, input_percentages, target_sizes


class AudioDataLoader(DataLoader):
    def __init__(self, *args, **kwargs):
        super(AudioDataLoader, self).__init__(*args, **kwargs)
        self.collate_fn = batch_preprocessing

In [7]:
spectogram = torchaudio.transforms.Spectrogram(n_fft=N_FFT,
                                              win_length=WINDOW_LENGHT,
                                              hop_length=HOP_LENGHT,
                                               window_fn=torch.hamming_window
                                              )

In [8]:
train_dataset = AudioDataset(train_df, spectogram, CHAR2IDX)
dev_dataset = AudioDataset(dev_df, spectogram, CHAR2IDX)

In [9]:
plt.imshow(train_dataset[10000][0].cpu())

In [10]:
class DeepSpeech(nn.Module):
    def __init__(self, n_feature, n_hidden, n_class, dropout = 0, max_clip_relu = 20):
        super(DeepSpeech, self).__init__()
        self.n_hidden = n_hidden
        self.fc_block = nn.Sequential(nn.Linear(n_feature, n_hidden),
                                     nn.Hardtanh(0, max_clip_relu),
                                     nn.Dropout(dropout),
                                     nn.Linear(n_hidden, n_hidden),
                                     nn.Hardtanh(0, max_clip_relu),
                                     nn.Dropout(dropout),
                                     nn.Linear(n_hidden, n_hidden),
                                     nn.Hardtanh(0, max_clip_relu),
                                     nn.Dropout(dropout))
        self.bi_rnn = nn.GRU(n_hidden, n_hidden, bidirectional=True, num_layers=1)
        self.out = nn.Sequential(nn.Linear(n_hidden, n_hidden),
                                     nn.Hardtanh(0, max_clip_relu),
                                     nn.Dropout(dropout),
                                 nn.Linear(n_hidden, n_class))
    
    def forward(self, x, input_sizes):
        x = x.permute(0, 1, 3, 2)
        output_sizes = input_sizes
        x = self.fc_block(x)
        x = x.squeeze(1)
        x = x.transpose(0, 1)
        x, _ = self.bi_rnn(x)
        x = x[:, :, :self.n_hidden] + x[:, :, self.n_hidden:]
        x = self.out(x)
        
        return x, output_sizes

In [11]:
class DeepSpeech2(nn.Module):
    def __init__(self, n_feature, n_hidden, n_class, dropout = 0, max_clip_relu = 20, n_rnn_layer=3):
        super(DeepSpeech2, self).__init__()
        self.n_hidden = n_hidden
        self.conv_block = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5)),
            nn.BatchNorm2d(32),
            nn.Hardtanh(0, 20, inplace=True),
            nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5)),
            nn.BatchNorm2d(32),
            nn.Hardtanh(0, 20, inplace=True)
        )
        
        rnn_input_size = int(math.floor((TARGET_SR * WINDOW_SIZE) / 2) + 1)
        rnn_input_size = int(math.floor(rnn_input_size + 2 * 20 - 41) / 2 + 1)
        rnn_input_size = int(math.floor(rnn_input_size + 2 * 10 - 21) / 2 + 1)
        rnn_input_size *= 32
        self.bi_rnn = nn.GRU(rnn_input_size, n_hidden, bidirectional=True, num_layers=n_rnn_layer)
        self.out = nn.Sequential(nn.Linear(n_hidden, n_class, bias=False))
    
    def forward(self, x, input_sizes):
        output_sizes = self.get_output_lenght(input_sizes)
        x = self.conv_block(x)
        x = x.view(x.size(0), x.size(1) * x.size(2), x.size(3))
        x = x.permute(2, 0, 1)
        x, _ = self.bi_rnn(x)
        x = x[:, :, :self.n_hidden] + x[:, :, self.n_hidden:]
        t, n, h = x.size(0), x.size(1), x.size(2)
        x = x.view(t*n, -1)
        x = self.out(x)
        x = x.view(t, n, -1)
        
        return x, output_sizes
    
    def get_output_lenght(self, input_lenght):
        seq_len = input_lenght
        for block in self.conv_block.modules():
            if type(block) == nn.modules.conv.Conv2d:
                seq_len = ((seq_len + 2 * block.padding[1] - block.dilation[1] * (block.kernel_size[1] - 1) - 1) // block.stride[1] + 1)
        return seq_len.int()

In [12]:
def greedy_decode(probas, idx2char, blank_idx=0):
    max_values, classes = torch.max(probas, dim=-1)
    texts = []
    for sequence in range(len(classes)):
        sequence_len = len(classes[sequence])
        text = ''
        for i in range(sequence_len):
            char = idx2char[classes[sequence][i].item()]
            if char != idx2char[blank_idx]:
                if i != 0 and char == idx2char[classes[sequence][i-1].item()]:
                    continue
                text += char
        texts.append(text)
        
    return texts

In [13]:
model = DeepSpeech2(161, 2048, len(IDX2CHAR), 0.2)
model.to(device)
model

In [14]:
train_loader = AudioDataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2)
dev_loader = AudioDataLoader(dev_dataset, batch_size=4, shuffle=False, num_workers=2) 

In [15]:
loss_fn = nn.CTCLoss()
optim = torch.optim.Adam(model.parameters())

train_history = {'loss' : [],
                'wer' : []}

dev_history = {'loss' : [],
                'wer' : [],
              'cer': []}

In [16]:
def get_ground_truth(y, target_sizes):
    texts = []
    idx = 0
    for size in target_sizes:
        text = ''
        for i in range(size.item()):
            text += IDX2CHAR[y[idx+i].item()]
            
        texts.append(text)
        idx += size
        
    return texts

In [None]:
for epoch in tqdm(range(20)):
    total_loss = 0
    wers = 0
    model.train()
    for step, (X, y, input_percentages, target_sizes) in enumerate(tqdm(train_loader)):
        optim.zero_grad()
        input_sizes = input_percentages.mul_(int(X.size(3))).int()
        X, y = X.to(device), y.to(device)
        preds, output_sizes = model(X, input_sizes)
        log_probs = nn.functional.log_softmax(preds, dim=-1)
        loss = loss_fn(log_probs, y, output_sizes, target_sizes)
        loss.backward()
        optim.step()
        total_loss += loss.item()
        ground_truth = get_ground_truth(y.cpu().detach(), target_sizes)
        decoded = greedy_decode(nn.functional.softmax(preds.cpu().detach(), dim=-1).transpose(1, 0), IDX2CHAR)
        wers+= wer(ground_truth, decoded)
        if step % 1000 == 0:
            print(f'WER: {round(wers/(step+1), 3)}')
            print(f'Decoded: {decoded[-1]}')
            print(f'Ground Truth: {ground_truth[-1]}')
    
    train_history['loss'].append(total_loss/len(train_loader))
    train_history['wer'].append(wers/len(train_loader))
    
    model.eval()
    total_loss = 0
    wers = 0
    cers = 0
    with torch.no_grad():
        for X, y, input_percentages, target_sizes in tqdm(dev_loader):
            input_sizes = input_percentages.mul_(int(X.size(3))).int()
            X, y = X.to(device), y.to(device)
            preds, output_sizes = model(X, input_sizes)
            log_probs = nn.functional.log_softmax(preds, dim=-1)
            probs = nn.functional.softmax(preds, dim=-1).cpu().detach()
            loss = loss_fn(log_probs, y, output_sizes, target_sizes)
            total_loss += loss.item()
            decoded = greedy_decode(probs.transpose(0, 1), IDX2CHAR)
            ground_truth = get_ground_truth(y.cpu().detach(), target_sizes)
            wers+= wer(ground_truth, decoded)
            cers = cer(ground_truth, decoded)
            
        dev_history['loss'].append(total_loss/len(dev_loader))
        dev_history['wer'].append(wers/len(dev_loader))
        dev_history['cer'].append(cers/len(dev_loader))
    
    print(f'Epoch: {epoch+1}')
    print(f"Train Loss: {train_history['loss'][-1]}, Train Wer: {train_history['wer'][-1]}")
    print(f"Dev Loss: {dev_history['loss'][-1]}, Dev Wer: {dev_history['wer'][-1]}, Dev Cer: {dev_history['cer'][-1]}")
    print('Decoded:', decoded[:3])
    print('Ground Truth:', ground_truth[:3])
    torch.save({'model': model.state_dict(),
               'optimizer': optim.state_dict()},
               'model_epoch{}_wer{}.pth'.format(epoch, round(dev_history['wer'][-1], 3)))

In [None]:
torch.save(model, 'model.pth')

In [None]:
preds.size()

In [None]:
ground_truth