In [1]:
import numpy as np
from datasets import load_dataset
import warnings

from torch import optim

warnings.filterwarnings("ignore")

In [None]:
datasets_tr = load_dataset("covost2", "tr_en", data_dir="Datasets/STT_Datasets/tr")

In [None]:
data = datasets_tr["train"][:1]

In [None]:
data

In [None]:
import librosa
import numpy as np
import torch

def audio_transformer(audio_path):
    audio, sr = librosa.load(audio_path, sr=16000)
    audio = librosa.util.normalize(audio)
    audio = librosa.feature.melspectrogram(audio)
    audio = librosa.power_to_db(audio, ref=np.max)
    tensor = torch.from_numpy(audio).T
    tensor = tensor.unsqueeze(0)
    return tensor

In [None]:
train_x_list,train_y_list=[],[]
for path,label in zip(data["file"],data["translation"]):
    train_x_list.append(audio_transformer(path))
    train_y_list.append(label_processing(label))

In [None]:
import torch.nn as nn
import torch.nn.functional as F

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size,layer_size, p):
        super(Encoder, self).__init__()
        
        self.drop = nn.Dropout(p)
        self.rnn = nn.LSTM(input_size, hidden_size,layer_size, batch_first=True, dropout=p)
    def forward(self, x):
        output = self.drop(x)
        output, hidden = self.rnn(output)
        return output, hidden

In [None]:
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        
        self.EO = nn.Linear(hidden_size, hidden_size)
        self.Hi = nn.Linear(hidden_size*2, hidden_size)
        self.Va = nn.Linear(hidden_size, 1)
    def forward(self, keys,query):
        hidden_state = query[0].permute(1,0,2)
        cell_state = query[1].permute(1,0,2)
        
        hidden_state = torch.cat((hidden_state, cell_state), dim=-1)
        scores = self.Va(torch.tanh(self.Hi(hidden_state) + self.EO(keys)))
        
        weights = F.softmax(scores, dim=1)
        context = torch.sum(torch.mul(weights,keys), dim=1).unsqueeze(1)
        
        return context

In [None]:
class Decoder(nn.Module):
    def __init__(self,hidden_size,output_size,layer_size,p):
        super(Decoder, self).__init__()
        
        self.drop = nn.Dropout(p)
        self.embedding = nn.Embedding(output_size,hidden_size)
        self.rnn = nn.LSTM(hidden_size*2, hidden_size,layer_size, batch_first=True, dropout=p)
        self.out = nn.Linear(hidden_size, output_size)
        
        self.attention = Attention(hidden_size)
        
    def forward(self,decoder_input, decoder_hidden, encoder_output):
        output = self.drop(self.embedding(decoder_input))
        
        context = self.attention(encoder_output, decoder_hidden)
        input_rnn = torch.cat((output, context), dim=-1)
        
        output, hidden = self.rnn(input_rnn,decoder_hidden)
        output = self.out(output)
        
        return output, hidden

In [None]:
class Model(nn.Module):
    def __init__(self, input_size, hidden_size,layer_size, output_size, p, max_length):
        super(Model, self).__init__()
        
        self.max_length = max_length
        self.drop = nn.Dropout(p)
        self.encoder = Encoder(input_size, hidden_size,layer_size,p)
        self.decoder = Decoder(hidden_size,output_size,layer_size,p)
        
    def forward(self,x,target=None):
        encoder_output,encoder_hidden = self.encoder(x)
        
        decoder_outputs = []
        
        decoder_input = torch.empty(x.size(0),1,dtype=torch.long).fill_(SOS_token)
        decoder_hidden = encoder_hidden
        
        if target is not None:
            max_length = target.size(1)
        else:
            max_length = self.max_length
        
        for i in range(max_length):
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden, encoder_output)
            decoder_outputs.append(decoder_output)
            if target is not None:
                decoder_input = target[:, i].unsqueeze(1)
            else:
                topv,topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(1).detach()
            
                if decoder_input == EOS_token:
                    break

        decoder_outputs = torch.cat(decoder_outputs,1)
        decoder_outputs = F.log_softmax(decoder_outputs,dim=1)
        return decoder_outputs

In [None]:
model = Model(128,256,1,len(alphabet)+2,0.2,100)

In [None]:
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=4e-5,weight_decay=1e-5)
epochs = 1000

In [None]:
for e in range(epochs):
    for i,(x,y) in enumerate(zip(train_x_list,train_y_list)):
        optimizer.zero_grad()
        output = model(x,y).squeeze(0)
        loss = criterion(output,y.squeeze(0))
        loss.backward()
        optimizer.step()
        prediction = ""
        for i in range(len(output)):
            value = torch.argmax(output[i])
            prediction += alphabet[value-2]
        print(loss.item(),prediction)

In [None]:
len(data["translation"][0]) , data["translation"][0]

In [None]:
train_y_list[0].size(1)

In [None]:
pred = model(train_x_list[0])
pred.shape

In [None]:
prediction