In [None]:
import torch
# Check CUDA availability and compatibility
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"CUDA version: {torch.version.cuda}")
print(f"GPU name: {torch.cuda.get_device_name(0)}")


In [None]:
print(torch.cuda.get_device_capability())  # Should show (7,5) for T4


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel, AutoTokenizer, AutoFeatureExtractor
import torchaudio
import torchvision
import numpy as np
import os
import cv2
from PIL import Image
import librosa
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")

# Check for CUDA availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ==================== TEXT PROCESSING MODULE ====================
class TextEncoder(nn.Module):
    def __init__(self, pretrained_model="distilbert-base-uncased", hidden_size=768):
        super(TextEncoder, self).__init__()
        self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model)
        self.model = AutoModel.from_pretrained(pretrained_model).to(device)

        # ❄️ Freeze BERT
        for param in self.model.parameters():
            param.requires_grad = False
            
        self.fc = nn.Linear(hidden_size, 256).to(device)
        
    def forward(self, texts):
        # Tokenize texts
        inputs = self.tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=128)
        inputs = {key: val.to(device) for key, val in inputs.items()}
        
        # Get text embeddings
        outputs = self.model(**inputs)
        text_features = outputs.last_hidden_state[:, 0, :]  # [CLS] token embedding
        text_features = self.fc(text_features)
        return text_features



In [None]:
# ==================== AUDIO ENCODER USING HUBERT ====================
import torchaudio
from torchaudio.pipelines import HUBERT_BASE
from torchaudio.transforms import Resample

class AudioEncoder(nn.Module):
    def __init__(self, input_dim=128, hidden_dim=256):  # input_dim kept for compatibility
        super(AudioEncoder, self).__init__()

        # Load HuBERT base model from torchaudio
        self.hubert_bundle = HUBERT_BASE
        self.hubert = self.hubert_bundle.get_model().to(device)

        # Freeze HuBERT parameters (can be unfrozen later for fine-tuning)
        for param in self.hubert.parameters():
            param.requires_grad = False

        # Projection layer: HuBERT output (768-dim) -> hidden_dim
        self.project = nn.Sequential(
            nn.Linear(self.hubert_bundle._params['encoder_embed_dim'], 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, hidden_dim)
        )

        self.to(device)

    def forward(self, waveforms):
        """
        Args:
            waveforms: Tensor [B, T] or [B, 1, T] (mono)
        Returns:
            Tensor: [B, hidden_dim]
        """
        # Ensure waveforms are [B, T]
        if waveforms.dim() == 3 and waveforms.shape[1] == 1:
            waveforms = waveforms.squeeze(1)

        with torch.no_grad():
            features, _ = self.hubert(waveforms)  # [B, T', 768]
            pooled = features.mean(dim=1)         # [B, 768]

        return self.project(pooled)               # [B, hidden_dim]

    @staticmethod
    def extract_spectrogram(audio_path, target_sr=16000, fixed_len=16000):
        """
        Preprocess .wav audio file into fixed-length waveform tensor.
        Args:
            audio_path (str): Path to a .wav file
            target_sr (int): Target sampling rate
            fixed_len (int): Desired number of samples (default: 16000 = 1 second)
        Returns:
            waveform (Tensor): [1, fixed_len], float32, 16kHz mono
        """
        waveform, sr = torchaudio.load(audio_path)
    
        # Mono conversion
        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)
    
        # Resample to 16kHz
        if sr != target_sr:
            resample = Resample(orig_freq=sr, new_freq=target_sr)
            waveform = resample(waveform)
    
        # Pad or truncate to fixed_len
        num_samples = waveform.shape[1]
        if num_samples < fixed_len:
            pad_size = fixed_len - num_samples
            waveform = F.pad(waveform, (0, pad_size))
        elif num_samples > fixed_len:
            waveform = waveform[:, :fixed_len]
    
        return waveform.to(device)


In [None]:
print(torch.cuda.is_available())  # Should return True
print(torch.cuda.get_device_name(0))  # Should show 'Tesla T4'


