<a href="https://colab.research.google.com/github/Lenaami/SpeechSynthesisLabs/blob/main/SS_lab4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
path = '/content/gdrive/My Drive/Colab Notebooks/Синтез речи/'

In [4]:
book1 = 'r_hod.Result.xml'
book2 = 'tropa.Result.xml' #самый большой
book3 = 'whtguard.Result.xml'

# Парсинг xml

In [5]:
import xml.etree.ElementTree as ET 
import numpy as np
import re

In [6]:
def isNone(a):
    return int(a) if a is not None else -1

In [7]:
# Создание словарей

allophones = ['']

books = [book1, book2, book3]

for book in books:
    tree = ET.parse(path + book)
    root = tree.getroot()

    for snt in root.findall('sentence'):
        for feat in snt:
            if feat.tag == 'word':         
                for lt in feat:
                    if lt.tag == 'allophone': 
                        allophones.append(lt.get('ph'))    


allophones = set(allophones)
allophones_dict = {ph:i for i,ph in enumerate(allophones)}
num_to_allophones = {i:ph for i,ph in enumerate(allophones)}

In [8]:
def get_data(file):  
    X_mfcc = [] 
    y_mfcc = []

    tree = ET.parse(file)
    root = tree.getroot()

    tags = ['word', 'pause']

    for snt in root.findall('sentence'):
        features = []
        feat_wrd = []

        features_ph = []
        features_ft_stc = []
        allophone_stc = []

        allph_wrd = []

        pause_pred = 0
        count = 0
        count_ph = 0
        for feat in snt:

            if feat.tag not in tags:
                continue

            if feat.tag == 'word':
                if pause_pred:
                    feat_wrd.append(1) # пауза до слова
                    feat_wrd.append(0) # пауза после слова (предв.)
                    feat_wrd.append(0) # длительность паузы
                    pause_pred = 0
                else:
                    feat_wrd.append(0)
                    feat_wrd.append(0)
                    feat_wrd.append(0)

                dct = feat.find('dictitem')
                feat_wrd.append(isNone(dct.get('stress_dict'))) # ударение
                feat_wrd.append(-1) # ударение (предв.)
                
                allph_wrd = [''] # нет фонемы до слова
                features_ph = []

                for lt in feat:                    
 
                    if lt.tag == 'allophone':                        
                        allph_wrd.append(lt.get('ph'))
                        feat_ph = []
                        feat_ph.append(isNone(lt.get('FO_INIT'))) # частота основного тона
                        for i in range(1, 4):
                            feat_ph.append(isNone(lt.get('FO' + str(i))))  # частота основного тона
                        feat_ph.extend(np.asarray(lt.get('En').split('|')[1:-1], dtype='int')) # энергия
                        feat_ph.append(len(allph_wrd) - 2) # позиция в слове
                        feat_ph.append(count_ph) # позиция в предложении
                        count_ph += 1

                        features_ph.append(feat_ph)

                        y_mfcc.append(np.asarray(lt.get('mfcc').split('|')[1:-1], dtype='float')) # значения MFCC для аллофона


                    if lt.tag == 'stress':
                        feat_wrd[-1] = len(allph_wrd) # позиция ударной фонемы                    

                allph_wrd.append('') # нет фонемы после слова
                feat_wrd.append(count) # позиция слова в предложении
                count += 1 # подсчет слов              
               
                features.append(feat_wrd)
                features_ft_stc.append(features_ph)
                allophone_stc.append(allph_wrd)
                feat_wrd = []

            if feat.tag == 'pause':
                features[-1][1] = 1 # пауза после слова
                features[-1][2] = isNone(feat.get('time')) # время паузы
                pause_pred = 1             

        # объединение 
        for i in range(count):
            for l in range(len(allophone_stc[i]) - 2):
                feat = [allophone_stc[i][l+1], allophone_stc[i][l], allophone_stc[i][l+2]] # текущая фонема, перед и после
                feat = [allophones_dict.get(s) for s in feat] # перевод из буквы в цифру
                feat.extend(features_ft_stc[i][l])
                feat.extend(features[i])
                feat.append(count)

                X_mfcc.append(feat)
       
    return np.asarray(X_mfcc), np.asarray(y_mfcc)

