1. Video Pre-processing:
    - Extract Frames:
        - Use a video processing tool like OpenCV to extract frames from the .avi video files at a consistent frame rate (e.g., 30 FPS).
        Synchronize frame extraction with the timestamps provided in the time labels (from the utterance summary in the evaluation folder). This ensures each utterance is processed with the correct time alignment.
    - Tools: OpenCV (Python), FFmpeg

In [6]:
import os

def list_files_in_folder(folder_path):
    # Get all items in the folder
    items = os.listdir(folder_path)

    # Filter out directories, keeping only files
    files = [item for item in items if os.path.isfile(os.path.join(folder_path, item))]

    return files

# Example usage
folder_path = '../data/raw/iemocap/IEMOCAP_full_release/Session1/dialog/avi/DivX/'
files = list_files_in_folder(folder_path)
print(files)

['Ses01F_impro02.avi', 'Ses01M_impro06.avi', '.DS_Store', 'Ses01F_script01_2.avi', 'Ses01M_impro03.avi', 'Ses01M_script02_1.avi', 'Ses01F_impro01.avi', 'Ses01F_impro03.avi', 'Ses01F_script02_1.avi', 'Ses01M_script01_1.avi', 'Thumbs.db', 'Ses01M_script03_1.avi', 'Ses01F_script01_3.avi', 'Ses01F_impro07.avi', 'Ses01F_script01_1.avi', 'Ses01F_impro05.avi', 'Ses01M_impro04.avi', 'Ses01F_impro04.avi', 'Ses01M_script01_3.avi', 'Ses01M_script03_2.avi', 'Ses01M_script02_2.avi', 'Ses01M_impro05.avi', 'Ses01M_impro07.avi', 'Ses01F_script03_1.avi', 'Ses01F_script02_2.avi', 'Ses01F_script03_2.avi', 'Ses01M_impro02.avi', 'Ses01M_script01_2.avi', 'Ses01F_impro06.avi', 'Ses01M_impro01.avi']


In [12]:
import cv2
from tqdm import tqdm

def extract_video_segments(video_path, lab_file, output_folder):
    # Open video file
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video file not found: {video_path}")
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Read utterance timing from lab file
    with open(lab_file, 'r') as f:
        lines = f.readlines()

    for line in lines:
        # Parse start and end time, and utterance name
        start_time, end_time, utterance_name = line.split()[:3]
        start_frame = int(float(start_time) * fps)
        end_frame = int(float(end_time) * fps)

        # Set video to start frame
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

        # Read and save frames for the utterance
        frames = []
        for _ in range(start_frame, end_frame + 1):
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)

        if not frames:
            raise ValueError(f"No frames extracted for utterance {utterance_name}")

        # Save the extracted frames as a video clip
        out = cv2.VideoWriter(f'{output_folder}/{utterance_name}.avi', 
                              cv2.VideoWriter_fourcc(*'XVID'), fps, (frames[0].shape[1], frames[0].shape[0]))
        for frame in frames:
            out.write(frame)

        out.release()

    cap.release()

def main():
    avi_dir_list = [
        f"../data/raw/iemocap/IEMOCAP_full_release/Session{i}/dialog/avi/DivX/" for i in range(2, 6)
    ]
    lab_dir_list = [
        f"../data/raw/iemocap/IEMOCAP_full_release/Session{i}/dialog/lab/Ses0{i}_F/" for i in range(2, 6)
    ]

    for i, directory_tuple in enumerate(zip(avi_dir_list, lab_dir_list)):
        video_dir, lab_dir = directory_tuple
        files = list_files_in_folder(video_dir)
        for file in tqdm(files):
            if file.split('.')[-1] != 'avi':
                continue
            video_path = video_dir + file
            lab_file = lab_dir + f"{file.split('.')[0]}.lab"
            output_folder = f"../data/interim/iemocap/Session{i + 2}"
            os.makedirs(output_folder, exist_ok=True)
            try:
                extract_video_segments(video_path, lab_file, output_folder)
            except Exception as e:
                print(f"Error processing {file}: {e}")
                print(f"Error occured at {video_path}, {lab_file}")
                break

if __name__ == "__main__":
    main()

100%|██████████| 31/31 [06:24<00:00, 12.41s/it]
100%|██████████| 32/32 [07:56<00:00, 14.89s/it]
100%|██████████| 31/31 [07:03<00:00, 13.65s/it]
100%|██████████| 32/32 [07:04<00:00, 13.26s/it]