In [None]:
print(device)

In [None]:
import timm
import torch.nn as nn
import torch.nn.functional as F

import timm
import torch.nn as nn

class VideoEncoder(nn.Module):
    def __init__(self, hidden_dim=256):
        super(VideoEncoder, self).__init__()
        
        # Load pretrained Xception backbone (no classifier)
        self.backbone = timm.create_model('xception', pretrained=True, num_classes=0).to(device)
        
        # Freeze all backbone parameters
        for param in self.backbone.parameters():
            param.requires_grad = False

        # Projection layer
        self.fc = nn.Linear(2048, hidden_dim)
        self.to(device)

    def forward(self, x):
        if x.dim() == 5:
            x = x.squeeze(1)  # [B, 1, 3, H, W] → [B, 3, H, W]
        features = self.backbone(x)           # [B, 2048]
        return self.fc(features)              # [B, hidden_dim]
    
    @staticmethod
    def extract_face_features(video_file, num_frames=16):
        """Extract facial features from video frames"""
        # Initialize face detector
        face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        
        # Open video file
        cap = cv2.VideoCapture(video_file)
        
        # Get video properties
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        # Calculate frame indices to extract
        indices = np.linspace(0, frame_count-1, num_frames, dtype=int)
        
        # Initialize tensor to store face frames
        face_frames = torch.zeros((num_frames, 3, 224, 224), device=device)

        
        for i, idx in enumerate(indices):
            # Set frame position
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            
            # Read frame
            ret, frame = cap.read()
            if not ret:
                continue
            
            # Convert to grayscale for face detection
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            
            # Detect faces
            faces = face_cascade.detectMultiScale(gray, 1.1, 4)
            
            if len(faces) > 0:
                # Extract the largest face
                x, y, w, h = max(faces, key=lambda rect: rect[2] * rect[3])
                
                # Expand bounding box slightly
                x = max(0, x - int(0.1 * w))
                y = max(0, y - int(0.1 * h))
                w = min(frame.shape[1] - x, int(1.2 * w))
                h = min(frame.shape[0] - y, int(1.2 * h))
                
                # Extract face
                face = frame[y:y+h, x:x+w]
                
                # Resize to 224x224
                face = cv2.resize(face, (224, 224))
                
                # Convert to RGB
                face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
                
                # Convert to tensor
                face_tensor = torchvision.transforms.ToTensor()(face)
                
                # Normalize
                face_tensor = torchvision.transforms.Normalize(
                    mean=[0.485, 0.456, 0.406],
                    std=[0.229, 0.224, 0.225]
                )(face_tensor).to(device)
                
                # Store tensor
                face_frames[i] = face_tensor
        
        # Release video capture
        cap.release()
        
        # Return the mean face features across frames with correct shape
        # This will ensure shape is [1, 3, 224, 224]
        return face_frames.mean(dim=0).unsqueeze(0)


In [None]:

# ==================== ATTENTION FUSION MODULE ====================
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=256, num_heads=4):
        super(MultiHeadAttention, self).__init__()
        self.attention = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads)
        
    def forward(self, query, key, value):
        # For PyTorch's MultiheadAttention, inputs should be: [seq_len, batch_size, embed_dim]
        # Make sure all inputs are correctly shaped
        if query.dim() == 2:
            query = query.unsqueeze(0)  # [1, batch_size, embed_dim]
        else:
            query = query.transpose(0, 1)  # [seq_len, batch_size, embed_dim]
            
        if key.dim() == 2:
            key = key.unsqueeze(0)  # [1, batch_size, embed_dim]
        elif key.dim() == 3:
            key = key.transpose(0, 1)  # [seq_len, batch_size, embed_dim]
        
        if value.dim() == 2:
            value = value.unsqueeze(0)  # [1, batch_size, embed_dim]
        elif value.dim() == 3:
            value = value.transpose(0, 1)  # [seq_len, batch_size, embed_dim]


        query = query.to(self.device)
        key = key.to(self.device)
        value = value.to(self.device)
        
        # Apply multihead attention
        attn_output, _ = self.attention(query, key, value)
        
        # Return to original shape: [batch_size, embed_dim]
        return attn_output.transpose(0, 1).squeeze(0)