In [9]:
X_train_1, y_train_1 = get_data(path + book2)

In [10]:
X_train_2, y_train_2 = get_data(path + book3)

In [11]:
# Объединение корпусов (2 и 3)

X_train = np.vstack((X_train_1, X_train_2))
y_train = np.vstack((y_train_1, y_train_2))

In [12]:
# Тестовый корпус (1)

X_test, y_test = get_data(path + book1)

# Предсказание MFCC вектора

## GAN

In [16]:
import torch
from torch import nn
import torch.optim as optim

from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [17]:
class Generator(nn.Module):
    def __init__(self, latent_dim, layers, output_activation=None):
        super(Generator, self).__init__()
        self.latent_dim = latent_dim
        self.output_activation = output_activation
        self._init_layers(layers)

    def _init_layers(self, layers):
        self.module_list = nn.ModuleList()
        last_layer = self.latent_dim
        for index, width in enumerate(layers):
            self.module_list.append(nn.Linear(last_layer, width))
            last_layer = width
            if index + 1 != len(layers):
                self.module_list.append(nn.LeakyReLU())

    def forward(self, input_tensor):
        intermediate = input_tensor
        for layer in self.module_list:
            intermediate = layer(intermediate)
        return intermediate

In [18]:
class Discriminator(nn.Module):
    def __init__(self, input_dim, layers):
        super(Discriminator, self).__init__()
        self.input_dim = input_dim
        self._init_layers(layers)

    def _init_layers(self, layers):
        self.module_list = nn.ModuleList()
        last_layer = self.input_dim
        for index, width in enumerate(layers):
            self.module_list.append(nn.Linear(last_layer, width))
            last_layer = width
            if index + 1 != len(layers):
                self.module_list.append(nn.LeakyReLU())
        else:
            self.module_list.append(nn.Sigmoid())

    def forward(self, input_tensor):
        intermediate = input_tensor
        for layer in self.module_list:
            intermediate = layer(intermediate)
        return intermediate        

In [19]:
class VanillaGAN():
    def __init__(self, generator, discriminator, batch_size=32, device='cpu', lr_d=1e-3, lr_g=2e-4):

        self.generator = generator
        self.generator = self.generator.to(device)
        self.discriminator = discriminator
        self.discriminator = self.discriminator.to(device)
        self.batch_size = batch_size
        self.device = device
        self.criterion = nn.BCELoss()
        self.optim_d = optim.Adam(discriminator.parameters(), lr=lr_d, betas=(0.5, 0.999))
        self.optim_g = optim.Adam(generator.parameters(), lr=lr_g, betas=(0.5, 0.999))
        self.target_ones = torch.ones((batch_size, 1)).to(device)
        self.target_zeros = torch.zeros((batch_size, 1)).to(device)

    def generate_samples(self, latent_vec=None, num=None):
        with torch.no_grad():
            samples = self.generator(latent_vec)
        return samples

    def train_step_generator(self, latent_vec):
        self.generator.zero_grad()

        generated = self.generator(latent_vec)
        classifications = self.discriminator(generated)
        loss = self.criterion(classifications, self.target_ones)
        loss.backward()
        self.optim_g.step()
        return loss.item()

    def train_step_discriminator(self, latent_vec, real_samples):
        self.discriminator.zero_grad()

        # real samples
        pred_real = self.discriminator(real_samples)
        loss_real = self.criterion(pred_real, self.target_ones)

        # generated samples
        with torch.no_grad():
            fake_samples = self.generator(latent_vec)
        pred_fake = self.discriminator(fake_samples)
        loss_fake = self.criterion(pred_fake, self.target_zeros)

        # combine
        loss = (loss_real + loss_fake) / 2
        loss.backward()
        self.optim_d.step()
        return loss_real.item(), loss_fake.item()

    def train_step(self, latent_vec, real_samples):
        loss_d = self.train_step_discriminator(latent_vec, real_samples)
        loss_g = self.train_step_generator(latent_vec)
        return loss_g, loss_d

In [20]:
class PhonemeDataset(Dataset):
    def __init__(self, x, y=None):
        self.x = x
        self.y = y
    
    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        features = self.x[idx]
        if self.y is not None:
            return features, self.y[idx]
        return features

