In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory


# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
import numpy as np
import pandas as pd
import os
import librosa
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
import json

# Set random seeds
torch.manual_seed(42)
np.random.seed(42)

In [3]:
def create_ravdess_df(data_path):
    file_emotion = []
    file_path = []
    valid_emotions = ['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust']
    
    for dir in os.listdir(data_path):
        actor_path = os.path.join(data_path, dir)
        if os.path.isdir(actor_path):
            for file in os.listdir(actor_path):
                part = file.split('.')[0].split('-')
                emotion = int(part[2])
                emotion_map = {
                    1: 'neutral', 2: 'calm', 3: 'happy', 4: 'sad',
                    5: 'angry', 6: 'fear', 7: 'disgust', 8: 'surprise'
                }
                if emotion_map[emotion] in valid_emotions:
                    file_emotion.append(emotion_map[emotion])
                    file_path.append(os.path.join(actor_path, file))
    
    return pd.DataFrame({'path': file_path, 'emotion': file_emotion})

def create_crema_df(data_path):
    file_emotion = []
    file_path = []
    valid_emotions = ['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust']
    
    for file in os.listdir(data_path):
        if file.endswith(".wav"):
            part = file.split('_')
            emotion_map = {
                'SAD': 'sad', 'ANG': 'angry', 'DIS': 'disgust',
                'FEA': 'fear', 'HAP': 'happy', 'NEU': 'neutral'
            }
            emotion = emotion_map.get(part[2])
            if emotion in valid_emotions:
                file_path.append(os.path.join(data_path, file))
                file_emotion.append(emotion)
    
    return pd.DataFrame({'path': file_path, 'emotion': file_emotion})

def create_savee_df(data_path):
    file_emotion = []
    file_path = []
    valid_emotions = ['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust']
    emotion_map = {
        'a': 'angry', 'd': 'disgust', 'f': 'fear', 'h': 'happy',
        'n': 'neutral', 'sa': 'sad'
    }
    
    for file in os.listdir(data_path):
        if file.endswith(".wav"):
            part = file.split('_')[1]
            ele = part[:-6]
            emotion = emotion_map.get(ele)
            if emotion in valid_emotions:
                file_path.append(os.path.join(data_path, file))
                file_emotion.append(emotion)
    
    return pd.DataFrame({'path': file_path, 'emotion': file_emotion})

def create_tess_df(data_path):
    file_emotion = []
    file_path = []
    valid_emotions = ['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust']
    
    for dir in os.listdir(data_path):
        dir_path = os.path.join(data_path, dir)
        if os.path.isdir(dir_path):
            for file in os.listdir(dir_path):
                if file.endswith(".wav"):
                    emotion = file.split('.')[0].split('_')[2]
                    if emotion.lower() in valid_emotions:
                        file_emotion.append(emotion.lower())
                        file_path.append(os.path.join(dir_path, file))
    
    return pd.DataFrame({'path': file_path, 'emotion': file_emotion})

def create_telugu_df(data_path):
    file_emotion = []
    file_path = []
    valid_emotions = ['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust']
    
    for emotion in os.listdir(data_path):
        if emotion.lower() in valid_emotions:
            emotion_path = os.path.join(data_path, emotion)
            if os.path.isdir(emotion_path):
                for file in os.listdir(emotion_path):
                    if file.endswith(".wav"):
                        file_path.append(os.path.join(emotion_path, file))
                        file_emotion.append(emotion.lower())
    
    df = pd.DataFrame({
        'path': file_path,
        'emotion': file_emotion
    })
    print(f"Telugu dataset loaded: {len(df)} files")
    print(f"Emotion distribution:\n{df['emotion'].value_counts()}")
    return df

In [4]:
def load_all_datasets(ravdess_path, crema_path, savee_path, tess_path, telugu_path):
    print("Loading datasets...")
    
    dfs = []
    try:
        ravdess_df = create_ravdess_df(ravdess_path)
        dfs.append(ravdess_df)
    except Exception as e:
        print(f"Error loading RAVDESS dataset: {str(e)}")
    
    try:
        crema_df = create_crema_df(crema_path)
        dfs.append(crema_df)
    except Exception as e:
        print(f"Error loading CREMA dataset: {str(e)}")
    
    try:
        savee_df = create_savee_df(savee_path)
        dfs.append(savee_df)
    except Exception as e:
        print(f"Error loading SAVEE dataset: {str(e)}")
    
    try:
        tess_df = create_tess_df(tess_path)
        dfs.append(tess_df)
    except Exception as e:
        print(f"Error loading TESS dataset: {str(e)}")
    
    try:
        telugu_df = create_telugu_df(telugu_path)
        dfs.append(telugu_df)
    except Exception as e:
        print(f"Error loading Telugu dataset: {str(e)}")
    
    combined_df = pd.concat(dfs, axis=0, ignore_index=True)
    print("\nDataset Statistics:")
    print(f"Total files: {len(combined_df)}")
    print("\nEmotion distribution:")
    emotion_counts = combined_df['emotion'].value_counts()
    print(emotion_counts)
    
    return combined_df

