In [1]:
import os
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, random_split, DataLoader
import torchaudio

### 1. Data Loading & Preprocessing

In [2]:
PATH = "/workspace/categorized_dataset"

In [3]:
class VoiceDataset(Dataset):
    
    def __init__(self, annotations_file, audio_dir, transformation, target_sample_rate, num_samples):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.transformation = transformation
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples
    
    # returns the number of audio samples
    def __len__(self):
        return len(self.annotations)
    
    # fetch the audio and its label; return the processed mel spectrogram and its label
    def __getitem__(self, index):
        audio_sample_path, label = self._get_audio_sample_path_and_label(index)
        signal, sample_rate = torchaudio.load(audio_sample_path)
        
        signal = self._resample(signal, sample_rate)
        signal = self._mix_down(signal)
        
        signal = self._cut(signal)
        signal = self._rightpad(signal)
        
        signal = self.transformation(signal)
        
        return signal, label
    
    # resample the audio
    def _resample(self, signal, sample_rate):
        if sample_rate != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sample_rate, self.target_sample_rate)
            signal = resampler(signal)
            
        return signal
    
    # merge channels if the audio is stereo
    def _mix_down(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
            
        return signal
    
    # cut the audio if it has more sample than the target sample rate
    def _cut(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
            
        return signal
    
    # zero pad the audio if it is shorter than the target sample rate
    def _rightpad(self, signal):
        signal_length = signal.shape[1]
        if signal_length < self.num_samples:
            missing_samples = self.num_samples - signal_length
            padding = (0, missing_samples)
            signal = torch.nn.functional.pad(signal, padding)
            
        return signal
    
    # fetch the audio path and label
    def _get_audio_sample_path_and_label(self, index):
        label = self.annotations.iloc[index]['label']
        filename = self.annotations.iloc[index]['filename']
        
        path = os.path.join(PATH, filename.split('.')[0].split('_')[-1], filename)
        
        return path, label

In [4]:
if __name__ == "__main__":
    
    ANNOT_FILE = os.path.join(PATH, 'annot_file.csv')
    AUDIO_DIR = PATH
    SAMPLE_RATE = 22050
    NUM_SAMPLES = 22050
    
    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        hop_length=512,
        n_mels=64
    )
    
    dataset = VoiceDataset(ANNOT_FILE, AUDIO_DIR, mel_spectrogram, SAMPLE_RATE, NUM_SAMPLES)
    
    print(f"{len(dataset)} samples in the dataset.")

2400 samples in the dataset.


In [5]:
train_size = int(0.85 * len(dataset))
test_size = len(dataset) - train_size
train_data, test_data = random_split(dataset, [train_size, test_size])

BATCH_SIZE = 16
NUM_WORKERS = 8

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"using: {device}")

if device == "cuda":
    num_workers = NUM_WORKERS
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, num_workers=num_workers, pin_memory=pin_memory)
test_loader = DataLoader(test_data, batch_size=BATCH_SIZE, num_workers=num_workers, pin_memory=pin_memory)

print(f"{train_size} samples for training; {len(train_loader)} sample sets in the train dataset.")
print(f"{test_size} samples for testing; {len(test_loader)} sample sets in the test dataset.")

using: cuda
2040 samples for training; 128 sample sets in the train dataset.
360 samples for testing; 23 sample sets in the test dataset.


### 2. Model Creation

In [6]:
from torch import nn
import torch.nn.functional as F
from torchsummary import summary

class ResidualBlock(nn.Module):
    
    def __init__(self, in_channels, out_channels, stride=1, kernel_size=3, padding=1, bias=False):
        super(ResidualBlock, self).__init__()
        
        self.cnn1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride,padding, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(True)
        )
        self.cnn2 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, kernel_size,1, padding,bias=False),
            nn.BatchNorm2d(out_channels)
        )
        
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.shortcut = nn.Sequential()
            
    def forward(self,x):
        residual = x
        x = self.cnn1(x)
        x = self.cnn2(x)
        x += self.shortcut(residual)
        x = nn.ReLU(True)(x)
        
        return x

