# Акустические модели

Акустическая модель - это часть системы автоматического распознавания речи, которая используется для преобразования аудиосигнала речи в последовательность фонем или других единиц речевого звука. Акустическая модель обучается на большом наборе речевых данных, чтобы определить, какие звуки соответствуют конкретным акустическим признакам в аудиосигнале. Эта модель может использоваться вместе с другими компонентами, такими как языковая модель и модель декодирования, чтобы достичь более точного распознавания речи.

В данной работе мы сконцентрируемся на обучении нейросетевых акустических моделей с помощью библиотек torch и torchaudio. Для экспериментов будем использовать базу [TIMIT](https://catalog.ldc.upenn.edu/LDC93s1)

In [None]:
import numpy as np
import time
import torch
import os
from typing import List, Dict, Union, Set, Any
from torch import nn
from tqdm.auto import tqdm
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict, Counter
from pathlib import Path
import pandas as pd
import soundfile as sf
import torchaudio
import warnings

import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display


# Загрузка датасета TIMIT

Официальная страница датасета TIMIT 

Для простоты загрузки данных удобнее всего пользоваться копией датасета, выложенной на kaggle 

https://www.kaggle.com/datasets/mfekadu/darpa-timit-acousticphonetic-continuous-speech


In [None]:
#!pip install kaggle

In [None]:
# https://github.com/Kaggle/kaggle-api - Docs kaggle 
# Simplest way: go to https://www.kaggle.com/settings , "Create new token" and move it into "~/.kaggle"

#!kaggle datasets download -d mfekadu/darpa-timit-acousticphonetic-continuous-speech

In [None]:
#!unzip -o -q darpa-timit-acousticphonetic-continuous-speech.zip -d timit/

# 1. Подготовка данных для обучения

TIMIT является одной из самых широко используемых баз данных для изучения систем автоматического распознавания речи. База данных TIMIT содержит произнесения предложений различными дикторами. Каждое произнесение сопровождается его словной и фонетической разметкой.

Для обучения акустической модели нам в первую очередь интересна фонетическая разметка произнесений. Такая разметка сопоставляет фонемы, которые были произнесены диктором, с временными интервалами в записи. Такая разметка позволит нам обучить пофреймовый классификатор, который будет предсказывать сказанную фонему.

## 1.a. Загрузка базы с диска

In [None]:
class TimitDataset(Dataset):
    """Загрузка TIMIT данных с диска"""
    def __init__(self, data_path):
        self.data_path = data_path
        self.uri2wav = {}
        self.uri2text = {}
        self.uri2word_ali = {}
        self.uri2phone_ali = {}
        for d, _, fs in os.walk(data_path):
            for f in fs:
                full_path = f'{d}/{f}'
                if f.endswith('.WAV'):
                    # skip it. Use .wav instead
                    pass
                elif f.endswith('.wav'):
                    stem = Path(f[:-4]).stem # .WAV.wav
                    self.uri2wav[f'{d}/{stem}'] = full_path
                elif f.endswith('.TXT'):
                    stem = Path(f).stem
                    self.uri2text[f'{d}/{stem}'] = full_path
                elif f.endswith('.WRD'):
                    stem = Path(f).stem
                    self.uri2word_ali[f'{d}/{stem}'] = full_path
                elif f.endswith('.PHN'):
                    stem = Path(f).stem
                    self.uri2phone_ali[f'{d}/{stem}'] = full_path
                else:
                    warnings.warn(f"Unknown file type {full_path} . Skip it.")
        
        self.uris = list(sorted(set(self.uri2wav.keys()) \
                                & set(self.uri2text.keys()) \
                                & set(self.uri2word_ali.keys()) \
                                &  set(self.uri2phone_ali.keys())
                               ))
        print(f"Found {len(self.uris)} utterances in {self.data_path}. ", 
              f"{len(self.uri2wav)} wavs, ", 
              f"{len(self.uri2text)} texts, ",
              f"{len(self.uri2word_ali)} word alinments, ",
             f"{len(self.uri2phone_ali)} phone alignments")
    
    def get_uri(self, index_or_uri: Union[str, int]):
        if isinstance(index_or_uri, str):
            uri = index_or_uri
        else:
            uri = self.uris[index_or_uri]
        return uri
    
    
    def get_audio(self, index_or_uri: Union[str, int]):
        uri = self.get_uri(index_or_uri)
        wav_path = self.uri2wav[uri]
        wav_channels, sr = torchaudio.load(wav_path)
        return wav_channels[0], sr 
        
    def get_text(self, index_or_uri: Union[str, int]):
        """ Return (start_sample, stop_sample, text)"""
        uri = self.get_uri(index_or_uri)
        txt_path = self.uri2text[uri]
        with open(txt_path) as f:
            start, stop, text = f.read().strip().split(maxsplit=2)
            start, stop = int(start), int(stop)
            assert start == 0, f"{txt_path}"
        return start, stop, text
    
    def get_word_ali(self, index_or_uri):
        """ Return [(start_sample, stop_sample, word), ...]"""
        uri = self.get_uri(index_or_uri)
        wrd_path = self.uri2word_ali[uri]
        with open(wrd_path) as f:
            words = [(int(start), int(stop), word) for start, stop, word in map(str.split, f.readlines())]
        return words
    
    def get_phone_ali(self, index_or_uri):
        """ Return [(start_sample, stop_sample, phone), ...]"""
        uri = self.get_uri(index_or_uri)
        ph_path = self.uri2phone_ali[uri]
        with open(ph_path) as f:
            phonemes = [(int(start), int(stop), ph) for start, stop, ph in map(str.split, f.readlines())]
        return phonemes
    
    def __getitem__(self, index):
        return {"uri": self.get_uri(index),
                "audio": self.get_audio(index),
                "text": self.get_text(index),
                "word_ali": self.get_word_ali(index),
                "phone_ali": self.get_phone_ali(index)}       

    def __len__(self):
        # вернем количество доступных ури в выборке
        return len(self.uris)

    def total_audio_samples(self) -> int:
        # сумма длины всех аудио в сэмплах
        total = 0
        for uri in self.uris:
            wav, sr = self.get_audio(uri)
            total += wav.shape[0]
        return total

    def total_num_words(self) -> int:
        # сумма количества слов в словном выравнивании по всем ури
        total = 0
        for uri in self.uris:
            total += len(self.get_word_ali(uri))
        return total

    def total_num_phones(self) -> int:
        # сумма количества фонем во всех выравниваниях
        total = 0
        for uri in self.uris:
            total += len(self.get_phone_ali(uri))
        return total

    def get_vocab(self) -> Set[str]:
        # собрать множество уникальных слов из всех выравниваний
        vocab = set()
        for uri in self.uris:
            words = self.get_word_ali(uri)
            for _, _, w in words:
                vocab.add(w)
        return vocab

    def get_phones(self) -> Set[str]:
        # собрать множество уникальных фонем
        phones = set()
        for uri in self.uris:
            phs = self.get_phone_ali(uri)
            for _, _, p in phs:
                phones.add(p)
        return phones

    def phones_prior(self) -> Dict[str, float]:
        # априорные вероятности фонем по частотам появления
        counts = Counter()
        total = 0
        for uri in self.uris:
            phs = self.get_phone_ali(uri)
            counts.update(p for _, _, p in phs)
            total += len(phs)
        priors = {ph: cnt / total for ph, cnt in counts.items()}
        return priors

            

In [None]:

def test_timit_dataset_stats():
    test_ds = TimitDataset('timit/data/TEST/')

    print("Len")
    assert len(test_ds) == 1680, f"{len(test_ds)}"

    print("Audio")
    audio_len = test_ds.total_audio_samples()
    assert audio_len == 82986452, f"{audio_len}"

    print("Words")
    words_len = test_ds.total_num_words()
    assert words_len == 14553, f"{words_len}"

    print("Phones")
    phones_len = test_ds.total_num_phones()
    assert phones_len == 64145, f"{phones_len}"

    print("Vocab")
    vocab = test_ds.get_vocab()
    assert len(set(vocab)) == 2378, f"{len(set(vocab))}"

    print("Phones vocab")
    phones = test_ds.get_phones()
    assert len(set(phones)) == 61, f"{len(set(phones))}"
    
    print("Phones prior")
    priors = test_ds.phones_prior()
    assert np.isclose(sum(priors.values()), 1.0), f"sum(priors.values())"
    pmin, pmax = min(priors.keys(), key=priors.get), max(priors.keys(), key=priors.get)
    assert pmin == 'eng', pmin
    assert pmax == 'h#', pmax
    print("Test 1.a passed")
test_timit_dataset_stats()

In [None]:
test_ds = TimitDataset('timit/data/TEST/')
item = test_ds[5]
print(item['uri'])
print(item['text'][2])
display.display(display.Audio(item['audio'][0].numpy(), rate=item['audio'][1]))
print('---words---')
for start, stop, word in item['word_ali']:
    print(word)
    display.display(display.Audio(item['audio'][0][start:stop].numpy(), rate=item['audio'][1]))
    break

## 1.b. Экстрактор фич
Для того чтобы построить акустическую модель, первым делом надо извлечь признаки аудио сигнала. Для распознавания речь принято использовать fbank признаки. fbank/MelSpectrogram признаки получается из амплитудного спектра сигнала путем свертки спекта с треугольными фильтрами в мел-шкале. Есть множество реализаций данных признаков в различных библиотеках (kaldi, librosa, torchaudio) и все они имеют свои особенности. В данной работе мы будем использовать реализацию из библиотеки torchaudio. 

In [None]:
class FeatureExtractor(torch.nn.Module):
    def __init__(
        self,
        sample_rate=16000,
        n_fft=400,
        hop_length=160,
        n_mels=40,
        f_max=7600,
        spec_aug_max_fmask=80,
        spec_aug_max_tmask=80,
    ):
        super().__init__()
        self.sample_rate = sample_rate
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels
        #TODO
        # инициализируйте обработчик fbank фич из torchaudio
        # self.mel_spec = ...
        self.mel_spec = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate,
            n_fft=n_fft,
            hop_length=hop_length,
            n_mels=n_mels,
            f_max=f_max
        )
        
    def samples2frames(self, num_samples: int) -> int:
        # TODO
        # Верните количество кадров в спектрограмме, соответствующей вавке длиной num_samples
        # mel_spec с center=True добавляет паддинг n_fft//2 с обеих сторон
        # формула для длины выходного спектра:
        # frames = floor((num_samples + 2 * pad - n_fft) / hop_length) + 1
        # где pad = n_fft // 2
        pad = self.n_fft // 2
        if num_samples + 2 * pad < self.n_fft:
            return 0
        return 1 + (num_samples + 2 * pad - self.n_fft) // self.hop_length
    
    @property
    def feats_dim(self):
        # TODO
        # Верните количество извлекаемых фич
        # размерность признаки = количество мел-банков
        return self.n_mels 
    
    def forward(self, waveform: torch.Tensor) -> torch.Tensor:
        mel = self.mel_spec(waveform)
        return mel

