In [27]:
import random
import os
import numpy as np
import sklearn
import torch
from torch.cuda import manual_seed_all
from torch import manual_seed as torch_manual_seed
from torch.backends import cudnn
import matplotlib as mpl
from matplotlib import pyplot as plt
import torchaudio
import torchaudio.transforms as T

In [2]:
# pre spectrogram augmentations
# these are examples and can be changed based on domain knowledge

time_stretch = T.TimeStretch()
def stretch_waveform(waveform, rate=1.2):
    # `rate > 1.0` speeds up, `rate < 1.0` slows down
    return time_stretch(waveform, rate)

pitch_shift = T.PitchShift(sample_rate=44100, n_steps=2)  # Shift up by 2 semitones
def shift_pitch(waveform, sample_rate):
    return pitch_shift(waveform)

def scale_volume(waveform, factor=1.5):
    return waveform * factor  # Amplifies waveform by factor

def crop_waveform(waveform, crop_size):
    start = torch.randint(0, max(1, waveform.size(-1) - crop_size), (1,)).item()
    return waveform[:, start:start + crop_size]

def apply_reverb(waveform):
    reverb = T.Reverberate()
    return reverb(waveform)

def time_shift(waveform, shift):
    return torch.roll(waveform, shifts=shift, dims=-1)

def add_noise(waveform, noise_level=0.005):
    noise = torch.randn_like(waveform) * noise_level
    return waveform + noise

# Augment on-the-fly stochastically
# again these are just examples and do not necessarily utilize the methods above
def augment_waveform(data):
    waveform, sample_rate = data
    if torch.rand(1).item() > 0.5:
        waveform += torch.randn_like(waveform) * 0.005
    if torch.rand(1).item() > 0.5:
        waveform = torch.roll(waveform, shifts=torch.randint(-5000, 5000, (1,)).item(), dims=-1)
    if torch.rand(1).item() > 0.5:
        waveform *= torch.FloatTensor(1).uniform_(0.8, 1.5).item()
    return waveform, sample_rate


In [3]:
# Create a MelSpectrogram transformation
mel_spectrogram_transform = T.MelSpectrogram(
    sample_rate=44100,         # Default sample rate, change if needed
    n_fft=1024,                # Number of FFT bins
    hop_length=512,            # Hop length between windows
    n_mels=64                  # Number of Mel bands
)

def waveform_to_spectrogram(data):
    waveform, sample_rate = data
    spectrogram = mel_spectrogram_transform(waveform)  # Apply the spectrogram transformation
    return spectrogram

In [4]:
# post spectrogram augmentations

# Example augmentations, could add more
time_mask = T.TimeMasking(time_mask_param=10)

freq_mask = T.FrequencyMasking(freq_mask_param=8)

# hybridizes two sounds
def mixup(spectrogram1, spectrogram2, alpha=0.2):
    lam = torch.FloatTensor(1).uniform_(0, alpha).item()
    return lam * spectrogram1 + (1 - lam) * spectrogram2

# should probably implement a randomization process like above
def augment_spectrogram(spectrogram):
    augmented = time_mask(spectrogram)  # Apply time masking
    augmented = freq_mask(augmented)   # Apply frequency masking
    return augmented

In [5]:
# Decode audio files
def decode_audio(file_tuple):
    file_path, file = file_tuple
    waveform, sample_rate = torchaudio.load(file_path)
    return waveform, sample_rate

In [6]:
import os
import torchaudio
from torch.utils.data import Dataset, DataLoader
import pandas as pd

