In [19]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import re
from collections import Counter
import numpy as np

# Устройство (CPU или GPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Гиперпараметры
BATCH_SIZE = 64
EMBEDDING_DIM = 128
HIDDEN_SIZE = 256
NUM_EPOCHS = 20
LEARNING_RATE = 0.001

# Функция для чтения ru.dic
def read_ru_dic(file_path):
    word_to_phonemes = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            match = re.match(r'^(.+?)\s+(.+)$', line)
            if match:
                word_with_number = match.group(1)
                transcription = match.group(2)
                word_to_phonemes[word_with_number] = transcription.split()
            else:
                print(f"Строка пропущена: {line}")
    return word_to_phonemes

# Функция для чтения ru_emphasize.dict
def read_ru_emphasize_dict(file_path):
    word_to_stressed_word = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split('|')
            if len(parts) == 2:
                word_with_number = parts[0]
                stressed_word = parts[1]
                word_to_stressed_word[word_with_number] = stressed_word
            else:
                print(f"Строка пропущена: {line}")
    return word_to_stressed_word

# Считываем данные из файлов
word_to_phonemes = read_ru_dic('ru.dic')
word_to_stressed_word = read_ru_emphasize_dict('ru_emphasize.dict')

# Создаем датасет
data = []
for word_with_number, phoneme_seq in word_to_phonemes.items():
    # Получаем слово с ударением (если есть)
    input_word = word_to_stressed_word.get(word_with_number, word_with_number)
    data.append((input_word, phoneme_seq))
    
# Учет буквы 'ё' как отдельного символа, всегда ударного
for idx, (input_word, phoneme_seq) in enumerate(data):
    # Если в слове есть 'ё', добавляем '+' перед ней
    if 'ё' in input_word:
        input_word = input_word.replace('ё', '+ё')
        data[idx] = (input_word, phoneme_seq)
        
# Создание словаря символов (включая '+')
char_counter = Counter()
for input_word, _ in data:
    char_counter.update(list(input_word))

char_vocab = {char: idx+1 for idx, (char, _) in enumerate(char_counter.most_common())}
char_vocab['<PAD>'] = 0
char_vocab['<SOS>'] = len(char_vocab)
char_vocab['<EOS>'] = len(char_vocab)
char_vocab['<UNK>'] = len(char_vocab)

idx2char = {idx: char for char, idx in char_vocab.items()}

# Создание словаря фонем
phoneme_counter = Counter()
for _, phoneme_seq in data:
    phoneme_counter.update(phoneme_seq)

phoneme_vocab = {ph: idx+1 for idx, (ph, _) in enumerate(phoneme_counter.most_common())}
phoneme_vocab['<PAD>'] = 0
phoneme_vocab['<SOS>'] = len(phoneme_vocab)
phoneme_vocab['<EOS>'] = len(phoneme_vocab)
phoneme_vocab['<UNK>'] = len(phoneme_vocab)

idx2phoneme = {idx: ph for ph, idx in phoneme_vocab.items()}

# Функции для кодирования
def encode_word(word):
    return [char_vocab.get(c, char_vocab['<UNK>']) for c in word]

def encode_phonemes(phoneme_seq):
    return [phoneme_vocab.get(ph, phoneme_vocab['<UNK>']) for ph in phoneme_seq]

# Кодируем данные
encoded_words = [encode_word(input_word) for input_word, _ in data]
encoded_phonemes = [encode_phonemes(phoneme_seq) for _, phoneme_seq in data]


In [20]:
class G2PDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

        self.input_lengths = [len(seq) for seq in inputs]
        self.target_lengths = [len(seq) for seq in targets]

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return (self.inputs[idx], self.targets[idx],
                self.input_lengths[idx], self.target_lengths[idx])

def collate_fn(batch):
    inputs, targets, input_lengths, target_lengths = zip(*batch)

    max_input_len = max(input_lengths)
    max_target_len = max(target_lengths)

    padded_inputs = [seq + [char_vocab['<PAD>']] * (max_input_len - len(seq)) for seq in inputs]
    padded_targets = [[phoneme_vocab['<SOS>']] + seq + [phoneme_vocab['<EOS>']] + [phoneme_vocab['<PAD>']] * (max_target_len - len(seq)) for seq in targets]

    return (torch.tensor(padded_inputs, dtype=torch.long),
            torch.tensor(padded_targets, dtype=torch.long),
            torch.tensor(input_lengths, dtype=torch.long),
            torch.tensor([len(seq)+2 for seq in targets], dtype=torch.long))  # +2 для <SOS> и <EOS>