In [21]:
batch_size = 32

In [22]:
train_loader = DataLoader(PhonemeDataset(X_train[:(-1)*(len(y_train) % batch_size)], y_train[:(-1)*(len(y_train) % batch_size)]), batch_size=batch_size)
#test_loader = DataLoader(PhonemeDataset(X_test[:(-1)*(len(y_test) % 32)], y_test[:(-1)*(len(y_test) % 32)]), batch_size=batch_size)

In [23]:
test_loader = DataLoader(PhonemeDataset(X_test, y_test), batch_size=1)

In [110]:
len(y_test[:(-1)*(len(y_test) % 32)])

6560

In [None]:
for wavs, labs in train_loader:
    print(wavs)
    #wavs, labs = wavs.cuda(), labs.detach().numpy()
 
    #outputs = outputs.detach().cpu().numpy().argmax(axis=1)
    #forecast.append(outputs)


In [28]:
from time import time

epochs = 20
##batches = 100
generator = Generator(19, [64, 32, 12]) # X_train.shape[1] , y_train.shape[1]
discriminator = Discriminator(12, [64, 32, 1])
##noise_fn = lambda x: torch.rand((x, 19), device='cuda') ######## < --------------
##data_fn = lambda x: torch.randn((x, 12), device='cuda') ######## < --------------
##gan = VanillaGAN(generator, discriminator, noise_fn, data_fn, device='cuda')
gan = VanillaGAN(generator, discriminator, batch_size, device='cuda')
loss_g, loss_d_real, loss_d_fake = [], [], []
start = time()
for epoch in range(epochs):
    count_batches = 0
    loss_g_running, loss_d_real_running, loss_d_fake_running = 0, 0, 0
    #for batch in range(batches):
    for features, labels in train_loader:
        features, labels = features.cuda(), labels.cuda()
        lg_, (ldr_, ldf_) = gan.train_step(features.float(), labels.float())
        loss_g_running += lg_
        loss_d_real_running += ldr_
        loss_d_fake_running += ldf_
        count_batches += 1
    loss_g.append(loss_g_running / count_batches)
    loss_d_real.append(loss_d_real_running / count_batches)
    loss_d_fake.append(loss_d_fake_running / count_batches)
    print(f"Epoch {epoch+1}/{epochs} ({int(time() - start)}s):"
          f" G={loss_g[-1]:.3f},"
          f" Dr={loss_d_real[-1]:.3f},"
          f" Df={loss_d_fake[-1]:.3f}")

Epoch 1/20 (171s): G=4.506, Dr=0.368, Df=0.352
Epoch 2/20 (343s): G=0.962, Dr=0.627, Df=0.583
Epoch 3/20 (514s): G=0.983, Dr=0.618, Df=0.563
Epoch 4/20 (685s): G=0.949, Dr=0.624, Df=0.571
Epoch 5/20 (855s): G=0.988, Dr=0.613, Df=0.552
Epoch 6/20 (1025s): G=0.971, Dr=0.618, Df=0.563
Epoch 7/20 (1195s): G=0.960, Dr=0.622, Df=0.571
Epoch 8/20 (1367s): G=0.971, Dr=0.618, Df=0.560
Epoch 9/20 (1536s): G=0.960, Dr=0.621, Df=0.567
Epoch 10/20 (1706s): G=0.908, Dr=0.636, Df=0.590
Epoch 11/20 (1874s): G=0.931, Dr=0.629, Df=0.577
Epoch 12/20 (2044s): G=0.919, Dr=0.632, Df=0.582
Epoch 13/20 (2213s): G=0.933, Dr=0.629, Df=0.578
Epoch 14/20 (2383s): G=0.932, Dr=0.630, Df=0.582
Epoch 15/20 (2554s): G=0.940, Dr=0.625, Df=0.573
Epoch 16/20 (2724s): G=0.919, Dr=0.632, Df=0.581
Epoch 17/20 (2895s): G=0.902, Dr=0.636, Df=0.588
Epoch 18/20 (3064s): G=0.895, Dr=0.638, Df=0.593
Epoch 19/20 (3234s): G=0.899, Dr=0.637, Df=0.595
Epoch 20/20 (3403s): G=0.910, Dr=0.635, Df=0.587