In [None]:
def test_samples2frames():
    fe = FeatureExtractor()
    for i in tqdm(range(15000, 40000)):
        wav = torch.zeros(i)
        feats = fe(wav)
        assert feats.shape[-2] == fe.feats_dim, f"{i} {feats.shape[-2]=}, {fe.feats_dim}"

        assert feats.shape[-1] == fe.samples2frames(i), f"{i} {feats.shape[-1]=}, {fe.samples2frames(i)}"
        
    print('Test 1.b passed')
test_samples2frames()

## 1.с. Таргеты и объединение данных в батчи 

Акустическая Модель (АМ) - пофреймовый классификатор, который предсказывает фонему для каждого кадра аудио. Для обучения AM будем использовать фонемное выравнивание. 

In [None]:
train_ds = TimitDataset('timit/data/TRAIN/')
print(train_ds[0])

# Строим мапинг из написания фонемы в ее id 
phones = train_ds.get_phones() 
phones.remove('pau')
phones.remove('epi')
phones.remove('h#')

# Фонемы паузы должны иметь индекс 0
PHONE2ID = {p:i for i, p in enumerate(['pau'] + list(sorted(phones)))}
PHONE2ID['epi'] = 0
PHONE2ID['h#'] = 0
print(PHONE2ID)

In [None]:
class FeatsPhoneDataset(TimitDataset):
    def __init__(self, data_path, feature_extractor: FeatureExtractor, phone2id):
        super().__init__(data_path)
        self.feature_extractor = feature_extractor
        self.phone2id = phone2id
    
    def __getitem__(self, index):
        orig_item = super().__getitem__(index)
        wav, sr = orig_item['audio']
        assert sr == self.feature_extractor.sample_rate, f"wrong sr for {index}"
        # подготавливаем пофреймовые фичи
        feats = self.feature_extractor(wav)
        feats = feats.squeeze(dim=0).transpose(0, 1) # time x feats

        # создаем пофреймовое выравнивание 
        targets = torch.zeros(feats.shape[0], dtype=torch.long)
        # TODO 
        # заполните пофреймовое фонемное выравнивание targets idшниками фонем
        # используйте phone_ali 
        phone_ali = orig_item['phone_ali']
        
        # Чтобы заполнить пофреймовое выравнивание, для каждого кадра определяем фонему
        # Фреймы получены с шагом hop_length, преобразуем sample индексы в индексы фреймов
        frames_per_second = self.feature_extractor.sample_rate / self.feature_extractor.hop_length

        # Пройдёмся по алигнменту фонемами
        # Для каждого (start_sample, stop_sample, ph), найдём соответствующие индексы фреймов и проставим targets
        for start_sample, stop_sample, ph in phone_ali:
            start_frame = int(start_sample / self.feature_extractor.hop_length)
            stop_frame = int(stop_sample / self.feature_extractor.hop_length)
            ph_id = self.phone2id.get(ph, 0)  # 0 может быть индекс padding или unknown
            # Присвоим этот id фонемы в маску targets для соответствующих кадров
            targets[start_frame:stop_frame] = ph_id
        
        # Возврат словаря с исходными данными и новыми признаками
        
        return {"uri": orig_item["uri"],
                "feats": feats,
                "targets": targets, 
                "src_key_padding_mask": torch.zeros(feats.shape[0], dtype=torch.bool)}
    
    def collate_pad(self, batch: List[Dict[str, Any]]) -> Dict[str, torch.Tensor]:
        """Функция объединения элементов в один батч"""
        # TODO 
        # Реализуйте функцию, которая объединяет несколько item'ов датасета в один батч
        # See collate_fn https://pytorch.org/docs/stable/data.html
        # Входные данные и маску надо вернуть таком формате, в каком работает с данными torch.nn.Transformer
        # targets надо склеить тензор с одной осью. Длина оси будет равна суммарному количеству кадров в батче
        # Максимальная длина последовательности среди всех айтемов
        max_len = max(item["feats"].shape[0] for item in batch)
        batch_size = len(batch)
        feat_dim = batch[0]["feats"].shape[1]
        
        # Инициализируем падденные тензоры для батча и маски паддинга
        feats = torch.zeros(max_len, batch_size, feat_dim)
        src_key_padding_mask = torch.ones(batch_size, max_len, dtype=torch.bool)  # True - padding
        
        targets_list = []
        
        for i, item in enumerate(batch):
            length = item["feats"].shape[0]
            feats[:length, i, :] = item["feats"]
            src_key_padding_mask[i, :length] = False
            targets_list.append(item["targets"])
        
        # Склеиваем targets один длинный тензор по времени без паддинга
        targets = torch.cat(targets_list, dim=0)
        
        return {'feats': feats, # (Time, Batch, feats)
               'targets': targets, #(SumTime)
               'src_key_padding_mask': src_key_padding_mask, #(Batch, Time)
               }
        