dataset = G2PDataset(encoded_words, encoded_phonemes)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)


In [21]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, embedding_dim, padding_idx=char_vocab['<PAD>'])
        self.lstm = nn.LSTM(embedding_dim, hidden_size, batch_first=True)

    def forward(self, input_seqs, input_lengths):
        embedded = self.embedding(input_seqs)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, input_lengths.cpu(), batch_first=True, enforce_sorted=False)
        outputs, (hidden, cell) = self.lstm(packed)
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        return outputs, (hidden, cell)

class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(output_size, embedding_dim, padding_idx=phoneme_vocab['<PAD>'])
        self.lstm = nn.LSTM(embedding_dim, hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)

    def forward(self, input_seqs, hidden):
        embedded = self.embedding(input_seqs)
        output, hidden = self.lstm(embedded, hidden)
        output = self.out(output)
        return output, hidden


In [22]:
# Функция для сохранения чекпойнта
def save_checkpoint(encoder, decoder, encoder_optimizer, decoder_optimizer, epoch, loss, filepath):
    checkpoint = {
        'encoder_state_dict': encoder.state_dict(),
        'decoder_state_dict': decoder.state_dict(),
        'encoder_optimizer_state_dict': encoder_optimizer.state_dict(),
        'decoder_optimizer_state_dict': decoder_optimizer.state_dict(),
        'epoch': epoch,
        'loss': loss
    }
    torch.save(checkpoint, filepath)
    print(f"Модель сохранена в {filepath}")

# Функция для загрузки чекпойнта
def load_checkpoint(encoder, decoder, encoder_optimizer, decoder_optimizer, filepath):
    checkpoint = torch.load(filepath, map_location=device)
    encoder.load_state_dict(checkpoint['encoder_state_dict'])
    decoder.load_state_dict(checkpoint['decoder_state_dict'])
    encoder_optimizer.load_state_dict(checkpoint['encoder_optimizer_state_dict'])
    decoder_optimizer.load_state_dict(checkpoint['decoder_optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    print(f"Модель загружена из {filepath}, Эпоха: {epoch}, Ошибка: {loss:.4f}")
    return epoch, loss


In [23]:
encoder = Encoder(len(char_vocab), EMBEDDING_DIM, HIDDEN_SIZE).to(device)
decoder = Decoder(len(phoneme_vocab), EMBEDDING_DIM, HIDDEN_SIZE).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=phoneme_vocab['<PAD>'])
encoder_optimizer = optim.Adam(encoder.parameters(), lr=LEARNING_RATE)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=LEARNING_RATE)


In [51]:
for epoch in range(NUM_EPOCHS):
    encoder.train()
    decoder.train()

    total_loss = 0
    for batch in dataloader:
        input_seqs, target_seqs, input_lengths, target_lengths = batch
        input_seqs = input_seqs.to(device)
        target_seqs = target_seqs.to(device)

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        encoder_outputs, encoder_hidden = encoder(input_seqs, input_lengths)

        # Подготовка входов для декодера
        decoder_input = target_seqs[:, :-1]  # Убираем <EOS>
        decoder_target = target_seqs[:, 1:]  # Убираем <SOS>

        # Обучение с принудительным учителем (teacher forcing)
        decoder_outputs, _ = decoder(decoder_input, encoder_hidden)

        # Вычисление ошибки
        loss = criterion(decoder_outputs.reshape(-1, decoder_outputs.size(-1)),
                         decoder_target.reshape(-1))

        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f'Эпоха [{epoch+1}/{NUM_EPOCHS}], Ошибка: {avg_loss:.4f}')


