In [None]:
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torchinfo import summary
import torch.optim as optim
from sklearn.cluster import KMeans
from sklearn.metrics import confusion_matrix, classification_report

In [None]:
from sklearn.preprocessing import LabelEncoder

In [None]:
import os
from sklearn.mixture import GaussianMixture

In [None]:
from collections import defaultdict

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

In [None]:
if torch.cuda.is_available():
    device = 'cuda'
    print("GPU is available. Using GPU.")
else:
    device = 'cpu'
    print("GPU is not available. Using CPU.")

In [None]:
import math
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=500, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.pow(10000, torch.arange(0, d_model, 2).float() / d_model)
        pe[:, 0::2] = torch.sin(position / div_term)  
        pe[:, 1::2] = torch.cos(position / div_term)  
        self.register_buffer("pe", pe.unsqueeze(0))  

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [None]:
scaler = torch.amp.GradScaler(device)

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, d_model, max_len, num_heads, num_layers):
        super(TransformerEncoder, self).__init__()
        self.d_model = d_model
        self.max_len = max_len
        #self.num_layers = num_layers
        #self.num_heads = num_heads
        #self.input_dim = input_dim
        self.encoder_layer = nn.TransformerEncoderLayer(self.d_model, 
                                                        num_heads)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, 
                                                         num_layers)
        #self.pos_encoder = PositionalEncoding(self.d_model, self.max_len)
        self.pos_encoder = nn.Parameter(torch.randn(1, self.max_len, 
                                                    input_dim))  
        self.linear_in = nn.Linear(input_dim, self.d_model)
        self.linear_out = nn.Linear(self.d_model, 64)

    def forward(self, src):
        src = self.linear_in(src) * math.sqrt(self.d_model)
        src = src + self.pos_encoder[:, :src.size(1), :]
        #src = self.pos_encoder(src)
        output = self.transformer_encoder(src)
        output = self.linear_out(output[:, 0, :])
        return output



In [None]:
class TransformerEncoder2(nn.Module):
    def __init__(self, input_dim, d_model, num_heads, num_layers):
        super(TransformerEncoder2, self).__init__()
        encoder_layer = nn.TransformerEncoderLayer(d_model, num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.pos_encoder = PositionalEncoding(d_model)
        #self.pos_encoder = nn.Parameter(torch.randn(1, 500, input_dim))  
        self.linear_in = nn.Linear(input_dim, d_model)
        self.linear_out = nn.Linear(d_model, 64)

    def forward(self, src):
        src = self.linear_in(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src)
        output = self.linear_out(output[:, 0, :])
        return output

In [None]:
class Trainer:
    def __init__(self, model, optimizer, criterion, device, max_grad_norm=1.0):
        self.model = model.to(device)
        self.optimizer = optimizer
        self.criterion = criterion
        self.device = device
        self.max_grad_norm = max_grad_norm

    def train_step(self, x):
        x = x.to(self.device)

       
        x = (x - x.mean(dim=0)) / (x.std(dim=0) + 1e-6)

        self.optimizer.zero_grad()
        output = self.model(x)
        loss = self.criterion(output, output.mean(dim=0))  

        if not torch.isfinite(loss):  
            print("Warning: Loss is not finite. Skipping this step.")
            return float('nan')

        loss.backward()
        
       
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
        self.optimizer.step()

        return loss.item()

    def train(self, dataloader, epochs):
        for epoch in range(epochs):
            torch.cuda.empty_cache()  
            initial_memory = torch.cuda.memory_allocated(device)
            print(f"Epoch {epoch + 1} - Initial GPU memory: {initial_memory / 1e6} MB")

            print(f"Epoch [{epoch + 1}/{epochs}]")
            total_loss = 0
            valid_steps = 0
            for batch in dataloader:
                loss = self.train_step(batch[0])
                if torch.isfinite(torch.tensor(loss)):  
                    total_loss += loss
                    valid_steps += 1
            avg_loss = total_loss / valid_steps if valid_steps > 0 else float('nan')
            print(f"Loss: {avg_loss:.4f}")
            torch.cuda.empty_cache()


In [None]:
openl3_train_embeddings_path = '/kaggle/input/openl3-embeddings/openl3_audio_embeddings_final_train_11-12-24.pt'
openl3_train_labels_path = '/kaggle/input/openl3-embeddings/openl3_labels_final_train_11-12-24.pt'
openl3_val_embeddings_path = '/kaggle/input/openl3-embeddings/openl3_audio_embeddings_final_validation_11-12-24.pt'
openl3_val_labels_path = '/kaggle/input/openl3-embeddings/openl3_labels_final_validation_11-12-24.pt'
openl3_test_embeddings_path = '/kaggle/input/openl3-embeddings/openl3_audio_embeddings_final_test_11-12-24.pt'
openl3_test_labels_path = '/kaggle/input/openl3-embeddings/openl3_labels_final_test_11-12-24.pt'

In [None]:
train_embeddings = torch.load(openl3_train_embeddings_path, 
                              map_location=device, 
                             weights_only=True)  
test_embeddings = torch.load(openl3_test_embeddings_path, 
                              map_location=device, 
                             weights_only=True)    
val_embeddings = torch.load(openl3_val_embeddings_path, 
                              map_location=device, 
                             weights_only=True)      


all_embeddings = torch.cat((train_embeddings, test_embeddings, val_embeddings), dim=0)
print(all_embeddings.shape)

In [None]:

embeddings = all_embeddings
num_samples, num_channels, num_seq_lengths, audio_size = all_embeddings.shape
print(num_samples, num_channels, num_seq_lengths, audio_size)

In [None]:

embeddings = embeddings.reshape(num_samples, num_seq_lengths, audio_size)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
embeddings.shape

In [None]:

model = TransformerEncoder(input_dim=audio_size, d_model=audio_size, max_len = num_seq_lengths, num_heads=4, num_layers=2)
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay = 1e-5)
criterion = nn.MSELoss()

