# Test model
Handling Panns cnn14

## 1. Loading weights
Loading weights, discarding unnecessary ones

In [1]:
from audiointerp.dataset.esc50 import ESC50dataset
import torch.nn as nn
import torch.nn.functional as TF
import torch.optim as optim
import torchaudio.transforms as T
from IPython.display import Audio
import torch
from torch.utils.data import DataLoader
import os
from collections import OrderedDict
from tqdm import tqdm

In [2]:
weight_dir = "weights"
weights_file = "Cnn14_mAP=0.431.pth"

In [3]:
path_to_weights = os.path.join(weight_dir, weights_file)

In [4]:
weights_full = torch.load(path_to_weights)["model"]

In [5]:
weights = OrderedDict()

In [6]:
for key, value in weights_full.items():
    if key.startswith(("logmel_extractor", "spectrogram_extractor", "fc_audioset")):
        continue
    weights[key] = value

## 2. Cnn14

In [7]:
# utility functions
def init_layer(layer):
    """Initialize a Linear or Convolutional layer. """
    nn.init.xavier_uniform_(layer.weight)
 
    if hasattr(layer, 'bias'):
        if layer.bias is not None:
            layer.bias.data.fill_(0.)
            
    
def init_bn(bn):
    """Initialize a Batchnorm layer. """
    bn.bias.data.fill_(0.)
    bn.weight.data.fill_(1.)

In [8]:
# convblock
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        
        super(ConvBlock, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=in_channels, 
                              out_channels=out_channels,
                              kernel_size=(3, 3), stride=(1, 1),
                              padding=(1, 1), bias=False)
                              
        self.conv2 = nn.Conv2d(in_channels=out_channels, 
                              out_channels=out_channels,
                              kernel_size=(3, 3), stride=(1, 1),
                              padding=(1, 1), bias=False)
                              
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.init_weight()
        
    def init_weight(self):
        init_layer(self.conv1)
        init_layer(self.conv2)
        init_bn(self.bn1)
        init_bn(self.bn2)

        
    def forward(self, input, pool_size=(2, 2), pool_type='avg'):
        
        x = input
        x = TF.relu_(self.bn1(self.conv1(x)))
        x = TF.relu_(self.bn2(self.conv2(x)))
        if pool_type == 'max':
            x = TF.max_pool2d(x, kernel_size=pool_size)
        elif pool_type == 'avg':
            x = TF.avg_pool2d(x, kernel_size=pool_size)
        elif pool_type == 'avg+max':
            x1 = TF.avg_pool2d(x, kernel_size=pool_size)
            x2 = TF.max_pool2d(x, kernel_size=pool_size)
            x = x1 + x2
        else:
            raise Exception('Incorrect argument!')
        
        return x

In [9]:
# Cnn14
class Cnn14(nn.Module):
    def __init__(self):
        
        super(Cnn14, self).__init__()

        self.bn0 = nn.BatchNorm2d(64)

        self.conv_block1 = ConvBlock(in_channels=1, out_channels=64)
        self.conv_block2 = ConvBlock(in_channels=64, out_channels=128)
        self.conv_block3 = ConvBlock(in_channels=128, out_channels=256)
        self.conv_block4 = ConvBlock(in_channels=256, out_channels=512)
        self.conv_block5 = ConvBlock(in_channels=512, out_channels=1024)
        self.conv_block6 = ConvBlock(in_channels=1024, out_channels=2048)

        self.fc1 = nn.Linear(2048, 2048, bias=True)
        
        self.init_weight()

    def init_weight(self):
        init_bn(self.bn0)
        init_layer(self.fc1)
 
    def forward(self, input):
        # (batch_size, 1, mel_bins, timesteps)

        x = input.transpose(2, 3)
        x = x.transpose(1, 3)
        x = self.bn0(x)
        x = x.transpose(1, 3)

        x = self.conv_block1(x, pool_size=(2, 2), pool_type='avg')
        x = TF.dropout(x, p=0.2, training=self.training)
        x = self.conv_block2(x, pool_size=(2, 2), pool_type='avg')
        x = TF.dropout(x, p=0.2, training=self.training)
        x = self.conv_block3(x, pool_size=(2, 2), pool_type='avg')
        x = TF.dropout(x, p=0.2, training=self.training)
        x = self.conv_block4(x, pool_size=(2, 2), pool_type='avg')
        x = TF.dropout(x, p=0.2, training=self.training)
        x = self.conv_block5(x, pool_size=(2, 2), pool_type='avg')
        x = TF.dropout(x, p=0.2, training=self.training)
        x = self.conv_block6(x, pool_size=(1, 1), pool_type='avg')
        x = TF.dropout(x, p=0.2, training=self.training)
        x = torch.mean(x, dim=3)
        
        (x1, _) = torch.max(x, dim=2)
        x2 = torch.mean(x, dim=2)
        x = x1 + x2
        x = TF.dropout(x, p=0.5, training=self.training)
        x = TF.relu_(self.fc1(x))
        embedding = TF.dropout(x, p=0.5, training=self.training)

        return embedding