Эпоха [1/20], Ошибка: 0.0166
Эпоха [2/20], Ошибка: 0.0166
Эпоха [3/20], Ошибка: 0.0166
Эпоха [4/20], Ошибка: 0.0164
Эпоха [5/20], Ошибка: 0.0165
Эпоха [6/20], Ошибка: 0.0165
Эпоха [7/20], Ошибка: 0.0164
Эпоха [8/20], Ошибка: 0.0165
Эпоха [9/20], Ошибка: 0.0164
Эпоха [10/20], Ошибка: 0.0162
Эпоха [11/20], Ошибка: 0.0164
Эпоха [12/20], Ошибка: 0.0160
Эпоха [13/20], Ошибка: 0.0162
Эпоха [14/20], Ошибка: 0.0161
Эпоха [15/20], Ошибка: 0.0160
Эпоха [16/20], Ошибка: 0.0161
Эпоха [17/20], Ошибка: 0.0162
Эпоха [18/20], Ошибка: 0.0161
Эпоха [19/20], Ошибка: 0.0162
Эпоха [20/20], Ошибка: 0.0160


In [52]:
# После завершения эпохи обучения
save_checkpoint(encoder, decoder, encoder_optimizer, decoder_optimizer, epoch+1, avg_loss, 'g2p_v2_e60')


Модель сохранена в g2p_v2_e60


In [65]:
load_checkpoint(encoder, decoder, encoder_optimizer, decoder_optimizer, 'g2p_v2_e40')

Модель загружена из g2p_v2_e40, Эпоха: 20, Ошибка: 0.0166


(20, 0.01655644970238751)

In [69]:
def predict(word):
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        input_seq = torch.tensor([encode_word(word)], dtype=torch.long).to(device)
        input_length = torch.tensor([len(input_seq[0])], dtype=torch.long).to(device)

        encoder_outputs, encoder_hidden = encoder(input_seq, input_length)

        decoder_input = torch.tensor([[phoneme_vocab['<SOS>']]], dtype=torch.long).to(device)
        decoder_hidden = encoder_hidden

        decoded_phonemes = []

        for _ in range(50):  # Максимальная длина последовательности фонем
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            topv, topi = decoder_output.topk(1)
            phoneme_idx = topi.item()
            if phoneme_idx == phoneme_vocab['<EOS>'] or phoneme_idx == phoneme_vocab['<PAD>']:
                break
            else:
                decoded_phonemes.append(idx2phoneme[phoneme_idx])

            decoder_input = torch.tensor([[phoneme_idx]], dtype=torch.long).to(device)

        return ' '.join(decoded_phonemes)

# Пример использования
test_words = ['хуй', 'пизда', 'п+езда', 'пизд+а', 'джигурда', 'джигурд+а', 'говножуй', 'говн+о','уёбок', 'уебище','уебан', 'уёбище']
for test_word in test_words:
    predicted_phonemes = predict(test_word)
    print(f'Слово: {test_word}, Предсказанные фонемы: {predicted_phonemes}')


Слово: хуй, Предсказанные фонемы: h u0 j
Слово: пизда, Предсказанные фонемы: pj i0 z d a0
Слово: п+езда, Предсказанные фонемы: pj e1 z d a0
Слово: пизд+а, Предсказанные фонемы: pj i0 z d a0
Слово: джигурда, Предсказанные фонемы: d zh i1 g u0 r d a0
Слово: джигурд+а, Предсказанные фонемы: d zh i1 g u0 r d a0
Слово: говножуй, Предсказанные фонемы: g o0 v n o0 zh u1 j
Слово: говн+о, Предсказанные фонемы: g o1 v n o0
Слово: уёбок, Предсказанные фонемы: u0 j o1 b o0 k
Слово: уебище, Предсказанные фонемы: u0 j e1 bj i0 sch e0
Слово: уебан, Предсказанные фонемы: u0 j e1 b a0 n
Слово: уёбище, Предсказанные фонемы: u0 j o1 bj i0 sch e0


## форматируем файл метадаты с применением этой g2p модели

In [178]:
import re