In [None]:
summary(model)

In [None]:

dataset = torch.utils.data.TensorDataset(embeddings)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=8, shuffle=True)

In [None]:
print(dataset[0][0].shape)
print(len(dataset))

### Train Model

In [None]:

trainer = Trainer(model, optimizer, criterion, device)
trainer.train(dataloader, 5)


model.eval()
with torch.no_grad():
    final_output = model(embeddings.to(device)).cpu().numpy()

In [None]:
torch.save(model.state_dict(), "transformer_model.pth")

In [None]:
final_output

### Re-load saved model

In [None]:
model_path = '/kaggle/input/moodtheme-1/transformer_model.pth'
model.load_state_dict(torch.load(model_path, weights_only=True))

In [None]:

model.eval()
with torch.no_grad():
    saved_final_output = model(embeddings.to(device)).cpu().numpy()

In [None]:
print(saved_final_output)

In [None]:
def audio_filepath_labels(input_tsv_file):
    df = pd.read_csv(input_tsv_file, sep='\t', header=None, names=['file', 'label'])
    if df.iloc[0, 0] == "PATH":
        df = df.drop(index=0).reset_index(drop=True)
    
    df['name'] = df['file'].apply(lambda x: x.split('/')[-1])  
    
    
    name_to_label = dict(zip(df['name'], df['label']))
    
    
    final_data = []
    
   
    for root, dirs, files in os.walk(input_folder):
        #print(root)
        for file in files:
            if file.endswith('.mp3'):
                name = file.replace('low.','') #os.path.splitext(file)[0]  
                #print("name:", name)
                if name in name_to_label:
                    full_path = os.path.join(root, file)
                    label = name_to_label[name]
                    #print(full_path, label)
                    final_data.append([full_path, label])  
    
    
    final_df_ = pd.DataFrame(final_data, columns=['audio_file_path', 'label'])
    
    
    output_file = '/kaggle/working/audio_files_with_labels.tsv'
    #final_df.to_csv(output_file, sep='\t', index=False)
    
    print(f"Final file generated: {output_file}")
    return final_df_

In [None]:
tsv_file = "/kaggle/input/moodtheme-1/final_mood_labels.tsv"
input_folder = "/kaggle/input/raw-00-01-02-filtered-files"
multi_tsv_file = "/kaggle/input/moodtheme-1/final_multi_mood_labels.tsv"

In [None]:
final_df = audio_filepath_labels(tsv_file)
multi_mood_final_df = audio_filepath_labels(multi_tsv_file)
print("Broader 4-labeled dataset shape:", final_df.shape)
print("Multi-labeled dataset shape:", multi_mood_final_df.shape)