class UrbanSoundDataset(Dataset):
    def __init__(self, audio_path, fold, csv_path, transform=None):
        self.audio_path = os.path.join(audio_path, f"fold{fold}")
        self.file_list = [os.path.join(self.audio_path, f) for f in os.listdir(self.audio_path) if f.endswith(".wav")]
        self.transform = transform

        # Load the metadata CSV file
        self.metadata = pd.read_csv(csv_path)

    def get_label(self, file_name):
        """Fetch the class label for a given file name from the metadata."""
        label_row = self.metadata.loc[self.metadata['slice_file_name'] == file_name, 'class']
        if not label_row.empty:
            return label_row.values[0]
        else:
            raise ValueError(f"File name {file_name} not found in metadata CSV.")

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        # Load the audio file
        file_path = self.file_list[idx]
        waveform, sample_rate = torchaudio.load(file_path)

        # Convert mono to stereo if necessary
        if waveform.size(0) == 1:  # If mono
            waveform = waveform.repeat(2, 1)

        # Apply transformations
        if self.transform:
            waveform = self.transform(waveform)

        # Extract the file name from the path
        file_name = os.path.basename(file_path)

        # Get the corresponding label for the file
        label = self.get_label(file_name)

        return waveform, label

# class UrbanSoundDataset(Dataset):
#     def __init__(self, audio_path, fold, transform=None):
#         self.audio_path = os.path.join(audio_path, f"fold{fold}")
#         self.norm_path = os.path.normpath(self.audio_path)
#         self.file_list = [os.path.join(self.norm_path, f) for f in os.listdir(self.norm_path) if f.endswith(".wav")]
#         self.transform = transform

#     def __len__(self):
#         return len(self.file_list)

#     # def __getitem__(self, idx):
#     #     # Load the audio file
#     #     file_path = self.file_list[idx]
#     #     waveform, sample_rate = torchaudio.load(file_path)

#     #     # Convert mono to stereo if necessary
#     #     if waveform.size(0) == 1:
#     #         waveform = waveform.repeat(2, 1)

        
#     #     # Apply any transformations (e.g., augmentations, spectrogram)
#     #     if self.transform:
#     #         waveform = self.transform(waveform)
        
#     #     return waveform

#     def __getitem__(self, idx):
#     file_path = self.file_list[idx]
#     waveform, sample_rate = torchaudio.load(file_path)
    
#     # Convert mono to stereo if necessary
#     if waveform.size(0) == 1:
#         waveform = waveform.repeat(2, 1)
    
#     # Apply transformations
#     if self.transform:
#         waveform = self.transform(waveform)

#     # Make sure to return both X (waveform) and y (label)
#     label = self.get_label(file_path)  # Replace with your method to get labels
#     return waveform, label

In [7]:
import torchaudio.transforms as T

# Example transformations
def augment_waveform(waveform):
    # Add your augmentation logic here (e.g., noise addition, time stretch, etc.)
    return waveform

waveform_to_spectrogram = T.MelSpectrogram(sample_rate=16000, n_mels=128)
augment_spectrogram = T.AmplitudeToDB()

# Combine transformations into a callable function
def transform_pipeline(waveform):
    waveform = augment_waveform(waveform)
    spectrogram = waveform_to_spectrogram(waveform)
    spectrogram = augment_spectrogram(spectrogram)
    return spectrogram

def pad_with_noise(spectrogram, max_time, noise_std=0.01):
    """
    Pads a spectrogram with Gaussian noise instead of zeros.

    Args:
        spectrogram (Tensor): Shape (channels, freq_bins, time_steps)
        max_time (int): Target time dimension
        noise_std (float): Standard deviation of the Gaussian noise

    Returns:
        Tensor: Padded spectrogram with noise
    """
    # Compute how much padding is needed
    pad_amount = max_time - spectrogram.size(2)
    
    if pad_amount > 0:
        # Generate random noise matching the shape of missing time steps
        noise = torch.randn((spectrogram.size(0), spectrogram.size(1), pad_amount)) * noise_std
        
        # Concatenate noise along the time axis
        spectrogram = torch.cat([spectrogram, noise], dim=2)
    
    return spectrogram

def convert_to_three_channels(spectrogram):
    # Convert [2, 224, 224] to [3, 224, 224]
    if spectrogram.size(0) == 2:
        # Duplicate the first channel to create a third channel
        return torch.cat((spectrogram, spectrogram[0:1, :, :]), dim=0)
    return spectrogram



In [8]:
import torchvision

