<a href="https://colab.research.google.com/github/anjorisarabhai/OIBSIP/blob/main/Untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import os
import numpy as np
from sklearn.metrics import classification_report, f1_score
from google.colab import files
import zipfile
import glob
import sys
import torch.nn.functional as F

# --- CONFIGURATION ---
LOCAL_ZIP_NAME = 'project_spectrogram_data.zip'
SPECTROGRAM_FOLDER_NAME = 'spectrogram'
LOCAL_CSV_NAME = 'consolidated_genres.csv'  # File name user uploads
CLEANED_CSV_NAME = 'cleaned_final_metadata.csv'  # New standardized name for the output
LOCAL_BASE_DIR = './project_data/'
SPECTROGRAM_FILE_EXTENSION = '.jpg'

# Global variables will be set after extraction
SPECTROGRAM_DIR = None
METADATA_FILE = None
NUM_CLASSES = None
train_loader = None
val_loader = None
genre_names = None

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- FILE UPLOAD AND AUTO-CORRECTING EXTRACTION UTILITY ---
def upload_and_extract_data():
    """Prompts for file upload, extracts ZIP, and finds the true paths."""
    global SPECTROGRAM_DIR, METADATA_FILE
    print(f"Please upload your '{LOCAL_ZIP_NAME}' file now:")
    uploaded = files.upload()
    if not uploaded:
        print("❌ ERROR: No file uploaded. Aborting setup.")
        return False
    uploaded_filename = list(uploaded.keys())[0]
    os.makedirs(LOCAL_BASE_DIR, exist_ok=True)
    try:
        with zipfile.ZipFile(uploaded_filename, 'r') as zip_ref:
            zip_ref.extractall(LOCAL_BASE_DIR)
        os.remove(uploaded_filename)
        # Find the correct, deep paths for SPECTROGRAM_DIR and METADATA_FILE
        spectro_paths = glob.glob(os.path.join(LOCAL_BASE_DIR, '**', SPECTROGRAM_FOLDER_NAME), recursive=True)
        csv_paths = glob.glob(os.path.join(LOCAL_BASE_DIR, '**', LOCAL_CSV_NAME), recursive=True)
        if not spectro_paths or not csv_paths:
            print(f"❌ FATAL ERROR: Could not find required files inside the extracted data.")
            return False

        # Set the global paths
        SPECTROGRAM_DIR = spectro_paths[0]
        # We will point METADATA_FILE to the temporary output of the cleansing script
        METADATA_FILE = os.path.join(os.path.dirname(csv_paths[0]), CLEANED_CSV_NAME)
        print(f"✅ SUCCESS: Data extracted and paths located.")
        return True
    except Exception as e:
        print(f"❌ ERROR: Failed during extraction or path search: {e}")
        return False

# --- DATA CLEANSING UTILITY (Identify all label columns dynamically) ---
def cleanse_metadata_file():
    """Filters the metadata CSV to use all available labels and synchronizes with physical files."""
    original_csv_path = glob.glob(os.path.join(LOCAL_BASE_DIR, '**', LOCAL_CSV_NAME), recursive=True)[0]
    metadata_df = pd.read_csv(original_csv_path)
    print(f"Original metadata columns: {metadata_df.columns.tolist()}")
    print(f"Original metadata sample:\n{metadata_df.head()}")

    # Sync filenames/extensions
    metadata_df['filename'] = metadata_df['filename'].str.replace(r'\.(png|jpeg|jpg)$', SPECTROGRAM_FILE_EXTENSION, regex=True)
    image_files = set(os.listdir(SPECTROGRAM_DIR))
    print(f"Image files found: {len(image_files)}; Examples: {list(image_files)[:5]}")

    # Filter by filenames matching files
    filtered_df = metadata_df[metadata_df['filename'].isin(image_files)].copy()
    print(f"Rows after filename syncing: {len(filtered_df)}")

    # Detect label columns: assume all except 'filename' are labels
    filename_col = filtered_df.columns[0]
    genre_columns = list(filtered_df.columns[1:])  # treat all other columns as labels
    print(f"Label columns detected: {genre_columns}")

    # Prepare final DataFrame
    final_columns = [filename_col] + genre_columns
    final_df = filtered_df[final_columns].copy()

    # Keep only tracks with at least one label
    final_df['label_sum'] = final_df[genre_columns].sum(axis=1)
    final_df = final_df[final_df['label_sum'] > 0].drop(columns=['label_sum'])

    print(f"Rows after removing tracks with no labels: {len(final_df)}")
    print(f"Final labels count: {len(genre_columns)}")

    # Save cleaned data
    final_df.to_csv(METADATA_FILE, index=False)
    print(f"✅ DATA CLEANSING COMPLETE")
    return True

