In [47]:
import sys
sys.path.append('../src')

In [48]:
sys.path

['/home/boris/Projects/Voice_Assistant_for_Voice_Anomaly_Persons/Multi-lingual Phoneme Recognition/notebooks',
 '/usr/lib/python310.zip',
 '/usr/lib/python3.10',
 '/usr/lib/python3.10/lib-dynload',
 '',
 '/home/boris/Projects/Voice_Assistant_for_Voice_Anomaly_Persons/venv/lib/python3.10/site-packages',
 '/tmp/tmprzh1u8oy',
 '../src',
 '../src']

In [49]:
from matplotlib import pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
import tqdm
import pytorch_lightning as pl
from torchinfo import summary

In [50]:
import json

In [51]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [52]:
f = open("/home/boris/Projects/Voice_Assistant_for_Voice_Anomaly_Persons/Multi-lingual Phoneme Recognition/models/processor/vocab.json")
phonemes: dict[str, int] = json.load(f)
f.close()

phonemes_reverse ={j:i for i, j in phonemes.items()}

In [53]:
params = {
    'batch_size': 4,
    'phonemes': 392
}

In [54]:
df = pd.read_csv('/home/boris/Projects/Project-On-Voice-Assistant/data/xls-r_dataset.csv').values.tolist()

train_df, test_df = train_test_split(df, test_size=0.1)
train_df, val_df = train_test_split(train_df)

In [55]:
train_loader = DataLoader(train_df, params["batch_size"], True)
val_loader = DataLoader(val_df, params["batch_size"], True)
test_loader = DataLoader(test_df, params["batch_size"], True)

Let see what is the type of batches

In [56]:
batch = next(iter(train_loader))
batch

[tensor([ 44, 243, 130, 380]),
 ('я очень высокий',
  'Централизованный объект',
  'Вода начала пузыриться',
  'Закажем пиццу?'),
 ('i a o l tʃ e n i v e s o k i',
  'ts e n k r e n d e z o v a n i a m b e j e k t',
  'd a n e tʃ i l a p o z e p i ts a',
  'n e o d a v l e t v a k r i t i l n i')]

so, we've `ids: list[int], texts: list['str'], phonemes: list[str]`

In [57]:
abc = "? абвгдеёжзийклмнопрстуфхшщчцьыъэюя"
def vectorize_str(labels: tuple[str]):
    lengths = torch.LongTensor(size=(len(labels),))
    letters = torch.zeros(size=(len(labels),  max(map(len, labels)), len(abc)), dtype=torch.double)
    
    for i, label in enumerate(labels):
        lengths[i] = len(label)
        j=0
        for c in label.lower():
            if not c in abc:
                lengths[i]-=1
            else:
                letters[i,j, abc.index(c)]=1
                # letters[i,j]= abc.index(c)
                j+=1
    
    return letters, lengths
def decode_str(X):
    r = []
    for i in range(X.shape[0]):
        s = []
        s1 = []
        for j in range(X.shape[1]):
            if X[i,j].argmax() >= 0:
                s.append(abc[X[i,j].argmax()])
            s1.append(abc[X[i,j,1:].argmax()+1])
        r.append(''.join(s)+'/'+''.join(s1))
    return r

In [58]:
k = vectorize_str(batch[1])[0]
decode_str(k)

['я очень высокий????????/я очень высокий        ',
 'централизованный объект/централизованный объект',
 'вода начала пузыриться?/вода начала пузыриться ',
 'закажем пиццу??????????/закажем пиццу          ']

In [59]:
def vectorize_phonemes(labels: tuple[str]):
    lengths = torch.LongTensor(size=(len(labels),))
    letters = torch.zeros(
        size=(
            len(labels),
            max(map(lambda x: len(x.split()), labels)),
            params['phonemes']
        ),
        dtype=torch.float
    )
    # letters = torch.zeros(size=(len(labels),  max(map(len, labels))), dtype=float)

    for i, label in enumerate(labels):
        lengths[i] = len(label.split())
        for j, c in enumerate(label.split()):
            letters[i,j, phonemes[c]]=1
    
    return letters, lengths
def decode_phonemes(X):
    r = []
    for i in range(X.shape[0]):
        s = []
        for j in range(X.shape[1]):
            if X[i,j].argmax() >= 0:
                s.append(phonemes_reverse[int(X[i,j].argmax())])
        r.append(''.join(s))
    return r

In [60]:
batch[2]

('i a o l tʃ e n i v e s o k i',
 'ts e n k r e n d e z o v a n i a m b e j e k t',
 'd a n e tʃ i l a p o z e p i ts a',
 'n e o d a v l e t v a k r i t i l n i')

In [61]:
decode_phonemes(vectorize_phonemes(batch[2])[0])

['iaoltʃenivesoki<pad><pad><pad><pad><pad><pad><pad><pad><pad>',
 'tsenkrendezovaniambejekt',
 'danetʃilapozepitsa<pad><pad><pad><pad><pad><pad><pad>',
 'neodavletvakritilni<pad><pad><pad><pad>']

Test of preprocess correctness

In [62]:
class Reshape(nn.Module):
    def __init__(self, *args) -> None:
        super().__init__()
        self.shape = args
    def forward(self, x):
        return x.reshape(*self.shape)

