# Enhanced SimCLR

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
with open('/content/gdrive/My Drive/file.txt', 'w') as f:
  f.write('content')

In [1]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
import random
from torch.optim import lr_scheduler

# Configuration
device = "cuda:0"
num_epochs = 5
num_folds = 10
batch_size = 58
feature_dim = 2048
num_classes = 13
root_dir = './spectrograms'
csv_file = "./spectrograms_balanced_no_sirens.csv"

# Get class names
full_annotations = pd.read_csv(csv_file)
class_names = sorted(full_annotations['classID'].unique())

# Dataset Class
class UrbanSoundDataset(Dataset):
    def __init__(self, root_dir, folds, csv_file, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.annotations = pd.read_csv(csv_file)

        if isinstance(folds, int):
            folds = [folds]
        self.file_list = self.annotations[self.annotations['fold'].isin(folds)]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        row = self.file_list.iloc[idx]
        img_path = os.path.join(self.root_dir, f'fold{row["fold"]}', row['spec_file_name'])
        image = Image.open(img_path).convert('RGB')
        label = row['classID']

        if self.transform:
            xi = self.transform(image)
            xj = self.transform(image)
            return xi, xj, label
        return image, label


# Model Components
class ProjectionHead(torch.nn.Module):
    def __init__(self, input_dim=2048, hidden_dim=512, output_dim=128):
        super().__init__()
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(input_dim, hidden_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_dim, output_dim)
        )
    def forward(self, x):
        return self.layers(x)