class AttentionFusion(nn.Module):
    def __init__(self, hidden_dim=256, device='cuda'):
        super().__init__()
        self.device = device
        self.mha = MultiHeadAttention(d_model=hidden_dim).to(self.device)
        
    def forward(self, text_features, audio_features, video_features):
        # Create sequence for key and value (3 Ã— [batch_size, feature_dim])
        # -> [3, batch_size, feature_dim]
        features = torch.stack([text_features.to(self.device), audio_features.to(self.device), video_features.to(self.device)], dim=0)        
        # Apply attention to each feature vector as query
        # We need to ensure features tensor is correctly shaped for the attention mechanism
        text_attn = self.mha(text_features, features, features)
        audio_attn = self.mha(audio_features, features, features)
        video_attn = self.mha(video_features, features, features)
        
        # Combine attended features
        fused_features = (text_attn + audio_attn + video_attn) / 3
        
        return fused_features

In [None]:
class SimplerAttentionFusion(nn.Module):
    def __init__(self, hidden_dim=256):
        super(SimplerAttentionFusion, self).__init__()
        self.attention_weights = nn.Linear(hidden_dim, 3)
        
    def forward(self, text_features, audio_features, video_features):
        # Stack features along a new dimension
        features = torch.stack([text_features, audio_features, video_features], dim=1)  # [batch_size, 3, hidden_dim]
        
        # Calculate attention weights (simplified attention)
        batch_size = features.size(0)
        
        # Use the mean of all features as a query
        query = features.mean(dim=1)  # [batch_size, hidden_dim]
        
        # Calculate attention scores
        attention_scores = self.attention_weights(query)  # [batch_size, 3]
        attention_weights = F.softmax(attention_scores, dim=1).unsqueeze(2)  # [batch_size, 3, 1]
        
        # Apply attention weights
        weighted_features = features * attention_weights  # [batch_size, 3, hidden_dim]
        
        # Sum over the modalities
        fused_features = weighted_features.sum(dim=1)  # [batch_size, hidden_dim]
        
        return fused_features

In [None]:
class TransformerFusion(nn.Module):
    def __init__(self, hidden_dim=256, num_layers=2, num_heads=4):
        super(TransformerFusion, self).__init__()
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, 
            nhead=num_heads, 
            dim_feedforward=hidden_dim*2, 
            dropout=0.1, 
            activation='relu',
            batch_first=True
        )
        
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.modality_embeddings = nn.Parameter(torch.randn(3, hidden_dim))  # For modality type info

    def forward(self, text_features, audio_features, video_features):
        batch_size = text_features.size(0)
        
        # Stack features with modality tokens
        features = torch.stack([text_features, audio_features, video_features], dim=1)  # [B, 3, D]
        
        # Add modality embeddings
        features = features + self.modality_embeddings.unsqueeze(0)  # [B, 3, D]
        
        # Pass through Transformer Encoder
        fused = self.transformer_encoder(features)  # [B, 3, D]
        
        # Pooling: mean over modalities
        fused_output = fused.mean(dim=1)  # [B, D]
        
        return fused_output