In [78]:
class Conv1DCorrector(pl.LightningModule):
    def __init__(self) -> None:
        super().__init__()
        self.conv = nn.Conv2d(
            1,
            33,
            (params['phonemes'], 3),
            padding=(0, 1)
        )
        self.lstm = nn.LSTM(33, 16)
        self.linear = nn.Linear(16, len(abc))

    def forward(self, x: torch.Tensor):
        # x: batch x len x phonemes
        x = x.permute(0,2,1)
        x = x.reshape(x.shape[0], 1, x.shape[1], x.shape[2])# x: batch x 1 x phonhemes x len
        x = self.conv(x)
        # x : batch x channels x 1 x len
        x = x[:,:,0,:].permute(0, 2, 1)
        x, (h, c) = self.lstm(x)
        x = self.linear(x)
        return F.log_softmax(x)
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters())
    def training_step(self, train_batch, batch_idx):
        labels, lengths = vectorize_str(train_batch[1])
        phonemes_vec, _ = vectorize_phonemes(train_batch[2])
        
        prediction = self(phonemes_vec.to(device))
        
        ilengths = lengths.clone()
        for i in range(prediction.shape[0]):
            ilengths[i] = prediction.shape[1]
        
        lv = F.ctc_loss(prediction.permute(1, 0, 2), labels.argmax(dim=-1), ilengths, lengths, zero_infinity=True)
        
        self.log('train_loss', lv, True)
        return lv
    def validation_step(self, train_batch, batch_idx):
        labels, lengths = vectorize_str(train_batch[1])
        phonemes_vec, _ = vectorize_phonemes(train_batch[2])
        
        prediction = self(phonemes_vec.to(device))
        
        ilengths = lengths.clone()
        for i in range(prediction.shape[0]):
            ilengths[i] = prediction.shape[1]
        
        lv = F.ctc_loss(prediction.permute(1, 0, 2), labels.argmax(dim=-1), ilengths, lengths, zero_infinity=True)
        
        self.log('val_loss', lv, True)


In [79]:
model = Conv1DCorrector()

In [80]:
db = vectorize_phonemes(next(iter(train_loader))[2])[0]
db.shape

torch.Size([4, 29, 392])

In [81]:
summary(model, input_data=db, batch_dim=0, verbose=2)

Layer (type:depth-idx)                   Output Shape              Param #
Conv1DCorrector                          [4, 29, 35]               --
├─Conv2d: 1-1                            [4, 33, 1, 29]            38,841
│    └─weight                                                      ├─38,808
│    └─bias                                                        └─33
├─LSTM: 1-2                              [1, 29, 16]               3,264
│    └─weight_ih_l0                                                ├─2,112
│    └─weight_hh_l0                                                ├─1,024
│    └─bias_ih_l0                                                  ├─64
│    └─bias_hh_l0                                                  └─64
├─Linear: 1-3                            [4, 29, 35]               595
│    └─weight                                                      ├─560
│    └─bias                                                        └─35
Total params: 42,700
Trainable params: 42,700
Non-

  return F.log_softmax(x)


Layer (type:depth-idx)                   Output Shape              Param #
Conv1DCorrector                          [4, 29, 35]               --
├─Conv2d: 1-1                            [4, 33, 1, 29]            38,841
│    └─weight                                                      ├─38,808
│    └─bias                                                        └─33
├─LSTM: 1-2                              [1, 29, 16]               3,264
│    └─weight_ih_l0                                                ├─2,112
│    └─weight_hh_l0                                                ├─1,024
│    └─bias_ih_l0                                                  ├─64
│    └─bias_hh_l0                                                  └─64
├─Linear: 1-3                            [4, 29, 35]               595
│    └─weight                                                      ├─560
│    └─bias                                                        └─35
Total params: 42,700
Trainable params: 42,700
Non-

In [82]:
b = next(iter(train_loader))

In [90]:
b

[tensor([569, 474, 205, 541]),
 ('Туалетная вода',
  'Подпишите мою петицию',
  'Тебе стоит собраться',
  'Чистый переулок'),
 ('ɑ l e t u d n a j a v a n d a',
  'n a k k r aː j a t u k ɔ k r a d s y n ɡ a l a v oː',
  'd ɨ n v ø d oː d θ ʌ v ʌ ɡ r a θ a',
  's e m e r f u n o d a ʃ i l ɑ s')]

In [84]:
model.training_step(b, None)

  return F.log_softmax(x)


tensor(1.0004, device='cuda:0', grad_fn=<MeanBackward0>)

In [85]:
from pytorch_lightning.loggers import TensorBoardLogger

logger = TensorBoardLogger("tb_logs", name="my_model")

trainer = pl.Trainer(max_epochs=40)
trainer.fit(model, train_loader, test_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name   | Type   | Params
----------------------------------
0 | conv   | Conv2d | 38.8 K
1 | lstm   | LSTM   | 3.3 K 
2 | linear | Linear | 595   
----------------------------------
42.7 K    Trainable params
0         Non-trainable params
42.7 K    Total params
0.171     Total estimated model params size (MB)


                                                                            

  return F.log_softmax(x)


Epoch 12: 100%|██████████| 129/129 [00:01<00:00, 74.79it/s, v_num=2, train_loss=0.000, val_loss=1.830]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")


In [87]:
phonemes_vec, _ = vectorize_phonemes(b[2])
prediction = model(phonemes_vec.to(device))
        

  return F.log_softmax(x)


In [88]:
prediction.shape

torch.Size([4, 25, 35])

In [89]:
print(*decode_str(prediction.permute(0,1,2)), sep='\n')

фйтюфюйюйтй?й?т??????????/фйтюфюйюйтйтййтооооооооеа
еъфе?о?еъфе???ьфффффффффъ/еъфеоооеъфеоооьфффффффффъ
????о?ф????о?ф?о?о???????/оооооофоооооофооооооооооо
тооо?т?жооэффээоо????????/тоооотожооэффээооооооооое


In [None]:
torch.save(model, '../models/corr1.pt')