In [5]:
def segment_audio(y, sr, segment_length=3, overlap=0.5):
    """
    Segment audio into fixed-length chunks with overlap
    segment_length: in seconds
    overlap: percentage of overlap between segments (0-1)
    """
    # Calculate segment size in samples
    segment_samples = int(segment_length * sr)
    hop_samples = int(segment_samples * (1 - overlap))
    
    # Create segments
    segments = []
    for i in range(0, len(y) - segment_samples + 1, hop_samples):
        segment = y[i:i + segment_samples]
        if len(segment) == segment_samples:  # Only keep complete segments
            segments.append(segment)
    
    # Handle last segment if needed
    if len(segments) == 0:
        # If audio is shorter than segment length, pad with zeros
        segment = np.zeros(segment_samples)
        segment[:len(y)] = y
        segments.append(segment)
        
    return segments

In [6]:
def extract_features(file_path, target_sr=44100, segment_length=3, overlap=0.5):
    try:
        # Load audio
        y, sr = librosa.load(file_path, sr=None)
        if sr != target_sr:
            y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
        
        # Segment audio
        segments = segment_audio(y, target_sr, segment_length, overlap)
        
        all_features = []
        for segment in segments:
            # Extract features for each segment
            window_size = int(0.025 * target_sr)
            hop_length = int(0.010 * target_sr)
            
            # Calculate mel spectrogram
            mel_spec = librosa.feature.melspectrogram(
                y=segment, sr=target_sr,
                n_fft=window_size,
                hop_length=hop_length,
                n_mels=128
            )
            mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
            
            # Calculate additional features for different channels
            chroma = librosa.feature.chroma_stft(y=segment, sr=target_sr, n_fft=window_size, hop_length=hop_length)
            spectral_contrast = librosa.feature.spectral_contrast(y=segment, sr=target_sr, n_fft=window_size, hop_length=hop_length)
            
            # Normalize each feature independently
            scaler = StandardScaler()
            mel_spec_db = scaler.fit_transform(mel_spec_db)
            chroma = scaler.fit_transform(chroma)
            spectral_contrast = scaler.fit_transform(spectral_contrast)
            
            # Resize additional features to match mel spectrogram dimensions
            chroma_resized = np.resize(chroma, mel_spec_db.shape)
            spectral_contrast_resized = np.resize(spectral_contrast, mel_spec_db.shape)
            
            # Stack features as RGB channels
            features = np.stack([
                mel_spec_db,          # Red channel: Mel spectrogram
                chroma_resized,       # Green channel: Chromagram
                spectral_contrast_resized  # Blue channel: Spectral contrast
            ], axis=0)
            
            all_features.append(features)
        
        # Average features across segments
        averaged_features = np.mean(all_features, axis=0) if len(all_features) > 1 else all_features[0]
        return averaged_features

    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")
        return None

In [7]:
class EmotionDataset(Dataset):
    def __init__(self, df, transform=None, features_cache=None):
        self.df = df
        self.transform = transform
        self.features_cache = features_cache if features_cache is not None else {}
        self.emotion_to_idx = {
            emotion: idx for idx, emotion in 
            enumerate(['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust'])
        }
        
        # Print shape of first item in dataset
        if len(self.features_cache) > 0:
            first_key = list(self.features_cache.keys())[0]
            print(f"\nDataset feature shape: {self.features_cache[first_key].shape}")
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        features = self.features_cache[idx]
        emotion = self.df.iloc[idx]['emotion']
        emotion_idx = self.emotion_to_idx[emotion]
        return torch.FloatTensor(features), emotion_idx