# --- CUSTOM MULTI-LABEL DATASET CLASS ---
class MultiLabelSpectrogramDataset(Dataset):
    def __init__(self, metadata_path, img_dir, transform=None):
        self.metadata_frame = pd.read_csv(metadata_path)
        self.img_dir = img_dir
        self.transform = transform
        # rename first column to 'filename' (for safety)
        self.metadata_frame.rename(columns={self.metadata_frame.columns[0]: 'filename'}, inplace=True)
        self.label_columns = self.metadata_frame.columns[1:].tolist()
        self.num_classes = len(self.label_columns)

    def __len__(self):
        return len(self.metadata_frame)

    def __getitem__(self, idx):
        filename = self.metadata_frame.iloc[idx, 0]
        img_path = os.path.join(self.img_dir, filename)
        image = Image.open(img_path).convert('RGB')
        label_vector = self.metadata_frame.iloc[idx, 1:].values.astype(np.float32)
        label_tensor = torch.tensor(label_vector, dtype=torch.float32)
        if self.transform:
            image = self.transform(image)
        return image, label_tensor

# --- RUN INITIALIZATION ---
if upload_and_extract_data():
    if cleanse_metadata_file():
        # Data transforms
        train_transform = transforms.Compose([
            transforms.RandomRotation(15),
            transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225]),
            transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x)
        ])
        val_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225]),
            transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x)
        ])

        full_dataset = MultiLabelSpectrogramDataset(
            metadata_path=METADATA_FILE,
            img_dir=SPECTROGRAM_DIR
        )

        global NUM_CLASSES, train_loader, val_loader, genre_names
        NUM_CLASSES = full_dataset.num_classes
        train_size = int(0.8 * len(full_dataset))
        val_size = len(full_dataset) - train_size
        train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

        train_dataset.dataset.transform = train_transform
        val_dataset.dataset.transform = val_transform

        batch_size = 16
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        genre_names = full_dataset.label_columns

Using device: cuda
Please upload your 'project_spectrogram_data.zip' file now:


Saving project_spectrogram_data.zip to project_spectrogram_data.zip
✅ SUCCESS: Data extracted and paths located.
Original metadata columns: ['filename', 'Hip hop', 'Rap', 'R&B / Soul', 'Pop', 'Rock', 'Jazz', 'Blues', 'Country', 'Folk', 'Classical', 'Electronic', 'Funk', 'Reggae / Ska / Dub', 'Metal', 'Gospel / Religious', 'Latin', 'Disco / Dance', 'Ambient / Chill / New Age', 'Experimental / Avant-Garde', 'Opera / Musical Theater / Soundtrack', 'Vocal / A cappella', 'Dancehall / Hip House / Club', 'Psychedelic', 'Other / Niche']
Original metadata sample:
                                   filename  Hip hop  Rap  R&B / Soul  Pop  \