In [None]:
final_df

In [None]:
multi_mood_final_df

### Multi-labeled dataset

In [None]:
multi_label_encoder = LabelEncoder()

In [None]:
multi_en_df = multi_mood_final_df.copy()  


multi_en_df['label_encoded'] = multi_label_encoder.fit_transform(multi_en_df['label'])
multi_en_df

In [None]:
multi_en_df['label_encoded'].value_counts()

In [None]:
def display_top_class_counts(df, top_k = None):
    value_counts = df['label_encoded'].value_counts().reset_index()
    value_counts.columns = ['label_encoded', 'count']
    
    
    encoding_to_label = df.drop_duplicates(subset=['label_encoded'])[['label_encoded', 'label']].set_index('label_encoded')
    value_counts['label'] = value_counts['label_encoded'].map(encoding_to_label['label'])
    
    
    #print(value_counts)
    x_counts = value_counts['label']
    y_counts = value_counts['count']
    if top_k:
        x_counts = value_counts['label'][:top_k]
        y_counts = value_counts['count'][:top_k]
    
    plt.figure(figsize=(10, 6))
    plt.bar(x_counts, y_counts, color='skyblue')
    plt.xlabel('Label')
    plt.ylabel('Count')
    plt.title('Label Encoding vs Actual Label Counts')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    return value_counts

In [None]:
multi_value_counts = display_top_class_counts(multi_en_df, 10)

In [None]:
multi_value_counts.head(10)

In [None]:
multi_clusters = len(list(np.unique(multi_en_df['label_encoded'])))
print(multi_clusters)

In [None]:
def run_GMM(model_output, num_clusters):   
    
    gmm = GaussianMixture(n_components=num_clusters, random_state=0)
    gmm.fit(model_output)
    
    
    probabilities = gmm.predict_proba(model_output)
    
    
    predicted_labels = np.argmax(probabilities, axis=1)
    
    
    return gmm, probabilities, predicted_labels

In [None]:
multi_gmm, multi_probs, multi_preds = run_GMM(saved_final_output, multi_clusters)

In [None]:
multi_gmm_counts = np.unique(multi_preds, return_counts=True)


for cluster, count in zip(multi_gmm_counts[0], multi_gmm_counts[1]):
    print(f"Cluster {cluster}: {count} samples")

In [None]:
def cluster_to_label(gmm_model, gmm_probs, actual_labels):
    
    cluster_to_true_label_ = {}
    
    
    for cluster in range(gmm_model.n_components):
        
        cluster_indices = np.where(np.argmax(gmm_probs, axis=1) == cluster)[0]
        
        true_labels_cluster = actual_labels[cluster_indices]
        
        
        label_probabilities = []
        for i in cluster_indices:
            
            sample_probs = gmm_probs[i]  
            
            
            true_label = actual_labels[i]  
            
            label_probabilities.append((true_label, sample_probs))
    
        
        cluster_to_true_label_[cluster] = label_probabilities
        #print(f'Cluster {cluster}: {label_probabilities}')
    
    
    return cluster_to_true_label_

In [None]:
multi_true_labels = multi_en_df['label_encoded'].values
print(len(multi_true_labels))

In [None]:
multi_cluster_to_label = cluster_to_label(multi_gmm, multi_probs, multi_true_labels)

In [None]:
multi_cluster_to_label.keys()

In [None]:
def final_label_mapping(cluster_to_true_label_):
    
    final_cluster_to_true_label_ = {}  
    cluster_to_tie_breaker_ = defaultdict(list)  
    
    
    for cluster, label_probabilities in cluster_to_true_label_.items():
        most_probable_label = None
        max_probability = -1
        label_prob_dict = defaultdict(list)  
        
        
        for true_label, probs in label_probabilities:
            avg_prob = np.mean(probs)  
            
            
            label_prob_dict[true_label] = avg_prob
            
            
            if avg_prob > max_probability:
                most_probable_label = true_label
                max_probability = avg_prob
                cluster_to_tie_breaker_[cluster] = [(true_label, avg_prob)]  
            elif avg_prob == max_probability:
                cluster_to_tie_breaker_[cluster].append((true_label, avg_prob))  
    
        
        final_cluster_to_true_label_[cluster] = most_probable_label
    return final_cluster_to_true_label_, cluster_to_tie_breaker_