In [None]:
def test_collate_pad():
    fe = FeatureExtractor()
    test_ds = FeatsPhoneDataset('timit/data/TEST/', feature_extractor=fe, phone2id=PHONE2ID)

    for i in range(20):
        targets = test_ds[i]['targets']
        orig_ph_ali = test_ds.get_phone_ali(i)
        targets_set = set(targets.tolist())
        orig_set = set([PHONE2ID[ph] for *_, ph in orig_ph_ali])
        assert targets_set == orig_set, f"{i} \n{targets_set} \n {orig_set} \n {orig_ph_ali}"

    items = [test_ds[i] for i in range(30)]
    batch = test_ds.collate_pad(items)
    assert len(batch['feats'].shape) == 3, batch['feats'].shape
    assert batch['feats'].shape[1] == 30, batch['feats'].shape
    
    assert len(batch['src_key_padding_mask'].shape) == 2, batch['src_key_padding_mask'].shape
    assert batch['src_key_padding_mask'].shape[0] == 30, batch['src_key_padding_mask'].shape
    assert batch['src_key_padding_mask'].shape[1] == batch['feats'].shape[0], f"{batch['feats'].shape} {batch['src_key_padding_mask'].shape}"
    
    number_nonmasked_frames = (~batch['src_key_padding_mask']).sum()
    assert number_nonmasked_frames == len(batch['targets']), f"{number_nonmasked_frames} != {len(batch['targets'])}"

    accumulated_len = 0
    for i, item in enumerate(items):
        feats = batch['feats'][:, i, :]
        assert torch.isclose(feats.sum(), item['feats'].sum()) , i
        src = batch['src_key_padding_mask'][i, :]
        cutted_feats = feats[~src]
        assert torch.isclose(item['feats'], cutted_feats).all()
        cutted_targets = batch['targets'][accumulated_len: accumulated_len + cutted_feats.shape[0]]
        assert torch.isclose(cutted_targets, item['targets']).all()
        accumulated_len += cutted_feats.shape[0]
    print("Test 1.c passed")
    
