In [None]:
import os
import sys
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, Subset
import torchaudio
import torchaudio.transforms as transforms
import pandas as pd
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score, precision_score, recall_score
import torch.nn.functional as F

# ------------------------------------------------
# 1. Add the PANNs model directory to the system path
# ------------------------------------------------
# (Do not remove this line!)
pytorch_path = r"C:\\Users\\Harsh\\Desktop\\Audio Recognition Project\\audioset_tagging_cnn-master_2\\pytorch"
sys.path.insert(0, pytorch_path)

# Import the Wavegram_Logmel_Cnn14 model (and Cnn14 if needed)
from models import Wavegram_Logmel_Cnn14
# (You can also import Cnn14 if desired: from models import Cnn14)

# ------------------------------------------------
# 2. Configuration and Paths
# ------------------------------------------------
DATASET_PATH = r"C:\\Users\\Harsh\Desktop\\Audio Recognition Project\\dataset"
PRETRAINED_MODEL_PATH = r"C:\\Users\\Harsh\\Desktop\\Audio Recognition Project\\pretrained_models\\PANN\\Wavegram_Logmel_Cnn14_mAP=0.439.pth"

TRAIN_METADATA_CSV = os.path.join(DATASET_PATH, "metadata of train set.csv")
TEST_METADATA_CSV  = os.path.join(DATASET_PATH, "metadata of test set.csv")

TRAIN_AUDIO_DIR = os.path.join(DATASET_PATH, "train")
TEST_AUDIO_DIR  = os.path.join(DATASET_PATH, "test")

BATCH_SIZE = 8
EPOCHS = 100
LEARNING_RATE = 1e-4
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ------------------------------------------------
# 3. Audio Processing Parameters
# ------------------------------------------------
SAMPLE_RATE = 32000
N_FFT = 1024
HOP_LENGTH = 320
N_MELS = 64
FMIN = 50
FMAX = 14000

# The model computes its own spectrograms internally.

# ------------------------------------------------
# 4. Prepare Metadata and Create Class Mapping
# ------------------------------------------------
train_meta = pd.read_csv(TRAIN_METADATA_CSV)
train_meta.columns = train_meta.columns.str.strip()

# Use the "Classname" column as our label.
classes = sorted(train_meta["Classname"].unique())
class_to_idx = {cls: i for i, cls in enumerate(classes)}
num_classes = len(classes)
print("Class mapping (Classname -> Index):")
print(class_to_idx)

# ------------------------------------------------
# 5. Define the Custom Dataset (Return Raw Waveform)
# ------------------------------------------------
class AudioDataset(Dataset):
    def __init__(self, metadata_csv, audio_dir, class_to_idx, transform=None):
        self.metadata = pd.read_csv(metadata_csv)
        self.metadata.columns = self.metadata.columns.str.strip()
        self.audio_dir = audio_dir
        self.class_to_idx = class_to_idx
        self.transform = transform

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        row = self.metadata.iloc[idx]
        filename = row["Filename"]
        label = self.class_to_idx[row["Classname"]]
        file_path = os.path.join(self.audio_dir, filename)
        
        waveform, sr = torchaudio.load(file_path)
        # If stereo, take the first channel.
        if waveform.shape[0] > 1:
            waveform = waveform[0:1, :]
        # Resample if needed.
        if sr != SAMPLE_RATE:
            resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=SAMPLE_RATE)
            waveform = resampler(waveform)
        # Squeeze to make the waveform 1D.
        waveform = waveform.squeeze(0)  # shape: [data_length]
        if self.transform:
            waveform = self.transform(waveform)
        return waveform, label

train_dataset = AudioDataset(TRAIN_METADATA_CSV, TRAIN_AUDIO_DIR, class_to_idx, transform=None)
test_dataset = AudioDataset(TEST_METADATA_CSV, TEST_AUDIO_DIR, class_to_idx, transform=None)

# ------------------------------------------------
# 6. Custom Collate Function for Raw Waveforms
# ------------------------------------------------
def collate_fn(batch):
    """
    Pads raw 1D waveforms in the batch along the time dimension
    so that all waveforms have the same length.
    """
    waveforms, labels = zip(*batch)
    max_length = max(waveform.shape[0] for waveform in waveforms)
    padded_waveforms = []
    for waveform in waveforms:
        pad_length = max_length - waveform.shape[0]
        padded_waveform = F.pad(waveform, (0, pad_length))
        padded_waveforms.append(padded_waveform)
    stacked_waveforms = torch.stack(padded_waveforms, dim=0)  # shape: [batch_size, max_length]
    labels = torch.tensor(labels, dtype=torch.long)
    return stacked_waveforms, labels

train_loader_full = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

# ------------------------------------------------
# 7. Define Model Parameters and Initialize Model
# ------------------------------------------------
# Here we use Wavegram_Logmel_Cnn14, which internally computes both wavegram and log-mel features.
model_params = {
    "sample_rate": SAMPLE_RATE,
    "window_size": N_FFT,
    "hop_size": HOP_LENGTH,
    "mel_bins": N_MELS,
    "fmin": FMIN,
    "fmax": FMAX,
    "classes_num": num_classes  # For your dataset (7 classes)
}

model = Wavegram_Logmel_Cnn14(**model_params)
model.to(DEVICE)

