In [128]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('fivethirtyeight') 
# %matplotlib inline
import librosa
import soundfile as sf
import librosa.display
import wave
import time
import os
import random
import torch
from torchvision.transforms.functional import pil_to_tensor
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from netcal.metrics import ECE


In [129]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [130]:
SPECTROGRAM_DPI = 90 # image quality of spectrograms
DEFAULT_SAMPLE_RATE = 44100
DEFAULT_HOPE_LENGHT = 1024

In [131]:
base_dir = '/home/user_7428/databases/TUT-acoustic-scenes-2017-development'
spectrogram_dir = f"{base_dir}/spectograms"
df = pd.read_csv(f"{base_dir}/meta.csv")
df['category'] = df['class']
classes_num = len(df['class'].unique())
classes = []
for c in df['class'].unique():
    classes.append(c)
print(f"Number of samples: {len(df)}, Number of classes: {classes_num}")
# df.groupby('class').agg('count')['path']
df.head()

Number of samples: 4680, Number of classes: 15


Unnamed: 0,path,class,unknown_yonatan_comment,category
0,audio/b020_90_100.wav,beach,b020,beach
1,audio/b020_110_120.wav,beach,b020,beach
2,audio/b020_100_110.wav,beach,b020,beach
3,audio/b020_40_50.wav,beach,b020,beach
4,audio/b020_50_60.wav,beach,b020,beach


In [132]:
matplotlib.pyplot.close()
if not os.path.exists(spectrogram_dir):
    os.mkdir(spectrogram_dir)
for i, row in df.iterrows():
    png_file = f"{spectrogram_dir}/{row['path'].split('/')[1].replace('.wav', '.png')}"
    if os.path.exists(png_file):
        continue
    waveform, sample_rate = librosa.load(f"{base_dir}/{row['path']}", sr=None)
    waveform = waveform.numpy()
    fig, axes = plt.subplots(1, 1)
    axes.specgram(waveform[0], Fs=sample_rate)
    # f = plt.figure()
    axes.axis('off')
    # plt.show(block=False)
    plt.savefig(f'{png_file}', dpi=SPECTROGRAM_DPI , bbox_inches='tight')
    plt.cla()
    plt.close(fig)
    # if i == 1000:
    #     break

In [133]:
def inspect_image_dimensions(image_dir, num_images=1):
    image_paths = [os.path.join(image_dir, fname) for fname in os.listdir(image_dir) if fname.endswith('.png')]
    for img_path in image_paths[:num_images]:
        with Image.open(img_path) as img:
            print(f'{img_path}: {img.size}')
inspect_image_dimensions(spectrogram_dir)

/home/user_7428/databases/TUT-acoustic-scenes-2017-development/spectograms/b047_30_40.png: (519, 367)


In [134]:
df['png_path'] = df['path'].apply(lambda path: path.replace('audio/', 'spectograms/').replace('.wav', '.png'))
df['png_path']

0        spectograms/b020_90_100.png
1       spectograms/b020_110_120.png
2       spectograms/b020_100_110.png
3         spectograms/b020_40_50.png
4         spectograms/b020_50_60.png
                    ...             
4675      spectograms/b081_50_60.png
4676      spectograms/b081_60_70.png
4677    spectograms/b081_100_110.png
4678    spectograms/b081_110_120.png
4679    spectograms/b081_120_130.png
Name: png_path, Length: 4680, dtype: object

In [135]:
img_height, img_width = 512, 384  # Adjusted dimensions based on inspection
transform = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.ToTensor(),  # Converts the image to tensor and scales pixel values to [0, 1]
])
batch_size = 32
learning_rate = 0.0001
num_epochs = 20

