In [1]:
import os
import csv
import datetime

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.models import efficientnet_b0

from data_loading import TorchTensorFolderDataset

In [2]:
def save_to_csv(data, filename, column_names):
    timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H.%M")
    filename = f"{filename}_{timestamp}.csv"
    with open(filename, mode="w", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(column_names)  
        writer.writerows(data)

    return

In [3]:
def get_loader(data_size:str="full", target_data:str="train", batch_size:int=16):
    # path_to_raw = os.path.join(os.getcwd(), "data", "preprocessed", data_size, "raw", target_data)
    path_to_mel = os.path.join(os.getcwd(), "data", "preprocessed", data_size, "mel", target_data)
    
    # dataset = EnsembleDataset(path_to_raw, path_to_mel)
    dataset = TorchTensorFolderDataset(path_to_mel)
    # n = len(dataset.labels)
    n = len(dataset.class_to_idx)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    return loader, n

In [4]:
DATA_SIZE = "sample"

In [5]:
def get_pretrained_model():
    model = efficientnet_b0(weights='EfficientNet_B0_Weights.IMAGENET1K_V1')
    
    original_conv = model.features[0][0]
    new_conv = nn.Conv2d(
        in_channels=1,
        out_channels=original_conv.out_channels,
        kernel_size=original_conv.kernel_size,
        stride=original_conv.stride,
        padding=original_conv.padding,
        bias=original_conv.bias is not None
    )
    with torch.no_grad():
        new_conv.weight[:] = original_conv.weight.mean(dim=1, keepdim=True)
    model.features[0][0] = new_conv
    
    return model

In [6]:
def train_model(model, optimizer, train_loader, val_loader, num_epochs=5, patience=3, verbose=False):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    criterion = nn.CrossEntropyLoss()
    model.to(device)

    counter = 0
    best_val_acc = float("+inf")
    best_model = model.state_dict()

    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        if not patience is None and counter >= patience:
            print('Patience trigger. End of learning.')
            break
            
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
    
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()
        train_acc = 100 * train_correct / train_total
    
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                
                outputs = model(images)
                loss = criterion(outputs, labels)
    
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        val_acc = 100 * val_correct / val_total

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        if verbose:
            print(f"Epoch [{epoch+1}/{num_epochs}]")
            print(f"Train Loss: {train_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%")
            print(f"Val Loss:   {val_loss/len(val_loader):.4f}, Val Acc:   {val_acc:.2f}%\n")

        if val_acc < best_val_acc:
            counter = 0
            best_val_acc = val_acc
            best_model = model.state_dict()
        else:
            counter += 1

    model.load_state_dict(best_model)

    return train_losses, val_losses

In [7]:
def test_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    criterion = nn.CrossEntropyLoss()
    model.to(device)
    
    model.eval()
    test_loss = 0
    test_correct = 0
    test_total = 0
    
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
    
            test_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels).sum().item()
    
    test_acc = 100 * test_correct / test_total

    return test_loss, test_acc

In [8]:
def test_learning_rates(times=3, learning_rates=[0.1, 0.05, 0.01, 0.005, 0.001]):
    val_loader, _ = get_loader(DATA_SIZE, "validation")
    test_loader, _ = get_loader(DATA_SIZE, "test")
    
    result = []
    column_names = ["learning_rate", "train_loss", "train_acc", "val_loss", "val_acc", "test_loss", "test_acc"]
    print(*column_names)
    
    for learning_rate in learning_rates:
        for _ in range(times):
            train_loader, n = get_loader(DATA_SIZE, "train")
            
            model = get_pretrained_model()
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)
            
            train_model(model, optimizer, train_loader, val_loader, num_epochs=10)
            
            train_loss, train_acc = test_model(model, train_loader)
            val_loss, val_acc = test_model(model, val_loader)
            test_loss, test_acc = test_model(model, test_loader)
            
            result.append((learning_rate, train_loss, train_acc, val_loss, val_acc, test_loss, test_acc))
            
            print(*result[-1])
            
    save_to_csv("results/learning_rates_test.csv", column_names, result)

In [11]:
def test_batch_sizes(times=3, batch_sizes=[16, 32, 64]):
    val_loader, _ = get_loader(DATA_SIZE, "validation")
    test_loader, _ = get_loader(DATA_SIZE, "test")
    
    result = []
    column_names = ["batch_size", "train_loss", "train_acc", "val_loss", "val_acc", "test_loss", "test_acc"]
    print(*column_names)
    
    for batch_size in batch_sizes:
        for _ in range(times):
            train_loader, n = get_loader(DATA_SIZE, "train", batch_size)
            
            model = get_pretrained_model()
            optimizer = optim.Adam(model.parameters(), lr=0.001)
            
            train_model(model, optimizer, train_loader, val_loader, num_epochs=10)
            
            train_loss, train_acc = test_model(model, train_loader)
            val_loss, val_acc = test_model(model, val_loader)
            test_loss, test_acc = test_model(model, test_loader)
            
            result.append((batch_size, train_loss, train_acc, val_loss, val_acc, test_loss, test_acc))
            
            print(*result[-1])
            
    save_to_csv("results/batch_sizes_test.csv", column_names, result)

In [12]:
test_batch_sizes()

batch_size train_loss train_acc val_loss val_acc test_loss test_acc


KeyboardInterrupt: 