In [None]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Embedding, LSTM, Concatenate, Dropout, Conv2D, MaxPooling2D, Flatten, TimeDistributed
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import cv2  # For video processing

# Define paths for video data
TRAIN_VIDEO_PATH = '/kaggle/input/PES-ml-hack-link1/train_videos'
TEST_VIDEO_PATH = '/kaggle/input/PES-ml-hack-link1/test_videos'

# Load the data
train_df = pd.read_csv('/kaggle/input/PES-ml-hack-link1/train.csv', encoding='ISO-8859-1')
test_df = pd.read_csv('/kaggle/input/PES-ml-hack-link1/test.csv', encoding='ISO-8859-1')

# Function to extract video features with CNN
def extract_video_features(video_path):
    try:
        cap = cv2.VideoCapture(video_path)
        frames = []

        # Extract up to 10 frames from each video
        while len(frames) < 10 and cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, (64, 64))  # Resize to 64x64 for CNN
            frames.append(frame)
        cap.release()

        # Pad frames if fewer than 10
        if len(frames) < 10:
            frames.extend([np.zeros_like(frames[0])] * (10 - len(frames)))

        # Return array of shape (10, 64, 64, 3)
        return np.array(frames[:10])
    except Exception as e:
        print(f"Error processing video {video_path}: {e}")
        return np.zeros((10, 64, 64, 3))

# Process video data for training and testing
def load_video_features(df, video_dir):
    video_features = []
    for _, row in df.iterrows():
        video_file = f"dia{row['Dialogue_ID']}_utt{row['Utterance_ID']}.mp4"
        video_path = os.path.join(video_dir, video_file)
        video_features.append(extract_video_features(video_path))
    return np.array(video_features)

# Encode labels
label_encoder = LabelEncoder()
train_df['Emotion'] = label_encoder.fit_transform(train_df['Emotion'])
y = to_categorical(train_df['Emotion'])

# Text preprocessing with Keras Tokenizer
tokenizer = Tokenizer(num_words=10000, oov_token="<OOV>")
tokenizer.fit_on_texts(train_df['Utterance'])
train_sequences = tokenizer.texts_to_sequences(train_df['Utterance'])
train_padded = pad_sequences(train_sequences, maxlen=100, padding='post')

# Split data for validation
train_text, val_text, y_train, y_val = train_test_split(train_df, y, test_size=0.2, random_state=42)

# Load video features for train and validation sets
X_train_text = pad_sequences(tokenizer.texts_to_sequences(train_text['Utterance']), maxlen=100, padding='post')
X_val_text = pad_sequences(tokenizer.texts_to_sequences(val_text['Utterance']), maxlen=100, padding='post')
X_train_video = load_video_features(train_text, TRAIN_VIDEO_PATH)
X_val_video = load_video_features(val_text, TRAIN_VIDEO_PATH)

# Define a text model
input_text = Input(shape=(100,), name='text_input')
x = Embedding(input_dim=10000, output_dim=128)(input_text)  # Increased embedding size
x = LSTM(128, return_sequences=True)(x)
x = LSTM(64)(x)
x = Dropout(0.5)(x)

# Define a CNN-LSTM model for video features
input_video = Input(shape=(10, 64, 64, 3), name='video_input')
v = TimeDistributed(Conv2D(32, (3, 3), activation='relu'))(input_video)
v = TimeDistributed(MaxPooling2D((2, 2)))(v)
v = TimeDistributed(Flatten())(v)
v = LSTM(64)(v)  # LSTM over time-distributed CNN features
v = Dropout(0.5)(v)

# Concatenate text and video models
combined = Concatenate()([x, v])
output = Dense(5, activation='softmax')(combined)  # Assuming 5 emotion classes

# Create and compile the model
model = Model(inputs=[input_text, input_video], outputs=output)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

# Summary of the model
model.summary()

# Train the model
history = model.fit([X_train_text, X_train_video], y_train,
                    epochs=15, batch_size=32,
                    validation_data=([X_val_text, X_val_video], y_val))

# Evaluate the model on the test data (if available)
# Uncomment and adjust the following lines if you have test data
# test_features_video = load_video_features(test_df, TEST_VIDEO_PATH)
# X_test_text = pad_sequences(tokenizer.texts_to_sequences(test_df['Utterance']), maxlen=100, padding='post')
# y_test = to_categorical(label_encoder.transform(test_df['Emotion']))
# test_loss, test_accuracy = model.evaluate([X_test_text, test_features_video], y_test)
# print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

# Save the model
model.save('multimodal_emotion_model.h5')

# Generate predictions on the test set (if applicable)
# predictions = model.predict([X_test_text, test_features_video])
# predicted_classes = np.argmax(predictions, axis=1)
# predicted_emotions = label_encoder.inverse_transform(predicted_classes)

# Save predictions to a CSV file
# results_df = pd.DataFrame({'Utterance_ID': test_df['Utterance_ID'], 'Predicted_Emotion': predicted_emotions})
# results_df.to_csv('emotion_predictions.csv', index=False)
# print("Predictions saved to 'emotion_predictions.csv'.")

In [None]:
!unzip /content/train_videos.zip

Archive:  /content/train_videos.zip
  End-of-central-directory signature not found.  Either this file is not
  a zipfile, or it constitutes one disk of a multi-part archive.  In the
  latter case the central directory and zipfile comment will be found on
  the last disk(s) of this archive.
