In [1]:
import os
import numpy as np
import torch
from torch import nn
from torch.nn import ReLU
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor
import torch.multiprocessing
from torchvision import datasets, transforms
from torchvision import models
from torch import optim
from torch.utils.data.dataloader import default_collate

from torch.utils.tensorboard import SummaryWriter
from torchsummary import summary
import torchvision
import matplotlib.pyplot as plt
from colorama import Fore
from IPython.display import Audio, display
from torchsummary import summary
from tqdm.auto import tqdm
from timeit import default_timer as timer
import psutil

writer_path = 'runs/logger_classifier_beta_v2'
# writer to log to tensorboard
writer = SummaryWriter(writer_path)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
"""
Display spectrogram of an audio waveform.
@param waveform: Numpy waveform of sound
@param sample_rate: Sound sample rate
"""
def audio_display_spectrogram(waveform, sample_rate, title="Spectrogram", xlim=None) -> None:
    waveform = waveform.numpy()
    num_channels, _ = waveform.shape
    figure, axes = plt.subplots(num_channels, 1)
    if num_channels == 1:
        axes = [axes]
    for c in range(num_channels):
        axes[c].specgram(waveform[c], Fs=sample_rate)
        if num_channels > 1:
            axes[c].set_ylabel(f"Channel {c+1}")
        if xlim:
            axes[c].set_xlim(xlim)
    figure.suptitle(title)
    plt.show(block=False)

"""
Play sound of audio waveform.
@param waveform: Numpy waveform of sound
@param sample_rate: Sound sample rate
"""
def audio_play(waveform, sample_rate) -> None:
    waveform = waveform.numpy()

    num_channels, _ = waveform.shape
    if num_channels == 1:
        display(Audio(waveform[0], rate=sample_rate))
    elif num_channels == 2:
        display(Audio((waveform[0], waveform[1]), rate=sample_rate))
    else:
        raise ValueError("Waveform with more than 2 channels are not supported.")

"""
Display a spectrogram image
@param img: Spectrogram of sound
@param one_channel: Whenever image is grey or has color (RGB) 
"""
def image_display_spectrogram(img, one_channel=False):
    if one_channel:
        img = img.mean(dim=0)
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    if one_channel:
        plt.imshow(npimg, cmap="Greys")
    else:
        plt.imshow(np.transpose(npimg, (1, 2, 0)))

"""
Display all the spectrogram of sounds within a batch
@param batches: Batch of data from a dataloader 
"""
def batches_display(batches, writer_path):
    dataiter = iter(batches)
    images, _ = next(dataiter)
    # create grid of images
    img_grid = torchvision.utils.make_grid(images)
    # show images
    image_display_spectrogram(img_grid, one_channel=False)
    # write to tensorboard
    writer.add_image(writer_path, img_grid)

"""
Log the size of each batch
@param batches: Batch of data from a dataloader 
"""
def batches_log_shape(batches):
    i = 0
    for curr_batch_image, _ in batches:
        print(Fore.GREEN, '[', '='*(i+1), ' '*(len(batches)-i-1), f'] Generated batch {i} with {len(curr_batch_image)} images')
        i += 1

In [3]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import os
from pathlib import Path
from colorama import Fore, Style
import torch.nn as nn

# --- Constants ---
NUM_WORKERS = 4
QUICK_DEV = False
DATASET_PATH = 'data'

# Load smaller database for faster loading time
if QUICK_DEV:
    DATASET_PATH = 'data' # You might want a different path for a smaller dev dataset

# Constants for audio processing and model input
IMAGE_SIZE = (389, 515)  # The target size for the spectrogram (height, width)
CHANNEL_COUNT = 3        # Target channels for the model
TARGET_SAMPLE_RATE = 22050 # Standardize all audio to this sample rate

ATTRIBUTION = ["bus", "car", "motorcycle", "pedestrian", "truck"]
SAVING_PATH = "../models/model_binary_beta_v1"
ACCURACY_THRESHOLD = 85

AUDIO_EXTENSIONS = {'.wav', '.mp3', '.flac'}

# --- Device Setup ---
print("CUDA available" if torch.cuda.is_available() else "CUDA not available")
print("Metal apple device detected" if torch.backends.mps.is_built() else "No Metal apple device")

# Get CPU or GPU device for training.
if torch.cuda.is_available():
    compute_unit = torch.device("cuda")