In [None]:
# ==================== FULL MODEL ====================
class MultimodalEmotionRecognition(nn.Module):
    def __init__(self, hidden_dim=256):
        super(MultimodalEmotionRecognition, self).__init__()
        
        # Encoders for each modality
        self.text_encoder = TextEncoder(hidden_size=768, pretrained_model="distilbert-base-uncased")
        self.audio_encoder = AudioEncoder(hidden_dim=hidden_dim)
        self.video_encoder = VideoEncoder(hidden_dim=hidden_dim)

         # Use the simpler attention fusion module
        #self.fusion = SimplerAttentionFusion(hidden_dim=hidden_dim)
        self.fusion = TransformerFusion(hidden_dim=hidden_dim)
        
        # Final classification layer
        self.fc = nn.Linear(hidden_dim, 1)  # Binary classification for surprise
        
    def forward(self, texts, audio_specs, video_frames):
        # Fix video input dimensions if needed
        if video_frames.dim() == 5:  # [batch_size, 1, 3, height, width]
            video_frames = video_frames.squeeze(1)
            
        # Encode each modality
        text_features = self.text_encoder(texts)
        audio_features = self.audio_encoder(audio_specs)
        video_features = self.video_encoder(video_frames)
        
        # Fuse features using attention
        fused_features = self.fusion(text_features, audio_features, video_features)
        
        # Classification
        output = self.fc(fused_features)
        output = torch.sigmoid(output)  # Probability of surprise emotion
        
        # Ensure output maintains proper dimensions for batch size 1
        batch_size = text_features.size(0)
        output = output.view(batch_size)  # Reshape to [batch_size]
        
        return output

In [None]:
# ==================== DATASET CLASS ====================
class MultimodalEmotionDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.samples = self._load_samples()
        
    def _load_samples(self):
        """Load sample paths and labels from the data directory"""
        samples = []
        
        # Assuming directory structure:
        # data_dir/
        #   â”œâ”€â”€ sample1/
        #   â”‚   â”œâ”€â”€ text.txt
        #   â”‚   â”œâ”€â”€ audio.wav
        #   â”‚   â”œâ”€â”€ video.mp4
        #   â”‚   â””â”€â”€ label.txt
        #   â””â”€â”€ sample2/
        #       â”œâ”€â”€ ...
        
        for sample_dir in os.listdir(self.data_dir):
            sample_path = os.path.join(self.data_dir, sample_dir)
            if os.path.isdir(sample_path):
                text_path = os.path.join(sample_path, "text.txt")
                audio_path = os.path.join(sample_path, "audio.wav")
                video_path = os.path.join(sample_path, "video.mp4")
                label_path = os.path.join(sample_path, "label.txt")
                
                if os.path.exists(text_path) and os.path.exists(audio_path) and \
                   os.path.exists(video_path) and os.path.exists(label_path):
                    with open(text_path, 'r') as f:
                        text = f.read().strip()
                    
                    with open(label_path, 'r') as f:
                        # Assuming the label file contains 1 for surprise, 0 for no surprise
                        label = int(f.read().strip())
                    
                    samples.append({
                        "text": text,
                        "audio_path": audio_path,
                        "video_path": video_path,
                        "label": label
                    })
        
        return samples
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        
        # Get text
        text = sample["text"]
        
        # Extract audio features
        audio_spec = AudioEncoder.extract_spectrogram(sample["audio_path"])
        
        # Extract video features
        video_features = VideoEncoder.extract_face_features(sample["video_path"])

        # Ensure video features have correct shape [1, 3, 224, 224]
        if video_features.dim() > 4:
            video_features = video_features.squeeze(1)
        
        # Get label
        label = torch.tensor(sample["label"], dtype=torch.float32)
        
        return text, audio_spec, video_features, label

In [None]:
print(device)

In [None]:
import time  # Added import at the top

