## Распознавание говорящего по необработанной форме сигнала с помощью SincNet

* Автор нотбука: Петросян Акоб
* МФТИ, г. Долгопрудный, Московская область
* Обратная связь: akob.petrosyan@phystech.edu, [vk.com/jacpetro](vk.com/jacpetro)

Сегодня нам предстоит изучать новую архитектуру CNN, названная SincNet.  
Подробное описание архитектуры доступно по ссылке: [SincNet](https://arxiv.org/pdf/1808.00158.pdf)
<br>  
Идея метода состоит в том, что в первом свёрточном слое мы используем функцию  $sinc(x) = \frac{sin(x)}{x}$, или, что одно и тоже в случае частотной зависимости, - полосовые фильтры. С помощью этих функций считаем свертки c входными сигналами $y[n] = x[n]*g_{w}[n, f1, f2]$, где   
<br>
<center>$g_{w}[n, f1, f2] = g[n, f1, f2]w[n]$</center>  
<br>
<center>$g[n, f1, f2] = 2f_{2}sinc(2\pi f_{2}n) − 2f_{1}sinc(2\pi f_{1}n)$  </center>   
<br>
<center>$w[n] = 0.54 − 0.46cos(\frac{2\pi n}{L})$</center>

Преимушества токого подхода:
1. Быстрая сходимость: : SincNet заставляет сеть фокусироваться только на параметрах фильтра, что существенно влияет на производительность
2. Мало параметров: $2F$ вместо $2FL$, где $F$ количество фильтров, а $L$ длина фильтров
3. Интерпретируемость: Карты признаков SincNet, полученные в первом сверточном слое, более интерпретируемы и удобочитаемы, чем другие подходы.

![Architecture of SincNet](https://miro.medium.com/max/1266/1*sUoYw3qewfZVrcBl6RIZRQ.png (SincNet))

## А теперь в БОЙ!

In [599]:
import pickle
import numpy as np

from tqdm import tqdm, tqdm_notebook
from pathlib import Path

import torchsummary
import torchaudio
from torchvision import transforms
from multiprocessing.pool import ThreadPool
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch import nn
from torch.nn import functional as F

from matplotlib import colors, pyplot as plt
%matplotlib inline

# в sklearn не все гладко, поэтому мы будем игнорировать warnings
import warnings
warnings.filterwarnings(action='ignore', category=DeprecationWarning)

In [524]:
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

CUDA is not available.  Training on CPU ...


In [525]:
# разные режимы датасета 
DATA_MODES = ['train', 'val', 'test']
# некоторые константы
NUMBER_OF_FILTERS = 80
WAVETIME = 0.2 # 200 мс
L = 251
# при возможности работаем на видеокарте
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# torch pi...
PI = torch.from_numpy(np.array(np.pi))

Ниже мы исспользуем враппер над датасетом для удобной работы.  
Стоит также отметить, что мы переопределяем метод __getitem__ для удобства работы с данной структурой данных.
Также используем LabelEncoder для преобразования строковых меток классов в id и обратно.

In [605]:
class SincNetDataset(Dataset):
    """
    Датасет с записями, который паралельно подгружает их из папок
    производит разделение на маленькие куски, скалирование и превращение в торчевые тензоры
    """
    def __init__(self, files, mode, wavetime, trashhold = 0.01):
        super().__init__()
        # список файлов для загрузки
        self.files = sorted(files)
        # продолжительность сигналов в секунах
        self.wavetime = wavetime
        # граница шума
        self.trashhold = trashhold
        # режим работы
        self.mode = mode

        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError

        self.len_ = len(self.files)
     
        self.label_encoder = LabelEncoder()

        if self.mode != 'test':
            self.labels = [path.parent.name for path in self.files]
            self.label_encoder.fit(self.labels)

            with open('label_encoder.pkl', 'wb') as le_dump_file:
                  pickle.dump(self.label_encoder, le_dump_file)
                      
    def __len__(self):
        return self.len_
      
    def load_sample(self, file):
        waveform, sample_rate = torchaudio.load(file.as_posix())
        return waveform, sample_rate
  
    def __getitem__(self, index):
        """
        переопределяем __getitem__ для нашего удобства
        """
        x, f = self.load_sample(self.files[index])
        x = self._prepare_sample(x, f)
        f = [f for i in range(len(x))] # потвторяем частоту дискретизации для каждого куска
        f = torch.FloatTensor(f)
        if self.mode == 'test':
            return x, f
        else:
            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            y = [y for i in range(len(x))]
            y = torch.FloatTensor(y) # потвторяем лейбл для каждого куска
            return x, f, y
        
    def _prepare_sample(self, sample, sample_rate):
        """
        Эта функция возврашает список 200мс-х записей, каждый элемент которого - это тензор
        очишенный от шума, выровненный и нормализировнный.
        """
        # избавляемся от шума
        clean = sample[0][sample[0] > self.trashhold*torch.max(torch.abs(sample[0]))]
        sample = clean[None, :]
        number = self.wavetime * sample_rate # количесиво значений каждого разделенного сигнала
        if len(sample[0]) > number:
            samples = list(torch.tensor_split(sample, int(sample.shape[1] // number + 1), dim=1))
            last_sample = samples[-1]
            samples = samples[:-1]
                
            """
            # последний кусок скорее всего меньше по размеру чем остальные, поэтому...
            # проверяем, если длина этого куска меньше 80 процентов длины остальных сигналов,
            # то выбрасываем его, иначе, заполняем padding-ом с двух сторон
            diff = number-last_sample.shape[1]
            if diff > 0.2*number:
                if diff%2 == 0:
                    p1d = (int(diff/2), int(diff/2))
                    last_sample = F.pad(last_sample, p1d, "reflect")
                    samples.append(last_sample)
                else:
                    p1d = (int(diff//2), int(diff//2 + 1))
                    last_sample = F.pad(last_sample, p1d, "reflect") 
                    samples.append(last_sample)
            """
        # наконец, нормализируем данные
        for i in range(len(samples)):
            samples[i] = (samples[i] - torch.mean(samples[i])) / torch.std(samples[i])
        return samples

Далее получим необходимые датасеты...

In [527]:
DATA_DIR = Path(r'C:\Users\hakob\OneDrive\Рабочий стол\KissMe\MIPT\VK SincNet\data')
train_val_test_files = sorted(list(DATA_DIR.rglob('*.flac')))

In [528]:
from sklearn.model_selection import train_test_split

train_val_test_labels = [path.parent.name for path in train_val_test_files]
train_val_files, test_files = train_test_split(train_val_test_files, test_size=0.2, stratify=train_val_test_labels)
train_val_labels = [path.parent.name for path in train_val_files]
train_files, val_files = train_test_split(train_val_files, test_size=0.25, stratify=train_val_labels)

In [529]:
%%time
print(f"Example from train_files: {train_files[0]}.")
print(f"Number of train files: {len(train_files)}, val files: {len(val_files)}, test files: {len(test_files)}.")
waveform, sample_rate = torchaudio.load('C:/Users/hakob/OneDrive/Рабочий стол/KissMe/MIPT/VK SincNet/data/LibriSpeech/train-clean-100/909/131044/909-131044-0004.flac')
print(f"Number of points of one of initial waves: {waveform.shape[1]}, sample rate: {sample_rate} Hz.")

Example from train_files: C:\Users\hakob\OneDrive\Рабочий стол\KissMe\MIPT\VK SincNet\data\LibriSpeech\train-clean-100\7264\92316\7264-92316-0006.flac.
Number of train files: 17123, val files: 5708, test files: 5708.
Number of points of one of initial waves: 246400, sample rate: 16000 Hz.
Wall time: 7.99 ms


In [606]:
%%time
train_dataset = SincNetDataset(train_files, "train", 0.2)
val_dataset = SincNetDataset(train_files, "val", 0.2)
test_dataset = SincNetDataset(train_files, "test", 0.2)

Wall time: 1.21 s


### Построение нейросети

Ниже реализована семантика SincNet.  
None-ы используются для увеличения размерности фильтров, так как сигналы подаются пачками.

Подробно:  
- [torch.nn.functional.conv1d](https://pytorch.org/docs/stable/nn.functional.html)
- [torch.nn.Conv1d](https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html#torch.nn.Conv1d)

In [531]:
# SincNet layer

class Custom(nn.Module):
    def __init__(self, number_of_filters, l):
        super().__init__()
        self.number_of_filters = number_of_filters
        self.L = l
       
    def forward(self, x, sample_rate):
        """
            x.shape = (batch_size, input channels, wavetime*sample_rate)
            result_filters.shape = (output_shape, input channels, L)
            result.shape = (batch_size, output_shape, wavetime*sample_rate - L + 1)
            
            Все расчеты выполнены при дефолтных значений параметров, а именно:
            stride = 1
            groups = 1
            padding = 0
            dilation = 1
        """ 
        hamming_window = torch.hamming_window(self.L, device = DEVICE) # окно длины 251 (в нашем случае)
        hamming_window = hamming_window[None, :] # shape (1, 251)
        n = torch.FloatTensor([i for i in range(self.L)])
        n = n[None, :] # shape (1, 251)
        f_1 = torch.rand(self.number_of_filters, 1,  1) * sample_rate /2 # shape (80, 1, 1)
        f_2 = f_1 + torch.abs(f_1 - torch.rand(self.number_of_filters, 1, 1) * sample_rate /2)
        g_filter = 2*f_2*torch.sinc(2*PI*f_2*n) - 2*f_1*torch.sinc(2*PI*f_1*n) 
        result_filters = g_filter * hamming_window # в нашем случае (80, 1, 251)
        results = F.conv1d(x, result_filters) # (batch_size, 1, 0.2*sample_rate) * (80, 1, 251) = (batch_size, 80, 0.2*sample_rate - 250)
                
        return resultsl

In [532]:
# SincNet
class SincNet(nn.Module):
    def __init__(self, n_classes):
        super().__init__()
        
        def _Glorot_weights(m):
            if type(m) == nn.Linear or type(m) == nn.Conv1d:
                torch.nn.init.xavier_uniform(m.weight)
                m.bias.data.fill_(0.01)
        
        # SincNet layer
        self.custom = nn.Sequential(
            Custom(NUMBER_OF_FILTERS, L), # out_channels = 80
            nn.MaxPool1d(kernel_size=2), # out_channels = 40
            nn.ReLU(),
            nn.Dropout(p=0.2) # out_channels = 40, shape: (batch_size, 40, 0.2*sample_rate - 250)
        )
        # standard CNN 1
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=40, out_channels=60, kernel_size=5),
            nn.ReLU() # shape: (batch_size, 60, 0.2*sample_rate - 254) 
        )
        self.conv1.apply(_Glorot_weights)
        # standard CNN 2
        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=60, out_channels=60, kernel_size=5),
            nn.ReLU() # shape: (batch_size, 60, 0.2*sample_rate - 258) 
        )
        self.conv2.apply(_Glorot_weights)
        # Dense layer 1
        self.lin1 = nn.Sequential(
            nn.Linear(in_features = int(60*(0.2*16000 - 258)), out_features=2048),
            nn.ReLU(),
            nn.BatchNorm1d(2048)
        )
        self.lin1.apply(_Glorot_weights)
        # Dense layer 2
        self.lin2 = nn.Sequential(
            nn.Linear(in_features=2048, out_features=2048),
            nn.ReLU(),
            nn.BatchNorm1d(2048)
        )
        self.lin2.apply(_Glorot_weights)
        # Dense layer 3
        self.lin3 = nn.Sequential(
            nn.Linear(in_features=2048, out_features=2048),
            nn.ReLU(),
            nn.BatchNorm1d(2048)
        )
        self.lin3.apply(_Glorot_weights)
        
        self.out = nn.Linear(in_features=2048, out_features=n_classes)
    
    def forward(self, x, sample_rate):
        x = self.custom(x, sample_rate)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.lin1(x)
        x = self.lin2(x)
        x = self.lin3(x)
        x = x.view(x.size(0), -1)
        logits = self.out(x)
        return logits

In [561]:
def fit_epoch(model, train_loader, criterion, optimizer):
    running_loss = 0.0
    running_corrects = 0
    processed_data = 0
    
    for inputs, sample_rate, labels in train_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        preds = torch.argmax(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_data += inputs.size(0)
              
    train_loss = running_loss / processed_data
    train_acc = running_corrects.cpu().numpy() / processed_data
    return train_loss, train_acc

In [534]:
def eval_epoch(model, val_loader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    processed_size = 0

    for inputs, labels in val_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            preds = torch.argmax(outputs, 1)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_size += inputs.size(0)
    val_loss = running_loss / processed_size
    val_acc = running_corrects.double() / processed_size
    return val_loss, val_acc

In [535]:
# Параметры оттимайзера и размер батча
LR = 0.001
ALPHA = 0.95
EPS = 10-7
BATCH_SIZE = 128

In [600]:
def train(train_files, val_files, model, epochs, batch_size):
    train_loader = DataLoader(TensorDataset(train_dataset), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    history = []
    log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
    val_loss {v_loss:0.4f} train_acc {t_acc:0.4f} val_acc {v_acc:0.4f}"

    with tqdm(desc="epoch", total=epochs) as pbar_outer:
        opt = torch.optim.RMSprop(model.parameters(), lr=LR, alpha=ALPHA, eps=EPS)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(epochs):
            train_loss, train_acc = fit_epoch(model, train_loader, criterion, opt)
            print("loss", train_loss)
            
            val_loss, val_acc = eval_epoch(model, val_loader, criterion)
            history.append((train_loss, train_acc, val_loss, val_acc))
            
            pbar_outer.update(1)
            tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,\
                                           v_loss=val_loss, t_acc=train_acc, v_acc=val_acc))
            
    return history

In [537]:
def predict(model, test_loader):
    with torch.no_grad():
        logits = []
    
        for inputs in test_loader:
            inputs = inputs.to(DEVICE)
            model.eval()
            outputs = model(inputs).cpu()
            logits.append(outputs)
            
    probs = nn.functional.softmax(torch.cat(logits), dim=-1).numpy()
    return probs

In [538]:
n_classes = len(np.unique(train_val_labels))
net = SincNet(n_classes).to(DEVICE)
print("we will classify :{}".format(n_classes))
print(simple_cnn)

  torch.nn.init.xavier_uniform(m.weight)


we will classify :585
SincNet(
  (custom): Sequential(
    (0): Custom()
    (1): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (2): ReLU()
    (3): Dropout(p=0.2, inplace=False)
  )
  (conv1): Sequential(
    (0): Conv1d(40, 60, kernel_size=(5,), stride=(1,))
    (1): ReLU()
  )
  (conv2): Sequential(
    (0): Conv1d(60, 60, kernel_size=(5,), stride=(1,))
    (1): ReLU()
  )
  (lin1): Sequential(
    (0): Linear(in_features=176520, out_features=2048, bias=True)
    (1): ReLU()
    (2): BatchNorm1d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (lin2): Sequential(
    (0): Linear(in_features=2048, out_features=2048, bias=True)
    (1): ReLU()
    (2): BatchNorm1d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (lin3): Sequential(
    (0): Linear(in_features=2048, out_features=2048, bias=True)
    (1): ReLU()
    (2): BatchNorm1d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

## Запустим обучение!

![ALt Text](https://media.tenor.com/images/2471505153e8225ff6940afc65fba1f8/tenor.gif)

In [620]:
%%time
history = train(train_dataset, val_dataset, model=SincNet, epochs=2, batch_size=128)

AttributeError: 'SincNetDataset' object has no attribute 'size'

Построим кривые обучения

In [None]:
loss, acc, val_loss, val_acc = zip(*history)

In [None]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

## А теперь предсказания

![alt text](https://www.vokrug.tv/pic/product/3/2/c/b/32cb49b46b5988307a2fab35e4a5de1e.jpeg)

In [None]:
test_dataset = SimpsonsDataset(test_files, mode="test")
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=64)
probs = predict(simple_cnn, test_loader)


preds = label_encoder.inverse_transform(np.argmax(probs, axis=1))

Вот и всё.)  

Спасибо за внимание!