0  00b1397d-7f3e-4c59-bb42-ccd7fa17ee10.jpg        0    0           0    1   
1  00c9dcab-4abf-47f5-9755-c5c805b779c7.jpg        1    1           1    0   
2  012e3459-b54d-49e9-b48d-d0922d295c5a.jpg        0    0           0    1   
3  013a7fe3-0113-4604-a295-f74a0b88bf05.jpg        0    0           0    0   
4  0172efb9-b353-4e55-82cd-80136d98069f.jpg      

In [4]:
# --- OPTIMIZED TRAINING FUNCTION (VGG TRANSFER LEARNING) ---

def train_and_evaluate_transfer_learning(model_name, model, train_loader, val_loader, NUM_CLASSES, genre_names, device):
    """
    Two-phase fine-tuning (VGG Transfer Learning) for Multi-Label Classification.
    Fixed device mismatch error and added epoch-wise progress.
    """
    # --- Phase 0: Adapt final layer & move to device
    if model_name.startswith('VGG'):
        num_ftrs = model.classifier[6].in_features
        model.classifier[6] = nn.Linear(num_ftrs, NUM_CLASSES).to(device)  # <-- FIXED DEVICE

    model.to(device)
    criterion = nn.BCEWithLogitsLoss()

    # --- Phase 1: Train classifier head only ---
    for param in model.features.parameters():
        param.requires_grad = False

    optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)
    num_epochs_phase1 = 5

    print(f"\n--- Phase 1: Training {model_name} Classifier Head (5 Epochs) ---")
    for epoch in range(num_epochs_phase1):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Phase 1 - Epoch {epoch+1}/{num_epochs_phase1}, Loss: {running_loss/len(train_loader):.4f}")

    # --- Phase 2: Fine-tune last layers ---
    for param in model.features[24:].parameters():
        param.requires_grad = True
    optimizer = optim.Adam(model.parameters(), lr=1e-5)

    num_epochs_phase2 = 40
    print("\nPhase 2: Fine-tuning last layers (40 Epochs)...")
    for epoch in range(num_epochs_phase2):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Phase 2 - Epoch {epoch+1}/{num_epochs_phase2}, Loss: {running_loss/len(train_loader):.4f}")

    # --- FINAL EVALUATION ---
    model.eval()
    y_true_list, y_pred_list = [], []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            predicted_probs = torch.sigmoid(outputs).cpu().numpy()
            predicted_labels = (predicted_probs > 0.5).astype(int)
            y_true_list.extend(labels.cpu().numpy())
            y_pred_list.extend(predicted_labels)

    y_true = np.array(y_true_list)
    y_pred = np.array(y_pred_list)

    exact_match_accuracy = (y_pred == y_true).all(axis=1).mean()
    from sklearn.metrics import f1_score, classification_report
    weighted_f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

    print(f"\n--- Final Evaluation for {model_name} ---")
    print(f"Overall Weighted F1-score: {weighted_f1:.4f}")
    print(f"Exact Match Accuracy: {exact_match_accuracy:.4f}")
    print(classification_report(y_true, y_pred, target_names=genre_names, zero_division=0))


# --- EXECUTION: VGG-16 TRANSFER LEARNING ---

# Define VGG-16 model with pre-trained weights
vgg16_model = models.vgg16(weights='IMAGENET1K_V1')

# Run VGG-16 (Optimized for 50%+ F1-score)
train_and_evaluate_transfer_learning(
    model_name="VGG-16",
    model=vgg16_model,
    train_loader=train_loader,
    val_loader=val_loader,
    NUM_CLASSES=NUM_CLASSES,
    genre_names=genre_names,
    device=device
)

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth


100%|██████████| 528M/528M [00:05<00:00, 97.8MB/s]



--- Phase 1: Training VGG-16 Classifier Head (5 Epochs) ---
Phase 1 - Epoch 1/5, Loss: 0.2862
Phase 1 - Epoch 2/5, Loss: 0.2420
Phase 1 - Epoch 3/5, Loss: 0.2259
Phase 1 - Epoch 4/5, Loss: 0.2097
Phase 1 - Epoch 5/5, Loss: 0.2065