In [24]:
from time import time

epochs = 20
##batches = 100
generator = Generator(19, [64, 32, 12]) # X_train.shape[1] , y_train.shape[1]
discriminator = Discriminator(12, [64, 32, 1])
##noise_fn = lambda x: torch.rand((x, 19), device='cuda') ######## < --------------
##data_fn = lambda x: torch.randn((x, 12), device='cuda') ######## < --------------
##gan = VanillaGAN(generator, discriminator, noise_fn, data_fn, device='cuda')
gan = VanillaGAN(generator, discriminator, batch_size, device='cuda')
loss_g, loss_d_real, loss_d_fake = [], [], []
start = time()
for epoch in range(epochs):
    count_batches = 0
    loss_g_running, loss_d_real_running, loss_d_fake_running = 0, 0, 0
    #for batch in range(batches):
    for features, labels in train_loader:
        features, labels = features.cuda(), labels.cuda()
        lg_, (ldr_, ldf_) = gan.train_step(features.float(), labels.float())
        loss_g_running += lg_
        loss_d_real_running += ldr_
        loss_d_fake_running += ldf_
        count_batches += 1
    loss_g.append(loss_g_running / count_batches)
    loss_d_real.append(loss_d_real_running / count_batches)
    loss_d_fake.append(loss_d_fake_running / count_batches)
    print(f"Epoch {epoch+1}/{epochs} ({int(time() - start)}s):"
          f" G={loss_g[-1]:.3f},"
          f" Dr={loss_d_real[-1]:.3f},"
          f" Df={loss_d_fake[-1]:.3f}")

Epoch 1/20 (178s): G=6.198, Dr=0.339, Df=0.331
Epoch 2/20 (354s): G=1.107, Dr=0.601, Df=0.536
Epoch 3/20 (530s): G=1.025, Dr=0.614, Df=0.558
Epoch 4/20 (706s): G=1.030, Dr=0.613, Df=0.553
Epoch 5/20 (884s): G=1.036, Dr=0.611, Df=0.551
Epoch 6/20 (1059s): G=1.010, Dr=0.614, Df=0.557
Epoch 7/20 (1236s): G=1.022, Dr=0.609, Df=0.549
Epoch 8/20 (1414s): G=0.986, Dr=0.620, Df=0.566
Epoch 9/20 (1595s): G=0.963, Dr=0.624, Df=0.576
Epoch 10/20 (1773s): G=0.984, Dr=0.619, Df=0.565
Epoch 11/20 (1952s): G=0.962, Dr=0.624, Df=0.572
Epoch 12/20 (2131s): G=0.943, Dr=0.627, Df=0.579
Epoch 13/20 (2309s): G=0.933, Dr=0.631, Df=0.584
Epoch 14/20 (2488s): G=0.899, Dr=0.642, Df=0.609
Epoch 15/20 (2667s): G=0.905, Dr=0.640, Df=0.600
Epoch 16/20 (2847s): G=0.898, Dr=0.642, Df=0.602
Epoch 17/20 (3027s): G=0.895, Dr=0.643, Df=0.605
Epoch 18/20 (3204s): G=0.890, Dr=0.645, Df=0.606
Epoch 19/20 (3384s): G=0.896, Dr=0.640, Df=0.598
Epoch 20/20 (3564s): G=0.952, Dr=0.629, Df=0.578


In [88]:
gan.generate_samples(torch.tensor((0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18), dtype=torch.float, device='cuda'), 1)

tensor([-15.2117,  34.0781,   7.5779,  -5.0084, -29.8542,   3.8091,  -1.9497,
        -13.0288, -14.4846,  25.4570, -18.9715,  15.6978], device='cuda:0')

In [25]:
y_true = []
y_pred = []

for features, labels in test_loader:
    features, labels = features.cuda(), labels.detach().numpy()
    outputs = gan.generate_samples(features.float())    
    outputs = outputs.detach().cpu().numpy()

    y_pred.append(outputs)
    y_true.append(labels)

show_metric(y_pred, y_true)


Euclidean distance:	1.906


In [26]:
print(y_true[0])
print(y_pred[0])