In [8]:
def extract_and_save_features(df, output_path, target_sr=22050, segment_length=3, overlap=0.5):
    """
    Extract features from audio files and save them as CSV and dictionary
    """
    features_cache = {}
    features_list = []
    labels = []

    for idx in tqdm(range(len(df))):
        features = extract_features(df.iloc[idx]['path'], target_sr=target_sr, segment_length=segment_length, overlap=overlap)
        if features is not None:
            features_cache[idx] = features
            features_flat = features.reshape(-1)
            features_list.append(features_flat)
            labels.append(df.iloc[idx]['emotion'])
    
    # Create feature column names
    feature_cols = [f'feature_{i}' for i in range(features_list[0].shape[0])]
    
    # Create DataFrame
    features_df = pd.DataFrame(features_list, columns=feature_cols)
    features_df['emotion'] = labels
    
    # Save to CSV
    print(f"Features saved to {output_path}")

    print(f"Features extracted.")
    return features_cache

In [9]:
def create_data_loaders(train_df, val_df, test_df, train_features_cache, val_features_cache, test_features_cache, batch_size=32):
    """
    Create DataLoader objects for training, validation, and test sets
    """
    train_dataset = EmotionDataset(train_df, features_cache = train_features_cache)
    val_dataset = EmotionDataset(val_df, features_cache = val_features_cache)
    test_dataset = EmotionDataset(test_df, features_cache = test_features_cache)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    return train_loader, val_loader, test_loader

In [10]:
def evaluate_model(model, test_loader, criterion, device, emotion_labels):
    """
    Evaluate model performance on test set
    """
    model.eval()
    test_loss = 0.0
    test_correct = 0
    test_total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc='Testing'):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            test_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    test_loss = test_loss / len(test_loader)
    test_acc = 100 * test_correct / test_total
    
    return test_loss, test_acc, all_preds, all_labels

In [11]:
def plot_training_history(history):
    """
    Plot training and validation metrics
    """
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Val Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['val_acc'], label='Val Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(true_labels, pred_labels, class_names):
    """
    Plot confusion matrix
    """
    cm = confusion_matrix(true_labels, pred_labels)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

def plot_predicted_vs_actual(true_labels, pred_labels, class_names):
    """
    Scatter plot of actual labels vs predicted labels.
    
    Parameters:
        true_labels (array-like): True labels.
        pred_labels (array-like): Predicted labels.
        class_names (list): List of class names.
    """
    plt.figure(figsize=(12, 6))
    plt.scatter(range(len(true_labels)), true_labels, label='Actual Labels', marker='o', color='blue', alpha=0.7)
    plt.scatter(range(len(pred_labels)), pred_labels, label='Predicted Labels', marker='x', color='red', alpha=0.7)

    plt.xlabel('Sample Index')
    plt.ylabel('Emotion Index')
    plt.title('Actual vs. Predicted Emotion Labels')
    plt.yticks(range(len(class_names)), class_names)
    plt.legend()
    plt.grid(axis='y')
    plt.tight_layout()
    plt.show()


In [12]:
class DeepEmotionCNN(nn.Module):
    def __init__(self, num_classes=6):
        super(DeepEmotionCNN, self).__init__()
        
        # Modified first conv layer to accept 3 channels
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),  # Changed from 1 to 3 channels
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.2)
        )
        
        # Rest of the model remains the same
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.2)
        )
        
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.3)
        )
        
        self.conv_block4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.4)
        )
        
        self.attention = nn.Sequential(
            nn.Conv2d(512, 1, kernel_size=1),
            nn.Sigmoid()
        )
        
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        self.fc1 = nn.Linear(512, 1024)
        self.fc1_bn = nn.BatchNorm1d(1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc2_bn = nn.BatchNorm1d(512)
        self.fc3 = nn.Linear(512, 256)
        self.fc3_bn = nn.BatchNorm1d(256)
        self.fc4 = nn.Linear(256, num_classes)
        
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()

    def forward(self, x):
        # Forward pass remains the same
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        
        attention_weights = self.attention(x)
        x = x * attention_weights
        
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        
        identity = x
        x = self.fc1(x)
        x = self.fc1_bn(x)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.fc2_bn(x)
        x = self.relu(x + identity)
        x = self.dropout(x)
        
        x = self.fc3(x)
        x = self.fc3_bn(x)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.fc4(x)
        
        return x

In [13]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    """
    Train the model
    """
    # Print shape of first batch
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        print(f"\nInput shape before feeding to model: {inputs.shape}")
        print(f"Number of classes in batch: {len(torch.unique(labels))}")
        break
        
    history = {
        'train_loss': [],
        'val_loss': [],
        'train_acc': [],
        'val_acc': []
    }
    
    best_val_acc = 0
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5)

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        correct = 0
        total = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
        
        train_loss = train_loss/len(train_loader)
        train_acc = 100. * correct / total

        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
        
        val_acc = 100. * correct / total
        val_loss = val_loss/len(val_loader)
        print(f'Epoch {epoch+1}: Train Loss: {train_loss:.3f}, Val Loss: {val_loss:.3f}, Train Acc: {train_acc:.2f}%, Val Acc: {val_acc:.2f}%')

        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['train_acc'].append(train_acc)
        history['val_acc'].append(val_acc)

        scheduler.step(val_acc)

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
            }, 'best_model.pth')
    
    return history