unzip:  cannot find zipfile directory in one of /content/train_videos.zip or
        /content/train_videos.zip.zip, and cannot find /content/train_videos.zip.ZIP, period.


In [None]:
train_df = pd.read_csv('/kaggle/input/PES-ml-hack-link1/train.csv', encoding='1252')
# Define path to video clips
video_dir = '/kaggle/input/PES-ml-hack-link2/train_videos'


# Function to get video file path from IDs
def get_video_clip_path(row):
    dialogue_id = row['Dialogue_ID']
    utterance_id = row['Utterance_ID']
    filename = f"dia{dialogue_id}_utt{utterance_id}.mp4"
    return os.path.join(video_dir, filename)

# Apply the function to get file paths for each sampled clip
train_df['video_clip_path'] = train_df.apply(get_video_clip_path, axis=1)

# Check sample paths
print(train_df[['Dialogue_ID', 'Utterance_ID', 'video_clip_path']].head())

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/PES-ml-hack-link1/train.csv'

Test code :

import os
import pandas as pd
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import cv2

# Load train data
train_df = pd.read_csv('/mnt/data/train (1).csv')
video_dir = '/kaggle/input/PES-ml-hack-link2/train_videos'

# Get video clip paths
def get_video_clip_path(row):
    dialogue_id = row['Dialogue_ID']
    utterance_id = row['Utterance_ID']
    filename = f"dia{dialogue_id}_utt{utterance_id}.mp4"
    return os.path.join(video_dir, filename)

train_df['video_clip_path'] = train_df.apply(get_video_clip_path, axis=1)

# Text Feature Extraction with BERT
class TextFeatureExtractor(nn.Module):
    def __init__(self, model_name='bert-base-uncased'):
        super(TextFeatureExtractor, self).__init__()
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
    
    def forward(self, text):
        inputs = self.tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=50)
        outputs = self.model(**inputs)
        return outputs.last_hidden_state.mean(dim=1)

# Video Feature Extraction with ResNet
class VideoFeatureExtractor(nn.Module):
    def __init__(self):
        super(VideoFeatureExtractor, self).__init__()
        self.model = models.resnet18(pretrained=True)
        self.model.fc = nn.Identity()  # Remove final layer to get feature vector
    
    def forward(self, video_path):
        # Video preprocessing
        cap = cv2.VideoCapture(video_path)
        frames = []
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = Image.fromarray(frame)
            frame = transforms.ToTensor()(frame)
            frames.append(frame)
            if len(frames) == 16:  # Sample 16 frames per video
                break
        cap.release()
        
        # Stack and pass through model
        frames = torch.stack(frames)
        with torch.no_grad():
            features = self.model(frames)
        return features.mean(dim=0)  # Average features across frames

# Custom Dataset
class MultimodalDataset(Dataset):
    def __init__(self, df, text_extractor, video_extractor):
        self.df = df
        self.text_extractor = text_extractor
        self.video_extractor = video_extractor

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = row['Text']
        video_path = row['video_clip_path']
        
        # Extract text and video features
        text_features = self.text_extractor(text)
        video_features = self.video_extractor(video_path)
        
        # Combine features
        features = torch.cat((text_features, video_features), dim=1)
        label = torch.tensor(row['Emotion_Label'])  # Assuming 'Emotion_Label' is in the CSV
        
        return features, label

# Define the multimodal model
class MultimodalFusionModel(nn.Module):
    def __init__(self, text_dim, video_dim, num_classes):
        super(MultimodalFusionModel, self).__init__()
        self.fc1 = nn.Linear(text_dim + video_dim, 256)
        self.fc2 = nn.Linear(256, num_classes)
    
    def forward(self, text_features, video_features):
        features = torch.cat((text_features, video_features), dim=1)
        x = torch.relu(self.fc1(features))
        x = self.fc2(x)
        return x

# Initialize components
text_extractor = TextFeatureExtractor()
video_extractor = VideoFeatureExtractor()
dataset = MultimodalDataset(train_df, text_extractor, video_extractor)

# Training loop
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
model = MultimodalFusionModel(text_dim=768, video_dim=512, num_classes=4)  # Adjust dimensions as needed
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

for epoch in range(10):  # Example epochs
    for features, labels in dataloader:
        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")



In [None]:
train_df.shape

In [None]:
# Define path to video clips
train_df = pd.read_csv('/kaggle/input/PES-ml-hack-link1/test.csv', encoding='1252')
video_dir = '/kaggle/input/PES-ml-hack-link2/test_videos'


# Function to get video file path from IDs
def get_video_clip_path(row):
    dialogue_id = row['Dialogue_ID']
    utterance_id = row['Utterance_ID']
    filename = f"dia{dialogue_id}_utt{utterance_id}.mp4"
    return os.path.join(video_dir, filename)

# Apply the function to get file paths for each sampled clip
df['video_clip_path'] = df.apply(get_video_clip_path, axis=1)

# Check sample paths
print(df[['Dialogue_ID', 'Utterance_ID', 'video_clip_path']].head())

In [None]:
all_preds = ["your_prediction" for i in df['Utterance_ID']]
all_ids = df["Sr No."]
submission_df = pd.DataFrame({
        'Sr No.': all_ids,
        'Emotion': all_preds
    })

# Save the DataFrame to CSV
submission_df.to_csv("submission.csv", index=False)