In [2]:
import numpy as np

import pandas as pd

import torch

import torch.nn as nn

from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split

from sklearn.preprocessing import LabelEncoder

from transformers import BertTokenizer, BertModel

import torch.nn.functional as F

import cv2

from tqdm import tqdm


In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"Using device: {device}")

torch.manual_seed(42)

np.random.seed(42)

if torch.cuda.is_available():

    torch.cuda.manual_seed_all(42)

Using device: cuda


In [4]:
def load_and_preprocess_data():

    # Load CSV files

    train_df = pd.read_csv('/kaggle/input/ml-hackathon-ec-campus-set-1/train.csv', encoding='ISO-8859-1')

    test_df = pd.read_csv('/kaggle/input/ml-hackathon-ec-campus-set-1/test.csv', encoding='ISO-8859-1')

    

    # Add video paths

    train_df['video_path'] = train_df.apply(

        lambda x: f"/kaggle/input/ml-hackathon-ec-campus-set-1/train/dia{x['Dialogue_ID']}_utt{x['Utterance_ID']}.mp4", 

        axis=1

    )

    test_df['video_path'] = test_df.apply(

        lambda x: f"/kaggle/input/ml-hackathon-ec-campus-set-1/test/dia{x['Dialogue_ID']}_utt{x['Utterance_ID']}.mp4", 

        axis=1

    )

    

    # Encode labels

    le = LabelEncoder()

    train_df['sentiment_encoded'] = le.fit_transform(train_df['Sentiment'])

    

    return train_df, test_df, le



# Load the data

train_df, test_df, label_encoder = load_and_preprocess_data()

print("Training samples:", len(train_df))

print("Test samples:", len(test_df))

print("Labels:", label_encoder.classes_)

Training samples: 999
Test samples: 100
Labels: ['negative' 'neutral' 'positive']


In [5]:
class TextProcessor:

    def __init__(self, max_length=128):
        model_path='/kaggle/input/berttttttt/bert_base_uncased_model'
        self.tokenizer = BertTokenizer.from_pretrained(model_path)

        self.max_length = max_length

    

    def process(self, text):

        return self.tokenizer(

            text,

            padding='max_length',

            truncation=True,

            max_length=self.max_length,

            return_tensors='pt'

        )


In [6]:
class VideoProcessor:

    def __init__(self, target_frames=16, target_size=(112, 112)):

        self.target_frames = target_frames

        self.target_size = target_size

    

    def process(self, video_path):

        frames = []

        cap = cv2.VideoCapture(video_path)

        

        # Get total frames

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        

        # Calculate frame indices to sample

        indices = np.linspace(0, total_frames-1, self.target_frames, dtype=int)

        

        for idx in indices:

            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)

            ret, frame = cap.read()

            if ret:

                frame = cv2.resize(frame, self.target_size)

                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                frames.append(frame)

        

        cap.release()

        

        if len(frames) < self.target_frames:

            # Pad with zeros if not enough frames

            padding = [np.zeros_like(frames[0]) for _ in range(self.target_frames - len(frames))]

            frames.extend(padding)

        

        # Normalize and convert to tensor

        frames = np.array(frames) / 255.0

        return torch.FloatTensor(frames).permute(3, 0, 1, 2)  # (C, T, H, W)


In [7]:
class MultimodalDataset(Dataset):

    def __init__(self, df, text_processor, video_processor, is_test=False):

        self.df = df

        self.text_processor = text_processor

        self.video_processor = video_processor

        self.is_test = is_test

    

    def __len__(self):

        return len(self.df)

    

    def __getitem__(self, idx):

        row = self.df.iloc[idx]

        

        # Process text

        text_features = self.text_processor.process(row['Utterance'])

        

        # Process video

        video_features = self.video_processor.process(row['video_path'])

        

        if self.is_test:

            return {

                'text': text_features,

                'video': video_features,

                'id': row['Sr No.']

            }

        

        return {

            'text': text_features,

            'video': video_features,

            'label': torch.tensor(row['sentiment_encoded'], dtype=torch.long)

        }

In [8]:
class TextModel(nn.Module):

    def __init__(self, num_classes):

        super().__init__()

        self.bert = BertModel.from_pretrained('/kaggle/input/berttttttt/bert_base_uncased_model')

        self.classifier = nn.Linear(self.bert.config.hidden_size, num_classes)

    

    def forward(self, input_ids, attention_mask):

        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)

        return self.classifier(outputs.pooler_output)

class VideoModel(nn.Module):

    def __init__(self, num_classes):

        super().__init__()

        self.conv3d = nn.Sequential(

            nn.Conv3d(3, 64, kernel_size=3, padding=1),

            nn.ReLU(),

            nn.MaxPool3d(kernel_size=2),

            nn.Conv3d(64, 128, kernel_size=3, padding=1),

            nn.ReLU(),

            nn.MaxPool3d(kernel_size=2),

        )

        self.fc = nn.Sequential(

            nn.Linear(128 * 4 * 28 * 28, 512),

            nn.ReLU(),

            nn.Linear(512, num_classes)

        )

    

    def forward(self, x):

        x = self.conv3d(x)

        x = x.view(x.size(0), -1)

        return self.fc(x)