class SimCLR(torch.nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.backbone = backbone
        self.projection = ProjectionHead()
    def forward(self, x):
        features = self.backbone(x)
        return self.projection(features)

class Classifier(torch.nn.Module):
    def __init__(self, input_dim=2048, num_classes=13):
        super().__init__()
        self.fc = torch.nn.Linear(input_dim, num_classes)
    def forward(self, x):
        return self.fc(x)

# Loss Function
class NTXentLoss(torch.nn.Module):
    def __init__(self, temperature=0.5):
        super().__init__()
        self.temperature = temperature
        self.criterion = torch.nn.CrossEntropyLoss()

    def forward(self, z_i, z_j):
        N = z_i.size(0)
        z = torch.cat([z_i, z_j], dim=0)

        # Compute similarity matrix
        sim = torch.mm(z, z.T) / self.temperature

        # Create labels: positives are the N off-diagonal elements
        labels = torch.cat([
            torch.arange(N, 2*N, device=z.device),
            torch.arange(0, N, device=z.device)
        ])

        # Mask out self-similarity
        mask = torch.eye(2*N, dtype=torch.bool, device=z.device)
        sim = sim.masked_fill(mask, -1e9)

        # Compute loss
        loss = self.criterion(sim, labels)
        return loss

#tried but does work require more memeory with batch size 64
# def mixup_data(x1, x2, y1, y2, alpha=0.2):
#     '''Returns mixed inputs, pairs of targets, and lambda'''
#     lam = np.random.beta(alpha, alpha)
#     batch_size = x1.size(0)
#     index = torch.randperm(batch_size).cuda()

#     mixed_x = lam * x1 + (1 - lam) * x2
#     y_a, y_b = y1, y2
#     return mixed_x, y_a, y_b, lam

# Training Function with Mixup and CosineAnnealingLR
def train():
    # Initialize backbone
    backbone = models.resnet50(pretrained=True)
    backbone.fc = torch.nn.Identity()

    # Data transformations
    transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.8, 0.8, 0.8, 0.2),
        transforms.RandomGrayscale(p=0.2),
        transforms.ToTensor(),
    ])

    # K-fold cross-validation
    for fold in range(1, num_folds+1):
        print(f"\n{'='*40}")
        print(f"=== Fold {fold}/{num_folds} {'='*20}")
        print(f"{'='*40}\n")

        # Data loaders
        train_ds = UrbanSoundDataset(root_dir, [f for f in range(1,11) if f != fold],
                                   csv_file, transform)
        val_ds = UrbanSoundDataset(root_dir, [fold], csv_file, transform)

        train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=4)
        val_loader = DataLoader(val_ds, batch_size=batch_size, num_workers=4)

        # Model components
        simclr = SimCLR(backbone).to(device)
        classifier = Classifier().to(device)
        optimizer = torch.optim.Adam(list(simclr.parameters()) + list(classifier.parameters()), lr=3e-4)
        criterion = NTXentLoss()

        # Cosine Annealing learning rate scheduler
        scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

        # Metrics storage
        train_losses, val_losses = [], []
        val_accuracies, all_preds, all_labels = [], [], []

        for epoch in range(num_epochs):
            print(f"\nEpoch {epoch+1}/{num_epochs}")

            # Training Phase
            simclr.train()
            classifier.train()
            epoch_loss = 0
            batch_count = 0

            for batch_idx, (xi, xj, labels) in enumerate(train_loader):
                xi, xj, labels = xi.to(device), xj.to(device), labels.to(device)

                # Forward pass
                zi, zj = simclr(xi), simclr(xj)
                loss_contrastive = criterion(zi, zj)

                # Classification
                features = simclr.backbone(xi)
                logits = classifier(features)
                loss_classification = torch.nn.functional.cross_entropy(logits, labels)

                # Total loss
                loss = loss_contrastive + 0.5 * loss_classification

                # Backward pass
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                # Progress tracking
                epoch_loss += loss.item()
                batch_count += 1

                # Print batch updates
                if (batch_idx + 1) % 10 == 0 or (batch_idx + 1) == len(train_loader):
                    current_lr = optimizer.param_groups[0]['lr']
                    print(f"  Batch {batch_idx + 1:03d}/{len(train_loader)} | "
                          f"Loss: {loss.item():.4f} | "
                          f"CLoss: {loss_contrastive.item():.4f} | "
                          f"FLoss: {loss_classification.item():.4f} | "
                          f"LR: {current_lr:.2e}")

            # Epoch statistics
            avg_train_loss = epoch_loss / batch_count
            train_losses.append(avg_train_loss)
            print(f"\n  Training Summary | Epoch {epoch+1}")
            print(f"  Avg Loss: {avg_train_loss:.4f}")
            print(f"  Last Batch Loss: {loss.item():.4f}")

            scheduler.step()

            # Validation Phase
            simclr.eval()
            classifier.eval()
            val_loss, correct, total = 0, 0, 0

            print("\n  Validating...")
            with torch.no_grad():
                for batch_idx, (xi, _, labels) in enumerate(val_loader):
                    xi, labels = xi.to(device), labels.to(device)

                    # Forward pass
                    features = simclr.backbone(xi)
                    logits = classifier(features)

                    # Loss calculation
                    loss = torch.nn.functional.cross_entropy(logits, labels)
                    val_loss += loss.item()

                    # Accuracy calculation
                    preds = torch.argmax(logits, dim=1)
                    correct += (preds == labels).sum().item()
                    total += labels.size(0)

                    # Store predictions
                    all_preds.extend(preds.cpu().numpy())
                    all_labels.extend(labels.cpu().numpy())

                    # Validation batch updates
                    if (batch_idx + 1) % 5 == 0 or (batch_idx + 1) == len(val_loader):
                        acc = 100 * (preds == labels).sum().item() / labels.size(0)
                        print(f"    Val Batch {batch_idx + 1:03d}/{len(val_loader)} | "
                              f"Loss: {loss.item():.4f} | "
                              f"Batch Acc: {acc:.2f}%")

            # Validation statistics
            avg_val_loss = val_loss / len(val_loader)
            val_losses.append(avg_val_loss)
            val_acc = 100 * correct / total
            val_accuracies.append(val_acc)

            print(f"\n  Validation Summary | Epoch {epoch+1}")
            print(f"  Avg Loss: {avg_val_loss:.4f} | Accuracy: {val_acc:.2f}%")
            print(f"  Current Best Acc: {max(val_accuracies):.2f}%")

            # Step the scheduler

        # Fold Completion
        print(f"\n{'='*40}")
        print(f"=== Fold {fold} Completed ===")
        print(f"Best Validation Accuracy: {max(val_accuracies):.2f}%")
        checkpoint_dir = 'checkpoints'  # Or provide an absolute path like '/path/to/save'
        os.makedirs(checkpoint_dir, exist_ok=True)  # Create directory if it doesn't exist

        torch.save({
              'simclr': simclr.state_dict(),
              'classifier': classifier.state_dict(),
              'optimizer': optimizer.state_dict(),
          }, os.path.join(checkpoint_dir, f'fold_{fold}_checkpoint.pth'))