test_collate_pad()


# 2. Акустическая модель

Обучим TransformerEncoder из torch решать задачу пофреймовой классификации. 

In [None]:
class AModel(nn.Module):
    def __init__(self, feats_dim, out_dim,  dim=128, num_layers=4, ff_dim=256, dropout=0.1, nhead=4, max_len=780):
        super().__init__()
        self.feats_dim = feats_dim
        self.max_len=max_len
        self.input_ff = nn.Linear(feats_dim, dim)
        self.positional_encoding = nn.Embedding(max_len, dim)
        layer = torch.nn.TransformerEncoderLayer(d_model=dim, 
                                                 nhead=nhead, 
                                                 dim_feedforward=ff_dim, 
                                                 dropout=dropout, 
                                                 batch_first=False)
        self.encoder = torch.nn.TransformerEncoder(encoder_layer=layer, num_layers=num_layers)
        
        self.head = nn.Linear(dim, out_dim)

    def forward(self, feats, src_key_padding_mask=None, **kwargs):
        #TODO 
        # реализуйте прямой проход модели.
        # Фичи подаются на первый ff слой, 
        # к результату прибавляются позиционные эмбединги.
        # Далее фреймы обрабатываются трансформером 
        # и финализируются с помощью головы
        # feats shape: (Time, Batch, feats_dim)
        x = self.input_ff(feats)  # (Time, Batch, dim)
        
        # создаём позиционные индексы для каждого кадра
        timesteps = torch.arange(x.size(0), device=x.device)

        # получаем позиционные эмбеддинги и добавляем к признакам
        pos_emb = self.positional_encoding(timesteps)  # (Time, dim)
        x = x + pos_emb.unsqueeze(1)  # (Time, Batch, dim)

        x = self.encoder(x, src_key_padding_mask=src_key_padding_mask)  # (Time, Batch, dim)

        logits = self.head(x)  # (Time, Batch, out_dim)
        
        return logits # (Time, Batch, Phones)