In [14]:
def load_cached_features(path):
    features_df = pd.read_csv(path)
    features_cache = {}
    
    # Determine expected feature dimension by looking at the first row
    first_row = features_df.iloc[0].drop('emotion')
    feature_size = len(first_row)
    
    # Calculate the original shape
    original_shape = (256, 13)  # From the code we know the original shape is 256x13. 
    
    if feature_size != np.prod(original_shape):
      print(f"Error! The size of the extracted feature is not compatible. Expecting {np.prod(original_shape)} features but found {feature_size}")
      return None # Or return empty dict or raise exception if needed

    
    for idx, row in features_df.iterrows():
      features = row.drop('emotion').values.astype(float)
      features = features.reshape(original_shape) # Reshape to original shape here.
      features_cache[idx] = features

    return features_cache

In [15]:
def main(re_extract_features=True):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Dataset paths
    RAVDESS_PATH = "/kaggle/input/ravdess-emotional-speech-audio/audio_speech_actors_01-24/"
    CREMA_PATH = "/kaggle/input/cremad/AudioWAV/"
    SAVEE_PATH = "/kaggle/input/surrey-audiovisual-expressed-emotion-savee/ALL"
    TESS_PATH = "/kaggle/input/toronto-emotional-speech-set-tess/TESS Toronto emotional speech set data"
    TELUGU_PATH = "/kaggle/input/speech-dataset/drive-download-20241227T163053Z-001"
    
    OUTPUT_PATH = "output_folder" # Directory where to save outputs
    os.makedirs(OUTPUT_PATH, exist_ok=True)
    
    # Load datasets
    combined_df = load_all_datasets(RAVDESS_PATH, CREMA_PATH, SAVEE_PATH, TESS_PATH, TELUGU_PATH)
    
    # Split data
    train_df, temp_df = train_test_split(combined_df, test_size=0.3, stratify=combined_df['emotion'], random_state=42)
    val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['emotion'], random_state=42)
    
    # Extract features and cache them, also save them as csv files
    if re_extract_features: #Re-extract features when `re_extract_features` parameter is True
        train_features_cache = extract_and_save_features(train_df, os.path.join(OUTPUT_PATH, "train_features.csv"))
        val_features_cache = extract_and_save_features(val_df, os.path.join(OUTPUT_PATH, "val_features.csv"))
        test_features_cache = extract_and_save_features(test_df, os.path.join(OUTPUT_PATH, "test_features.csv"))
    else:
        train_features_cache = load_cached_features(os.path.join(OUTPUT_PATH, "train_features.csv"))
        val_features_cache = load_cached_features(os.path.join(OUTPUT_PATH, "val_features.csv"))
        test_features_cache = load_cached_features(os.path.join(OUTPUT_PATH, "test_features.csv"))
   
    if train_features_cache is None or val_features_cache is None or test_features_cache is None:
      print("Error loading cached features. Please re-extract features.")
      return
    
    # Create datasets and loaders
    train_loader, val_loader, test_loader = create_data_loaders(
        train_df, val_df, test_df, train_features_cache, val_features_cache, test_features_cache, batch_size=32
    )
    
   # Initialize model
    model = DeepEmotionCNN().to(device)
    
    #Calculate and print class weights
    class_counts = combined_df['emotion'].value_counts()
    total_samples = len(combined_df)
    
    # Calculate class weights based on frequencies
    class_weights = {emotion: total_samples / (len(class_counts) * count) 
                    for emotion, count in class_counts.items()}
    print("\nClass weights:")
    for emotion, weight in class_weights.items():
        print(f"{emotion}: {weight:.3f}")
        
    # Sort class weights by label names
    class_weights_sorted = [class_weights[emotion] for emotion in sorted(class_weights.keys())]

    # Convert class weights to tensor
    class_weights_tensor = torch.FloatTensor(class_weights_sorted).to(device)
    

    # Define Loss and Optimizer
    criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training parameters
    num_epochs = 50
    
    #Train model
    history = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device)
    
    #Plot training history
    plot_training_history(history)
    
    # Load best model for evaluation
    checkpoint = torch.load('best_model.pth')
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # Evaluate on test set
    emotions = ['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust']
    test_loss, test_acc, all_preds, all_labels = evaluate_model(
        model, test_loader, criterion, device, emotions
    )
    print(f'\nTest accuracy: {test_acc:.2f}%')
    
    # Plot confusion matrix
    plot_confusion_matrix(all_labels, all_preds, emotions)

    # Plot predicted vs. actual labels
    plot_predicted_vs_actual(all_labels, all_preds, emotions)
    
    # Save final results
    results = {
        'test_accuracy': test_acc,
        'confusion_matrix': confusion_matrix(all_labels, all_preds).tolist(),
        'best_val_accuracy': checkpoint['val_acc']
    }
    
    with open(os.path.join(OUTPUT_PATH, 'results.json'), 'w') as f:
        json.dump(results, f)
    
    # Save final model
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'history': history,
        'test_accuracy': test_acc
    }, os.path.join(OUTPUT_PATH, 'final_model.pth'))