if __name__ == "__main__":
    train()




[1;30;43mStreaming output truncated to the last 5000 lines.[0m
    Val Batch 085/101 | Loss: 0.2402 | Batch Acc: 91.38%
    Val Batch 090/101 | Loss: 0.1882 | Batch Acc: 87.93%
    Val Batch 095/101 | Loss: 0.3688 | Batch Acc: 89.66%
    Val Batch 100/101 | Loss: 0.0711 | Batch Acc: 98.28%
    Val Batch 101/101 | Loss: 0.2160 | Batch Acc: 87.50%

  Validation Summary | Epoch 5
  Avg Loss: 0.8007 | Accuracy: 73.19%
  Current Best Acc: 73.19%

=== Fold 2 Completed ===
Best Validation Accuracy: 73.19%



Epoch 1/5
  Batch 010/899 | Loss: 1.9352 | CLoss: 1.0866 | FLoss: 1.6972 | LR: 3.00e-04
  Batch 020/899 | Loss: 1.7955 | CLoss: 1.1489 | FLoss: 1.2933 | LR: 3.00e-04
  Batch 030/899 | Loss: 1.5172 | CLoss: 1.0266 | FLoss: 0.9812 | LR: 3.00e-04
  Batch 040/899 | Loss: 1.3212 | CLoss: 0.9136 | FLoss: 0.8152 | LR: 3.00e-04
  Batch 050/899 | Loss: 1.1909 | CLoss: 0.8006 | FLoss: 0.7808 | LR: 3.00e-04
  Batch 060/899 | Loss: 1.0375 | CLoss: 0.6210 | FLoss: 0.8330 | LR: 3.00e-04
  Batch 070/8

In [53]:
spec = pd.read_csv('/content/spectrograms_balanced_no_sirens.csv')
spec.sort_values('classID').groupby('fold').apply(lambda x: x['classID'].unique())
spec.sort_values('classID')['class'].unique()
# spec[spec['classID'] == 11]

  spec.sort_values('classID').groupby('fold').apply(lambda x: x['classID'].unique())


array(['air_conditioner', 'ambulance', 'car_horn', 'children_playing',
       'dog_bark', 'drilling', 'engine_idling', 'firetruck', 'gun_shot',
       'jackhammer', 'police', 'street_music', 'traffic'], dtype=object)

In [55]:
import torch
import librosa
import numpy as np
import matplotlib.pyplot as plt
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import os
import IPython.display as ipd

# Model Components (same as in training script)
class ProjectionHead(torch.nn.Module):
    def __init__(self, input_dim=2048, hidden_dim=512, output_dim=128):
        super().__init__()
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(input_dim, hidden_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_dim, output_dim)
        )
    def forward(self, x):
        return self.layers(x)

class SimCLR(torch.nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.backbone = backbone
        self.projection = ProjectionHead()
    def forward(self, x):
        features = self.backbone(x)
        return self.projection(features)

class Classifier(torch.nn.Module):
    def __init__(self, input_dim=2048, num_classes=13):
        super().__init__()
        self.fc = torch.nn.Linear(input_dim, num_classes)
    def forward(self, x):
        return self.fc(x)

def process_audio(audio_path, sr=22050, duration=None, n_mels=224):
    """Process audio file to mel spectrogram"""
    # Load audio
    y, sr = librosa.load(audio_path, sr=sr, duration=duration)

    # Create mel spectrogram
    S = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=2048,
                                      hop_length=512, n_mels=n_mels)
    S_dB = librosa.power_to_db(S, ref=np.max)

    # Convert to image
    plt.figure(figsize=(4, 4))
    librosa.display.specshow(S_dB, sr=sr, x_axis='time', y_axis='mel')
    plt.axis('off')
    plt.tight_layout(pad=0)

    # Save temporarily and load as PIL
    temp_path = 'temp_spec.png'
    plt.savefig(temp_path, bbox_inches='tight', pad_inches=0)
    plt.close()

    img = Image.open(temp_path).convert('RGB')
    if os.path.exists(temp_path):
        os.remove(temp_path)

    return img, y, sr

class AudioClassifier:
    def __init__(self, checkpoint_path, class_names, device=None):
        if device is None:
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        else:
            self.device = device

        print(f"Using device: {self.device}")

        # Initialize model components
        backbone = models.resnet50(pretrained=False)
        backbone.fc = torch.nn.Identity()

        self.simclr = SimCLR(backbone).to(self.device)
        self.classifier = Classifier().to(self.device)
        self.class_names = class_names

        # Load checkpoint
        print(f"Loading checkpoint from {checkpoint_path}")
        checkpoint = torch.load(checkpoint_path, map_location=self.device)

        # Debug: print keys
        print(f"Checkpoint keys: {checkpoint.keys()}")

        self.simclr.load_state_dict(checkpoint['simclr'])
        self.classifier.load_state_dict(checkpoint['classifier'])

        # Set to evaluation mode
        self.simclr.eval()
        self.classifier.eval()

        # Define transforms (similar to validation transforms)
        self.transform = transforms.Compose([
            transforms.Resize(224),
            transforms.ToTensor(),
        ])

    def predict(self, audio_path, verbose=True):
        """Predict class for audio file"""
        if verbose:
            print(f"Processing audio: {audio_path}")

        # Process audio to spectrogram image
        # img, audio, sr = process_audio(audio_path)
        #extract the the img
        r = pd.read_csv('./spectrograms_balanced_no_sirens.csv')
        img = r[r['orig_file_name'] == audio_path].iloc[0]
        print(img)
        path= '/content/spectrograms/'+ 'fold'+str(img['fold']) + '/' + img['spec_file_name']
        print(path)
        img = Image.open(path).convert('RGB')

        # Apply transforms
        img_tensor = self.transform(img).unsqueeze(0).to(self.device)

        # Get predictions
        with torch.no_grad():
            # Extract features
            features = self.simclr.backbone(img_tensor)

            # Get logits from classifier
            logits = self.classifier(features)

            # Convert to probabilities
            probs = torch.nn.functional.softmax(logits, dim=1).squeeze().cpu().numpy()

            print(probs)

        if verbose:
            # Print top 3 predictions
            top_indices = np.argsort(probs)
            print(top_indices)
            print("\nTop 3 predictions:")
            for i, idx in enumerate(top_indices):
                print(f"{i+1}. {self.class_names[idx]}: {probs[idx]*100:.2f}%")

        return probs

def visualize_prediction(probs, class_names):
    """Visualize prediction with audio playback"""
    # Play audio
    # print("Audio sample:")
    # display(ipd.Audio(audio, rate=sr))

    # Plot waveform and probabilities
    plt.figure(figsize=(12, 6))

    # Waveform
    # plt.subplot(1, 2, 1)
    # plt.plot(np.linspace(0, len(audio)/sr, len(audio)), audio)
    # plt.title("Waveform")
    # plt.xlabel("Time (s)")
    # plt.ylabel("Amplitude")

    # Class probabilities
    plt.subplot(1, 2, 2)
    indices = np.argsort(probs)[::-1]
    plt.barh(range(len(class_names)), [probs[i] for i in indices])
    plt.yticks(range(len(class_names)), [class_names[i] for i in indices])
    plt.title("Class Probabilities")
    plt.xlabel("Probability")
    plt.tight_layout()
    plt.show()

# Example usage
if __name__ == "__main__":
    # Define class names (same as in training)
    class_names = ['air_conditioner', 'ambulance', 'car_horn', 'children_playing',
       'dog_bark', 'drilling', 'engine_idling', 'firetruck', 'gun_shot',
       'jackhammer', 'police', 'street_music', 'traffic']

    # Initialize classifier
    classifier = AudioClassifier(
        checkpoint_path='./checkpoints/fold_10_checkpoint.pth',
        class_names=class_names
    )

    # Predict
    audio_path = "127873-0-0-0.wav"  # Replace with your audio file
    probs = classifier.predict(audio_path)

    # Visualize
    # visualize_prediction(probs, class_names)

    # Get top prediction
    top_idx = np.argmax(probs)
    print(f"\nFinal prediction: {class_names[top_idx]} with {probs[top_idx]*100:.2f}% confidence")


Using device: cuda




Loading checkpoint from ./checkpoints/fold_10_checkpoint.pth
Checkpoint keys: dict_keys(['simclr', 'classifier', 'optimizer'])
Processing audio: 127873-0-0-0.wav
spec_file_name    127873-0-orig.png
orig_file_name     127873-0-0-0.wav
fsID                         127873
fold                              1
classID                           0
class               air_conditioner
augmentation               original
Name: 1330, dtype: object
/content/spectrograms/fold1/127873-0-orig.png
[0.73637927 0.00793837 0.04359502 0.00095662 0.07454624 0.01800478
 0.03493095 0.01103815 0.03062015 0.02225268 0.00778793 0.00078703
 0.0111628 ]
[11  3 10  1  7 12  5  9  8  6  2  4  0]

Top 3 predictions:
1. street_music: 0.08%
2. children_playing: 0.10%
3. police: 0.78%
4. ambulance: 0.79%
5. firetruck: 1.10%
6. traffic: 1.12%
7. drilling: 1.80%
8. jackhammer: 2.23%
9. gun_shot: 3.06%
10. engine_idling: 3.49%
11. car_horn: 4.36%
12. dog_bark: 7.45%
13. air_conditioner: 73.64%

Final prediction: air_condit

In [76]:
import torch
import torch.nn.functional as F
import numpy as np
import torch
import torch.nn.functional as F
import numpy as np
from tabulate import tabulate

def top_1_percent_accuracy_per_class(logits, labels, num_classes, top_percent=1):
    """
    Calculate the top 1% accuracy for each class.

    Args:
        logits: The model's raw output (before softmax), shape (batch_size, num_classes).
        labels: The true labels, shape (batch_size,).
        num_classes: The number of unique classes.
        top_percent: The percentage of top predictions to consider for accuracy.

    Returns:
        Dictionary of top 1% accuracy per class.
    """
    probs = F.softmax(logits, dim=1)

    num_samples = logits.size(0)

    top_k = max(1, int(num_samples * top_percent / 100))  # Ensure at least 1

    top_1_percent_accuracy = {i: 0 for i in range(num_classes)}
    class_counts = {i: 0 for i in range(num_classes)}
    for i in range(num_samples):
        true_label = labels[i].item()
        prob = probs[i].cpu().numpy()

        top_indices = np.argsort(prob)[::-1]

        if true_label == top_indices[0]:  # Check if the true class is the top-1 prediction
            top_1_percent_accuracy[true_label] += 1

        class_counts[true_label] += 1

    for class_id in top_1_percent_accuracy:
        if class_counts[class_id] > 0:  # Avoid division by zero
            top_1_percent_accuracy[class_id] = (top_1_percent_accuracy[class_id] / class_counts[class_id]) * 100
        else:
            top_1_percent_accuracy[class_id] = 0

    return top_1_percent_accuracy


# Example usage after validation is complete
def evaluate(model, val_loader, num_classes, classifier):
    model.eval()
    all_logits = []
    all_labels = []

    with torch.no_grad():
        for batch_idx, (xi, _, labels) in enumerate(val_loader):
            xi, labels = xi.to(device), labels.to(device)

            # Forward pass
            features = model.backbone(xi)
            logits = classifier(features)

            all_logits.append(logits)
            all_labels.append(labels)

        # Concatenate logits and labels across all batches
        all_logits = torch.cat(all_logits, dim=0)
        all_labels = torch.cat(all_labels, dim=0)

        # Calculate Top 1% Accuracy per class
        top_1_percent_acc_per_class = top_1_percent_accuracy_per_class(all_logits, all_labels, num_classes, top_percent=1)

        print("\nTop 1% Accuracy per class:")
        table = []
        for class_id in range(num_classes):
            class_name = class_names[class_id]  # Get the class name from the list
            table.append([f"Class {class_name}", f"{top_1_percent_acc_per_class[class_id]:.2f}%"])

        # Print the table using tabulate
        headers = ["Class", "Top 1% Accuracy"]
        print(tabulate(table, headers=headers, tablefmt="grid"))
        # for class_id in range(num_classes):
        #     print(f"Class {class_id}: {top_1_percent_acc_per_class[class_id]:.2f}%")


# Assuming you have your validation loader and the number of classes
num_classes = 13  # Set this according to your dataset


# Initialize model components
backbone = models.resnet50(pretrained=False)
backbone.fc = torch.nn.Identity()

checkpoint_path = './checkpoints/fold_10_checkpoint.pth'
# Load checkpoint
print(f"Loading checkpoint from {checkpoint_path}")
checkpoint = torch.load(checkpoint_path, map_location="cuda:0")

# Debug: print keys
print(f"Checkpoint keys: {checkpoint.keys()}")


simclr = SimCLR(backbone).to("cuda:0")
classifier = Classifier().to("cuda:0")
class_names = class_names

simclr.load_state_dict(checkpoint['simclr'])
classifier.load_state_dict(checkpoint['classifier'])

# Set to evaluation mode
simclr.eval()
classifier.eval()

# Define transforms (similar to validation transforms)
transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.8, 0.8, 0.8, 0.2),
        transforms.RandomGrayscale(p=0.2),
        transforms.ToTensor(),
    ])

