In [8]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [9]:
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import torchaudio
import os
torchaudio.set_audio_backend("soundfile")

  torchaudio.set_audio_backend("soundfile")


In [10]:
class UrbanSoundDataset(Dataset):
    def __init__(self, annotation_file, audio_dir, transformation, target_sample_rate, num_samples, device):
        self.annotation = pd.read_csv(annotation_file)
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples

    def __len__(self):
        return len(self.annotation)

    def __getitem__(self, idx):
        audio_sample_path = self._get_audio_sample_path(idx)
        label = self._get_audio_sample_label(idx)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)
        return signal, label
    
    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal
    
    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal
    
    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate).to(self.device)
            signal = resampler(signal)
        return signal
    
    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal
    
    def _get_audio_sample_path(self, idx):
        fold = f"fold{self.annotation.iloc[idx, 5]}"
        path = os.path.join(self.audio_dir, fold, self.annotation.iloc[idx, 0])
        return path
    
    def _get_audio_sample_label(self, idx):
        return self.annotation.iloc[idx, 6]

In [11]:
ANNOTATION_FILE = "./data/UrbanSound8K/metadata/UrbanSound8K.csv"
AUDIO_DIR = os.path.join("data", "UrbanSound8K", "audio")
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050

In [12]:
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE,
    n_fft=1024,
    hop_length=512,
    n_mels=64
)

In [13]:
usd = UrbanSoundDataset(ANNOTATION_FILE, AUDIO_DIR, mel_spectrogram, SAMPLE_RATE, NUM_SAMPLES, device)

In [15]:
from torch import nn

In [16]:
class CNNNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(128 * 5 * 4, 10)
        self.softmax = nn.Softmax(dim=1)


    def forward(self, input_data):
        conv1_output = self.conv1(input_data)
        conv2_output = self.conv2(conv1_output)
        conv3_output = self.conv3(conv2_output)
        conv4_output = self.conv4(conv3_output)
        flatten_output = self.flatten(conv4_output)
        linear_output = self.linear(flatten_output)
        output = self.softmax(linear_output)
        return output

In [17]:
cnn = CNNNetwork().to(device)

In [18]:
from torchsummary import summary

In [19]:
summary(cnn, input_size=(1, 64, 44))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 16, 66, 46]             160
              ReLU-2           [-1, 16, 66, 46]               0
         MaxPool2d-3           [-1, 16, 33, 23]               0
            Conv2d-4           [-1, 32, 35, 25]           4,640
              ReLU-5           [-1, 32, 35, 25]               0
         MaxPool2d-6           [-1, 32, 17, 12]               0
            Conv2d-7           [-1, 64, 19, 14]          18,496
              ReLU-8           [-1, 64, 19, 14]               0
         MaxPool2d-9             [-1, 64, 9, 7]               0
           Conv2d-10           [-1, 128, 11, 9]          73,856
             ReLU-11           [-1, 128, 11, 9]               0
        MaxPool2d-12            [-1, 128, 5, 4]               0
          Flatten-13                 [-1, 2560]               0
           Linear-14                   

In [20]:
BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 0.001

In [21]:
def create_data_loader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size)
    return train_dataloader

In [22]:
train_dataloader = create_data_loader(usd, BATCH_SIZE)

In [23]:
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(cnn.parameters(), lr=LEARNING_RATE)

In [24]:
def train_one_epoch(model, data_loader, loss_function, optimizer, device):
    for inputs, targets in data_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)
        # calculate loss
        predictions = model(inputs)
        loss = loss_function(predictions, targets)
        # backpropagate error and update weights
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Loss: {loss.item()}")
    return loss.item()

In [25]:
def train(model, data_loader, loss_function, optimizer, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_one_epoch(model, data_loader, loss_function, optimizer, device)
        print("---------------------------")
    print("Finished training")

In [26]:
train(cnn, train_dataloader, loss_function, optimizer, device, EPOCHS)

Epoch 1
Loss: 2.4237799644470215
---------------------------
Epoch 2
Loss: 2.2952210903167725
---------------------------
Epoch 3
Loss: 2.3593215942382812
---------------------------
Epoch 4
Loss: 2.3345603942871094
---------------------------
Epoch 5
Loss: 2.281878709793091
---------------------------
Epoch 6
Loss: 2.3485047817230225
---------------------------
Epoch 7
Loss: 2.3586928844451904
---------------------------
Epoch 8
Loss: 2.4224298000335693
---------------------------
Epoch 9
Loss: 2.35557222366333
---------------------------
Epoch 10
Loss: 2.3036892414093018
---------------------------
Finished training


In [28]:
torch.save(cnn.state_dict(), "models/cnn.pth")