## 3. Обучение модели 

In [None]:
# Стандартный пайплайн обучения моделей в pytorch
class Trainer(nn.Module):
    def __init__(self, model, fe, phone2id, device='cuda', opt_cls=torch.optim.Adam, opt_kwargs={'lr':0.0001}):
        super().__init__()
        self.device=device
        self.fe = fe
        self.model = model.to(self.device)
        self.phone2id = phone2id
        self.id2phone = {i:ph for ph,i in phone2id.items()}
        self.optimizer = opt_cls(self.model.parameters(), **opt_kwargs)
        self.criterion = torch.nn.CrossEntropyLoss()
        print(f"{self.model}. {self.device}")

    def to(self, device):
        self.device = device
        return super().to()
        
    def forward(self, batch):
        batch = self.batch_to_device(batch)
        #logits = self.model(**batch)
        # TODO
        # реализуйте подсчет loss функции  
        logits = self.model(batch['feats'], src_key_padding_mask=batch.get('src_key_padding_mask', None))
        
        # Маска паддинга (Batch, Time)
        src_key_padding_mask = batch.get('src_key_padding_mask', None)

        # Рассчитаем размеры для корректного паддинга targets 
        batch_size = src_key_padding_mask.size(0)
        max_len = src_key_padding_mask.size(1)
        
        # Инициализируем targets паддингом -100 (игнорируемый в cross-entropy)
        targets_padded = torch.full((batch_size, max_len), fill_value=-100, dtype=torch.long, device=logits.device)
        
        # Заполняем targets по непаддинговым фреймам из склеенных targets в batch['targets']
        offset = 0
        for i in range(batch_size):
            length = (~src_key_padding_mask[i]).sum().item()  # число непаддинговых фреймов
            targets_padded[i, :length] = batch['targets'][offset:offset+length]
            offset += length
        
        # Транспонируем logits к виду (Batch, Time, Classes) и потом делаем (Batch*Time, Classes)
        logits = logits.permute(1, 0, 2).contiguous()  # (Batch, Time, Classes)
        logits_flat = logits.view(-1, logits.size(-1))
        
        loss = self.criterion(logits_flat, targets_padded.view(-1))
        return loss

    def batch_to_device(self, batch):
        return {k: v.to(self.device) for k, v in batch.items()}
        
    def train_one_epoch(self, train_dataloader):
        """ Цикл обучения одной эпохи по всем данным"""
        self.model.train()
        pbar = tqdm(train_dataloader)
        losses = []
        for batch in pbar:
            self.optimizer.zero_grad()
            loss = self.forward(batch)
            loss.backward()
            self.optimizer.step()
            losses.append(loss.item())
            pbar.set_description(f"training loss {losses[-1]:.5f}")
        return losses

    def score(self, valid_dataloader) -> List[float]:
        """Подсчет лосса на валидационной выборке"""
        pbar = tqdm(valid_dataloader, desc="Scoring...")
        losses = []
        # TODO 
        # реализуйте функцию, которая подсчитывает лосс на валидационной выборке 
        # losses должен хранить значение ошибки на каждом батче 
        with torch.no_grad():
            for batch in pbar:
                loss = self.forward(batch)
                losses.append(loss.item())
        
        return losses

    def fit(self, train_dataloader, epochs, valid_dataloader=None, plot_losses=True):
        """Запуск обучения на данном dataloader"""
        pbar = tqdm(range(epochs))
        per_epoch_train_losses = []
        per_epoch_val_losses = []
        for e in pbar:
            train_loss = np.mean(self.train_one_epoch(train_dataloader))
            per_epoch_train_losses.append(train_loss)
            if valid_dataloader is not None:
                val_loss = np.mean(self.score(valid_dataloader))
                per_epoch_val_losses.append(val_loss)
            if plot_losses:
                display.clear_output()
                self.plot_losses(per_epoch_train_losses, per_epoch_val_losses)
            else:
                val_loss = val_loss if valid_dataloader is not None else float('Nan')
                print(f"train: {train_loss:.5f} | val: {val_loss:.5f}")
        return per_epoch_train_losses, per_epoch_val_losses
    
    def plot_losses(self, train_losses, val_losses=[]):
        plt.title(f"Train test losses (epoch {len(train_losses)})")
        plt.plot(range(len(train_losses)), train_losses)
        if len(val_losses)>0:
            assert len(train_losses) == len(val_losses)
            plt.plot(range(len(val_losses)), val_losses)
        plt.ylabel("loss")
        plt.xlabel('epoch')
        plt.legend(["train loss", "valid loss"])
        plt.grid(True)
        plt.show()
                 
            
                