In [9]:
class LateFusionModel:

    def __init__(self, text_model, video_model, device, text_weight=0.6):

        self.text_model = text_model.to(device)

        self.video_model = video_model.to(device)

        self.device = device

        self.text_weight = text_weight

        self.video_weight = 1 - text_weight

    

    def predict(self, batch):

        self.text_model.eval()

        self.video_model.eval()

        

        with torch.no_grad():

            # Text predictions

            text_out = self.text_model(

                batch['text']['input_ids'].squeeze(1).to(self.device),

                batch['text']['attention_mask'].squeeze(1).to(self.device)

            )

            text_probs = F.softmax(text_out, dim=1)

            

            # Video predictions

            video_out = self.video_model(batch['video'].to(self.device))

            video_probs = F.softmax(video_out, dim=1)

            

            # Combine predictions

            final_probs = (

                self.text_weight * text_probs + 

                self.video_weight * video_probs

            )

            

            return torch.argmax(final_probs, dim=1)


In [10]:
text_processor = TextProcessor()

video_processor = VideoProcessor()



# Split data

train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)



# Create datasets

train_dataset = MultimodalDataset(train_df, text_processor, video_processor)

val_dataset = MultimodalDataset(val_df, text_processor, video_processor)

test_dataset = MultimodalDataset(test_df, text_processor, video_processor, is_test=True)



# Create dataloaders

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

val_loader = DataLoader(val_dataset, batch_size=8)

test_loader = DataLoader(test_dataset, batch_size=8)



# Initialize models

num_classes = len(label_encoder.classes_)

text_model = TextModel(num_classes).to(device)

video_model = VideoModel(num_classes).to(device)


In [11]:
def train_models(train_loader, val_loader, text_model, video_model, num_epochs=2):

    text_optimizer = torch.optim.AdamW(text_model.parameters(), lr=2e-5)

    video_optimizer = torch.optim.AdamW(video_model.parameters(), lr=1e-4)

    criterion = nn.CrossEntropyLoss()

    

    best_accuracy = 0

    

    for epoch in range(num_epochs):

        text_model.train()

        video_model.train()

        

        # Training loop

        for batch in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):

            # Train text model

            text_optimizer.zero_grad()

            text_out = text_model(

                batch['text']['input_ids'].squeeze(1).to(device),

                batch['text']['attention_mask'].squeeze(1).to(device)

            )

            text_loss = criterion(text_out, batch['label'].to(device))

            text_loss.backward()

            text_optimizer.step()

            

            # Train video model

            video_optimizer.zero_grad()

            video_out = video_model(batch['video'].to(device))

            video_loss = criterion(video_out, batch['label'].to(device))

            video_loss.backward()

            video_optimizer.step()

        

        # Validation

        accuracy = validate_models(val_loader, text_model, video_model)

        if accuracy > best_accuracy:

            best_accuracy = accuracy

            # Save best models

            torch.save(text_model.state_dict(), 'best_text_model.pth')

            torch.save(video_model.state_dict(), 'best_video_model.pth')

        

        print(f'Epoch {epoch+1}/{num_epochs}, Validation Accuracy: {accuracy:.4f}, Best: {best_accuracy:.4f}')


In [12]:
def validate_models(val_loader, text_model, video_model):

    fusion_model = LateFusionModel(text_model, video_model, device)

    correct = 0

    total = 0

    

    with torch.no_grad():

        for batch in val_loader:

            predictions = fusion_model.predict(batch)

            correct += (predictions == batch['label'].to(device)).sum().item()

            total += batch['label'].size(0)

    

    return correct / total

In [13]:
print("Starting training...")

train_models(train_loader, val_loader, text_model, video_model)


Starting training...


Epoch 1/2: 100%|██████████| 100/100 [16:02<00:00,  9.62s/it]


Epoch 1/2, Validation Accuracy: 0.5100, Best: 0.5100


Epoch 2/2: 100%|██████████| 100/100 [15:49<00:00,  9.49s/it]


Epoch 2/2, Validation Accuracy: 0.7050, Best: 0.7050


In [14]:
def generate_predictions():

    # Load best models

    text_model.load_state_dict(torch.load('best_text_model.pth'))

    video_model.load_state_dict(torch.load('best_video_model.pth'))

    

    fusion_model = LateFusionModel(text_model, video_model, device)

    predictions = []

    

    # Generate IDs from 1 to 100

    ids = list(range(1, 101))  # Ensure the range is inclusive of 100

    

    with torch.no_grad():

        for i, batch in enumerate(tqdm(test_loader, desc="Generating predictions")):

            batch_preds = fusion_model.predict(batch)

            predictions.extend(batch_preds.cpu().numpy())

    

    # Create submission file

    submission_df = pd.DataFrame({

        'ID': ids[:len(predictions)],  # Limit IDs to match predictions length

        'Sentiment': label_encoder.inverse_transform(predictions)

    })

    submission_df.to_csv('submission.csv', index=False)

    print("Submission file created: submission.csv")



# Generate final predictions

generate_predictions()


  text_model.load_state_dict(torch.load('best_text_model.pth'))
  video_model.load_state_dict(torch.load('best_video_model.pth'))
Generating predictions: 100%|██████████| 13/13 [02:06<00:00,  9.77s/it]

Submission file created: submission.csv



