# Data Analysis

Approach:

    Preprocess Images: Convert them to grayscale, normalize, and subtract the baseline from the active image.
    Segment Active Regions: Use OpenCV or a deep learning model to detect the highlighted brain regions.
    Identify the Region: Use a pre-trained model (e.g., a convolutional neural network trained on brain MRI scans) or a simple region-based lookup to determine which part of the brain is active.
    Overlay Annotations: Draw text labels or bounding boxes on the image.

## Imports, Settings, & Function Definitions

In [1]:
from moviepy import VideoFileClip
import librosa
from IPython.display import Audio as IPDAudio
from IPython.display import display, Image, Video
from transformers import pipeline
from datasets import Audio as Datasets_Audio
from datasets import Dataset
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import pandas as pd
from sklearn.metrics import accuracy_score
import cv2
import time
from glob import glob
import os

In [2]:
def extract_audio_from_video(video_file_path, audio_output_path):
    """
    This function will accept a video file path and output
    the extracted audio.

    Parameters
    ----------
    video_file_path : string
        This is the path to the mp4 video.
    audio_output_path : string
        This is the path to the output audio wav file.
    """
    video = VideoFileClip(video_file_path)
    audio = video.audio
    audio.write_audiofile(audio_output_path)

## Data Collection

In [3]:
# Total Video
video_file_path = '../data/kernel-brain-data-jokes-lex-only-training-set.mp4'
audio_file_path = '../data/kernel-brain-data-jokes-lex-only-training-set.wav'

# Laughter Training Samples
video_file_path_laughter_train = '../data/laughter-only/kernel-brain-data-jokes-lex-only-train-laughter-only.mp4'
audio_file_path_laughter_train = '../data/laughter-only/kernel-brain-data-jokes-lex-only-train-laughter-only.wav'

# Non-Laughter Training Samples
video_file_path_non_laughter_train = '../data/non-laughter/kernel-brain-data-jokes-lex-only-training-set-non-laughter.mp4'
audio_file_path_non_laughter_train = '../data/non-laughter/kernel-brain-data-jokes-lex-only-training-set-non-laughter.wav'

In [None]:
audio_data, sample_rate = librosa.load(audio_file_path)
IPDAudio(data=audio_data, rate=sample_rate)

In [None]:
Video(video_file_path)

### Video Preprocessing

In [None]:
cap = cv2.VideoCapture(video_file_path)

In [None]:
# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))  # Frames per second
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = int(total_frames / fps)  # Duration in seconds

In [None]:
# Extract one frame per second
frames = []
for sec in tqdm(range(duration), desc="Extracting one frame per second...", ascii="░▒▓█"):
    cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000)  # Move to the timestamp (sec * 1000ms)
    ret, frame = cap.read()
    if not ret:
        break
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for display
    frames.append(frame)

cap.release()  # Release the video file


##### Insufficient memory to extract all frames

In [None]:
# # Insufficient memory to extract all frames
# all_frames = []
# progress_bar = tqdm(total=cap.get(cv2.CAP_PROP_FRAME_COUNT), desc="Processing Frames", unit="frame")
# while True:
#     ret, frame = cap.read()
#     if not ret:
#         break
#     # all_frames.append(frame)
#     progress_bar.update(1)
# cap.release()
# cv2.destroyAllWindows()
# progress_bar.close()

In [None]:
index = 0

In [None]:
# View all frames
index += 1 
index = index % len(frames)

plt.figure(figsize=(12, 8))
plt.imshow(frames[index])
plt.axis('off')
plt.tight_layout()
plt.show()
index

## Laughter Samples

In [4]:
cap = cv2.VideoCapture(video_file_path_laughter_train)

# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))  # Frames per second
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = int(total_frames / fps)  # Duration in seconds

In [5]:
total_frames

279

In [6]:
laughter_all_frames = []
progress_bar = tqdm(total=cap.get(cv2.CAP_PROP_FRAME_COUNT), desc="Processing Frames", unit="frame")
while True:
    ret, frame = cap.read()
    if not ret:
        break
    laughter_all_frames.append(frame)
    progress_bar.update(1)
cap.release()
cv2.destroyAllWindows()
progress_bar.close()

Processing Frames: 100%|██████████| 279/279.0 [00:01<00:00, 190.24frame/s]


In [None]:
audio_data, sample_rate = librosa.load(audio_file_path_laughter_train)
IPDAudio(data=audio_data, rate=sample_rate)

## Non-Laughter Samples

In [7]:
video_file_path_non_laughter_train

'../data/non-laughter/kernel-brain-data-jokes-lex-only-training-set-non-laughter.mp4'