In [None]:
def overfit_one_batch_check():
    # Для проверки работоспособности кода обучения удоно использовать тест модели на overfit 
    # Для этого запускается обучение на одном батче данных. 
    # Если код написан правильно, то модель обязана выучить выучить все примеры из этого батча наизусть. 
    fe = FeatureExtractor()
    train_dataset = FeatsPhoneDataset('timit/data/TEST/DR1/FAKS0', feature_extractor=fe, phone2id=PHONE2ID)
    
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32, collate_fn=train_dataset.collate_pad)
    test_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=1, collate_fn=train_dataset.collate_pad)

    trainer = Trainer(model=AModel(feats_dim=fe.feats_dim, 
                                   out_dim=max(PHONE2ID.values()) + 1,  
                                   dim=256, 
                                   num_layers=6, 
                                   ff_dim=512, 
                                   dropout=0.0, 
                                   nhead=8), 
                      fe=fe, 
                      phone2id=PHONE2ID, device='cuda')
   
    # only one batch. The model must learn it by heart
    losses, val_losses = trainer.fit(train_dataloader, 160, valid_dataloader=test_dataloader, plot_losses=False)

    trainer.plot_losses(losses, val_losses)

    val_loss = np.mean(trainer.score(test_dataloader))
    
    assert val_loss < 0.5, f"{val_loss}. Model doesn't train well" 
    print(f"Test 3.a passed")
overfit_one_batch_check()

In [None]:

def experiment():
    # Запуск полноценного обучения модели
    # TODO: Тюнинг гиперпараметров
    fe = FeatureExtractor()
    test_dataset = FeatsPhoneDataset('timit/data/TEST/', feature_extractor=fe, phone2id=PHONE2ID)
    train_dataset = FeatsPhoneDataset('timit/data/TRAIN/', feature_extractor=fe, phone2id=PHONE2ID)
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=40, 
                                               num_workers=0, collate_fn=train_dataset.collate_pad, shuffle=True)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=1, 
                                               num_workers=0, collate_fn=test_dataset.collate_pad, shuffle=False)


    trainer = Trainer(model=AModel(feats_dim=fe.feats_dim, 
                                 out_dim=max(PHONE2ID.values())+1, 
                                 dim=128, 
                                 num_layers=7, 
                                 ff_dim=256, 
                                 dropout=0.0, 
                                 nhead=8),
                     fe=fe, 
                     phone2id=PHONE2ID, device='cuda')

    trainer.fit(train_dataloader, epochs=40, valid_dataloader=test_dataloader, plot_losses=True)
    return trainer.to('cpu')
results = experiment()

In [None]:
torch.save(results, 'baseline.trainer')

# Основное задание (12 баллов)
Надо улучшить бейзлайн так, чтобы значение loss на валидации было менее 1.9 

**Дополнительное задание** (4 балла): Улучшите loss до 1.3 

In [None]:
def test_trained_model(trainer):
    test_dataset = FeatsPhoneDataset('timit/data/TEST/', feature_extractor=trainer.fe, phone2id=trainer.phone2id)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=1, 
                                               num_workers=0, collate_fn=test_dataset.collate_pad, shuffle=False)
    loss = np.mean(trainer.score(test_dataloader))
    print(f"Test loss is {loss}")
    assert loss < 1.8, "Main task failed"
    print(f"Main task is done! (12 points)")
    if loss <= 1.3:
        print(f"Additional task is done! (+4 points)")
test_trained_model(results.to('cuda'))

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import torchaudio
import os

# PHONE2ID должен быть определен заранее

class FeatureExtractor(torch.nn.Module):
    def __init__(
        self,
        sample_rate=16000,
        n_fft=512,
        hop_length=160,
        n_mels=128,
        f_max=7600,
    ):
        super().__init__()
        self.sample_rate = sample_rate
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels
        self.mel_spec = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate,
            n_fft=n_fft,
            hop_length=hop_length,
            n_mels=n_mels,
            f_max=f_max
        )

    def forward(self, waveform: torch.Tensor) -> torch.Tensor:
        device = next(self.mel_spec.parameters()).device if len(list(self.mel_spec.parameters())) > 0 else torch.device('cpu')
        waveform = waveform.to(device)
        mel = self.mel_spec(waveform)
        mel = torch.log(mel + 1e-8)
        return mel.squeeze(0).transpose(0, 1)

    @property
    def feats_dim(self):
        return self.n_mels