Phase 2: Fine-tuning last layers (40 Epochs)...
Phase 2 - Epoch 1/40, Loss: 0.1916
Phase 2 - Epoch 2/40, Loss: 0.1795
Phase 2 - Epoch 3/40, Loss: 0.1758
Phase 2 - Epoch 4/40, Loss: 0.1689
Phase 2 - Epoch 5/40, Loss: 0.1650
Phase 2 - Epoch 6/40, Loss: 0.1619
Phase 2 - Epoch 7/40, Loss: 0.1556
Phase 2 - Epoch 8/40, Loss: 0.1580
Phase 2 - Epoch 9/40, Loss: 0.1523
Phase 2 - Epoch 10/40, Loss: 0.1453
Phase 2 - Epoch 11/40, Loss: 0.1443
Phase 2 - Epoch 12/40, Loss: 0.1384
Phase 2 - Epoch 13/40, Loss: 0.1383
Phase 2 - Epoch 14/40, Loss: 0.1363
Phase 2 - Epoch 15/40, Loss: 0.1344
Phase 2 - Epoch 16/40, Loss: 0.1306
Phase 2 - Epoch 17/40, Loss: 0.1283
Phase 2 - Epoch 18/40, Loss: 0.1270
Phase 2 - Epoch 19/40, Loss: 0.1204
Phase 2 - Epoch 20/40, Loss: 0.1208
Phase 2 -

In [5]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import os
import numpy as np
from sklearn.metrics import f1_score
import glob

# --- CONFIGURATION ---
LOCAL_ZIP_NAME = 'project_spectrogram_data.zip'  # change if needed
LOCAL_BASE_DIR = './project_data/'
SPECTROGRAM_FOLDER_NAME = 'spectrogram'
LOCAL_CSV_NAME = 'consolidated_genres.csv'
CLEANED_CSV_NAME = 'cleaned_final_metadata.csv'
SPECTROGRAM_FILE_EXTENSION = '.jpg'
BATCH_SIZE = 32
NUM_EPOCHS_PHASE1 = 5
NUM_EPOCHS_PHASE2 = 50
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# --- DATA CLEANSING FUNCTION ---
def cleanse_metadata_file():
    original_csv_path = glob.glob(os.path.join(LOCAL_BASE_DIR, '**', LOCAL_CSV_NAME), recursive=True)[0]
    metadata_df = pd.read_csv(original_csv_path)
    metadata_df['filename'] = metadata_df['filename'].str.replace(r'\.(png|jpeg|jpg)$', SPECTROGRAM_FILE_EXTENSION, regex=True)
    image_files = set(os.listdir(os.path.join(LOCAL_BASE_DIR, SPECTROGRAM_FOLDER_NAME)))
    metadata_df = metadata_df[metadata_df['filename'].isin(image_files)].copy()
    filename_col = metadata_df.columns[0]
    genre_columns = list(metadata_df.columns[1:])
    final_columns = [filename_col] + genre_columns
    final_df = metadata_df[final_columns].copy()
    final_df['label_sum'] = final_df[genre_columns].sum(axis=1)
    final_df = final_df[final_df['label_sum'] > 0].drop(columns=['label_sum'])
    cleaned_csv_path = os.path.join(LOCAL_BASE_DIR, CLEANED_CSV_NAME)
    final_df.to_csv(cleaned_csv_path, index=False)
    print(f"✅ DATA CLEANSING COMPLETE: {len(final_df)} samples with {len(genre_columns)} labels.")
    return cleaned_csv_path, genre_columns

