In [1]:
import pickle
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import math
import os
import subprocess
from tqdm import tqdm
from collections import Counter
from sklearn.metrics import f1_score

In [7]:
def mf1(y_true, y_pred):
    """
    Calculate MF1 metric (Macro F1 Score).
    
    :param y_true: True labels
    :param y_pred: Predicted labels
    :return: MF1 (F1 score averaged across all classes)
    """
    return f1_score(y_true, y_pred, average='macro', zero_division=0)

def get_video_info(input_path):
    """Gets video file duration in seconds and FPS using ffprobe."""
    # Command to get duration
    duration_cmd = f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{input_path}"'
    # Command to get FPS
    fps_cmd = f'ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate "{input_path}"'
    
    try:
        duration = float(subprocess.check_output(duration_cmd, shell=True).decode('utf-8').strip())
        
        # Get FPS (might be in format "30/1", so needs calculation)
        fps_output = subprocess.check_output(fps_cmd, shell=True).decode('utf-8').strip()
        if '/' in fps_output:
            numerator, denominator = map(float, fps_output.split('/'))
            fps = numerator / denominator
        else:
            fps = float(fps_output)
            
        return {'duration': duration, 'fps': fps}
    except subprocess.CalledProcessError:
        print(f"Error getting file info for {input_path}")
        return {'duration': 0.0, 'fps': 0.0}

def segment_multimodal_file(input_path):
    """
    Segments a multimodal file into 4-second parts with 2-second step.
    If file is shorter than 4 seconds - simply copies it with ___0000 index.
    """
    # Check file duration
    video_info = get_video_info(input_path)
    duration = video_info['duration']
    fps = video_info['fps']
    
    # Get base filename and extension
    base_name, ext = os.path.splitext(os.path.basename(input_path))

    metadata = []

    # Generate segment timings
    timings = slice_audio(start_time=0, end_time=duration,
                win_max_length=4, win_shift=2, win_min_length=1)

    for segment_index, timing in enumerate(timings):
        # Convert timing to frame numbers
        start_time = timing['start'] * fps
        end_time = timing['end'] * fps

        # Generate new filename with index
        new_name = f"{base_name}___{segment_index:04d}{ext}"

        # Store segment metadata
        metadata.append([new_name, int(float(start_time)), int(float(end_time)), fps])
        
    return metadata

def slice_audio(start_time: float, end_time: float, 
                win_max_length: float, win_shift: float, win_min_length: float) -> list[dict]:
    """Slices audio on windows

    Args:
        start_time (float): Start time of audio
        end_time (float): End time of audio
        win_max_length (float): Window max length
        win_shift (float): Window shift
        win_min_length (float): Window min length

    Returns:
        list[dict]: List of dict with timings, f.e.: {'start': 0, 'end': 12}
    """    

    if end_time < start_time:
        return []
    elif (end_time - start_time) > win_max_length:
        timings = []
        while start_time < end_time:
            end_time_chunk = start_time + win_max_length
            if end_time_chunk < end_time:
                timings.append({'start': start_time, 'end': end_time_chunk})
            elif end_time_chunk == end_time: # if tail exact `win_max_length` seconds
                timings.append({'start': start_time, 'end': end_time_chunk})
                break
            else: # if tail less then `win_max_length` seconds
                if end_time - start_time < win_min_length: # if tail less then `win_min_length` seconds
                    break
                
                timings.append({'start': start_time, 'end': end_time})
                break

            start_time += win_shift
        return timings
    else:
        return [{'start': start_time, 'end': end_time}]


def save_txt(column_names, file_names, labels, save_name):
    data_lines = [','.join(column_names)]
    for file_name, label in zip(file_names, labels):
        data_lines.append(f"{file_name},{label}")

    with open(save_name, "w") as file:
        for line in data_lines:
            file.write(line + "\n")

def get_df(pred, video_names, save_results):
    # Extract core video names by removing segment indices and keeping extensions
    core_video_names = sorted(set([i.split('___')[0]+i[-4:] for i in video_names]))
    compound_emotions = ["Fearfully_Surprised","Happily_Surprised","Sadly_Surprised",
                        "Disgustedly_Surprised","Angrily_Surprised","Sadly_Fearful","Sadly_Angry"]
    data = []
    
    # Process each video file
    for idx, video_name in enumerate(tqdm(core_video_names)):
        # Get segment metadata for current video
        curr_metadata = segment_multimodal_file(f'E:/Databases/9th_ABAW/C-EXPR-DB/videos/{video_name}')
        
        # Process each segment of the video
        for curr_name, start_frame, end_frame, fps in curr_metadata:
            # Find index of current segment in video_names list
            ind_name = video_names.index(curr_name)
            predictions = pred[ind_name]  # Get predictions for this segment
            
            # Extract base filename (without ___0000.mp4 suffix)
            base_name = curr_name.split('___')[0]
            
            # Generate frame entries for this segment
            for frame_num in range(start_frame + 1, end_frame + 1 + 1):  # +1 because of 0-based indexing
                frame_name = f"{frame_num:05d}.jpg"  # Format as 00001.jpg
                
                # Add record to data list
                data.append({
                    'file_name': base_name,
                    'segment': curr_name,
                    'frame': frame_name,
                    compound_emotions[0]: predictions[0],
                    compound_emotions[1]: predictions[1],
                    compound_emotions[2]: predictions[2],
                    compound_emotions[3]: predictions[3],
                    compound_emotions[4]: predictions[4],
                    compound_emotions[5]: predictions[5],
                    compound_emotions[6]: predictions[6]
                })
    
    # Create DataFrame from collected data
    df = pd.DataFrame(data)
    
    grouped_df = df.groupby(['file_name', 'frame']).agg({
        compound_emotions[0]: 'mean',
        compound_emotions[1]: 'mean',
        compound_emotions[2]: 'mean',
        compound_emotions[3]: 'mean',
        compound_emotions[4]: 'mean',
        compound_emotions[5]: 'mean',
        compound_emotions[6]: 'mean'
    }).reset_index()
    final_df = grouped_df.copy()
    
    final_df['image_location'] = [f'{x}/{y}' for x, y in zip(final_df.file_name, final_df.frame)]
    final_df[compound_emotions[0]] = np.argmax(final_df[compound_emotions].values, axis=1).tolist()
    final_df[compound_emotions[1:]] = None
    final_df = final_df[['image_location'] + compound_emotions]
    
    df = pd.read_csv('ICCV_9th_ABAW_CE_test_set_example.txt')
    df_image_location = df[['image_location']]
    result = df_image_location.merge(final_df, on='image_location', how='left')
    result = result.ffill()
    result = result.astype({'Fearfully_Surprised': 'int'})
    column_names = ['image_location'] + compound_emotions
    save_txt(column_names, result.image_location.tolist(), result.Fearfully_Surprised.tolist(), f'{save_results}.txt')
    return result

