In [None]:
# !pip install torch
# !pip install torchaudio
# !pip install numpy
# !pip install pandas
# !pip install wget

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import torchaudio
from torchaudio.transforms import Resample

import numpy as np
import pandas as pd

import os
import glob
import wget

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# # Download and Unzip VC-PRG-1_5.zip
# !wget http://cmp.felk.cvut.cz/data/audio_vc/audio/VC-PRG-1_5.zip
# !unzip VC-PRG-1_5.zip
# !rm VC-PRG-1_5.zip

In [None]:
# # Download and Unzip VC-PRG-6.zip
# !wget http://cmp.felk.cvut.cz/data/audio_vc/audio/VC-PRG-6.zip
# !unzip VC-PRG-6.zip
# !rm VC-PRG-6.zip

In [None]:
DATASET_PATH = "/home/penguin/Data/thesis/learning/vehicle_counting/VC-PRG-1_5/"
TARGET_SAMPLE_RATE = 44100 # Hz
SAMPLE_LENGTH =  20 # seconds
BATCH_SIZE = 10
LEARNING_RATE = 0.001
EPOCHS = 20

In [None]:
class VehicleCountingDataset(Dataset):
    def __init__(self, dataset_path, target_sample_rate, signal_len, transformation, device):
        self.dataset_path = dataset_path
        self.target_sample_rate = target_sample_rate
        self.signal_len = signal_len
        self.num_samples = int(self.signal_len * self.target_sample_rate)
        self.transformation = transformation
        self.device = device

        self.audio_files = glob.glob(os.path.join(self.dataset_path, "*.wav"))
        self.vc_files = glob.glob(os.path.join(self.dataset_path, "*.txt"))
    
    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, item):
        label = self._get_vc(item)
        signal, sample_rate = self._get_signal(item)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sample_rate)      
        signal = self._mix_down_if_necessary(signal)    
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        mel_spec = self.transformation(signal)
        return mel_spec, label

    def _get_signal(self, item):
        signal, sample_rate = torchaudio.load(self.audio_files[item])
        return signal, sample_rate

    def _get_vc(self, item):
        vc = 0
        with open(self.vc_files[item], 'r') as f:
            vc = len(f.readlines())
        return vc
    
    def _resample_if_necessary(self, signal, sample_rate):
        if sample_rate != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sample_rate, self.target_sample_rate).to(self.device)
            signal = resampler(signal)
        return signal
    
    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal
    
    def _right_pad_if_necessary(self, signal):
        if signal.shape[1] < self.num_samples:
            num_missing_samples = self.num_samples - signal.shape[1]
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

def create_data_loader(data, batch_size):
    dataloader = DataLoader(data, batch_size=batch_size)
    return dataloader

In [None]:
class CNNNetwork(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.flatten = nn.Flatten()

        self.linear = nn.Linear(in_features=22400, out_features=10)

        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, input_data):
        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.flatten(x)
        logits = self.linear(x)
        predictions = self.softmax(logits)
        return predictions

def train_single_epoch(model, data_loader, loss_fn, optimizer, device):
    for feature, target in data_loader:
        feature, target = feature.to(device), target.to(device)

        # Loss
        predictions = model(feature)
        loss = loss_fn(predictions, target)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Loss: {loss.item()}")
    return loss.item()

def train(model, data_loader, loss_fn, optimizer, epochs, device):
    loss_history = {}
    for epoch in range(epochs):
        print(f"Epoch: {epoch +1}")
        loss = train_single_epoch(model, data_loader, loss_fn, optimizer, device)
        loss_history[epoch] = loss
        print("----------------------------------------------------------------------------------------")
    print("Training finished")
    return loss_history

In [None]:
# Check if GPU is available
if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

# Feature Extractor
mel_spectrogram_transformer = torchaudio.transforms.MelSpectrogram(
    sample_rate=TARGET_SAMPLE_RATE,
    n_fft=4096,
    hop_length=1638,
    n_mels=64
).to(device)

# Dataset
vcd = VehicleCountingDataset(DATASET_PATH, TARGET_SAMPLE_RATE, SAMPLE_LENGTH, mel_spectrogram_transformer, device)

# Data loader
train_dataloader = create_data_loader(vcd, BATCH_SIZE)

# Model
model = CNNNetwork().to(device)

# Loss function
loss_fn = nn.CrossEntropyLoss()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Train
loss_history = train(model, train_dataloader, loss_fn, optimizer, EPOCHS, device)
torch.save(model.state_dict(), "vcd_cnn_model.pt")
plt.plot(loss_history.values())
plt.show()