def process_line(line, predict_function, special_space="<space>"):
    """
    Обрабатывает одну строку из metadata.csv и преобразует текст в фонемную транскрипцию,
    сохраняя пунктуацию и заменяя пробелы между словами на специальный символ.

    Args:
        line (str): Строка из metadata.csv в формате 'filename|text'.
        predict_function (function): Функция для преобразования слова в фонемы, например, predict(word).
        special_space (str): Специальный символ для замены пробелов между словами.

    Returns:
        str: Новая строка в формате 'filename|phoneme1 phoneme2 <\space> phoneme3 ...'.
    """
    # Удаляем пробельные символы в начале и конце строки
    line = line.strip()

    # Проверяем наличие вертикальной черты '|'
    if '|' not in line:
        print(f"Строка пропущена: нет символа '|' в строке: {line}")
        return None

    # Разделяем строку на имя файла и текст
    filename, text = line.split('|', 1)

    # Регулярное выражение для разделения слов (включая '+') и пунктуации
    # Это выражение захватывает слова, которые могут содержать '+', а также отдельные знаки пунктуации
    tokens = re.findall(r'\w+\+\w+|\w+|[.,;:!?«»“”…()\[\]{}\-]', text)

    processed_tokens = []
    for token in tokens:
        if re.match(r'[.,;:!?«»“”…()\[\]{}\-]', token):
            # Если токен - пунктуация, добавляем его как есть
            processed_tokens.append(token)
        else:
            # Иначе, токен - слово (может содержать '+')
            word_lower = token.lower()
            try:
                # Преобразуем слово в фонемную транскрипцию
                phoneme = predict_function(word_lower)
                if phoneme:
                    processed_tokens.append(phoneme)
                else:
                    print(f"Пустая транскрипция для слова '{word_lower}'. Используем '<UNK>'.")
                    processed_tokens.append('<UNK>')  # Используем <UNK> для неизвестных фонем
            except Exception as e:
                print(f"Ошибка при предсказании слова '{word_lower}': {e}. Используем '<UNK>'.")
                processed_tokens.append('<UNK>')

    # Теперь собираем новую строку с правильными разделителями
    final_tokens = []
    for i, token in enumerate(processed_tokens):
        final_tokens.append(token)
        if i < len(processed_tokens) - 1:
            next_token = processed_tokens[i + 1]
            # Определяем тип текущего и следующего токена
            current_is_punct = re.match(r'[.,;:!?«»“”…()\[\]{}\-]', token)
            next_is_punct = re.match(r'[.,;:!?«»“”…()\[\]{}\-]', next_token)

            if not current_is_punct and not next_is_punct:
                # Оба токена - слова, вставляем специальный символ с пробелами
                final_tokens.append(f" {special_space} ")
            elif not current_is_punct and next_is_punct:
                # Текущий токен - слово, следующий - пунктуация, вставляем пробел
                final_tokens.append(' ')
            elif current_is_punct and not next_is_punct:
                # Текущий токен - пунктуация, следующий - слово, вставляем пробел
                final_tokens.append(' ')
            elif current_is_punct and next_is_punct:
                # Оба токена - пунктуация, вставляем пробел
                final_tokens.append(' ')
            else:
                # В остальных случаях, вставляем пробел
                final_tokens.append(' ')

    # Объединяем токены в строку
    phoneme_text = ''.join(final_tokens)

    # Заменяем множественные пробелы на один
    phoneme_text = re.sub(r'\s+', ' ', phoneme_text)

    # Формируем новую строку
    new_line = f"{filename}|{phoneme_text}\n"

    return new_line


In [179]:
# Пример строки из metadata.csv
test_line = "000012_RUSLAN|Ведь представл+ения, которые он+о рождает, безграничны д+о нул+я."

# Обработка строки
processed_line = process_line(test_line, predict)

if processed_line:
    print("Исходная строка:")
    print(test_line)
    print("\nОбработанная строка:")
    print(processed_line)

Исходная строка:
000012_RUSLAN|Ведь представл+ения, которые он+о рождает, безграничны д+о нул+я.

Обработанная строка:
000012_RUSLAN|vj e1 dj <space> p rj e0 d s t a0 v lj e1 nj i0 j a0 , k o0 t o1 r y0 j e0 <space> o0 n o1 <space> r o0 zh d a1 j e0 t , bj e0 z g r a0 nj i1 ch n y0 <space> d o1 <space> n u0 lj a1 .



In [180]:
def convert_metadata_to_phonemes(input_metadata_path, output_metadata_phonemes_path, predict_function, special_space="<\\space>"):
    """
    Преобразует metadata.csv в metadata_phonemes.csv с фонемными транскрипциями.
    
    Args:
        input_metadata_path (str): Путь к оригинальному metadata.csv
        output_metadata_phonemes_path (str): Путь для сохранения фонемного metadata
        predict_function (function): Функция для преобразования слова в фонемы, например, predict(word)
        special_space (str): Специальный символ для замены пробелов между словами.
    """
    with open(input_metadata_path, 'r', encoding='utf-8') as infile, \
         open(output_metadata_phonemes_path, 'w', encoding='utf-8') as outfile:
        
        for line_num, line in enumerate(infile, 1):
            processed_line = process_line(line, predict_function, special_space)
            if processed_line:
                outfile.write(processed_line)
            else:
                print(f"Строка {line_num} пропущена.")
    
    print(f"Фонемный файл сохранен по пути: {output_metadata_phonemes_path}")