In [None]:
audio_data, sample_rate = librosa.load(audio_file_path_non_laughter_train)
IPDAudio(data=audio_data, rate=sample_rate)

In [8]:
cap = cv2.VideoCapture(video_file_path_non_laughter_train)

# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))  # Frames per second
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = int(total_frames / fps)  # Duration in seconds
total_frames

339

In [9]:
non_laughter_all_frames = []
progress_bar = tqdm(total=cap.get(cv2.CAP_PROP_FRAME_COUNT), desc="Processing Frames", unit="frame")
while True:
    ret, frame = cap.read()
    if not ret:
        break
    non_laughter_all_frames.append(frame)
    progress_bar.update(1)
cap.release()
cv2.destroyAllWindows()
progress_bar.close()

Processing Frames: 100%|██████████| 339/339.0 [00:02<00:00, 127.94frame/s]


In [None]:
len(laughter_all_frames)

339

In [15]:
len(non_laughter_all_frames)

339

## Training a Mixture of Experts Model Example

In [25]:
import torch
import torch.nn as nn
import torch.nn.functional as F


# Expert Networks (one for each modality)
class AudioExpert(nn.Module):
    def __init__(self):
        super(AudioExpert, self).__init__()
        self.fc = nn.Linear(256, 256)

    def forward(self, x):
        return self.fc(x)


class FacialEmotionExpert(nn.Module):
    def __init__(self):
        super(FacialEmotionExpert, self).__init__()
        self.fc = nn.Linear(256, 256)

    def forward(self, x):
        return self.fc(x)


class BodyPoseExpert(nn.Module):
    def __init__(self):
        super(BodyPoseExpert, self).__init__()
        self.fc = nn.Linear(256, 256)

    def forward(self, x):
        return self.fc(x)


class NeuralDataExpert(nn.Module):
    def __init__(self):
        super(NeuralDataExpert, self).__init__()
        self.fc = nn.Linear(256, 256)

    def forward(self, x):
        return self.fc(x)


# Gating Network (Decides how much weight each expert gets)
class GatingNetwork(nn.Module):
    def __init__(self):
        super(GatingNetwork, self).__init__()
        self.fc = nn.Linear(1024, 4)  # 4 experts (one per modality)

    def forward(self, x):
        gate_weights = F.softmax(self.fc(x), dim=1)  # Output weights for each expert
        return gate_weights


# MoE Model that uses Gating Network and Expert Networks
class MixtureOfExperts(nn.Module):
    def __init__(self):
        super(MixtureOfExperts, self).__init__()
        self.audio_expert = AudioExpert()
        self.facial_emotion_expert = FacialEmotionExpert()
        self.body_pose_expert = BodyPoseExpert()
        self.neural_data_expert = NeuralDataExpert()
        self.gating_network = GatingNetwork()
        self.final_fc = nn.Linear(256, 1)
        
        # Final prediction layer for binary output (laughter: 1, not laughter: 0)

    def forward(self, audio, facial_emotion, body_pose, neural_data):
        # Concatenate inputs to pass through the gating network
        combined_input = torch.cat(
            (audio, facial_emotion, body_pose, neural_data), dim=1
        )

        # Get gate weights (importance of each modality)
        gate_weights = self.gating_network(combined_input)

        # Compute the output of each expert
        audio_output = self.audio_expert(audio)
        facial_emotion_output = self.facial_emotion_expert(facial_emotion)
        body_pose_output = self.body_pose_expert(body_pose)
        neural_data_output = self.neural_data_expert(neural_data)

        # Combine the outputs weighted by the gate's output
        weighted_outputs = (
            gate_weights[:, 0].unsqueeze(1) * audio_output
            + gate_weights[:, 1].unsqueeze(1) * facial_emotion_output
            + gate_weights[:, 2].unsqueeze(1) * body_pose_output
            + gate_weights[:, 3].unsqueeze(1) * neural_data_output
        )

        final_output = self.final_fc(weighted_outputs)

        # Sigmoid activation to get probability
        prediction = torch.sigmoid(final_output)

        return prediction


In [26]:
# Example usage
model = MixtureOfExperts()
audio_data = torch.randn(1, 256)  # Example audio data
facial_emotion_data = torch.randn(1, 256)  # Example facial emotion data
body_pose_data = torch.randn(1, 256)  # Example body pose data
neural_data = torch.randn(1, 256)  # Example neural data

# Make a prediction
prediction = model(audio_data, facial_emotion_data, body_pose_data, neural_data)
print(f"Predicted output: {prediction}")


Predicted output: tensor([[0.5455]], grad_fn=<SigmoidBackward0>)


