In [1]:
import os
import math
import json
import numpy as np
import torch
import torchaudio
import torchaudio.transforms as T
from torch.utils.data import Dataset
from pathlib import Path
from collections import defaultdict
from glob import glob
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.optim import AdamW
from sklearn.metrics import f1_score
import argparse

In [2]:

# ========== Dataset Class ==========
class SummaryDataset(Dataset):
    def __init__(self, list_file, wav_dir, label_dir, max_seq_len=250, resample_sr=2000):
        self.video_names = self.read_video_list(list_file)
        self.wav_dir = Path(wav_dir)
        self.label_dir = Path(label_dir)
        self.max_seq_len = max_seq_len
        self.resample_sr = resample_sr

        self.label_files = self.prefetch_label_files()
        self.data_size, self.index_to_chunk, self.labels = self.prefetch_and_index()

    def read_video_list(self, list_path):
        with open(list_path, 'r') as f:
            return [line.strip() for line in f]

    def prefetch_label_files(self):
        name_set = set(self.video_names)
        label_files = defaultdict(list)

        for label_file in self.label_dir.glob("**/*.json"):
            file_name = label_file.stem
            annotator_id_index = len(file_name) - file_name[::-1].find("_") - 1
            video_name = file_name[:annotator_id_index]

            if video_name in name_set:
                label_files[video_name].append(label_file)

        return label_files

    def extract_label(self, video_name):
        label_files = self.label_files.get(video_name, [])
        labels = []

        for label_file in label_files:
            with open(label_file, "r") as rf:
                data = json.load(rf)
            video_length = math.ceil(data["metadata"]["length"])
            annotator_label = np.zeros(video_length)

            for timeline in data["timelines"]:
                for t in range(timeline["start"], timeline["end"] + 1):
                    if t < video_length:
                        annotator_label[t] = 1

            labels.append(annotator_label)

        return np.array(labels)

    def prefetch_and_index(self):
        index = 0
        index_to_chunk = {}
        all_labels = {}

        for video_name in self.video_names:
            labels = self.extract_label(video_name)
            if labels is None or len(labels) == 0 or len(labels[0]) == 0:
                print(f"⚠️ Skipping {video_name}: no valid labels")
                continue

            all_labels[video_name] = labels
            chunk_count = math.ceil(len(labels[0]) / self.max_seq_len)

            for chunk_index in range(chunk_count):
                index_to_chunk[index + chunk_index] = (video_name, chunk_index)

            index += chunk_count

        return index, index_to_chunk, all_labels

    def __len__(self):
        return self.data_size

    def __getitem__(self, index):
        video_name, chunk_index = self.index_to_chunk[index]
        start = chunk_index * self.max_seq_len
        end = start + self.max_seq_len

        labels = self.labels[video_name][:, start:end]

        try:
            wav_path = self.wav_dir / f"{video_name}.wav"
            audio_data, sr = torchaudio.load(str(wav_path))
        except:
            print(f"🚫 Error loading: {wav_path}")
            return None

        resampler = T.Resample(sr, self.resample_sr, dtype=audio_data.dtype)
        audio_data = resampler(audio_data)
        audio_data = torch.mean(audio_data, axis=0).numpy()

        # Crop or pad the audio
        audio_data = audio_data[start * self.resample_sr : end * self.resample_sr]

        total_segments = self.max_seq_len
        num_frames_per_segment = len(audio_data) // total_segments
        audio_list = []

        for i in range(0, len(audio_data) - num_frames_per_segment + 1, num_frames_per_segment):
            segment = audio_data[i : i + num_frames_per_segment]

            if len(segment) < self.resample_sr:
                pad = self.resample_sr - len(segment)
                segment = np.pad(segment, (0, pad), mode="constant")
            elif len(segment) > self.resample_sr:
                segment = segment[:self.resample_sr]

            audio_list.append(segment)

        audio_array = np.vstack(audio_list)

        # Convert labels
        labels = torch.from_numpy(labels).squeeze(0)
        labels = torch.sum(labels, dim=0)
        labels = torch.min(labels, torch.ones(labels.shape[0], device=labels.device))

        return video_name, audio_array, labels


In [3]:
# ========== Dataset ==========
sd_train_av = SummaryDataset(
    list_file="/home/jovyan/EmotionDetection/video_data/av_train.txt",
    wav_dir="/home/jovyan/EmotionDetection/audio_data/av_train",
    label_dir="/home/jovyan/EmotionDetection/video_data/label"
)