elif torch.backends.mps.is_available():
    compute_unit = torch.device("mps")
else:
    compute_unit = torch.device("cpu")

# see https://github.com/pytorch/pytorch/issues/11201
if os.name != 'nt': # file_system sharing strategy is not available on Windows
    torch.multiprocessing.set_sharing_strategy('file_system')

print(Fore.LIGHTMAGENTA_EX + f"Using {compute_unit} device for computation.")
print(Fore.GREEN + f"Computation workers count set to {NUM_WORKERS}")


# --- Custom Audio Dataset ---
class AudioFolder(Dataset):
    """
    A custom Dataset class for audio files, structured like ImageFolder.
    Assumes that root directory contains class-named subdirectories with audio files.
    """
    def __init__(self, root, transform=None, sample_rate=22050, extensions=None):
        self.root = Path(root)
        self.transform = transform
        self.sample_rate = sample_rate
        self.extensions = extensions if extensions else AUDIO_EXTENSIONS

        self.classes, self.class_to_idx = self._find_classes(self.root)
        self.samples = self._make_dataset(self.root, self.class_to_idx, self.extensions)

        if not self.samples:
            raise RuntimeError(f"Found 0 files in subfolders of: {self.root}. "
                               f"Supported extensions are: {', '.join(self.extensions)}")

    def _find_classes(self, dir):
        classes = [d.name for d in os.scandir(dir) if d.is_dir()]
        classes.sort()
        class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
        return classes, class_to_idx

    def _make_dataset(self, dir, class_to_idx, extensions):
        instances = []
        dir = os.path.expanduser(dir)
        for target_class in sorted(class_to_idx.keys()):
            class_index = class_to_idx[target_class]
            target_dir = os.path.join(dir, target_class)
            if not os.path.isdir(target_dir):
                continue
            for root, _, fnames in sorted(os.walk(target_dir, followlinks=True)):
                for fname in sorted(fnames):
                    path = os.path.join(root, fname)
                    if path.lower().endswith(tuple(extensions)):
                        item = (path, class_index)
                        instances.append(item)
        return instances

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        path, target = self.samples[index]
        # Load audio file
        try:
            waveform, sr = torchaudio.load(path)
        except Exception as e:
            print(f"Error loading file {path}: {e}")
            # Return a dummy tensor and target if a file is corrupt
            return torch.zeros((CHANNEL_COUNT, *IMAGE_SIZE)), target

        # Apply transformations
        if self.transform:
            waveform = self.transform(waveform)

        return waveform, target

# --- Audio Transformations ---

# Define the sequence of transformations to be applied to the audio waveforms
audio_transform = nn.Sequential(
    # 1. Resample to the target sample rate
    torchaudio.transforms.Resample(orig_freq=44100, new_freq=TARGET_SAMPLE_RATE), # Note: Set orig_freq to your dataset's actual sample rate

    # 2. Convert to a Mel Spectrogram
    torchaudio.transforms.MelSpectrogram(sample_rate=TARGET_SAMPLE_RATE, n_fft=1024, hop_length=512, n_mels=IMAGE_SIZE[0]),

    # 3. Convert to a logarithmic scale (decibels)
    torchaudio.transforms.AmplitudeToDB(),

    # 4. Resize the spectrogram to the desired final image size
    torchaudio.transforms.Resize(IMAGE_SIZE),

    # 5. Convert single-channel spectrogram to 3 channels by duplicating the channel
    nn.Lambda(lambda x: x.repeat(CHANNEL_COUNT, 1, 1))
)

# Move transforms to the selected compute device for potential speed-up
audio_transform = audio_transform.to(compute_unit)


# --- Load the Dataset ---
print(Fore.LIGHTMAGENTA_EX + f"Loading audio files from dataset at {DATASET_PATH}")

try:
    # Instantiate the custom dataset
    audio_dataset = AudioFolder(
        root=DATASET_PATH,
        transform=audio_transform,
        sample_rate=TARGET_SAMPLE_RATE
    )
    print(Fore.GREEN + f"Successfully loaded dataset with {len(audio_dataset)} audio files from {len(audio_dataset.classes)} classes.")
    print(f"Classes found: {audio_dataset.classes}")
except (RuntimeError, FileNotFoundError) as e:
    print(Fore.RED + f"Error loading dataset: {e}")



