In [1]:
from datetime import datetime
import matplotlib.pyplot as plt

import numpy as np
import torch

import librosa
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import os
import tqdm.notebook as tqdm

In [2]:
from transforms import *
from torchvision.transforms import Compose

# Dataset

In [19]:
class SpeechCommandsDataset(Dataset):
    def __init__(self, folder, transform=None):
        self.classes = [d for d in os.listdir(folder) if os.path.isdir(os.path.join(folder, d)) and not d.startswith('_')]
        self.classes.sort()

        self.class_to_idx = {self.classes[i]: i for i in range(len(self.classes))}
        self.idx_to_class = {idx: c for c, idx in self.class_to_idx.items()} 
        
        self.class_indices = [[] for _ in range(len(self.classes))]
 
        data = []
        cur_idx = 0

        for data_class in self.classes:
            folder_path = os.path.join(folder, data_class)
            target = self.class_to_idx[data_class]

            for file_name in os.listdir(folder_path):
                path = os.path.join(folder_path, file_name)
                data.append((path, target))

                self.class_indices[target].append(cur_idx)
                cur_idx += 1

        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        path, target = self.data[index]
        data = {'path': path, 'target': target}

        if self.transform is not None:
            data = self.transform(data)

        return data

    def get_classes_number(self):
        return len(self.classes)

    def get_class_from_idx(self, idx):
        if idx in self.idx_to_class.keys():
            return self.idx_to_class[idx]
        return 'unknown'
    
    def get_idx_from_class(self, c):
        if c in self.class_to_idx.keys():
            return self.class_to_idx[c]
        return -1

    def get_class_indices(self):
        return self.class_indices

    def make_weights_for_balanced_classes(self):
        """adopted from https://discuss.pytorch.org/t/balanced-sampling-between-classes-with-torchvision-dataloader/2703/3"""

        classes_number = len(self.classes)
        classes_size = np.zeros(classes_number)

        for i in range(classes_number):
            classes_size[i] = len(self.class_indices[i])

        total_size = float(sum(classes_size))
        weight_per_class = total_size / classes_size

        weight = np.zeros(len(self))
        for idx, item in enumerate(self.data):
            weight[idx] = weight_per_class[item[1]]
        return weight

class BackgroundNoiseDataset(Dataset):
    """Dataset for silence / background noise."""

    def __init__(self, folder, transform=None, sample_rate=16000, sample_length=1):
        audio_files = [d for d in os.listdir(folder) if os.path.isfile(os.path.join(folder, d)) and d.endswith('.wav')]
        samples = []
        for f in audio_files:
            path = os.path.join(folder, f)
            s, sr = librosa.load(path, sr=sample_rate)
            samples.append(s)

        samples = np.hstack(samples)
        c = int(sample_rate * sample_length)
        r = len(samples) // c
        self.samples = samples[:r*c].reshape(-1, c)
        self.sample_rate = sample_rate
        self.transform = transform
        self.path = folder

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        data = {'samples': self.samples[index], 'sample_rate': self.sample_rate, 'target': 1, 'path': self.path}

        if self.transform is not None:
            data = self.transform(data)

        return data


# DataLoader + Loss

## Triplet

In [4]:
import torch.nn as nn
import torch

class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        distance_positive = nn.functional.pairwise_distance(anchor, positive, 2)
        distance_negative = nn.functional.pairwise_distance(anchor, negative, 2)
        losses = nn.functional.relu(distance_positive - distance_negative + self.margin)
        return losses.mean()


In [5]:

class TripletLoader(DataLoader):
    def __init__(self, dataset, batch_size=30, shuffle=True, num_workers=0):
        super().__init__(dataset=dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)

        self.dataset_norm = dataset
        self.triplets_in_batch = batch_size // 3
        self.class_indices = self.dataset_norm.get_class_indices()
        self.labels = np.arange(len(self.class_indices))
        
    def __iter__(self):
        batch=[]

        for i in range((len(self.dataset_norm) + self.batch_size - 1) // self.batch_size):
            label_pairs = np.zeros((self.triplets_in_batch, 2), dtype=np.int32)

            for i in range(self.triplets_in_batch):
                anchor_label, negative_label = np.random.choice(self.labels, size=2, replace=False)
                label_pairs[i] = [anchor_label, negative_label]
            
            for anchor_label, negative_label in label_pairs:
                anchor_idx, positive_idx = np.random.choice(self.class_indices[anchor_label], 2, replace=False)
                negative_idx = np.random.choice(self.class_indices[negative_label])

                anchor, anchor_label = self.dataset_norm.__getitem__(anchor_idx)
                positive, positive_label = self.dataset_norm.__getitem__(positive_idx)
                negative, negative_label = self.dataset_norm.__getitem__(negative_idx)

                triplet = torch.stack((anchor, positive, negative))
                triplet_labels = torch.stack((anchor_label, positive_label, negative_label))

                if len(batch) == 0:
                    batch.append(triplet)
                    batch.append(triplet_labels)
                else:
                    batch[0] = torch.vstack((batch[0], triplet))
                    batch[1] = torch.hstack((batch[1], triplet_labels))
            
            yield batch
            batch = []

## N_pair

## Lifted Structured Loss

In [6]:
class LiftedStructuredLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(LiftedStructuredLoss, self).__init__()
        self.margin = margin

    def forward(self, features, labels):
        # Compute pairwise distance between features
        device = features.device
        pairwise_distance = torch.cdist(features, features)

        # Create a mask for positive pairs
        mask = torch.eq(labels.unsqueeze(0), labels.unsqueeze(1))
        invert_mask = ~mask
        neg_dists = torch.exp((self.margin - pairwise_distance)) * invert_mask

        mask *= ((torch.ones(mask.shape[0], mask.shape[0]).to(device) - torch.eye(mask.shape[0], mask.shape[0]).to(device) == 1))

        row_exp_sum = torch.sum(neg_dists, dim=1)
        table_exp_sum = torch.max(row_exp_sum.unsqueeze(0), row_exp_sum.unsqueeze(1)) + torch.min(row_exp_sum.unsqueeze(0), row_exp_sum.unsqueeze(1))

        loss = (pairwise_distance + torch.log(table_exp_sum)) * mask
        loss = torch.square(torch.relu(loss))

        loss = torch.sum(loss) / torch.sum(mask)

        return loss

# Train Loop

In [55]:
def training_loop(n_epochs, network, loss_fn, optimizer, dl_train, dl_test, device):
    train_losses, test_losses = [], []

    for epoch in (pbar := tqdm.tqdm(range(n_epochs), total=n_epochs, leave=True)):
        # Переводим сеть в режим обучения
        network.train()

        # Итерация обучения сети
        for batch in tqdm.tqdm(dl_train, total=len(dl_train), leave=False):
            images = batch['input']
            images = torch.unsqueeze(images, 1)

            labels = batch['target']

            optimizer.zero_grad()
            
            images = images.to(device)
            labels = labels.to(device)

            net_out = network(images)
            loss = loss_fn(net_out, labels)
            
            train_losses.append(loss.item())
            loss.backward()
            optimizer.step()
        
        # Оцениваем качество модели каждые 3 итерации
        if epoch % 3 == 0 or epoch == n_epochs - 1:
            with torch.no_grad():
                iter_number = 0
                loss_sum = 0

                for batch in tqdm.tqdm(dl_test, total=len(dl_test), leave=False):
                    images = batch['input']
                    images = torch.unsqueeze(images, 1)

                    labels = batch['target']
                
                    net_out = network(images.to(device))
                    losses = loss_fn(net_out, labels.to(device))

                    loss_sum += losses.detach().cpu()
                    iter_number += 1
                
                test_losses.append(loss_sum / iter_number)
    
        pbar.set_description(
                        'Loss (Train/Test): {0:.3f}/{1:.3f}\n'.format(
                            train_losses[-1], test_losses[-1]
                        )
                    )
    
    return train_losses, test_losses

# Model

In [56]:
class DSCNN(torch.nn.Module):
    def __init__(self, in_channels=1, in_shape=(32, 32), ds_cnn_number=3, ds_cnn_size=64, is_classifier=False, classes_number=0):
        super(DSCNN, self).__init__()

        self.classes_number = classes_number
        self.is_classifier = is_classifier

        ### your code here
        self.initial_convolution = self.make_features(in_channels, ds_cnn_size)
        self.dscnn_blocks = self.make_dscnn_blocks(ds_cnn_size, ds_cnn_number)
        self.pool = self.make_pool(in_shape)

        self.classifier = nn.Linear(ds_cnn_size, classes_number)
    
    def make_features(self, in_channels, out_channels):
        layers = []
    
        layers.append(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, 3), padding='same'))
        layers.append(nn.BatchNorm2d(out_channels))
        layers.append(torch.nn.ReLU(inplace=True))

        return torch.nn.Sequential(*layers)

    def make_dscnn_blocks(self, ds_cnn_size, ds_cnn_number):
        layers = []

        for i in range(ds_cnn_number):
            layers.append(nn.Conv2d(in_channels=ds_cnn_size, out_channels=ds_cnn_size, kernel_size=3, groups=ds_cnn_size, padding='same'))
            layers.append(nn.BatchNorm2d(ds_cnn_size))          
            layers.append(torch.nn.ReLU(inplace=True))
            layers.append(nn.Conv2d(in_channels=ds_cnn_size, out_channels=ds_cnn_size, kernel_size=1, padding='same'))
            layers.append(nn.BatchNorm2d(ds_cnn_size))    
            layers.append(torch.nn.ReLU(inplace=True))

        return torch.nn.Sequential(*layers)

    def make_pool(self, in_shape):
        layers = []

        layers.append(nn.AvgPool2d(in_shape))
        layers.append(torch.nn.Flatten())

        return torch.nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.initial_convolution(x)
        x = self.dscnn_blocks(x)
        x = self.pool(x)

        if self.is_classifier:
            x = self.classifier(x)

        return x

