In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics import (silhouette_score, calinski_harabasz_score, 
                             davies_bouldin_score, adjusted_rand_score)
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import librosa
from tqdm import tqdm
import pickle
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)
torch.manual_seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
CONFIG = {
    'sample_rate': 22050, 'duration': 30, 'n_mels': 128, 'n_mfcc': 40,
    'n_fft': 2048, 'hop_length': 512, 'max_samples_per_class': 50,
    'fixed_time_steps': 128,  # Fixed time dimension for CNN
}
BASE_PATH = r"f:\BRACU\Semester 12 Final\CSE425\FInal_project\Datasets"
BANGLA_PATH = os.path.join(BASE_PATH, "Bangla_Datasets")
ENGLISH_PATH = os.path.join(BASE_PATH, "English_Datasets")
METADATA_PATH = os.path.join(BASE_PATH, "updated_metadata.csv")
OUTPUT_PATH = r"f:\BRACU\Semester 12 Final\CSE425\FInal_project\processed_data"
RESULTS_PATH = r"f:\BRACU\Semester 12 Final\CSE425\FInal_project\results_advanced"
os.makedirs(OUTPUT_PATH, exist_ok=True)
os.makedirs(RESULTS_PATH, exist_ok=True)
print("Configuration loaded!")
print("Loading metadata with lyrics...")
metadata_df = pd.read_csv(METADATA_PATH)
print(f"Metadata shape: {metadata_df.shape}")
print(f"Columns: {metadata_df.columns.tolist()}")
lyrics_dict = dict(zip(metadata_df['ID'], metadata_df['lyrics'].fillna('')))
print(f"Loaded {len(lyrics_dict)} lyrics entries")

In [None]:
def load_audio(file_path):
    try:
        audio, sr = librosa.load(file_path, sr=CONFIG['sample_rate'], duration=CONFIG['duration'])
        expected = CONFIG['sample_rate'] * CONFIG['duration']
        if len(audio) < expected:
            audio = np.pad(audio, (0, expected - len(audio)))
        return audio, sr
    except Exception as e:
        return None, None

def extract_mel_spectrogram(audio, sr):
    mel = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=CONFIG['n_mels'],
                                          n_fft=CONFIG['n_fft'], hop_length=CONFIG['hop_length'])
    mel_db = librosa.power_to_db(mel, ref=np.max)
    # Resize to fixed time steps
    if mel_db.shape[1] > CONFIG['fixed_time_steps']:
        mel_db = mel_db[:, :CONFIG['fixed_time_steps']]
    else:
        mel_db = np.pad(mel_db, ((0, 0), (0, CONFIG['fixed_time_steps'] - mel_db.shape[1])))
    return mel_db

def extract_mfcc(audio, sr):
    mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=CONFIG['n_mfcc'],
                                 n_fft=CONFIG['n_fft'], hop_length=CONFIG['hop_length'])
    if mfcc.shape[1] > CONFIG['fixed_time_steps']:
        mfcc = mfcc[:, :CONFIG['fixed_time_steps']]
    else:
        mfcc = np.pad(mfcc, ((0, 0), (0, CONFIG['fixed_time_steps'] - mfcc.shape[1])))
    return mfcc

print("Audio extraction functions defined!")

# ============================================================================
# CELL 5: Lyrics Embedding Function
# ============================================================================
def create_lyrics_embeddings(lyrics_list, max_features=100):
    """Create TF-IDF embeddings for lyrics."""
    vectorizer = TfidfVectorizer(max_features=max_features, stop_words='english')
    # Handle empty lyrics
    lyrics_cleaned = [l if l and len(str(l)) > 0 else ' ' for l in lyrics_list]
    embeddings = vectorizer.fit_transform(lyrics_cleaned).toarray()
    return embeddings, vectorizer

print("Lyrics embedding function defined!")

# ============================================================================
# CELL 6: Collect and Process Audio Files
# ============================================================================
def collect_audio_files():
    audio_files = []
    for path, lang in [(BANGLA_PATH, 'bn'), (ENGLISH_PATH, 'en')]:
        if os.path.exists(path):
            for genre in os.listdir(path):
                genre_path = os.path.join(path, genre)
                if os.path.isdir(genre_path):
                    files = [f for f in os.listdir(genre_path) if f.endswith('.wav')][:CONFIG['max_samples_per_class']]
                    for f in files:
                        file_id = os.path.splitext(f)[0]
                        audio_files.append({
                            'path': os.path.join(genre_path, f),
                            'language': lang, 'genre': genre,
                            'filename': f, 'id': file_id,
                            'lyrics': lyrics_dict.get(file_id, '')
                        })
    return audio_files

audio_files = collect_audio_files()
print(f"Collected {len(audio_files)} audio files")


In [None]:
print("\nExtracting features...")
mel_spectrograms, mfccs, labels, lyrics_list, file_metadata = [], [], [], [], []