[[-0.67395527  1.03842558 -0.46609802 -0.26750833 -0.08744055 -0.55251648
   0.06810237  0.31983177 -0.2124693   0.25121132  0.09961122  0.02914081]]
[[-2.8735523   2.3975954   0.22248186  0.6811306  -0.6667562  -0.13630494
  -0.38938317  0.5198267  -0.50991833  0.11009459 -0.21513316 -1.0195515 ]]


## Классификаторы

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [13]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import LinearRegression

from sklearn.model_selection import cross_val_score
from sklearn import metrics
import matplotlib.pyplot as plt
import pandas as pd

from numpy import linalg

In [14]:
import numpy as np
from sklearn import linear_model
from sklearn import svm

classifiers = [
    svm.SVR(),
    linear_model.SGDRegressor(),
    linear_model.BayesianRidge(),
    linear_model.LassoLars(),
    linear_model.ARDRegression(),
    linear_model.PassiveAggressiveRegressor(),
    linear_model.TheilSenRegressor(),
    linear_model.LinearRegression()]

In [54]:
a = np.array([[1, 1, 1], [2, 2, 2]])
print(a)
print(a.T)

[[1 1 1]
 [2 2 2]]
[[1 2]
 [1 2]
 [1 2]]


In [41]:
#def fit_clf(clf, X_train, y_train):
def fit_clf(X_train, y_train):
    
    n_clf = y_train.shape[1]
    clfs = []
    for i in range(n_clf):
        clfs.append(LinearRegression())
    
    for i in range(n_clf):
        clfs[i].fit(X_train, y_train[:, i])
    
    return clfs

def predict_clf(clfs, X_test, y_test, metric='micro'):
    
    y_pred = []

    for i in range(len(clfs)):
        y_pred.append(clfs[i].predict(X_test))

    #print(len(y_pred), len(y_pred[0]))
    y_pred = np.asarray(y_pred).T
    #print(y_pred[0])

    show_metric(y_pred, y_test)

    #print('\nConfusion matrix:\n', pd.DataFrame(metrics.confusion_matrix(y_test, y_pred)))

    #metrics.plot_confusion_matrix(clf, X_test, y_test, values_format='d')
    #plt.show()

    return y_pred


def show_metric(y_pred, y_true):



    L2_score = []

    for i in range(len(y_pred)):
        L2_score.append(np.linalg.norm(y_pred[i] - y_true[i]))

    #print(pred, true)

    #print(metrics.classification_report(true, pred))

    #lev_score.append(Levenshtein.distance(out, true))
    

    #print('F1-score (%s):\t%0.3f' % (metric, metrics.f1_score(y_true, y_pred, average=metric)))
    #print('F1-score (%s):\t%0.3f' % (metric, metrics.f1_score(y_true, y_pred, average=metric)))
    #print('Recall (%s): \t%0.3f' % (metric, metrics.recall_score(y_true, y_pred, average=metric)))
    #print('Precision (%s):\t%0.3f' % (metric,metrics.precision_score(y_true, y_pred, average=metric)))
    #print('Accuracy:\t\t%0.3f' % metrics.accuracy_score(y_true, y_pred))
    print('\nEuclidean distance:\t%0.3f' % np.mean(L2_score))

    # np.linalg.norm(x-y)


In [43]:
#lr = LinearRegression()

lrs = fit_clf(X_train, y_train)
y_pred = predict_clf(lrs, X_test, y_test) #'weighted'


Euclidean distance:	1.144


In [47]:
print(y_test[0])
print(y_pred[0])

[-0.67395527  1.03842558 -0.46609802 -0.26750833 -0.08744055 -0.55251648
  0.06810237  0.31983177 -0.2124693   0.25121132  0.09961122  0.02914081]
[-1.52776019 -0.41384362 -0.43839239  0.56508074 -0.23214492 -0.48060697
 -0.27259143  0.04865441 -0.20263612  0.11753622 -0.29548488 -0.14340339]


In [65]:
from sklearn import linear_model
from sklearn import svm

In [67]:
svc = linear_model.SGDRegressor()

svcs = fit_clf(svc, X_train, y_train)
y_pred = predict_clf(svcs, X_test, y_test) #'weighted'


Euclidean distance:	557722936138315.750