In [91]:
# Пути к файлам
input_metadata_path = '/app/TTSproject/tacotron/datasets/metadata.csv'
output_metadata_phonemes_path = '/app/TTSproject/tacotron/datasets/metadata_phonemes.csv'

# Вызов функции преобразования
convert_metadata_to_phonemes(input_metadata_path, output_metadata_phonemes_path, predict)

Фонемный файл сохранен по пути: /app/TTSproject/tacotron/datasets/metadata_phonemes.csv


In [184]:
def replace_punctuation(file_path):
    """
    Заменяет специфические символы в файле metadata.csv.
    
    Замены:
        « » “ ” -> "
        … -> ...
    
    Args:
        file_path (str): Путь к файлу metadata.csv
    """
    replacements = {
        '«': '"',
        '»': '"',
        '“': '"',
        '”': '"',
        #'…': '...',
    }
    
    # Чтение содержимого файла
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    
    # Выполнение замен
    for old, new in replacements.items():
        content = content.replace(old, new)
    
    # Запись изменённого содержимого обратно в файл
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(content)

# Пример использования:
replace_punctuation('/app/TTSproject/tacotron/datasets/metadata_phonemes.csv')


In [11]:
def process_text(text, predict_function, special_space="<space>"):
    # Регулярное выражение для разделения слов (включая '+') и пунктуации
    # Это выражение захватывает слова, которые могут содержать '+', а также отдельные знаки пунктуации
    tokens = re.findall(r'\w+\+\w+|\w+|[.,;:!?«»“”…()\[\]{}\-]', text)

    processed_tokens = []
    for token in tokens:
        if re.match(r'[.,;:!?«»“”…()\[\]{}\-]', token):
            # Если токен - пунктуация, добавляем его как есть
            processed_tokens.append(token)
        else:
            # Иначе, токен - слово (может содержать '+')
            word_lower = token.lower()
            try:
                # Преобразуем слово в фонемную транскрипцию
                phoneme = predict_function(word_lower)
                if phoneme:
                    processed_tokens.append(phoneme)
                else:
                    print(f"Пустая транскрипция для слова '{word_lower}'. Используем '<UNK>'.")
                    processed_tokens.append('<UNK>')  # Используем <UNK> для неизвестных фонем
            except Exception as e:
                print(f"Ошибка при предсказании слова '{word_lower}': {e}. Используем '<UNK>'.")
                processed_tokens.append('<UNK>')

    # Теперь собираем новую строку с правильными разделителями
    final_tokens = []
    for i, token in enumerate(processed_tokens):
        final_tokens.append(token)
        if i < len(processed_tokens) - 1:
            next_token = processed_tokens[i + 1]
            # Определяем тип текущего и следующего токена
            current_is_punct = re.match(r'[.,;:!?«»“”…()\[\]{}\-]', token)
            next_is_punct = re.match(r'[.,;:!?«»“”…()\[\]{}\-]', next_token)

            if not current_is_punct and not next_is_punct:
                # Оба токена - слова, вставляем специальный символ с пробелами
                final_tokens.append(f" {special_space} ")
            elif not current_is_punct and next_is_punct:
                # Текущий токен - слово, следующий - пунктуация, вставляем пробел
                final_tokens.append(' ')
            elif current_is_punct and not next_is_punct:
                # Текущий токен - пунктуация, следующий - слово, вставляем пробел
                final_tokens.append(' ')
            elif current_is_punct and next_is_punct:
                # Оба токена - пунктуация, вставляем пробел
                final_tokens.append(' ')
            else:
                # В остальных случаях, вставляем пробел
                final_tokens.append(' ')

    # Объединяем токены в строку
    phoneme_text = ''.join(final_tokens)

    # Заменяем множественные пробелы на один
    phoneme_text = re.sub(r'\s+', ' ', phoneme_text)

    # Формируем новую строку
    new_line = f"{phoneme_text}\n"

    return new_line