for file_info in tqdm(audio_files, desc="Processing"):
    audio, sr = load_audio(file_info['path'])
    if audio is not None:
        try:
            mel_spectrograms.append(extract_mel_spectrogram(audio, sr))
            mfccs.append(extract_mfcc(audio, sr))
            labels.append(file_info['genre'])
            lyrics_list.append(file_info['lyrics'])
            file_metadata.append(file_info)
        except:
            pass

mel_spectrograms = np.array(mel_spectrograms)
mfccs = np.array(mfccs)
labels = np.array(labels)
print(f"\nMel spectrograms shape: {mel_spectrograms.shape}")
print(f"MFCCs shape: {mfccs.shape}")

# Create lyrics embeddings
lyrics_embeddings, tfidf_vectorizer = create_lyrics_embeddings(lyrics_list)
print(f"Lyrics embeddings shape: {lyrics_embeddings.shape}")

# Encode labels
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)
print(f"Classes: {label_encoder.classes_}")

In [None]:
class ConvVAE(nn.Module):
    def __init__(self, input_channels=1, input_height=128, input_width=128, latent_dim=64):
        super(ConvVAE, self).__init__()
        self.latent_dim = latent_dim
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, 3, stride=2, padding=1),
            nn.BatchNorm2d(32), nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1),
            nn.BatchNorm2d(64), nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2, padding=1),
            nn.BatchNorm2d(128), nn.ReLU(),
            nn.Conv2d(128, 256, 3, stride=2, padding=1),
            nn.BatchNorm2d(256), nn.ReLU(),
        )
        
        # Calculate flattened size
        self.flat_size = 256 * (input_height // 16) * (input_width // 16)
        self.fc_mu = nn.Linear(self.flat_size, latent_dim)
        self.fc_logvar = nn.Linear(self.flat_size, latent_dim)
        
        # Decoder
        self.fc_decode = nn.Linear(latent_dim, self.flat_size)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(128), nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(64), nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(32), nn.ReLU(),
            nn.ConvTranspose2d(32, input_channels, 3, stride=2, padding=1, output_padding=1),
        )
        self.h_out = input_height // 16
        self.w_out = input_width // 16

    def encode(self, x):
        h = self.encoder(x).view(-1, self.flat_size)
        return self.fc_mu(h), self.fc_logvar(h)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        return mu + torch.randn_like(std) * std

    def decode(self, z):
        h = self.fc_decode(z).view(-1, 256, self.h_out, self.w_out)
        return self.decoder(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar, z

    def get_latent(self, x):
        mu, _ = self.encode(x)
        return mu

print("Convolutional VAE defined!")

In [None]:
class HybridVAE(nn.Module):
    def __init__(self, audio_latent_dim=32, lyrics_dim=100, combined_latent_dim=48):
        super(HybridVAE, self).__init__()
        self.combined_latent_dim = combined_latent_dim
        
        # Audio encoder (from ConvVAE)
        self.audio_encoder = nn.Sequential(
            nn.Conv2d(1, 32, 3, stride=2, padding=1), nn.BatchNorm2d(32), nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
        )
        self.audio_flat = 128 * 16 * 16
        self.audio_fc = nn.Linear(self.audio_flat, audio_latent_dim)
        
        # Lyrics encoder
        self.lyrics_encoder = nn.Sequential(
            nn.Linear(lyrics_dim, 64), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(64, audio_latent_dim), nn.ReLU()
        )
        
        # Combined latent space
        combined_input = audio_latent_dim * 2
        self.fc_mu = nn.Linear(combined_input, combined_latent_dim)
        self.fc_logvar = nn.Linear(combined_input, combined_latent_dim)
        
        # Decoder (audio only)
        self.fc_decode = nn.Linear(combined_latent_dim, self.audio_flat)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(64), nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(32), nn.ReLU(),
            nn.ConvTranspose2d(32, 1, 3, stride=2, padding=1, output_padding=1),
        )

    def encode(self, audio, lyrics):
        audio_h = self.audio_encoder(audio).view(-1, self.audio_flat)
        audio_feat = self.audio_fc(audio_h)
        lyrics_feat = self.lyrics_encoder(lyrics)
        combined = torch.cat([audio_feat, lyrics_feat], dim=1)
        return self.fc_mu(combined), self.fc_logvar(combined)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        return mu + torch.randn_like(std) * std

    def forward(self, audio, lyrics):
        mu, logvar = self.encode(audio, lyrics)
        z = self.reparameterize(mu, logvar)
        h = self.fc_decode(z).view(-1, 128, 16, 16)
        recon = self.decoder(h)
        return recon, mu, logvar, z

    def get_latent(self, audio, lyrics):
        mu, _ = self.encode(audio, lyrics)
        return mu

print("Hybrid VAE defined!")