class densenet(torch.nn.Module):
    """
    DenseNet Class, derived from Pytorch. Intended for model manipulation (i.e. unfreezing layers, etc.)
    To use model, try (densenet).model(data)
    May change to reflect manual implementation of densenet161.
    """
    def __init__(self):
        super().__init__()  # Initialize the nn.Module base class
        self.model = torchvision.models.densenet161()

    def forward(self, x):
        return self.model(x)  # Delegate forward pass to the original DenseNet

    def layer_change(self):
        """
        Unfreeze layers of DenseNet model per specifications
        """
        pass

In [44]:
# Define training and testing loops
def train_loop(dataloader, model, loss_fn, optimizer, scheduler = None, epochs=1):
    model.train()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs}")
        size = len(dataloader.dataset)
        total_loss = 0  # Initialize variable to accumulate loss per epoch
        total_batches = len(dataloader)

        for batch, (X, y) in enumerate(dataloader):
            # Compute prediction and loss
            X = X.to(device)
            y = y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Accumulate loss
            total_loss += loss.item()
            

            total_batches = len(dataloader)
            if batch % (total_batches // 5) == 0:  # Prints 5 times per epoch
                current = (batch + 1) * len(X)
                print(f"loss: {loss.item():>7f}  [{current:>5d}/{size:>5d}]")

        if scheduler is not None:
            scheduler.step()
        # Average loss for this epoch
        avg_loss = total_loss / len(dataloader)
        print(f"Training Loss (Epoch): {avg_loss:>7f}")
    return avg_loss  # Return the average loss for the last epoch


def test_loop(dataloader, model, loss_fn):
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device)
            y = y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    # Average loss and accuracy for this fold
    avg_test_loss = test_loss / num_batches
    accuracy = correct / size
    print(f"Test Error: \n Accuracy: {(100*accuracy):>0.1f}%, Avg loss: {avg_test_loss:>8f} \n")
    return avg_test_loss, accuracy  # Return both average loss and accuracy for this fold

In [10]:
import torchvision.transforms as transforms

# Resize and normalize for DenseNet
resize_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize for DenseNet
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard ImageNet normalization
])

def custom_collate_fn(batch):
    inputs, labels = zip(*batch)  # Separate inputs and labels
    max_time = max(spectrogram.size(2) for spectrogram in inputs)

    # Pad inputs to the same length along the time dimension
    padded_inputs = [
        torch.nn.functional.pad(input, (0, max_time - input.size(2)))
        for input in inputs
    ]

    # Convert to 3 channels and resize
    resized_inputs = [resize_transform(convert_to_three_channels(input)) for input in padded_inputs]
    
    # Map labels to numeric class IDs
    class_mapping = {
        "air_conditioner": 0,
        "car_horn": 1,
        "children_playing": 2,
        "dog_bark": 3,
        "drilling": 4,
        "engine_idling": 5,
        "gun_shot": 6,
        "jackhammer": 7,
        "siren": 8,
        "street_music": 9
    }

    numeric_labels = [class_mapping[label] for label in labels]

    # Stack inputs and labels
    return torch.stack(resized_inputs), torch.tensor(numeric_labels)


In [11]:
print(os.getcwd())

/sfs/gpfs/tardis/home/asm2fe/UrbanAdversary


In [45]:
from sklearn.model_selection import train_test_split
import torch
pwd = os.getcwd()
# Specify paths and batch size
AUDIO_PATH = "./UrbanSound8K/audio"
CSV_PATH = "./UrbanSound8K/metadata/UrbanSound8K.csv"
batch_size = 64
epochs = 20

def setup_seed(seed):
    torch_manual_seed(seed)
    manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    cudnn.deterministic = True

SEED = 666
setup_seed(SEED)


fold_losses = []
fold_accuracies = []

model = densenet()
loss_fn = torch.nn.CrossEntropyLoss()

# Loop through folds
# Variables to accumulate metrics across folds
fold_losses = []
fold_accuracies = []

