In [1]:
import os
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt

In [2]:
import torch
import torchaudio
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import seaborn as sns
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
from PIL import Image
import random
#from plot_audio import plot_specgram, plot_waveform
os.getcwd()

  from .autonotebook import tqdm as notebook_tqdm
Found Intel OpenMP ('libiomp') and LLVM OpenMP ('libomp') loaded at
the same time. Both libraries are known to be incompatible and this
can cause random crashes or deadlocks on Linux when loaded in the
same Python program.
Using threadpoolctl may cause crashes or deadlocks. For more
information and possible workarounds, please see
    https://github.com/joblib/threadpoolctl/blob/master/multiple_openmp.md



'/Users/jansta/learn/acoustics'

In [3]:
dict_mats = np.load('/Users/jansta/learn/acoustics/dict_mats_dB.npy', allow_pickle=True).item()


In [4]:
len(dict_mats['A']['can_opening'][3])

all_labels = dict_mats['A'].keys()
print(all_labels)

dict_keys(['dog', 'chirping_birds', 'vacuum_cleaner', 'thunderstorm', 'door_wood_knock', 'can_opening', 'crow', 'clapping', 'fireworks', 'chainsaw', 'airplane', 'mouse_click', 'pouring_water', 'train', 'sheep', 'water_drops', 'church_bells', 'clock_alarm', 'keyboard_typing', 'wind', 'footsteps', 'frog', 'cow', 'brushing_teeth', 'car_horn', 'crackling_fire', 'helicopter', 'drinking_sipping', 'rain', 'insects', 'laughing', 'hen', 'engine', 'breathing', 'crying_baby', 'hand_saw', 'coughing', 'glass_breaking', 'snoring', 'toilet_flush', 'pig', 'washing_machine', 'clock_tick', 'sneezing', 'rooster', 'sea_waves', 'siren', 'cat', 'door_wood_creaks', 'crickets'])


In [5]:
chosen_labels = ['crickets', 'can_opening', 'chirping_birds', 'dog', 'chainsaw'][:4]
encoded_labels = {'crickets': 0, 'can_opening': 1, 'chirping_birds': 2, 'dog': 3, 'chainsaw': 4}

In [6]:
chosen_labels = list(all_labels)[:20]
print(chosen_labels)
encoded_labels = {}
for i, label in enumerate(chosen_labels):
    encoded_labels[label] = i

['dog', 'chirping_birds', 'vacuum_cleaner', 'thunderstorm', 'door_wood_knock', 'can_opening', 'crow', 'clapping', 'fireworks', 'chainsaw', 'airplane', 'mouse_click', 'pouring_water', 'train', 'sheep', 'water_drops', 'church_bells', 'clock_alarm', 'keyboard_typing', 'wind']


In [7]:
class AudioDataset(Dataset):
    def __init__(self, dict_mats, chosen_labels, encoded_labels, transform=None):
        self.X = []
        self.y = []
        self.transform = transform
        for key in dict_mats.keys():
            if key in chosen_labels:
                for i in range(len(dict_mats[key])):
                    self.X.append(dict_mats[key][i])
                    self.y.append(encoded_labels[key])
        
        self.X = np.array(self.X)
        self.y = np.array(self.y)
        
    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        sample = self.X[idx]
        label = self.y[idx]
        
        # Add a channel dimension
        sample = np.expand_dims(sample, axis=0)
        
        # Convert to tensor
        sample = torch.FloatTensor(sample)
        label = torch.tensor(label, dtype=torch.long)
        
        if self.transform:
            sample = self.transform(sample)
        
        return sample, label

In [8]:
transform = transforms.Compose(
    [transforms.Resize((64,431)),
    transforms.Grayscale(num_output_channels=1),
    #transforms.ToTensor(),
    transforms.Normalize((0.5, ), (0.5, ))
    ])

In [9]:
# Create dataset with transform
dataset = AudioDataset(dict_mats['A'], chosen_labels, encoded_labels, transform=transform)

# Split dataset
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Create dataloaders
batch_size = 4
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)



In [10]:
# Test the dataloader
for i, (inputs, labels) in enumerate(train_loader):
    print(f"Batch {i+1}:")
    print(f"Input batch size: {inputs.size()}")
    print(f"Labels: {labels}")
    print("-" * 30)
    break  # Just to test the first batch