2. Face Detection and Alignment
    - Detect Face and Facial Landmarks:
        - For each frame, apply a facial detection model (e.g., Dlib or OpenCV's face detector) to locate the face.
        Use facial landmark detectors to identify key facial features (e.g., eyes, mouth, nose, jawline).
        Align the face to a canonical position (rotated or scaled so that the eyes are horizontally aligned) for consistent feature extraction.
    - Tools: Dlib, OpenCV

In [None]:
import dlib
from imutils import face_utils

# Load pre-trained face detector and shape predictor models
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')

def detect_landmarks(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    faces = detector(gray)
    landmarks_list = []
    for face in faces:
        shape = predictor(gray, face)
        landmarks = face_utils.shape_to_np(shape)  # Convert to (x, y) coordinates
        landmarks_list.append(landmarks)
    return landmarks_list

# Process all frames in the utterance
def process_utterance_frames(frames):
    for frame in frames:
        landmarks = detect_landmarks(frame)
        # landmarks now contain the (x, y) coordinates for facial landmarks
        print(landmarks)  # For demonstration, you can store them as needed

def extract_frames(video_path):
    # Open video file
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video file not found: {video_path}")
    cap = cv2.VideoCapture(video_path)
    frames = []

    start_frame = 0  # Define start_frame
    end_frame = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - 1  # Define end_frame as the last frame of the video

    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)  # Set video to start frame

    for _ in range(start_frame, end_frame + 1):
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()

    return frames

def main():
    interim_avi_list = [
        f"../data/interim/iemocap/session{i}/" for i in range(1, 6)
    ]

    for i, video_dir in enumerate(interim_avi_list):
        files = list_files_in_folder(video_dir)
        for file in tqdm(files):
            if file.split('.')[-1] != 'avi':
                continue
            video_path = video_dir + file
            frames = extract_frames(video_path)
            try:
                process_utterance_frames(frames)
            except Exception as e:
                print(f"Error processing {file}: {e}")
                print(f"Error occured at {video_path}, {lab_file}")
                break
    # Usage
    frames = [...]  # Assume frames are already extracted
    process_utterance_frames(frames)

if __name__ == "__main__":
    main()

3. High-Level Feature Extraction (Facial Action Units)

    - Facial Action Unit (AU) Extraction:
        - Use a pre-trained model like OpenFace or Py-Feat to extract Facial Action Units (AUs) for each frame. AUs correspond to specific facial muscle movements and represent high-level features.
        Extract AU intensity values, which can help interpret emotions and map them to higher-level categories like "angry" or "happy."

In [None]:
import openface

# Initialize OpenFace feature extractor
face_model = openface.Face()

def extract_action_units(image):
    rgb_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    faus = face_model.detectFace(rgb_img, with_landmarks=True)  # Detect FAUs
    return faus  # Returns FAU intensities and presence

# Process frames to extract FAUs
def extract_faus_for_frames(frames):
    faus_per_frame = []
    for frame in frames:
        faus = extract_action_units(frame)
        faus_per_frame.append(faus)
    return faus_per_frame

# Usage
frames = [...]  # Assume frames are already extracted
faus_per_frame = extract_faus_for_frames(frames)
print(faus_per_frame)


4. Low-Level Feature Extraction (Deep Learning Features)

    - Low-Level Feature Extraction:
        - Use deep learning-based models like CNNs (Convolutional Neural Networks) or ResNet to extract low-level pixel-based features directly from the raw face images.
        These features capture textures, gradients, and other fine-grained details useful for deep learning-based emotion recognition models.

    - Tools: Pre-trained CNNs (ResNet, VGG), custom CNN models

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image

# Load the pretrained VGG16 model from PyTorch
model = models.vgg16(pretrained=True)
model.classifier = nn.Identity()  # Remove the classification layer to get features

# Define image transformation
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def extract_features(image):
    image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))  # Convert to PIL image
    image = preprocess(image)
    image = image.unsqueeze(0)  # Add batch dimension

    model.eval()
    with torch.no_grad():
        features = model(image)
    return features.numpy().flatten()

# Process frames to extract low-level features
def extract_cnn_features_for_frames(frames):
    features_per_frame = []
    for frame in frames:
        features = extract_features(frame)
        features_per_frame.append(features)
    return features_per_frame

# Usage
frames = [...]  # Assume frames are already extracted
low_level_features = extract_cnn_features_for_frames(frames)
print(low_level_features)