In [136]:
class SpectrogramDataset(Dataset):
    def __init__(self, image_dir, df, classes, transform, wanted_classes=None):
        self.image_dir = image_dir
        self.transform = transform
        if wanted_classes == None:
            self.labels = [classes.index(df[df['png_path'] == f'spectograms/{fname}']['category'].values[0]) for fname in os.listdir(image_dir) if fname.endswith('.png')]
            self.image_paths = [os.path.join(image_dir, fname) for fname in os.listdir(image_dir) if fname.endswith('.png')]
        else:
            self.labels = []
            self.image_paths = []
            for fname in os.listdir(image_dir):
                if not fname.endswith('.png'):
                    continue
                full_image_path = os.path.join(image_dir, fname)
                label = classes.index(df[df['png_path'] == f'spectograms/{fname}']['category'].values[0])    
                if label not in wanted_classes:
                    continue
                self.image_paths.append(full_image_path)
                self.labels.append(label)
        
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        # return pil_to_tensor(image), label
        return image, label

In [137]:
def get_test_and_train_loader(batch_size, wanted_classes=None, train_fraction=0.8):
    dataset = SpectrogramDataset(spectrogram_dir, df, classes, transform, wanted_classes)
    train_size = int(train_fraction * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader

In [138]:
class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        )
        self.fc1 = nn.Linear(128 * (img_height // 8) * (img_width // 8), 512)
        self.drop = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        return out

In [139]:
def train_model(model, train_loader, criterion, optimizer, num_epochs, device, save_path):
    if save_path != None:
        print(f"Checking if path {save_path} exists")
        if os.path.exists(save_path):
            print(f'Model loaded from {save_path}')
            model.load_state_dict(torch.load(save_path))
            model.to(device)
            model.eval()
            return
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / 100:.4f}')
    if save_path != None:
        torch.save(model.state_dict(), save_path)
        print(f'Model saved to {save_path}')

In [140]:
def evaluate_model(model, test_loader, device):
    model.eval()
    ground_truth = []
    confidences = []
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            probabilities = F.softmax(outputs, dim=1)
            confidences += list(probabilities.cpu().numpy())
            _, predicted = torch.max(outputs.data, 1)
            ground_truth += list(labels.to('cpu').numpy())
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f'Total: {total}, Correct: {correct}. Accuracy of the model on the test images: {100 * correct / total:.2f}%')
    ground_truth = np.array(ground_truth)
    confidences = np.array(confidences)
    return ground_truth, confidences

In [141]:
def get_ece(confidences, ground_truth):
    correct = ground_truth == confidences.argmax(axis=1)
    sum_true = np.sum(correct)
    total = correct.size
    acc = sum_true / total
    n_bins = 10
    ece = ECE(n_bins)
    uncalibrated_score = ece.measure(confidences, ground_truth)
    return uncalibrated_score

In [142]:
def create_train_and_evaluate_model(batch_size, number_of_wanted_classes, num_epochs, save=False, train_fraction=0.8):
    result_dir = "./results"
    ext = f"_{number_of_wanted_classes}_classes_{batch_size}_batch_{num_epochs}_epochs"
    ece_file_name = f"{result_dir}/ece{ext}.txt"
    if os.path.exists(ece_file_name):
        return
    if not os.path.isdir(result_dir):
        os.mkdir(result_dir)
    wanted_classes = [i for i in range(number_of_wanted_classes)]
    train_loader, test_loader = get_test_and_train_loader(batch_size, wanted_classes=wanted_classes, train_fraction=train_fraction)
    model = CNN(num_classes=number_of_wanted_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    save_path = None
    if save:
        save_path=f"{result_dir}/cnn_model{ext}.pth"
    train_model(model, train_loader, criterion, optimizer, num_epochs, device, save_path)
    ground_truth, confidences = evaluate_model(model, test_loader, device)
    ece = get_ece(confidences, ground_truth)
    with open(ece_file_name, 'w') as f:
        f.write(f"{ece}")


In [143]:
for number_of_wanted_classes in range(0, classes_num, int(0.1 * len(classes_num))):
    if number_of_wanted_classes == 0:
        continue
    for batch_size in [1, 4, 16, 32, 64]:
        for num_epochs in [5, 10, 15, 20]:
            create_train_and_evaluate_model(batch_size, number_of_wanted_classes, num_epochs)

Epoch [1/9], Loss: 2.7905
Epoch [2/9], Loss: 1.6113


KeyboardInterrupt: 