class ResNet34(nn.Module):
    
    def __init__(self):
        super(ResNet34,self).__init__()
        
        self.block1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=2, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True)
        )
        
        self.block2 = nn.Sequential(
            nn.MaxPool2d(1,1),
            ResidualBlock(64,64),
            ResidualBlock(64,64,2)
        )
        
        self.block3 = nn.Sequential(
            ResidualBlock(64,128),
            ResidualBlock(128,128,2)
        )
        
        self.block4 = nn.Sequential(
            ResidualBlock(128,256),
            ResidualBlock(256,256,2)
        )
        
        self.block5 = nn.Sequential(
            ResidualBlock(256,512),
            ResidualBlock(512,512,2)
        )
        
        self.avgpool = nn.AvgPool2d(2)
        
        self.linear = nn.Linear(512,6)
        
    def forward(self,x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.avgpool(x)
        x = x.view(x.size(0),-1)
        x = self.linear(x)
        
        return x

In [7]:
resnet34 = ResNet34().to(device)
summary(resnet34, (1, 64, 44))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 35, 25]             256
       BatchNorm2d-2           [-1, 64, 35, 25]             128
              ReLU-3           [-1, 64, 35, 25]               0
         MaxPool2d-4           [-1, 64, 35, 25]               0
            Conv2d-5           [-1, 64, 35, 25]          36,864
       BatchNorm2d-6           [-1, 64, 35, 25]             128
              ReLU-7           [-1, 64, 35, 25]               0
            Conv2d-8           [-1, 64, 35, 25]          36,864
       BatchNorm2d-9           [-1, 64, 35, 25]             128
    ResidualBlock-10           [-1, 64, 35, 25]               0
           Conv2d-11           [-1, 64, 18, 13]          36,864
      BatchNorm2d-12           [-1, 64, 18, 13]             128
             ReLU-13           [-1, 64, 18, 13]               0
           Conv2d-14           [-1, 64,

In [9]:
from tqdm import tqdm
model = ResNet34().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=4e-4)
criterion = nn.CrossEntropyLoss()
BATCH_SIZE = 16
EPOCHS = 50

model.train()
losses = []
accs = []

for epoch in range(EPOCHS):
    
    print('epochs {}/{} '.format(epoch+1, EPOCHS))
    
    running_loss = 0.0
    running_acc = 0.0
    
    for idx, (inputs, labels) in tqdm(enumerate(train_loader), total=len(train_loader)):
        
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(inputs.float())
        loss = criterion(outputs,labels)

        running_loss += loss
        running_acc += (outputs.argmax(1)==labels).float().mean()
        
        loss.backward()
        optimizer.step()
        
    losses.append(running_loss/len(train_loader))
    accs.append(running_acc/(len(train_loader)*3))
    
    print('acc : {:.2f}'.format(running_acc/len(train_loader)))
    print('loss : {:.4f}'.format(running_loss/len(train_loader)))

epochs 1/50 


100%|██████████| 128/128 [17:16<00:00,  8.10s/it]


acc : 0.76%
loss : 0.6493
epochs 2/50 


100%|██████████| 128/128 [17:14<00:00,  8.08s/it]


acc : 0.96%
loss : 0.1165
epochs 3/50 


100%|██████████| 128/128 [17:08<00:00,  8.04s/it]


acc : 0.98%
loss : 0.0599
epochs 4/50 


100%|██████████| 128/128 [17:07<00:00,  8.02s/it]


acc : 0.98%
loss : 0.0557
epochs 5/50 


100%|██████████| 128/128 [16:44<00:00,  7.85s/it]


acc : 0.99%
loss : 0.0191
epochs 6/50 


 37%|███▋      | 47/128 [06:05<10:30,  7.78s/it]


KeyboardInterrupt: 

In [12]:
SAVE_PATH = '/workspace/approach-1-model.pth'
torch.save(model, SAVE_PATH)