In [None]:
# Defining the dataset class
from torch.utils.data import Dataset, DataLoader

class LaughterDataset(Dataset):
    def __init__(self, audio_features, facial_features, body_features, neural_features, labels):
        self.audio_features = audio_features
        self.facial_features = facial_features
        self.body_features = body_features
        self.neural_features = neural_features
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return (
            torch.tensor(self.audio_features[idx], dtype=torch.float32),
            torch.tensor(self.facial_features[idx], dtype=torch.float32),
            torch.tensor(self.body_features[idx], dtype=torch.float32),
            torch.tensor(self.neural_features[idx], dtype=torch.float32),
            torch.tensor(self.labels[idx], dtype=torch.float32),
        )


In [None]:
# Preparing training and validation data
import numpy as np

# Load or generate random data (replace with actual data loading)
num_samples = 1000  # Adjust based on dataset size
audio_features = np.random.randn(num_samples, 256)
facial_features = np.random.randn(num_samples, 256)
body_features = np.random.randn(num_samples, 256)
neural_features = np.random.randn(num_samples, 256)
labels = np.random.randint(0, 2, size=(num_samples, 1))  # Binary labels

# Split into training and validation sets
train_size = int(0.8 * num_samples)
val_size = num_samples - train_size

dataset = LaughterDataset(audio_features, facial_features, body_features, neural_features, labels)
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


In [None]:
# Defining the training loop
import torch.optim as optim

# Initialize model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MixtureOfExperts().to(device)
criterion = nn.BCELoss()  # Binary Cross-Entropy Loss for classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training function
def train_model(model, train_loader, val_loader, epochs=10):
    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for audio, facial, body, neural, label in train_loader:
            audio, facial, body, neural, label = (
                audio.to(device),
                facial.to(device),
                body.to(device),
                neural.to(device),
                label.to(device),
            )

            optimizer.zero_grad()
            output = model(audio, facial, body, neural)
            loss = criterion(output, label)  # Compute loss
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)

        # Validation phase
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for audio, facial, body, neural, label in val_loader:
                audio, facial, body, neural, label = (
                    audio.to(device),
                    facial.to(device),
                    body.to(device),
                    neural.to(device),
                    label.to(device),
                )

                output = model(audio, facial, body, neural)
                val_loss += criterion(output, label).item()
                predicted = (output > 0.5).float()  # Convert probability to class (0 or 1)
                correct += (predicted == label).sum().item()
                total += label.size(0)

        avg_val_loss = val_loss / len(val_loader)
        accuracy = correct / total

        print(
            f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, "
            f"Val Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.4f}"
        )

# Train for 20 epochs
train_model(model, train_loader, val_loader, epochs=20)


In [None]:
# Saving the model
# torch.save(model.state_dict(), "moe_laughter_model.pth")

# Loading the model
# model = MixtureOfExperts()
# model.load_state_dict(torch.load("moe_laughter_model.pth"))
# model.to(device)
# model.eval()


In [None]:
# Making Predictions
# Example input
new_audio = torch.randn(1, 256).to(device)
new_facial = torch.randn(1, 256).to(device)
new_body = torch.randn(1, 256).to(device)
new_neural = torch.randn(1, 256).to(device)

# Get model prediction
with torch.no_grad():
    prediction = model(new_audio, new_facial, new_body, new_neural)
    predicted_label = (prediction > 0.5).float().item()  # Convert to class label

print(f"Predicted label: {predicted_label}")  # 1 for laughter, 0 for no laughter


## Data Representation

    Audio: Convert audio signals into spectrograms or MFCCs (Mel-Frequency Cepstral Coefficients).
    Facial Emotion: Use a facial recognition system (like OpenCV or Dlib) to extract facial landmarks and classify emotional states.
    Body Pose: Use a pose estimation model (like OpenPose or MediaPipe) to extract keypoints of the body.
    Neural Data: Normalize brain activity (e.g., fMRI, EEG) to a standard scale.

2. Model Architecture

The core idea is to create a multi-encoder architecture with modality-specific subnetworks and a fusion layer. Here's how to structure it:
Step 1: Modality-Specific Encoders

Create separate neural networks (like CNNs, RNNs, or Transformers) for each modality. Here's how:

    Audio Encoder: Use a 1D CNN or RNN to process audio features like spectrograms.
    Facial Emotion Encoder: Use a CNN (ResNet, VGG) to process facial emotion features extracted from images.
    Body Pose Encoder: Use a CNN or RNN to process body pose keypoints.
    Neural Data Encoder: Use a simple MLP (Multi-layer Perceptron) or RNN to process neural data like EEG or fMRI.