Batch 1:
Input batch size: torch.Size([4, 1, 64, 431])
Labels: tensor([13, 16,  2, 10])
------------------------------


In [11]:
n_classes = len(chosen_labels)

class AudioClassifNetCAM(nn.Module):
    def __init__(self, n_classes) -> None:
        super().__init__()
        self.n_classes = n_classes
        
        # First block: 2 convolutional layers + pooling
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        
        # Second block: 3 convolutional layers + global pooling
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )
        
        # Dense layers
        self.fc_block = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(256, n_classes)
        )
        
    def forward(self, x):
        # Store feature maps after second convolutional block
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        
        # Store feature maps for CAM visualization
        self.feature_maps = x.detach()
        
        x = self.fc_block(x)
        return x

    def getCAM(self, class_idx):
        if self.feature_maps is None:
            raise ValueError("Feature maps are not set. Run a forward pass first.")
        
        if class_idx >= n_classes:
            raise ValueError(f"Class index {class_idx} is out of bounds for {n_classes} classes.")
        
        # Get the feature maps from the last convolutional layer
        feature_maps = self.feature_maps.squeeze(0)
        
        # Get the weights for the final fully connected layer
        weights = self.fc_block[7].weight[class_idx].detach()
        #print(f"Weights shape before reshape: {weights.shape}")
        weights = weights.view(1,-1, 1, 1)
        #print(f"Weights shape after reshape: {weights.shape}")
    
        cam = torch.sum(feature_maps * weights, dim=1, keepdim=True)
        #print(f"CAM shape after computation: {cam.shape}")
        
        # Normalize the CAM
        cam = cam - cam.min()
        cam = cam / cam.max()
        
        return cam                 

    def generate_cam_visualization(self, cam, test_inp):
        """
        Generate CAM visualization for a given input
        Args:
            input_tensor (Tensor): Input audio spectrogram tensor
            class_idx (int): Index of the target class   
        Returns:
            visualization (ndarray): CAM visualization overlaid on input
        """
        #cam = self.getCAM(class_idx)

        # First squeeze to remove singleton dimensions
        cam = cam.squeeze(0)  # Remove batch dimension
        cam = cam.squeeze(0)  # Remove channel dimension

        # Now resize to match input dimensions
        # Get the height and width from input tensor
        height, width = test_inp.shape[1:]

        # Resize using both dimensions
        cam = F.interpolate(
            cam.unsqueeze(0).unsqueeze(0),  # Add back dimensions for interpolation
            size=(height, width),           # Use both dimensions
            mode='bilinear',
            align_corners=True
        )

        # Remove added dimensions and convert to numpy
        visualization = cam.squeeze().cpu().numpy()
        
        return visualization



In [12]:
def check_for_nans(tensor, name):
    if torch.isnan(tensor).any():
        print(f"NaNs found in {name}")
        return True
    return False

In [13]:
## Create an  instance of the model:
model = AudioClassifNetCAM(n_classes)
#from helper_functions import capture_gradients, resize_cam_to_input


In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# %% TRAINING
losses_epoch_mean = []
NUM_EPOCHS = 200
for epoch in range(NUM_EPOCHS):
    losses_epoch = []
    for i, data in enumerate(train_loader):
        inputs, labels = data
        # Check for NaN loss
        if torch.isnan(inputs).any():
            print(f"NaN input at epoch {epoch}, batch {i}")
            i_err = inputs
            break
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        
        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        losses_epoch.append(loss.item())
    
    losses_epoch_mean.append(np.mean(losses_epoch))
    if epoch % int(NUM_EPOCHS/10) == 0:
        print(f'Epoch {epoch}/{NUM_EPOCHS}, Loss: {np.mean(losses_epoch):.12f}')

sns.lineplot(x=list(range(len(losses_epoch_mean))), y=losses_epoch_mean)        

Epoch 0/200, Loss: 2.990327878077
Epoch 20/200, Loss: 1.230514698830
Epoch 40/200, Loss: 1.007270149125
Epoch 60/200, Loss: 0.227148406229
Epoch 80/200, Loss: 0.260315243944
Epoch 100/200, Loss: 0.425427594266
Epoch 120/200, Loss: 0.050084567831


In [None]:
y_val = []
y_val_hat = []
for i, data in enumerate(val_loader):
    inputs, y_val_temp = data
    with torch.no_grad():
        y_val_hat_temp = model(inputs).round()
    
    y_val.extend(y_val_temp.numpy())
    y_val_hat.extend(y_val_hat_temp.numpy())