5. Synchronization with Time Labels

    - Map Features to Time Labels:
        - For each extracted feature (both AU and CNN features), map them back to the time intervals provided in the dataset’s utterance summaries.
        - This ensures that the features correspond to the correct utterance or time segment within the video.

    - Tools: Pandas, Numpy

In [None]:
def load_emotion_labels(evaluation_file):
    emotion_labels = {}
    with open(evaluation_file, 'r') as f:
        for line in f:
            start_time, end_time, utterance_name, ground_truth, *dimensional = line.split()
            emotion_labels[utterance_name] = {
                'categorical': ground_truth,
                'valence': dimensional[0],
                'activation': dimensional[1],
                'dominance': dimensional[2]
            }
    return emotion_labels

# Usage
evaluation_file = 'SessionX/dialog/Evaluation/Ses01F_impro01_eval.txt'
emotion_labels = load_emotion_labels(evaluation_file)
print(emotion_labels)


6. Aggregation and Representation

    - Aggregate Features:
        - For high-level features (AUs), you can either average the AU intensity values over each time interval or select key frames.
        For low-level features (CNN-extracted), consider methods like temporal pooling or sequence models (LSTM, GRU) to capture temporal dynamics.

    - Tools: Scikit-learn, TensorFlow

In [None]:
from sklearn.preprocessing import StandardScaler

def normalize_features(features):
    scaler = StandardScaler()
    return scaler.fit_transform(features)

def aggregate_features_per_utterance(features_per_frame):
    # Example: Taking the mean of features across all frames for each utterance
    return np.mean(features_per_frame, axis=0)

# Usage
features_per_frame = [...]  # Low-level or FAU features per frame
normalized_features = normalize_features(features_per_frame)
utterance_features = aggregate_features_per_utterance(normalized_features)
print(utterance_features)

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Load features and labels
X = [...]  # Features (low-level and/or FAUs)
y = [...]  # Emotion labels (categorical or dimensional)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a classifier (example: RandomForest)
clf = RandomForestClassifier()
clf.fit(X_train, y_train)

# Test the model
y_pred = clf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy}')



7. Output and Storage

    Save Features:
        Store both high-level (AUs) and low-level (CNN) features for each time segment in a structured format (e.g., CSV, HDF5). Ensure that these features are labeled with the corresponding utterance or time interval for easy retrieval.

    Tools: Pandas, HDF5

In [None]:
import h5py
sessions = [...]
# Create or open an HDF5 file
with h5py.File('iemocap_features.h5', 'w') as hdf:
    for session_idx, session_data in enumerate(sessions):
        # Create a group for each session
        session_grp = hdf.create_group(f'session_{session_idx+1}')
        for utterance_idx, utterance_data in enumerate(session_data):
            # Create a subgroup for each utterance
            utterance_grp = session_grp.create_group(f'utterance_{utterance_idx+1}')

            # Store high-level (AU) features
            au_data = np.array(utterance_data['high_level_features'])
            utterance_grp.create_dataset('high_level_features', data=au_data, compression="gzip")

            # Store low-level (CNN) features
            cnn_data = np.array(utterance_data['low_level_features'])
            utterance_grp.create_dataset('low_level_features', data=cnn_data, compression="gzip")

# Archive

Currently, code that is not in use. 

In [None]:
### Feature extraction code
### Currently, this code is just a placeholder and does not actually extract features

import pandas as pd
import numpy as np
from pydub import AudioSegment
from transformers import BertTokenizer

# Load MELD CSV files
train_df = pd.read_csv('train_sent_emo.csv')
dev_df = pd.read_csv('dev_sent_emo.csv')
test_df = pd.read_csv('test_sent_emo.csv')

# Video processing function
def extract_frames_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames

# Audio processing function
def extract_audio_features(audio_path):
    audio = AudioSegment.from_file(audio_path)
    # Extract features here (e.g., MFCC)
    # Placeholder for feature extraction code
    return np.array([])  # Replace with actual feature extraction

# Text processing function
def preprocess_text(text):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    return tokenizer(text, padding=True, truncation=True, return_tensors="pt")

def main():
    # Example usage
    example_video_path = 'example.mp4'
    frames = extract_frames_from_video(example_video_path)
    audio_features = extract_audio_features('example.wav')
    text_features = preprocess_text("example utterance text")

if __name__ == "__main__":
    main()