roor_dir = './spectrograms'


val_ds = UrbanSoundDataset(root_dir, [10], csv_file, transform)

val_loader = DataLoader(val_ds, batch_size=batch_size, num_workers=4)
class_names = ['air_conditioner', 'ambulance', 'car_horn', 'children_playing',
       'dog_bark', 'drilling', 'engine_idling', 'firetruck', 'gun_shot',
       'jackhammer', 'police', 'street_music', 'traffic']
evaluate(simclr, val_loader, num_classes, classifier = classifier)




Loading checkpoint from ./checkpoints/fold_10_checkpoint.pth
Checkpoint keys: dict_keys(['simclr', 'classifier', 'optimizer'])

Top 1% Accuracy per class:
+------------------------+-------------------+
| Class                  | Top 1% Accuracy   |
| Class air_conditioner  | 95.10%            |
+------------------------+-------------------+
| Class ambulance        | 98.00%            |
+------------------------+-------------------+
| Class car_horn         | 73.23%            |
+------------------------+-------------------+
| Class children_playing | 89.53%            |
+------------------------+-------------------+
| Class dog_bark         | 69.93%            |
+------------------------+-------------------+
| Class drilling         | 90.65%            |
+------------------------+-------------------+
| Class engine_idling    | 93.54%            |
+------------------------+-------------------+
| Class firetruck        | 94.21%            |
+------------------------+-------------------+