In [None]:
multi_final_cluster_to_true, multi_cluster_to_tie = final_label_mapping(multi_cluster_to_label)

In [None]:
print("Cluster to True Label Mapping (Most Probable):", multi_final_cluster_to_true)

In [None]:

multi_final_predicted_labels = []
for cluster in multi_preds:
    
    true_label = multi_final_cluster_to_true[cluster]
    multi_final_predicted_labels.append(true_label)

In [None]:
top_k = 10
multi_unique, multi_counts = np.unique(multi_true_labels, return_counts=True)
multi_top_k_labels = multi_unique[np.argsort(-multi_counts)[:10]]

In [None]:
multi_top_k_labels

In [None]:

multi_cm = confusion_matrix(multi_true_labels, multi_final_predicted_labels, labels=multi_top_k_labels)


plt.figure(figsize=(8, 6))
sns.heatmap(multi_cm, annot=True, fmt='g', cmap='Blues', xticklabels=multi_top_k_labels, yticklabels=multi_top_k_labels)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

In [None]:
multi_label_report = classification_report(multi_true_labels, multi_final_predicted_labels)


print(multi_label_report)

### 4-labeled Dataset

In [None]:

en_df = final_df.copy()  


label_encoder = LabelEncoder()
en_df['label_encoded'] = label_encoder.fit_transform(en_df['label'])
en_df

In [None]:
broad_value_counts = display_top_class_counts(en_df)

In [None]:
en_df['label_encoded'].value_counts()

In [None]:
from sklearn.mixture import GaussianMixture


gmm = GaussianMixture(n_components=4, random_state=0)
gmm.fit(final_output)


probabilities = gmm.predict_proba(final_output)


predicted_labels = np.argmax(probabilities, axis=1)




In [None]:
gmm_counts = np.unique(predicted_labels, return_counts=True)


for cluster, count in zip(gmm_counts[0], gmm_counts[1]):
    print(f"Cluster {cluster}: {count} samples")

In [None]:
import joblib


joblib.dump(gmm, 'gmm_model.pkl')

In [None]:
true_labels = en_df['label_encoded'].values

In [None]:


cluster_to_true_label = {}


for cluster in range(gmm.n_components):
    
    cluster_indices = np.where(np.argmax(probabilities, axis=1) == cluster)[0]
    
    
    true_labels_cluster = true_labels[cluster_indices]
    
    
    label_probabilities = []
    for i in cluster_indices:
        
        sample_probs = probabilities[i]  
        
        
        true_label = true_labels[i]  
        
        label_probabilities.append((true_label, sample_probs))

    
    cluster_to_true_label[cluster] = label_probabilities


print("Cluster to True Label Mapping (without majority voting):")
#print(cluster_to_true_label)


In [None]:
cluster_to_true_label[0][:5]

In [None]:
from collections import defaultdict



final_cluster_to_true_label = {}  
cluster_to_tie_breaker = defaultdict(list)  


for cluster, label_probabilities in cluster_to_true_label.items():
    most_probable_label = None
    max_probability = -1
    label_prob_dict = defaultdict(list)  
    
    
    for true_label, probs in label_probabilities:
        avg_prob = np.mean(probs)  
        
        
        label_prob_dict[true_label] = avg_prob
        
        
        if avg_prob > max_probability:
            most_probable_label = true_label
            max_probability = avg_prob
            cluster_to_tie_breaker[cluster] = [(true_label, avg_prob)]  
        elif avg_prob == max_probability:
            cluster_to_tie_breaker[cluster].append((true_label, avg_prob))  

    
    final_cluster_to_true_label[cluster] = most_probable_label


print("Cluster to True Label Mapping (Most Probable):", final_cluster_to_true_label)
print("Cluster to True Label Tie Breakers:", cluster_to_tie_breaker)


In [None]:

final_predicted_labels = []
for cluster in predicted_labels:
    
    true_label = final_cluster_to_true_label[cluster]
    final_predicted_labels.append(true_label)


cm = confusion_matrix(true_labels, final_predicted_labels)


plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=np.unique(true_labels), yticklabels=np.unique(true_labels))
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()


In [None]:
from sklearn.metrics import classification_report
report = classification_report(true_labels, final_predicted_labels, zero_division=0)


print(report)