class FeatsPhoneDataset(TimitDataset):
    def __init__(self, data_path, feature_extractor: FeatureExtractor, phone2id):
        super().__init__(data_path)
        self.feature_extractor = feature_extractor
        self.phone2id = phone2id

    def __getitem__(self, index):
        orig_item = super().__getitem__(index)
        wav, sr = orig_item['audio']
        assert sr == self.feature_extractor.sample_rate
        feats = self.feature_extractor(wav)

        targets = torch.zeros(feats.shape[0], dtype=torch.long)
        for start_sample, stop_sample, ph in orig_item['phone_ali']:
            start_frame = int(start_sample / self.feature_extractor.hop_length)
            stop_frame = int(stop_sample / self.feature_extractor.hop_length)
            ph_id = self.phone2id.get(ph, 0)
            targets[start_frame:stop_frame] = ph_id

        return {
            'uri': orig_item['uri'],
            'feats': feats,
            'targets': targets,
            'src_key_padding_mask': torch.zeros(feats.shape[0], dtype=torch.bool)
        }

    def collate_pad(self, batch):
        max_len = max(item['feats'].shape[0] for item in batch)
        batch_size = len(batch)
        feat_dim = batch[0]['feats'].shape[1]

        feats = torch.zeros(max_len, batch_size, feat_dim)
        src_key_padding_mask = torch.ones(batch_size, max_len, dtype=torch.bool)
        targets_padded = torch.full((batch_size, max_len), fill_value=-100, dtype=torch.long)

        for i, item in enumerate(batch):
            length = item['feats'].shape[0]
            feats[:length, i, :] = item['feats']
            src_key_padding_mask[i, :length] = False
            targets_padded[i, :length] = item['targets']

        return {
            'feats': feats,
            'targets': targets_padded,
            'src_key_padding_mask': src_key_padding_mask
        }


class AModel(nn.Module):
    def __init__(self, feats_dim, out_dim, dim=512, num_layers=8, ff_dim=2048, dropout=0.2, nhead=8, max_len=1000):
        super().__init__()
        self.input_ff = nn.Linear(feats_dim, dim)
        self.layer_norm = nn.LayerNorm(dim)
        self.dropout = nn.Dropout(dropout)
        
        self.positional_encoding = nn.Parameter(torch.zeros(max_len, dim))
        nn.init.normal_(self.positional_encoding, mean=0.0, std=0.02)
        
        layer = nn.TransformerEncoderLayer(
            d_model=dim,
            nhead=nhead,
            dim_feedforward=ff_dim,
            dropout=dropout,
            batch_first=False,
            activation='gelu'
        )
        self.encoder = nn.TransformerEncoder(layer, num_layers=num_layers)
        
        self.final_norm = nn.LayerNorm(dim)
        self.head = nn.Linear(dim, out_dim)
        
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                torch.nn.init.constant_(module.bias, 0.0)
        elif isinstance(module, nn.LayerNorm):
            torch.nn.init.constant_(module.bias, 0.0)
            torch.nn.init.constant_(module.weight, 1.0)

    def forward(self, feats, src_key_padding_mask=None):
        x = self.input_ff(feats)
        x = self.layer_norm(x)
        x = self.dropout(x)
        
        seq_len = x.size(0)
        pos_emb = self.positional_encoding[:seq_len]
        x = x + pos_emb.unsqueeze(1)
        
        x = self.encoder(x, src_key_padding_mask=src_key_padding_mask)
        x = self.final_norm(x)
        logits = self.head(x)
        return logits