# Putting all togather

In [57]:
device = torch.device('cpu')
if torch.cuda.is_available():
    device = torch.device('cuda', 0)
    
print(type(device), device)

# may benefit if network size/input/output is stable
if device == 'cuda':
    torch.backends.cudnn.benchmark = True

<class 'torch.device'> cpu


In [58]:
background_noise_path = 'datasets/speech_commands/_background_noise_'
train_dataset_path = 'datasets/speech_commands/train'
valid_dataset_path = 'datasets/speech_commands/validation'

In [59]:
n_mels = 32

data_aug_transform = Compose([ChangeAmplitude(), ChangeSpeedAndPitchAudio(), FixAudioLength(), ToSTFT(), StretchAudioOnSTFT(), TimeshiftAudioOnSTFT(), FixSTFTDimension()])

bg_dataset = BackgroundNoiseDataset(background_noise_path, data_aug_transform)
add_bg_noise = AddBackgroundNoiseOnSTFT(bg_dataset)

train_feature_transform = Compose([ToMelSpectrogramFromSTFT(n_mels=n_mels), DeleteSTFT(), ToTensor('mel_spectrogram', 'input')])
train_dataset = SpeechCommandsDataset(train_dataset_path,
                                Compose([LoadAudio(),
                                         data_aug_transform,
                                         add_bg_noise,
                                         train_feature_transform]))

valid_feature_transform = Compose([ToSTFT(), ToMelSpectrogramFromSTFT(n_mels=n_mels), ToTensor('mel_spectrogram', 'input')])
valid_dataset = SpeechCommandsDataset(valid_dataset_path,
                                Compose([LoadAudio(),
                                         FixAudioLength(),
                                         valid_feature_transform]))

In [60]:
in_channels = 1
in_shape = (n_mels, n_mels)
ds_cnn_number = 3
ds_cnn_size = 64

is_classifier = True
classes_number = train_dataset.get_classes_number()

model = DSCNN(in_channels, in_shape, ds_cnn_number, ds_cnn_size, is_classifier, classes_number)

In [64]:
batch_size = 32
n_epoch = 100

if device == 'cuda':
    model = torch.nn.DataParallel(model).cuda()

In [65]:
dl_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
dl_validation = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

loss_fn = nn.CrossEntropyLoss()
learning_rate = 1e-4
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [66]:
train_losses, test_losses = training_loop(n_epoch, model, loss_fn, optimizer, dl_train, dl_validation, device)

  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

KeyboardInterrupt: 

# Handy Tools

In [36]:
# Number of model parameters
pytorch_total_params = sum(p.numel() for p in model.parameters())
print(f'Model parameteres number: {pytorch_total_params}')

Model parameteres number: 17236


In [None]:
# setup device
device = torch.device('cpu')
if torch.cuda.is_available():
    device = torch.device('cuda', 0)

print(type(device), device)