CUDA not available
No Metal apple device
[95mUsing cpu device for computation.
[32mComputation workers count set to 4




AttributeError: module 'torchaudio.transforms' has no attribute 'Resize'

In [None]:
# Define the data transformation
transform=transforms.ToTensor() 

# Load the dataset
print(Fore.LIGHTMAGENTA_EX + f"Loading images from dataset at {DATASET_PATH}")
dataset = datasets.ImageFolder(DATASET_PATH, transform=transform)

# train / test split
val_ratio = 0.2
val_size = int(val_ratio * len(dataset))
train_size = len(dataset) - val_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
print(Fore.GREEN + f"{train_size} images for training, {val_size} images for validation")

In [None]:
batch_size = 16


# Load into batches
train_batches = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True,
                                           num_workers=NUM_WORKERS,
                                           pin_memory=False) # switch to True if using collate

val_batches = torch.utils.data.DataLoader(val_dataset,
                                         batch_size=batch_size*2,
                                         num_workers=NUM_WORKERS,
                                         pin_memory=False) # switch to True if using collate

print(Fore.LIGHTMAGENTA_EX + f"Dataset loaded in batches.")
print(Fore.GREEN + f"Batch set to {batch_size} for training")
print(Fore.GREEN + f"Batch set to {batch_size*2} for validation")
batches_display(val_batches, writer_path=writer_path)

In [None]:
# Define CNN as sequential
model_binary_v1_arch = nn.Sequential(
    nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1),
    nn.ReLU(),
    nn.Conv2d(16, 16, kernel_size=3, stride=2, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
    nn.Conv2d(16, 10, kernel_size=3, stride=2, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
    nn.Conv2d(10, 10, kernel_size=3, stride=2, padding=1),
    nn.ReLU(),
    nn.Flatten(),
    nn.Linear(in_features=480,
              out_features=2),
)

# define CNN as sequential
class neuralNetworkV1(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1) 
        self.conv2 = nn.Conv2d(16, 16, kernel_size=3, stride=2, padding=1) 
        self.conv3 = nn.Conv2d(16, 10, kernel_size=3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(10, 10, kernel_size=3, stride=2, padding=1)
        self.pooling = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)) 
        self.relu = nn.ReLU()
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(in_features=480, out_features=2)
    
    def forward(self, x: torch.Tensor):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.pooling(x)
        x = self.relu(self.conv3(x))
        x = self.pooling(x)
        x = self.relu(self.conv4(x))
        x = self.flatten(x)
        try:
            x = self.linear(x)
        except Exception as e:
            print(Fore.RED + f"Error : Linear block should take support shape of {x.shape} for in_features.")
        return x

#selected_model = model_binary_v1_arch.to(compute_unit)
selected_model = neuralNetworkV1()

# Add CNN info to tensorboard
train_images_sample, _ = next(iter(train_batches))
writer.add_graph(selected_model, train_images_sample)

print(Fore.LIGHTMAGENTA_EX + f"Training dataloader shape :")
print(Fore.GREEN + f"({len(train_batches)}, {len(train_images_sample)}, {len(train_images_sample[0])}, {len(train_images_sample[0][0])}, {len(train_images_sample[0][0][0])})")

print(Fore.LIGHTMAGENTA_EX + "Model summary : " + Fore.GREEN)
print(summary(selected_model, (CHANNEL_COUNT, IMAGE_SIZE[0], IMAGE_SIZE[1])))

In [None]:
# display total time training
def display_training_time(start, end, device):
    total_time = end - start
    print(Fore.LIGHTMAGENTA_EX + f"Train time on {device}: {total_time:.3f} seconds")
    return total_time

# Calculate accuracy
def accuracy_fn(y_true, y_pred):
    """Calculates accuracy between truth labels and predictions.
    Args:
        y_true (torch.Tensor): Truth labels for predictions.
        y_pred (torch.Tensor): Predictions to be compared to predictions.
    Returns:
        [torch.float]: Accuracy value between y_true and y_pred, e.g. 78.45
    """
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

# Display training infos for each epochs
def display_training_infos(epoch, val_loss, train_loss, accuracy):
    val_loss = round(val_loss.item(), 3)
    train_loss = round(train_loss.item(), 3)
    accuracy = round(accuracy, 2)
    print(Fore.GREEN + f"Epoch : {epoch}, Training loss : {train_loss}, Validation loss : {val_loss}, Accuracy : {accuracy} %")

# Check memory usage excess
def check_memory():
    mem_percent = psutil.virtual_memory().percent
    swap_percent = psutil.swap_memory().percent
    if mem_percent >= 90:
        print(Fore.YELLOW + f"WARNING : Reached {mem_percent} memory usage !")
        os.system(f'say "Memory usage high"')
    if swap_percent >= 90:
        print(Fore.YELLOW + f"WARNING : Reached {mem_percent} memory usage !")
        os.system(f'say "Swap usage high"')
    if mem_percent >= 95 and swap_percent >= 95:
        print(Fore.RED + f"ABORTING : Memory and Swap full !")
        os.system(f'say "Aborting training"')
        raise MemoryError

def train_neural_net(epochs, model, loss_func, optimizer, train_batches, val_batches):
    last_loss = 0
    final_accuracy = 0
    for epoch in tqdm(range(epochs)):
        # check memory and swap usage
        check_memory()
        # training mode
        model.train()
        with torch.enable_grad():
            train_loss = 0
            for images, labels in train_batches:
                predictions = model(images)
                loss = loss_func(predictions, labels)
                train_loss += loss
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            train_loss /= len(train_batches)
            writer.add_scalar("training loss", train_loss, epoch)
        # evaluation mode
        val_loss, val_accuracy = 0, 0
        model.eval()
        with torch.inference_mode():
            for images, labels in val_batches:
                #images, labels = images.to(compute_unit), labels.to(compute_unit)
                predictions = model(images)
                val_loss += loss_func(predictions, labels)
                val_accuracy += accuracy_fn(y_true=labels, y_pred=predictions.argmax(dim=1))
            val_loss /= len(val_batches)
            val_accuracy /= len(val_batches)
            writer.add_scalar("validation loss", val_loss, epoch)
            final_accuracy = val_accuracy
        display_training_infos(epoch+1, val_loss, train_loss, val_accuracy)
        writer.add_scalar("accuracy", val_accuracy, epoch)
        if val_accuracy >= ACCURACY_THRESHOLD:
            break
        last_loss = val_loss
    return final_accuracy

MAX_EPOCHS = 300
LEARNING_RATE = 0.01
GRADIENT_MOMENTUM = 0.90
loss_func = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(selected_model.parameters(), lr=LEARNING_RATE, momentum=GRADIENT_MOMENTUM)

print(Fore.LIGHTMAGENTA_EX + "Model ready : ")
print(Fore.GREEN, f"Learning rate set to : {LEARNING_RATE}")
print(Fore.GREEN, f"Momentum set to : {GRADIENT_MOMENTUM}")

print(Fore.LIGHTMAGENTA_EX + "Starting model training...")
train_time_start_on_gpu = timer()
training_complete = False
model_accuracy = train_neural_net(MAX_EPOCHS, selected_model, loss_func, optimizer, train_batches, val_batches)
print(Fore.LIGHTCYAN_EX + f"Training complete : {model_accuracy} %")
os.system(f'say "Training complete"')
training_complete = True
display_training_time(start=train_time_start_on_gpu,
                  end=timer(),
                  device=compute_unit)

INFERENCE

In [None]:
if training_complete == True:
    if input("Save model ? y for YES") == "y":
        print(Fore.LIGHTMAGENTA_EX + f"Saving model at {SAVING_PATH}")
        torch.save(selected_model, SAVING_PATH)
writer.flush()
writer.close()

In [None]:
SPECTOGRAM_SAVE_PATH = ''
DEVICE = torch.device('cpu')
def infer(sound_path : str) -> int:
    model = torch.load("./model_path", map_location=DEVICE)
    sound = audio(sound_path)
    sound.write_disk_specogram(SPECTOGRAM_SAVE_PATH, dpi = 90)
    image = Image.open(SPECTOGRAM_SAVE_PATH).convert('RGB')
    with torch.no_grad():
        image_array = np.array(image)
        image_array = np.transpose(image_array, (2,0,1))
        image_tensor = torch.tensor(image_array, dtype=torch.float32).unsqueeze(0)
        predictions = model(image_tensor)
        top_index = torch.argmax(predictions, dim =1).item()
    return predictions[top_index]



In [None]:
SPECTOGRAM_DPI = 90
DEFAULT_SAMPLE_RATE = 44100
DEFAULT_HOPE_LWNGTH = 1024