In [None]:
if __name__ == "__main__":
    main()

Loading datasets...
Telugu dataset loaded: 168 files
Emotion distribution:
emotion
neutral    42
sad        31
disgust    29
angry      27
happy      23
fear       16
Name: count, dtype: int64

Dataset Statistics:
Total files: 11486

Emotion distribution:
emotion
sad        1954
disgust    1952
angry      1950
happy      1946
fear       1939
neutral    1745
Name: count, dtype: int64


100%|██████████| 8040/8040 [08:08<00:00, 16.45it/s]


In [17]:
def predict_emotion(audio_path, model_path, device, target_sr=22050):
    """
    Predict emotion from a given audio file using the trained model.

    Args:
        audio_path (str): Path to the input audio file.
        model_path (str): Path to the trained model's checkpoint file (.pth).
        device (torch.device): Device to use (CPU or GPU).
        target_sr (int, optional): Target sample rate for audio resampling. Defaults to 22050.

    Returns:
        dict: Predicted emotion label and emotion probabilities.
    """
    
    # Load the trained model
    checkpoint = torch.load(model_path, map_location=device)
    model = DeepEmotionCNN().to(device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()  # Set the model to evaluation mode

    # Extract features from the new audio file
    features = extract_features(audio_path, target_sr=target_sr)
    
    if features is None:
      print("Error in feature extraction. Returning None")
      return None

    # Convert to tensor and move to the device
    features = torch.FloatTensor(features).to(device)
    
    # Add a batch dimension (as your model expects batches of data)
    features = features.unsqueeze(0)  # Now shape [1, 1, height, width]

    # Perform prediction
    with torch.no_grad():
        output = model(features)
        probabilities = torch.softmax(output, dim=1).cpu().numpy() # Get the probabilities of each class
        _, predicted_idx = torch.max(output, 1)
        
    # Convert predicted class index to emotion label
    emotion_labels = ['happy', 'sad', 'fear', 'angry', 'neutral', 'disgust']
    predicted_emotion = emotion_labels[predicted_idx.item()]

    return {"predicted_emotion": predicted_emotion,
            "emotion_probabilities": dict(zip(emotion_labels, probabilities[0]))}

In [19]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_path = "/kaggle/working/best_model.pth"  # Replace with the path to your trained model
audio_path = "/kaggle/input/custom-dataset/18balayyapowerfuldialoguesimhamovienbkshor.mp3" # Replace with the path to the audio you want to classify

prediction_result = predict_emotion(audio_path, model_path, device)

if prediction_result:
   print(f"Predicted emotion: {prediction_result['predicted_emotion']}")
   print("Emotion probabilities:")
   for emotion, prob in prediction_result["emotion_probabilities"].items():
      print(f"   {emotion}: {prob:.4f}")
else:
    print("Prediction failed.")

  checkpoint = torch.load(model_path, map_location=device)



Feature extraction output shape: (1, 141, 301)
Predicted emotion: disgust
Emotion probabilities:
   happy: 0.0001
   sad: 0.0166
   fear: 0.0004
   angry: 0.2296
   neutral: 0.0614
   disgust: 0.6919