In [10]:
# model for transfer
class TransferModel(nn.Module):
    def __init__(self, embedder, num_classes):

        super(TransferModel, self).__init__()
        
        self.base = embedder()
        emb_dim = self.base.fc1.out_features
        self.classifier = nn.Linear(in_features=emb_dim, out_features=num_classes)

    def load_base_weights(self, path_to_weights):
        weights_full = torch.load(path_to_weights)["model"]

        weights = OrderedDict()
        for key, value in weights_full.items():
            if key.startswith(("logmel_extractor", "spectrogram_extractor", "fc_audioset")):
                continue
            weights[key] = value

        self.base.load_state_dict(weights)
        

    def forward(self, input):
        embedding = self.base(input)
        logits = self.classifier(embedding)

        return logits

## 3. Trying out on data

In [11]:
root_dir = "/home/yuliya/ESC50"
sr = 32000
train_folds = [1, 2, 3]
valid_folds = [4]
test_folds = [5]

In [12]:
n_fft = 1024
hop_length = 320
win_length = 1024
n_mels = 64
f_min = 50
f_max = 14000

In [13]:
spec = T.Spectrogram(n_fft=n_fft, win_length=win_length, hop_length=hop_length, power=2.0)
mel = T.MelScale(n_mels=n_mels, sample_rate=sr, f_min=f_min, f_max=f_max, n_stft=n_fft // 2 + 1)
amplitude_to_db = T.AmplitudeToDB(stype="power", top_db=80)

In [14]:
feature_extractor = nn.Sequential(spec, mel, amplitude_to_db)

In [15]:
train_data = ESC50dataset(root_dir=root_dir, sr=sr, folds=train_folds, feature_extractor=feature_extractor)
valid_data = ESC50dataset(root_dir=root_dir, sr=sr, folds=valid_folds, feature_extractor=feature_extractor)
test_data = ESC50dataset(root_dir=root_dir, sr=sr, folds=test_folds, feature_extractor=feature_extractor)

In [16]:
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

In [17]:
def train_step(model, criterion, optimizer, dataloader, device):
    model.train()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0
    
    for images, labels in dataloader:
        images = images.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        _, preds = torch.max(outputs, 1)
        running_loss += loss.item() * images.size(0)
        running_corrects += torch.sum(preds == labels.data)
        total_samples += images.size(0)
    
    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples
    
    return epoch_loss, epoch_acc.item()

In [18]:
def valid_step(model, criterion, dataloader, device):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0
    
    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * images.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total_samples += images.size(0)
    
    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples
    
    return epoch_loss, epoch_acc.item()

In [19]:
def train(model, num_epochs, criterion, optimizer, device):
    best_acc = 0.0
    best_model = None


    for epoch in tqdm(range(num_epochs), desc='Epoch'):
    
        train_loss, train_acc = train_step(model, criterion, optimizer, train_loader, device)
        val_loss, val_acc = valid_step(model, criterion, valid_loader, device)
    
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val   Loss: {val_loss:.4f},   Val Acc:   {val_acc:.4f}")
    
        if val_acc > best_acc:
            best_acc = val_acc
            best_model = model.state_dict()

    print(f"Best val Acc: {best_acc:.4f}")

    if best_model is not None:
        model.load_state_dict(best_model)
    
    torch.save(model.state_dict(), "best.pth")
    print("Модель сохранена в best.pth")

    return model

In [20]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [21]:
num_epochs = 15
learning_rate = 1e-3

In [22]:
model = TransferModel(Cnn14, 50)
model.load_base_weights(path_to_weights)

In [23]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [24]:
model = model.to(device)

In [25]:
model = train(model, num_epochs, criterion, optimizer, device)

Epoch:   7%|▋         | 1/15 [00:45<10:30, 45.02s/it]

Train Loss: 2.5027, Train Acc: 0.3958
Val   Loss: 0.5347,   Val Acc:   0.8350


Epoch:  13%|█▎        | 2/15 [01:28<09:34, 44.16s/it]

Train Loss: 0.4826, Train Acc: 0.8558
Val   Loss: 0.4784,   Val Acc:   0.8475


Epoch:  20%|██        | 3/15 [02:10<08:35, 42.98s/it]

Train Loss: 0.2482, Train Acc: 0.9325
Val   Loss: 0.3136,   Val Acc:   0.9225


Epoch:  27%|██▋       | 4/15 [02:55<08:01, 43.81s/it]

Train Loss: 0.1792, Train Acc: 0.9383
Val   Loss: 0.4384,   Val Acc:   0.8800


Epoch:  33%|███▎      | 5/15 [03:42<07:29, 44.94s/it]

Train Loss: 0.1655, Train Acc: 0.9492
Val   Loss: 0.3415,   Val Acc:   0.9125


Epoch:  40%|████      | 6/15 [04:26<06:43, 44.87s/it]

Train Loss: 0.1558, Train Acc: 0.9583
Val   Loss: 0.4115,   Val Acc:   0.9050


Epoch:  47%|████▋     | 7/15 [05:11<05:58, 44.87s/it]

Train Loss: 0.1264, Train Acc: 0.9583
Val   Loss: 0.4411,   Val Acc:   0.8850


Epoch:  53%|█████▎    | 8/15 [06:02<05:27, 46.77s/it]

Train Loss: 0.1196, Train Acc: 0.9675
Val   Loss: 0.4151,   Val Acc:   0.9125


Epoch:  60%|██████    | 9/15 [06:47<04:36, 46.14s/it]

Train Loss: 0.0941, Train Acc: 0.9750
Val   Loss: 0.4200,   Val Acc:   0.9075


Epoch:  67%|██████▋   | 10/15 [07:31<03:47, 45.55s/it]

Train Loss: 0.0990, Train Acc: 0.9750
Val   Loss: 0.2760,   Val Acc:   0.9250


Epoch:  73%|███████▎  | 11/15 [08:18<03:04, 46.07s/it]

Train Loss: 0.0614, Train Acc: 0.9825
Val   Loss: 0.3462,   Val Acc:   0.9050


Epoch:  80%|████████  | 12/15 [09:03<02:16, 45.62s/it]

Train Loss: 0.0802, Train Acc: 0.9775
Val   Loss: 0.7114,   Val Acc:   0.8575


Epoch:  87%|████████▋ | 13/15 [09:47<01:30, 45.11s/it]

Train Loss: 0.0822, Train Acc: 0.9775
Val   Loss: 0.4885,   Val Acc:   0.9050


Epoch:  93%|█████████▎| 14/15 [10:32<00:45, 45.16s/it]

Train Loss: 0.1105, Train Acc: 0.9758
Val   Loss: 0.4451,   Val Acc:   0.8900


Epoch: 100%|██████████| 15/15 [11:19<00:00, 45.29s/it]

Train Loss: 0.0859, Train Acc: 0.9750
Val   Loss: 0.4028,   Val Acc:   0.8950
Best val Acc: 0.9250





Модель сохранена в best.pth


In [28]:
model.load_state_dict(torch.load("best.pth"))

<All keys matched successfully>

In [29]:
test_loss, test_acc = valid_step(model, criterion, test_loader, device)
test_loss, test_acc

(0.43132887959480287, 0.8775000000000001)