sd_test_av = SummaryDataset(
    list_file="/home/jovyan/EmotionDetection/video_data/av_test.txt",
    wav_dir="/home/jovyan/EmotionDetection/audio_data/av_test",
    label_dir="/home/jovyan/EmotionDetection/video_data/label"
)

sd_test_mul = SummaryDataset(
    list_file="/home/jovyan/EmotionDetection/video_data/mul_test.txt",
    wav_dir="/home/jovyan/EmotionDetection/audio_data/mul_test",
    label_dir="/home/jovyan/EmotionDetection/video_data/label"
)

# ========== DataLoader ==========
# Custom collate function to skip None and unpack correctly
def safe_collate(batch):
    batch = [b for b in batch if b is not None]
    if len(batch) == 0:
        return None
    return tuple(zip(*batch))  # returns (video_names, inputs, labels)

dl_train_av = DataLoader(
    sd_train_av,
    batch_size=1,
    shuffle=True,
    num_workers=2,
    collate_fn=safe_collate)

dl_test_av = DataLoader(
    sd_test_av,
    batch_size=1,
    shuffle=False,
    num_workers=2,
    collate_fn=safe_collate)

dl_test_mul = DataLoader(
    sd_test_mul,
    batch_size=1,
    shuffle=False,
    num_workers=2,
    collate_fn=safe_collate)

# ========== Info ==========
print(f"📦 Train dataset size: {len(sd_train_av)}")
print(f"🎬 AV Test dataset size: {len(sd_test_av)}")
print(f"🎬 MUL Test dataset size: {len(sd_test_mul)}")

📦 Train dataset size: 401
🎬 AV Test dataset size: 140
🎬 MUL Test dataset size: 163


# Model Load
# Extract Emotional Feature


In [4]:
import numpy as np
from tqdm.notebook import tqdm


all_waveforms = []

for batch in tqdm(dl_train_av, desc="Extracting raw waveforms"):
    if batch is None:
        continue

    video_names, inputs, labels = batch  # inputs: (1, [max_seq_len, resample_sr])
    audio_array = inputs[0]  # (max_seq_len, resample_sr)
    all_waveforms.append(audio_array)

# Convert to numpy array with object dtype (due to potential varying lengths)
waveform_np = np.array(all_waveforms, dtype=object)

# save
save_path = "Features/waveform_without_emotion_av_train.npy"
np.save(save_path, waveform_np)
print(f"✅ Saved {len(all_waveforms)} waveforms to {save_path}")


Extracting raw waveforms:   0%|          | 0/401 [00:00<?, ?it/s]

✅ Saved 401 waveforms to waveform_without_emotion_av_train.npy


In [5]:
import numpy as np
from tqdm.notebook import tqdm


all_waveforms = []

for batch in tqdm(dl_test_av, desc="Extracting raw waveforms"):
    if batch is None:
        continue

    video_names, inputs, labels = batch  # inputs: (1, [max_seq_len, resample_sr])
    audio_array = inputs[0]  # (max_seq_len, resample_sr)
    all_waveforms.append(audio_array)

# Convert to numpy array with object dtype (due to potential varying lengths)
waveform_np = np.array(all_waveforms, dtype=object)

# save
save_path = "Features/waveform_without_emotion_av_val.npy"
np.save(save_path, waveform_np)
print(f"✅ Saved {len(all_waveforms)} waveforms to {save_path}")


Extracting raw waveforms:   0%|          | 0/140 [00:00<?, ?it/s]

✅ Saved 140 waveforms to waveform_without_emotion_av_val.npy


In [6]:
import numpy as np
from tqdm.notebook import tqdm


all_waveforms = []

for batch in tqdm(dl_test_mul, desc="Extracting raw waveforms"):
    if batch is None:
        continue

    video_names, inputs, labels = batch  # inputs: (1, [max_seq_len, resample_sr])
    audio_array = inputs[0]  # (max_seq_len, resample_sr)
    all_waveforms.append(audio_array)

# Convert to numpy array with object dtype (due to potential varying lengths)
waveform_np = np.array(all_waveforms, dtype=object)

# save
save_path = "Features/waveform_without_emotion_mul_val.npy"
np.save(save_path, waveform_np)
print(f"✅ Saved {len(all_waveforms)} waveforms to {save_path}")


Extracting raw waveforms:   0%|          | 0/163 [00:00<?, ?it/s]

✅ Saved 163 waveforms to waveform_without_emotion_mul_val.npy