def train_model(model, train_loader, val_loader, num_epochs=10):
    # Timing setup

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)  # Move model to GPU

    total_start = time.time()
    
    # Loss function and optimizer
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)
    
    # Training loop
    best_val_loss = float('inf')
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        epoch_start = time.time()  # Start epoch timer
        
        # Training phase
        model.train()
        train_loss = 0.0
        
        for texts, audio_specs, video_frames, labels in train_loader:
            # Move data to device
            # texts = texts.to(device)
            audio_specs = audio_specs.to(device)
            video_frames = video_frames.to(device)
            labels = labels.to(device)
            
            # Forward pass
            optimizer.zero_grad()
            outputs = model(texts, audio_specs, video_frames)
            
            # Ensure dimensions match
            if outputs.dim() == 0:
                outputs = outputs.unsqueeze(0)
            if outputs.dim() > 1:
                outputs = outputs.view(labels.size())
                
            loss = criterion(outputs, labels)
            
            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        train_loss /= len(train_loader)
        train_losses.append(train_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for texts, audio_specs, video_frames, labels in val_loader:
                # texts = texts.to(device)
                audio_specs = audio_specs.to(device)
                video_frames = video_frames.to(device)
                labels = labels.to(device)
                
                outputs = model(texts, audio_specs, video_frames)
                
                if outputs.dim() == 0:
                    outputs = outputs.unsqueeze(0)
                if outputs.dim() > 1:
                    outputs = outputs.view(labels.size())
                
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                predictions = (outputs > 0.5).float()
                total += labels.size(0)
                correct += (predictions == labels).sum().item()
        
        val_loss /= len(val_loader)
        val_accuracy = correct / total
        val_losses.append(val_loss)
        
        # Update learning rate
        scheduler.step(val_loss)
        
        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
        
        # Print epoch statistics with timing
        epoch_time = time.time() - epoch_start
        print(f'Epoch {epoch+1}/{num_epochs} completed in {time.strftime("%H:%M:%S", time.gmtime(epoch_time))}\n'
              f'  Train Loss: {train_loss:.4f}, '
              f'Val Loss: {val_loss:.4f}, '
              f'Val Accuracy: {val_accuracy:.4f}\n')
    
    # Final timing and output
    total_time = time.time() - total_start
    print(f'\n🔥 Total training completed in {time.strftime("%H:%M:%S", time.gmtime(total_time))}')
    
    # Plot training curve
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Validation Loss')
    plt.savefig('training_curve.png')
    
    return model


In [None]:
print(device)

In [None]:
def predict_emotion(model, text, audio_path, video_path):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)  # Move model to GPU before prediction
    model.eval()
    
    with torch.no_grad():
        try:
            # Preprocess inputs
            audio_spec = AudioEncoder.extract_spectrogram(audio_path).to(device)
            video_features = VideoEncoder.extract_face_features(video_path).to(device)
            
            # Ensure proper batch dimension
            if audio_spec.dim() == 3:  # [channel, height, width]
                audio_spec = audio_spec.unsqueeze(0)  # Add batch dimension -> [batch_size, channel, height, width]
                
            if video_features.dim() == 3:  # [channel, height, width]
                video_features = video_features.unsqueeze(0)  # Add batch dimension
            
            # Forward pass
            output = model([text], audio_spec, video_features)
            
            # Get prediction
            if output.dim() == 0:  # If scalar
                probability = output.item()
            else:
                probability = output.squeeze().item()
                
            prediction = "Surprise" if probability > 0.5 else "Not Surprise"
            
            return prediction, probability
            
        except Exception as e:
            print(f"Error during prediction: {e}")
            print(f"Audio spec shape: {audio_spec.shape}")
            print(f"Video features shape: {video_features.shape}")
            raise e

In [None]:
import torch
print(f"CUDA available: {torch.cuda.is_available()}")  # Should return True[4][5]
print(f"Detected GPU: {torch.cuda.get_device_name(0)}")  # Should show "Tesla T4"[4]


In [None]:
# ==================== MAIN CODE ====================
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Create model
model = MultimodalEmotionRecognition()
model = model.to(device)

# Example usage:


def collate_fn(batch):
    return {k: v.to(device) for k, v in batch.items()}


print(f"Model on GPU: {next(model.parameters()).is_cuda}")
    
    
    # After model creation
print("Model device check:")
print(f"Text encoder: {next(model.text_encoder.parameters()).device}")
print(f"Audio encoder: {next(model.audio_encoder.parameters()).device}")
print(f"Video encoder: {next(model.video_encoder.parameters()).device}")

# 1. Training
# Create datasets and dataloaders
train_dataset = MultimodalEmotionDataset('/kaggle/input/d/rishikant24/bbtsurprise-train-small/bbtSurprise_train_small')
val_dataset = MultimodalEmotionDataset('/kaggle/input/d/rishikant24/bbtsurprise-val-small/bbtSurprise_val_small')
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=256)
model = train_model(model, train_loader, val_loader, num_epochs=20)