# ------------------------------------------------
# 8. Load Pretrained Weights (Filtering Out Final Layer)
# ------------------------------------------------
checkpoint = torch.load(PRETRAINED_MODEL_PATH, map_location=DEVICE)
pretrained_dict = checkpoint["model"]

# The checkpoint was trained on AudioSet (527 classes); filter out the final layer ("fc_audioset")
filtered_dict = {k: v for k, v in pretrained_dict.items() if not k.startswith("fc_audioset")}
model_dict = model.state_dict()
model_dict.update(filtered_dict)
model.load_state_dict(model_dict)
print("Loaded pretrained weights (except final classification layer) successfully.")

# ------------------------------------------------
# 9. Loss and Optimizer
# ------------------------------------------------
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# ------------------------------------------------
# 10. 3-Fold Cross-Validation on Training Set
# ------------------------------------------------
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score, precision_score, recall_score

kf = KFold(n_splits=3, shuffle=True, random_state=42)
fold_f1 = []
fold_precision = []
fold_recall = []

print("\nStarting 3-Fold Cross-Validation...")
for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
    print(f"\n--- Fold {fold+1} ---")
    train_subset = Subset(train_dataset, train_idx)
    val_subset = Subset(train_dataset, val_idx)
    
    train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
    
    # Initialize a new model for this fold and load the pretrained weights (except final layer)
    model_fold = Wavegram_Logmel_Cnn14(**model_params)
    model_fold.to(DEVICE)
    model_fold.load_state_dict(model_dict)
    optimizer_fold = optim.Adam(model_fold.parameters(), lr=LEARNING_RATE)
    criterion_fold = nn.CrossEntropyLoss()
    
    for epoch in range(EPOCHS):
        model_fold.train()
        running_loss = 0.0
        for waveforms, labels in tqdm(train_loader, desc=f"Fold {fold+1} Epoch {epoch+1}"):
            waveforms, labels = waveforms.to(DEVICE), labels.to(DEVICE)
            optimizer_fold.zero_grad()
            outputs = model_fold(waveforms)
            logits = outputs["clipwise_output"]  # Use the classification logits
            loss = criterion_fold(logits, labels)
            loss.backward()
            optimizer_fold.step()
            running_loss += loss.item()
        print(f"Fold {fold+1} Epoch {epoch+1} Loss: {running_loss/len(train_loader):.4f}")
    
    # Evaluate on the validation set for this fold.
    model_fold.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for waveforms, labels in val_loader:
            waveforms, labels = waveforms.to(DEVICE), labels.to(DEVICE)
            outputs = model_fold(waveforms)
            logits = outputs["clipwise_output"]
            _, preds = torch.max(logits, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    fold_f1.append(f1_score(all_labels, all_preds, average="weighted", zero_division=0))
    fold_precision.append(precision_score(all_labels, all_preds, average="weighted", zero_division=0))
    fold_recall.append(recall_score(all_labels, all_preds, average="weighted", zero_division=0))
    print(f"Fold {fold+1} - F1: {fold_f1[-1]:.4f}, Precision: {fold_precision[-1]:.4f}, Recall: {fold_recall[-1]:.4f}")

print("\n--- Average 3-Fold Cross-Validation Results ---")
print(f"F1 Score: {np.mean(fold_f1):.4f}")
print(f"Precision: {np.mean(fold_precision):.4f}")
print(f"Recall: {np.mean(fold_recall):.4f}")

# ------------------------------------------------
# 11. Train Final Model on Full Training Set and Evaluate on Test Set
# ------------------------------------------------
model_final = Wavegram_Logmel_Cnn14(**model_params)
model_final.to(DEVICE)
model_final.load_state_dict(model_dict)  # Load pretrained weights except final layer
optimizer_final = optim.Adam(model_final.parameters(), lr=LEARNING_RATE)
criterion_final = nn.CrossEntropyLoss()

print("\nTraining final model on full training set...")
for epoch in range(EPOCHS):
    model_final.train()
    total_loss = 0.0
    for waveforms, labels in tqdm(train_loader_full, desc=f"Final Model Epoch {epoch+1}/{EPOCHS}"):
        waveforms, labels = waveforms.to(DEVICE), labels.to(DEVICE)
        optimizer_final.zero_grad()
        outputs = model_final(waveforms)
        logits = outputs["clipwise_output"]
        loss = criterion_final(logits, labels)
        loss.backward()
        optimizer_final.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} - Training Loss: {total_loss/len(train_loader_full):.4f}")

torch.save(model_final.state_dict(), "trained_model_2.pth")
print("Final training complete. Model saved as 'trained_model_2.pth'.")

model_final.eval()
correct = 0
total = 0
with torch.no_grad():
    for waveforms, labels in test_loader:
        waveforms, labels = waveforms.to(DEVICE), labels.to(DEVICE)
        outputs = model_final(waveforms)
        logits = outputs["clipwise_output"]
        _, predicted = torch.max(logits, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
        
test_accuracy = correct / total
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Class mapping (Classname -> Index):
{'breath': 0, 'cough': 1, 'crying': 2, 'laugh': 3, 'screaming': 4, 'sneeze': 5, 'yawn': 6}
Loaded pretrained weights (except final classification layer) successfully.

Starting 3-Fold Cross-Validation...

--- Fold 1 ---


Fold 1 Epoch 1:  35%|███▌      | 185/524 [05:43<10:29,  1.86s/it]


RuntimeError: Sizes of tensors must match except in dimension 1. Expected size 196 but got size 195 for tensor number 1 in the list.