In [57]:
# Пример строки из metadata.csv
test_text = "Удивительно, почему +именно этот чекп+ойнт ещё умеет ставить плюсики."

# Обработка строки
processed_text = process_text(test_text, predict)

if processed_txt:
    print("Исходная строка:")
    print(test_text)
    print("\nОбработанная строка:")
    print(processed_text)

Исходная строка:
Удивительно, почему +именно этот чекп+ойнт ещё умеет ставить плюсики.

Обработанная строка:
u0 dj i0 vj i1 tj e0 lj n o0 , p o0 ch e0 m u1 <space> i0 mj e0 n n o1 <space> e1 t o0 t <space> ch e0 k p o1 j n t <space> j e0 sch o1 <space> u0 mj e1 j e0 t <space> s t a1 vj i0 tj <space> p lj u1 sj i0 kj i0 .


In [19]:

def process_text(text, predict_function, special_space="<space>"):
    # Регулярное выражение для разделения слов (включая '+') и пунктуации
    # Это выражение захватывает слова, которые могут содержать '+', а также отдельные знаки пунктуации
    tokens = re.findall(r'\w+\+\w+|\w+|[.,;:!?«»“”…()\[\]{}\-]', text)

    processed_tokens = []
    for token in tokens:
        if re.match(r'[.,;:!?«»“”…()\[\]{}\-]', token):
            # Если токен - пунктуация, добавляем его как есть
            processed_tokens.append(token)
        else:
            # Иначе, токен - слово (может содержать '+')
            word_lower = token.lower()
            try:
                # Преобразуем слово в фонемную транскрипцию
                phoneme = predict_function(word_lower)
                if phoneme:
                    processed_tokens.append(phoneme)
                else:
                    print(f"Пустая транскрипция для слова '{word_lower}'. Используем '<UNK>'.")
                    processed_tokens.append('<UNK>')  # Используем <UNK> для неизвестных фонем
            except Exception as e:
                print(f"Ошибка при предсказании слова '{word_lower}': {e}. Используем '<UNK>'.")
                processed_tokens.append('<UNK>')

    # Теперь собираем новую строку с правильными разделителями
    final_tokens = []
    for i, token in enumerate(processed_tokens):
        final_tokens.append(token)
        if i < len(processed_tokens) - 1:
            next_token = processed_tokens[i + 1]
            # Определяем тип текущего и следующего токена
            current_is_punct = re.match(r'[.,;:!?«»“”…()\[\]{}\-]', token)
            next_is_punct = re.match(r'[.,;:!?«»“”…()\[\]{}\-]', next_token)

            if not current_is_punct and not next_is_punct:
                # Оба токена - слова, вставляем специальный символ с пробелами
                final_tokens.append(f" {special_space} ")
            elif not current_is_punct and next_is_punct:
                # Текущий токен - слово, следующий - пунктуация, вставляем пробел
                final_tokens.append(' ')
            elif current_is_punct and not next_is_punct:
                # Текущий токен - пунктуация, следующий - слово, вставляем пробел
                final_tokens.append(' ')
            elif current_is_punct and next_is_punct:
                # Оба токена - пунктуация, вставляем пробел
                final_tokens.append(' ')
            else:
                # В остальных случаях, вставляем пробел
                final_tokens.append(' ')

    # Объединяем токены в строку
    phoneme_text = ''.join(final_tokens)

    # Заменяем множественные пробелы на один
    phoneme_text = re.sub(r'\s+', ' ', phoneme_text)

    # Формируем новую строку
    new_line = f"{phoneme_text}"

    return new_line

In [32]:
process_text('ахуеть, эта функция реально работает изнутр+и т+екущего файла!', predict)

'a1 h u0 j e0 tj , e1 t a0 <space> f u1 n k c i0 j a0 <space> rj e0 a1 lj n o0 <space> r a0 b o1 t a0 j e0 t <space> i0 z n u0 t rj i1 <space> tj e1 k u0 sch e0 g o0 <space> f a1 j l a0 !'

## тесты обновления датасета для такотрона на фонемный

In [188]:
import csv
import sys

# #### changed version for ru text

_pad = '_'
_punctuation = ' -,.!?\'\"():;'
_special = '+'
_letters = 'абвгдеёжзийклмнопрстуфхцчшщъыьэюя'  # АБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ

# Все предопределенные символы в одном списке для сортировки
predefined_order = [_pad] + list(_punctuation) + list(_special)  # + list(_letters)

def get_unique_symbols_from_metadata(file_path, predefined_order):
    """
    Считывает файл metadata.csv и возвращает список уникальных токенов (фонем, знаков препинания, специальных символов).
    
    Args:
        file_path (str): Путь к файлу metadata.csv
        predefined_order (list): Список предопределённых символов
    
    Returns:
        list: Список уникальных символов, включающий предопределённые и дополнительные
    """
    unique_symbols = set()
    
    # Увеличиваем максимальный размер поля для CSV
    maxInt = sys.maxsize
    while True:
        try:
            csv.field_size_limit(maxInt)
            break
        except OverflowError:
            maxInt = int(maxInt / 10)
    
    with open(file_path, mode='r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='|')
        for row in reader:
            if len(row) > 1:
                phoneme_text = row[1].strip()
                tokens = phoneme_text.split()  # Разделение по пробелам
                for token in tokens:
                    # Очистка токена от лишних символов
                    token_clean = token.strip().replace('\n', '').replace('\r', '')
                    if token_clean:
                        unique_symbols.add(token_clean)
            else:
                print(f"Строка пропущена: {row}")
    
    # Добавляем предопределённые символы
    sorted_symbols = predefined_order.copy()
    
    # Добавляем новые уникальные символы, отсортированные
    extra_symbols = sorted([s for s in unique_symbols if s not in predefined_order])
    
    return sorted_symbols + extra_symbols

# Пример использования:
file_path = '/app/TTSproject/tacotron/datasets/metadata.csv'
unique_symbols = get_unique_symbols_from_metadata(file_path, predefined_order)
symbols = unique_symbols

# Создание словарей
_symbol_to_id = {s: i for i, s in enumerate(symbols)}
_id_to_symbol = {i: s for i, s in enumerate(symbols)}


In [189]:
len(symbols)

68

In [191]:
def get_ru_text(text):
    # Конвертируем текст в список ID
    #sequence = [_symbol_to_id[s] for s in text if s in _symbol_to_id]
    sequence = [_symbol_to_id[s.lower()] for s in text if s.lower() in _symbol_to_id]
    
    # Преобразуем список ID в тензор
    sequence_tensor = torch.tensor(sequence, dtype=torch.int64)
    
    return sequence_tensor

In [196]:
get_ru_phonemes('r a0 z v lj e0 ch e1 nj i0 j e0 ')

tensor([46, 15, 60, 56, 37, 23, 20, 24, 41, 31, 33, 23])

In [198]:
tensor_to_ru_phonemes(get_ru_phonemes('r a0 z v lj e0 ch e1 nj i0 j e0 '))

'r a0 z v lj e0 ch e1 nj i0 j e0'

In [193]:
def get_ru_phonemes(text):
    """
    Конвертирует текст с фонемами в тензор ID.
    
    Args:
        text (str): Строка с фонемами, разделёнными пробелами.
    
    Returns:
        torch.Tensor: Тензор с ID фонем.
    """
    # Разбиваем текст на токены (фонемы, знаки препинания, специальные символы)
    tokens = text.split()
    
    # Преобразуем каждый токен в соответствующий ID, используя словарь _symbol_to_id
    sequence = [_symbol_to_id[token.lower()] for token in tokens if token.lower() in _symbol_to_id]
    
    # Преобразуем список ID в тензор
    sequence_tensor = torch.tensor(sequence, dtype=torch.int64)
    
    return sequence_tensor

def tensor_to_ru_phonemes(tensor):
    """
    Конвертирует тензор ID фонем обратно в текст.
    
    Args:
        tensor (torch.Tensor): Тензор с ID фонем.
    
    Returns:
        str: Строка с фонемами, разделёнными пробелами.
    """
    # Преобразуем тензор в список ID
    ids = tensor.tolist()
    
    # Преобразуем каждый ID обратно в соответствующий токен, используя словарь _id_to_symbol
    tokens = [ _id_to_symbol.get(id, '<UNK>') for id in ids ]
    
    # Объединяем токены в строку с пробелами между ними
    text = ' '.join(tokens)
    
    return text

In [41]:
# g2p_infer.py

import torch
import re
from g2p import * # Убедитесь, что пути импорта корректны
from collections import Counter

# Устройство (CPU или GPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Загрузка словарей
_symbol_to_id = {
    # Пример, замените на ваш фактический словарь
    'su.et.noe': 1,
    '<\\space>': 2,
    'chu.vstvo1': 3,
    'tre.vo.zhit': 4,
    'me.nia1': 5,
    '.': 6,
    '<UNK>': 7,
    '<SOS>': 8,
    '<EOS>': 9,
    '_': 0
}

_id_to_symbol = {v: k for k, v in _symbol_to_id.items()}

# Функции для кодирования и декодирования
def encode_phonemes(phoneme_seq):
    return [ _symbol_to_id.get(ph, _symbol_to_id['<UNK>']) for ph in phoneme_seq.split() ]

def decode_phonemes(tensor):
    return ' '.join([ _id_to_symbol.get(id, '<UNK>') for id in tensor.tolist() ])

# Загрузка модели
def load_g2p_model(encoder_path, decoder_path):
    # Инициализация моделей
    encoder = Encoder(input_size=len(_symbol_to_id), embedding_dim=128, hidden_size=256).to(device)
    decoder = Decoder(output_size=len(_symbol_to_id), embedding_dim=128, hidden_size=256).to(device)
    
    # Загрузка весов
    encoder.load_state_dict(torch.load(encoder_path, map_location=device))
    decoder.load_state_dict(torch.load(decoder_path, map_location=device))
    
    encoder.eval()
    decoder.eval()
    
    return encoder, decoder

# Функция инференса
def predict_word(word, encoder, decoder):
    """
    Преобразует слово в фонемную транскрипцию с помощью модели G2P.
    
    Args:
        word (str): Входное слово.
        encoder (nn.Module): Загруженный энкодер модели.
        decoder (nn.Module): Загруженный декодер модели.
    
    Returns:
        str: Предсказанная фонемная транскрипция.
    """
    # Очистка и подготовка слова
    word = word.lower()
    
    # Кодирование слова
    input_seq = encode_phonemes(word)
    input_tensor = torch.tensor([input_seq], dtype=torch.long).to(device)
    input_length = torch.tensor([len(input_seq)], dtype=torch.long).to(device)
    
    with torch.no_grad():
        encoder_outputs, encoder_hidden = encoder(input_tensor, input_length)
        decoder_input = torch.tensor([[ _symbol_to_id['<SOS>'] ]], dtype=torch.long).to(device)
        decoder_hidden = encoder_hidden
        
        decoded_phonemes = []
        
        for _ in range(50):  # Максимальная длина последовательности фонем
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            topv, topi = decoder_output.topk(1)
            phoneme_idx = topi.item()
            phoneme = _id_to_symbol.get(phoneme_idx, '<UNK>')
            decoded_phonemes.append(phoneme)
                
            decoder_input = torch.tensor([[phoneme_idx]], dtype=torch.long).to(device)
    
    return ' '.join(decoded_phonemes)

# Пример использования
if __name__ == "__main__":
    load_checkpoint(encoder, decoder, encoder_optimizer, decoder_optimizer, 'g2p_v2_e40')
    
    # Пример слова
    test_word = "ч+увство"
    
    # Инференс
    predicted_phonemes = predict_word(test_word, encoder, decoder)
    print(f"Фонемная транскрипция для '{test_word}': {predicted_phonemes}")


Модель загружена из g2p_v2_e40, Эпоха: 20, Ошибка: 0.0166
Фонемная транскрипция для 'ч+увство': <\space> <UNK> <UNK> <UNK> <UNK> <UNK> <UNK> <UNK> <\space> me.nia1 <UNK> <UNK> <\space> <UNK> <UNK> <UNK> <\space> <UNK> <UNK> <UNK> <UNK> <UNK> <\space> <UNK> <UNK> <UNK> <UNK> <UNK> <UNK> me.nia1 <UNK> <UNK> <UNK> <\space> <UNK> <UNK> <UNK> <UNK> <UNK> me.nia1 <\space> <UNK> <UNK> <SOS> <UNK> <UNK> <\space> <UNK> <UNK> <UNK>


In [8]:
predicted_phonemes

NameError: name 'predicted_phonemes' is not defined