In [None]:

# Accuracy
acc = accuracy_score(y_val, np.argmax(y_val_hat, axis=1))
print(f'Accuracy: {acc*100:.2f} %')
# confusion matrix
cm = confusion_matrix(y_val, np.argmax(y_val_hat, axis=1))
sns.heatmap(cm, annot=True, xticklabels=chosen_labels, yticklabels=chosen_labels)

In [None]:
# model.feature_maps.size()

# torch.save(model.state_dict(), 'test_CAM_model.pth')

In [None]:
## get the test input and the validation class
test_inp = val_loader.dataset[0][0]

output = model(test_inp.unsqueeze(0))

pred_class = torch.argmax(output, dim=1).item()
print(pred_class)
predicted_label = list(encoded_labels.keys())[list(encoded_labels.values()).index(pred_class)]

plt.imshow(test_inp[0])
plt.title(f'input spectrogram for -> {predicted_label}')


In [None]:
test_inp = val_loader.dataset[1][0]
output = model.forward(test_inp.unsqueeze(0))
print(output)
pred_class = torch.argmax(output, dim=1).item()
predicted_label = list(encoded_labels.keys())[list(encoded_labels.values()).index(pred_class)]
print(predicted_label)

plt.imshow(test_inp[0])
plt.title(f'input spectrogram for -> {predicted_label}')

In [None]:
print(c1.shape)
print(test_inp.shape)

In [None]:
cams = {}
samples = {}
for i, data in enumerate(val_loader):
    inputs, y_val_temp = data
    #print(inputs.shape, y_val_temp.shape)
    for i in range(inputs.shape[0]):
        model.eval()
        with torch.no_grad():
            # Get the model output
            output = model(inputs[i].unsqueeze(0))
            # # Get the predicted class
            _, pred_class = torch.max(output, 1)
            predicted_label = list(encoded_labels.keys())[list(encoded_labels.values()).index(pred_class)]
            # # Generate CAM for the first input and its predicted class
            cam = model.getCAM(pred_class.item())
            cam_vis = model.generate_cam_visualization(cam, inputs[0])

            if predicted_label not in cams.keys():
                cams[predicted_label] = [cam_vis]
            else:
                cams[predicted_label].append(cam_vis)

            if predicted_label not in samples.keys():
                samples[predicted_label] = inputs[i]

In [None]:
model.fc_block[7]

In [None]:
class_cams = {}
for key in cams.keys():
    class_cams[key] = np.mean(cams[key], axis=0)

for key in class_cams.keys():
    plt.figure(figsize=(5, 10))
    plt.imshow(class_cams[key], cmap='hot')
    plt.title(f"Class Activation Map for class: {key}")
    #plt.colorbar()
    plt.show()

In [None]:
for key in samples.keys():
    plt.figure(figsize=(5, 10))
    plt.imshow(samples[key][0], cmap='Greys')
    plt.title(f"Class Activation Map for class: {key}")
    #plt.colorbar()
    plt.show()

In [None]:
fig, axs = plt.subplots(len(samples.keys()), 2, figsize=(12, 2.5*len(samples.keys())))
fig.suptitle('Samples and Corresponding Class Activation Maps')

# Plot samples and CAMs side by side
for idx, key in enumerate(samples.keys()):
    # Left subplot - Sample
    axs[idx, 0].imshow(samples[key][0], cmap='Greys')
    axs[idx, 0].set_title(f'Sample ({key})')
    
    # Right subplot - CAM
    im = axs[idx, 1].imshow(class_cams[key], cmap='hot')
    axs[idx, 1].set_title(f'CAM ({key})')
    
    # Add colorbar to CAM subplot
    fig.colorbar(im, ax=axs[idx, 1])

# Adjust layout to prevent overlap
plt.tight_layout()
plt.show()

In [None]:
for key in class_cams.keys():
    fig, ax = plt.subplots(figsize=(10, 2))
    ax.imshow(samples[key][0], cmap='Greys')
    im = ax.imshow(class_cams[key], cmap='RdPu', alpha=0.35)
    plt.title(f"Class Activation Map for class: {key}", pad=20)
    plt.colorbar(im, label='Activation Strength', fraction=0.046, pad=0.04)
    plt.tight_layout()
    plt.show()