# --- CUSTOM DATASET ---
class MultiLabelSpectrogramDataset(Dataset):
    def __init__(self, metadata_path, img_dir, transform=None):
        self.metadata_frame = pd.read_csv(metadata_path)
        self.img_dir = img_dir
        self.transform = transform
        self.label_columns = self.metadata_frame.columns[1:].tolist()

    def __len__(self):
        return len(self.metadata_frame)

    def __getitem__(self, idx):
        filename = self.metadata_frame.iloc[idx, 0]
        img_path = os.path.join(self.img_dir, filename)
        image = Image.open(img_path).convert('RGB')
        label_vec = self.metadata_frame.iloc[idx, 1:].values.astype(np.float32)
        label_tensor = torch.tensor(label_vec, dtype=torch.float32)
        if self.transform:
            image = self.transform(image)
        return image, label_tensor

# --- DATA AUGMENTATIONS ---
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.6, 1.0), ratio=(0.9, 1.1)),
    transforms.RandomRotation(30),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)),
    transforms.ToTensor(),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.3, 3.3)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x)
])

val_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x)
])

# --- MAIN TRAINING FUNCTION ---
def train_model():
    # Cleanse data and get classes
    cleaned_csv_path, genre_names = cleanse_metadata_file()
    img_dir = os.path.join(LOCAL_BASE_DIR, SPECTROGRAM_FOLDER_NAME)

    # Dataset and dataloaders
    full_dataset = MultiLabelSpectrogramDataset(cleaned_csv_path, img_dir, transform=None)
    num_classes = len(full_dataset.label_columns)
    train_size = int(0.8 * len(full_dataset))
    val_size = len(full_dataset) - train_size
    train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
    train_dataset.dataset.transform = train_transform
    val_dataset.dataset.transform = val_transform

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)

    # Load pretrained VGG16
    model = models.vgg16(pretrained=True)
    # Replace classifier for multi-label output
    model.classifier[6] = nn.Linear(model.classifier[6].in_features, num_classes)
    model = model.to(DEVICE)

    # Loss function and optimizer with differential LR
    criterion = nn.BCEWithLogitsLoss()

    base_params = [param for name, param in model.named_parameters() if "classifier" not in name]
    head_params = [param for name, param in model.named_parameters() if "classifier" in name]

    optimizer = optim.AdamW([
        {'params': base_params, 'lr': 1e-5},
        {'params': head_params, 'lr': 1e-3}
    ], weight_decay=1e-4)

    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS_PHASE2)

    # Training phases
    model.train()
    print("--- Phase 1: Training classifier head (5 epochs) ---")
    for epoch in range(NUM_EPOCHS_PHASE1):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        epoch_loss = running_loss / len(train_loader.dataset)
        print(f"Phase 1 - Epoch {epoch+1}/{NUM_EPOCHS_PHASE1}, Loss: {epoch_loss:.4f}")

    print("\n--- Phase 2: Fine-tuning last layers (50 epochs) ---")
    for epoch in range(NUM_EPOCHS_PHASE2):
        running_loss = 0.0
        model.train()
        for inputs, labels in train_loader:
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        scheduler.step()
        epoch_loss = running_loss / len(train_loader.dataset)
        print(f"Phase 2 - Epoch {epoch+1}/{NUM_EPOCHS_PHASE2}, Loss: {epoch_loss:.4f}")
        # Optionally add validation per epoch for monitoring

    # Evaluation on validation set
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(inputs)
            preds = torch.sigmoid(outputs).cpu().numpy()
            labels = labels.cpu().numpy()
            all_labels.append(labels)
            all_preds.append(preds)

    all_labels = np.vstack(all_labels)
    all_preds = np.vstack(all_preds)
    # Thresholding at 0.5 for multi-label
    all_pred_labels = (all_preds >= 0.5).astype(int)

    # Calculate weighted F1 score
    f1 = f1_score(all_labels, all_pred_labels, average='weighted', zero_division=0)
    print(f"\nFinal Weighted F1 Score on validation set: {f1:.4f}")

if __name__ == "__main__":
    train_model()

Using device: cuda


FileNotFoundError: [Errno 2] No such file or directory: './project_data/spectrogram'