def get_emo_df(feature_path, model1, model2, name_1, name_2):
    features_full = []
    file = open(feature_path,'rb')
    object_file = pickle.load(file)
    
    for data in tqdm(object_file):
        with torch.no_grad():
            outputs1 = model1(torch.unsqueeze(data['video'], 0).to('cuda'))
            outputs2 = model2(torch.unsqueeze(data['video'], 0).to('cuda'), features=False)
        video_name = os.path.basename(data['audio_path'])
        outputs1 = torch.softmax(outputs1[0], dim=0).cpu().numpy().tolist()
        outputs2 = torch.softmax(outputs2[0], dim=0).cpu().numpy().tolist()
        features_full.append([video_name] + outputs1 +outputs2)
    
    df_res = pd.DataFrame(features_full, columns=["video_name"]+ [f'{name_1}_{i}' for i in emotion]+[f'{name_2}_{i}' for i in emotion])
    return df_res
    
def get_compound_prob(curr_prob):
    pairs = [[3, 6], [4, 6], [5, 6], [2, 6], [1, 6], [5, 3], [5, 1]]
    curr_compound_prob = []
    for pair in pairs:
        curr_compound_prob.append(curr_prob[:, pair[0]]+curr_prob[:, pair[1]])
    curr_compound_prob = np.array(curr_compound_prob).T
    return curr_compound_prob

def get_df_compound_pred(pickle_path = '', df_annotation = None, flag_get_compound_prob = True, save_df_name='', modality='audio'):

    emotion_names = ["Neutral","Anger","Disgust","Fear","Happiness","Sadness","Surprise","Other"]
    compound_names = ["Fearfully_Surprised","Happily_Surprised","Sadly_Surprised","Disgustedly_Surprised","Angrily_Surprised","Sadly_Fearful","Sadly_Angry"]
    df_annotation = df_annotation[['video_name']]
    file = open(pickle_path, 'rb')
    object_file = pickle.load(file)
    features_full = []
    
    if modality == 'multimodal_feature_fusion':
        for data in tqdm(object_file):
            video_name = data['metas']['file_name'] + '.mp4'
            outputs1 = data['predictions']['emo']
            features_full.append([video_name] + outputs1)
    elif modality == 'audio':
        for data in tqdm(object_file):
            video_name = data['metas']['audio_name'][:-4] + '.mp4'
            outputs1 = data['predictions']['emo']
            features_full.append([video_name] + outputs1)
    elif modality == 'scene':
        for data in tqdm(object_file):
            video_name = data['video_name']
            outputs1 = data['probs']
            features_full.append([video_name] + outputs1)
    
    if flag_get_compound_prob:
        df_res = pd.DataFrame(features_full, columns=["video_name"]+ emotion)
        result = df_annotation.merge(df_res, on='video_name', how='left')
        result = result.ffill()
        MM_prob = result[emotion].values
        MM_prob = get_compound_prob(MM_prob)
    else:
        df_res = pd.DataFrame(features_full, columns=["video_name"]+ compound_names)
        result = df_annotation.merge(df_res, on='video_name', how='left')
        result = result.ffill()
        MM_prob = result[compound_names].values
        
    df_compound_pred = get_df(MM_prob, df_annotation.video_name.tolist(), save_df_name)

    return df_compound_pred

subset = 'test'
corpus = 'c-expr-db'

path_annotation = 'E:/Databases/9th_ABAW/'
full_path_annotation = f'{path_annotation}{corpus.upper()}/Chunk/{subset}_segment.csv'
df_annotation = pd.read_csv(full_path_annotation)

In [8]:
path = 'multimodal_predictions/run_20250702_191542_epoch_20_predictions/run_20250702_191542_epoch_20_predictions_compound.pkl'
curr_df = get_df_compound_pred(pickle_path = path, df_annotation = df_annotation, flag_get_compound_prob = False, save_df_name='test1', modality='multimodal_feature_fusion')

100%|████████████████████████████████████████████████████████████████████████████████████████| 449/449 [00:00<?, ?it/s]
100%|██████████████████████████████████████████████████████████████████████████████████| 56/56 [00:05<00:00, 10.13it/s]