# Loop through folds
for fold in range(1, 11):
    print(f"Processing Fold {fold}")

    # Initialize dataset and DataLoader
    dataset = UrbanSoundDataset(audio_path=AUDIO_PATH, fold=fold, transform=transform_pipeline, csv_path=CSV_PATH)
    train_indices, val_indices = train_test_split(list(range(len(dataset))), test_size=0.2, random_state=666)
    
    train_dataset = torch.utils.data.Subset(dataset, train_indices)
    val_dataset = torch.utils.data.Subset(dataset, val_indices)
    
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate_fn)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate_fn)

    optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones = [1, 10, 15, 16, 17, 19], gamma=0.1)

    # Train and validate
    train_loss = train_loop(train_dataloader, model, loss_fn, optimizer, scheduler, epochs=epochs)
    val_loss, val_accuracy = test_loop(val_dataloader, model, loss_fn)

    # Aggregate metrics
    fold_losses.append(val_loss)
    fold_accuracies.append(val_accuracy)

# Compute average loss and accuracy across folds
mean_loss = sum(fold_losses) / len(fold_losses)
mean_accuracy = sum(fold_accuracies) / len(fold_accuracies)

print(f"\nCross-Validation Results:")
print(f"Avg Loss: {mean_loss:.6f}, Avg Accuracy: {(100 * mean_accuracy):.2f}%")

# Compute average loss and accuracy across folds
mean_loss = sum(fold_losses) / len(fold_losses)
mean_accuracy = sum(fold_accuracies) / len(fold_accuracies)

print(f"\nCross-Validation Results:")
print(f"Avg Loss: {mean_loss:.6f}, Avg Accuracy: {(100 * mean_accuracy):.2f}%")


Processing Fold 1
Epoch 1/20
loss: 7.014059  [   64/  698]
loss: 3.600421  [  192/  698]
loss: 1.725978  [  320/  698]
loss: 1.395375  [  448/  698]
loss: 1.517347  [  576/  698]
loss: 1.551700  [  638/  698]
Training Loss (Epoch): 2.635453
Epoch 2/20
loss: 1.518900  [   64/  698]
loss: 1.456610  [  192/  698]
loss: 1.138530  [  320/  698]
loss: 1.181764  [  448/  698]
loss: 1.280517  [  576/  698]
loss: 1.148436  [  638/  698]
Training Loss (Epoch): 1.281892
Epoch 3/20
loss: 1.082554  [   64/  698]
loss: 0.990879  [  192/  698]
loss: 1.262527  [  320/  698]
loss: 1.057591  [  448/  698]
loss: 1.010648  [  576/  698]
loss: 0.873514  [  638/  698]
Training Loss (Epoch): 1.044794
Epoch 4/20
loss: 0.754652  [   64/  698]
loss: 0.750141  [  192/  698]
loss: 0.782680  [  320/  698]
loss: 0.997096  [  448/  698]
loss: 0.812159  [  576/  698]
loss: 0.915061  [  638/  698]
Training Loss (Epoch): 0.845733
Epoch 5/20
loss: 0.673269  [   64/  698]
loss: 0.697129  [  192/  698]
loss: 0.654542  [  

KeyboardInterrupt: 

In [None]:
# import torchaudio

# audio_path = "./UrbanSound8k/audio/fold1/137156-9-0-30.wav"
# waveform, sample_rate = torchaudio.load(audio_path)
# print(f"Shape: {waveform.shape}, Sample Rate: {sample_rate}")

In [None]:
def show_stereogram(spectrogram):
    # Convert to numpy
    spectrogram_np = spectrogram.numpy()  # Shape: (2, Freq, Time)

    # Plot left and right channels
    fig, axs = plt.subplots(2, 1, figsize=(6, 6), constrained_layout=True)

    axs[0].imshow(spectrogram_np[0], aspect='auto', origin='lower', cmap='magma')
    axs[0].set_title(f"Spectrogram {i+1} - Left Channel")
    axs[0].set_ylabel("Frequency Bins")
    axs[0].set_xlabel("Time Frames")

    axs[1].imshow(spectrogram_np[1], aspect='auto', origin='lower', cmap='magma')
    axs[1].set_title(f"Spectrogram {i+1} - Right Channel")
    axs[1].set_ylabel("Frequency Bins")
    axs[1].set_xlabel("Time Frames")

    plt.show()