class Trainer(nn.Module):
    def __init__(self, model, fe, phone2id, device='cuda', opt_cls=optim.AdamW, opt_kwargs={'lr': 1e-4}):
        super().__init__()
        self.device = device
        self.fe = fe
        self.model = model.to(device)
        self.phone2id = phone2id
        self.optimizer = opt_cls(self.model.parameters(), **opt_kwargs)
        self.criterion = nn.CrossEntropyLoss(ignore_index=-100)
        self.scheduler = None

    def forward(self, batch):
        batch = self.batch_to_device(batch)
        logits = self.model(batch['feats'], src_key_padding_mask=batch.get('src_key_padding_mask'))
        logits = logits.permute(1, 0, 2).contiguous()
        logits_flat = logits.view(-1, logits.size(-1))
        targets_flat = batch['targets'].view(-1)
        loss = self.criterion(logits_flat, targets_flat)
        return loss

    def batch_to_device(self, batch):
        return {k: v.to(self.device) for k, v in batch.items()}

    def train_one_epoch(self, dataloader):
        self.model.train()
        losses = []
        pbar = tqdm(dataloader)
        for batch in pbar:
            self.optimizer.zero_grad()
            loss = self.forward(batch)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()
            if self.scheduler:
                self.scheduler.step()
            losses.append(loss.item())
            current_lr = self.optimizer.param_groups[0]['lr']
            pbar.set_description(f"Train loss: {loss.item():.4f}, LR: {current_lr:.2e}")
        return losses

    def score(self, dataloader):
        self.model.eval()
        losses = []
        with torch.no_grad():
            pbar = tqdm(dataloader, desc="Eval")
            for batch in pbar:
                loss = self.forward(batch)
                losses.append(loss.item())
        return losses

    def fit(self, train_dataloader, epochs, valid_dataloader=None, plot_losses=True):
        # Используем CosineAnnealingWarmRestarts вместо OneCycleLR
        self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
            self.optimizer, 
            T_0=10,  # Период первой рестарта
            T_mult=2,  # Умножение периода после каждой рестарты
            eta_min=1e-6  # Минимальный learning rate
        )
        
        per_epoch_train_losses = []
        per_epoch_val_losses = []
        best_val_loss = float('inf')
        
        for epoch in tqdm(range(epochs)):
            train_loss = np.mean(self.train_one_epoch(train_dataloader))
            per_epoch_train_losses.append(train_loss)
            
            if valid_dataloader:
                val_loss = np.mean(self.score(valid_dataloader))
                per_epoch_val_losses.append(val_loss)
                
                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    torch.save(self.model.state_dict(), 'best_model.pth')
                
                print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Validation Loss = {val_loss:.4f}, Best Val Loss = {best_val_loss:.4f}")
            else:
                print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}")

            if plot_losses and (epoch + 1) % 5 == 0:
                plt.figure(figsize=(10, 6))
                plt.title(f"Epoch {epoch+1} Train/Val Loss")
                plt.plot(per_epoch_train_losses, label="Train Loss")
                if len(per_epoch_val_losses) > 0:
                    plt.plot(per_epoch_val_losses, label="Validation Loss")
                plt.legend()
                plt.grid(True)
                plt.xlabel('Epoch')
                plt.ylabel('Loss')
                plt.show()
                
        if valid_dataloader and os.path.exists('best_model.pth'):
            self.model.load_state_dict(torch.load('best_model.pth'))
            
        return per_epoch_train_losses, per_epoch_val_losses


def experiment():
    fe = FeatureExtractor()
    test_dataset = FeatsPhoneDataset('timit/data/TEST/', feature_extractor=fe, phone2id=PHONE2ID)
    train_dataset = FeatsPhoneDataset('timit/data/TRAIN/', feature_extractor=fe, phone2id=PHONE2ID)
    
    train_dataloader = torch.utils.data.DataLoader(
        train_dataset, 
        batch_size=32,
        num_workers=4, 
        collate_fn=train_dataset.collate_pad, 
        shuffle=True,
        pin_memory=True
    )
    test_dataloader = torch.utils.data.DataLoader(
        test_dataset, 
        batch_size=16,
        num_workers=4, 
        collate_fn=test_dataset.collate_pad, 
        shuffle=False,
        pin_memory=True
    )

    model = AModel(
        feats_dim=fe.feats_dim,
        out_dim=max(PHONE2ID.values()) + 1,
        dim=512,
        num_layers=8,
        ff_dim=2048,
        dropout=0.2,
        nhead=8
    )

    trainer = Trainer(
        model, 
        fe, 
        PHONE2ID, 
        device='cuda', 
        opt_kwargs={
            'lr': 2e-4,
            'weight_decay': 0.01,
            'betas': (0.9, 0.98)
        }
    )
    
    trainer.fit(train_dataloader, epochs=100, valid_dataloader=test_dataloader)
    
    if os.path.exists('best_model.pth'):
        os.remove('best_model.pth')
        
    return trainer.to('cpu')


results = experiment()


def test_trained_model(trainer):
    test_dataset = FeatsPhoneDataset('timit/data/TEST/', feature_extractor=trainer.fe, phone2id=trainer.phone2id)
    test_dataloader = torch.utils.data.DataLoader(
        test_dataset, 
        batch_size=1,
        num_workers=0, 
        collate_fn=test_dataset.collate_pad, 
        shuffle=False
    )
    loss = np.mean(trainer.score(test_dataloader))
    print(f"Test loss is {loss}")
    assert loss < 1.8, "Main task failed"
    print(f"Main task is done! (12 points)")
    if loss <= 1.3:
        print(f"Additional task is done! (+4 points)")
    if loss <= 1.2:
        print(f"Target achieved! Loss <= 1.2")


test_trained_model(results.to('cuda'))