# 提取四个数据集的test样本

In [None]:
import pandas as pd

# reason csv files
mosi_label_csv_path = '../datasets/mosi_reason.csv'
mosei_label_csv_path = '../datasets/mosei_reason.csv'
sims_label_csv_path = '../datasets/sims_reason.csv'
sims2_label_csv_path = '../datasets/sims2_reason.csv'


def extract_test_samples(label_csv_path, dataset_name='mosi'):
    df = pd.read_csv(label_csv_path)
    test_samples = df[df['mode'] == 'test']
    print(f"Extracted {len(test_samples)} test samples from {label_csv_path}")
    test_samples.to_csv(f"./{dataset_name}_test.csv", index=False)


extract_test_samples(mosi_label_csv_path, 'mosi')
extract_test_samples(mosei_label_csv_path, 'mosei')
extract_test_samples(sims_label_csv_path, 'sims')
extract_test_samples(sims2_label_csv_path, 'sims2')

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, f1_score

class MetricsTop():
    def __init__(self):
        self.metrics_dict = {
            'MOSI': self.__eval_mosi_regression,
            'MOSEI': self.__eval_mosei_regression,
            'SIMS': self.__eval_sims_regression,
            'SIMS2': self.__eval_sims2_regression
        }
    

    def __multiclass_acc(self, y_pred, y_true):
        return np.sum(np.round(y_pred) == np.round(y_true)) / float(len(y_true))   

    def __eval_mosei_regression(self, y_pred, y_true, exclude_zero=False):
        test_preds = y_pred.view(-1).cpu().detach().numpy()  
        test_truth = y_true.view(-1).cpu().detach().numpy()

        test_preds_a7 = np.clip(test_preds, a_min=-3., a_max=3.)
        test_truth_a7 = np.clip(test_truth, a_min=-3., a_max=3.)
        test_preds_a5 = np.clip(test_preds, a_min=-2., a_max=2.)
        test_truth_a5 = np.clip(test_truth, a_min=-2., a_max=2.)
        test_preds_a3 = np.clip(test_preds, a_min=-1., a_max=1.)
        test_truth_a3 = np.clip(test_truth, a_min=-1., a_max=1.)


        mae = np.mean(np.absolute(test_preds - test_truth))   
        corr = np.corrcoef(test_preds, test_truth)[0][1]  
        mult_a7 = self.__multiclass_acc(test_preds_a7, test_truth_a7)
        mult_a5 = self.__multiclass_acc(test_preds_a5, test_truth_a5)
        mult_a3 = self.__multiclass_acc(test_preds_a3, test_truth_a3)
        
        non_zeros = np.array([i for i, e in enumerate(test_truth) if e != 0])
        non_zeros_binary_truth = (test_truth[non_zeros] > 0)
        non_zeros_binary_preds = (test_preds[non_zeros] > 0)

        non_zeros_acc2 = accuracy_score(non_zeros_binary_preds, non_zeros_binary_truth)
        non_zeros_f1_score = f1_score(non_zeros_binary_preds, non_zeros_binary_truth, average='weighted')

        binary_truth = (test_truth >= 0)
        binary_preds = (test_preds >= 0)
        acc2 = accuracy_score(binary_preds, binary_truth)
        f_score = f1_score(binary_preds, binary_truth, average='weighted')
        
        eval_results = {
            "Has0_acc_2":  round(acc2, 4),  # 保留四位小数
            "Has0_F1_score": round(f_score, 4),
            "Non0_acc_2":  round(non_zeros_acc2, 4),
            "Non0_F1_score": round(non_zeros_f1_score, 4),
            "Mult_acc_3": round(mult_a3, 4),
            "Mult_acc_5": round(mult_a5, 4),
            "Mult_acc_7": round(mult_a7, 4),
            "MAE": round(mae, 4),
            "Corr": round(corr, 4)
        }
        return eval_results


    def __eval_mosi_regression(self, y_pred, y_true):
        return self.__eval_mosei_regression(y_pred, y_true)

    def __eval_sims_regression(self, y_pred, y_true):
        test_preds = y_pred.view(-1).cpu().detach().numpy()
        test_truth = y_true.view(-1).cpu().detach().numpy()
        test_preds = np.clip(test_preds, a_min=-1., a_max=1.)
        test_truth = np.clip(test_truth, a_min=-1., a_max=1.)

        # two classes{[-1.0, 0.0], (0.0, 1.0]}
        ms_2 = [-1.01, 0.0, 1.01]
        test_preds_a2 = test_preds.copy()
        test_truth_a2 = test_truth.copy()
        for i in range(2):
            test_preds_a2[np.logical_and(test_preds > ms_2[i], test_preds <= ms_2[i+1])] = i
        for i in range(2):
            test_truth_a2[np.logical_and(test_truth > ms_2[i], test_truth <= ms_2[i+1])] = i

        # three classes{[-1.0, -0.1], (-0.1, 0.1], (0.1, 1.0]}
        ms_3 = [-1.01, -0.1, 0.1, 1.01]
        test_preds_a3 = test_preds.copy()
        test_truth_a3 = test_truth.copy()
        for i in range(3):
            test_preds_a3[np.logical_and(test_preds > ms_3[i], test_preds <= ms_3[i+1])] = i
        for i in range(3):
            test_truth_a3[np.logical_and(test_truth > ms_3[i], test_truth <= ms_3[i+1])] = i
        
        # five classes{[-1.0, -0.7], (-0.7, -0.1], (-0.1, 0.1], (0.1, 0.7], (0.7, 1.0]}
        ms_5 = [-1.01, -0.7, -0.1, 0.1, 0.7, 1.01]
        test_preds_a5 = test_preds.copy()
        test_truth_a5 = test_truth.copy()
        for i in range(5):
            test_preds_a5[np.logical_and(test_preds > ms_5[i], test_preds <= ms_5[i+1])] = i
        for i in range(5):
            test_truth_a5[np.logical_and(test_truth > ms_5[i], test_truth <= ms_5[i+1])] = i
 
        mae = np.mean(np.absolute(test_preds - test_truth))   # Average L1 distance between preds and truths
        corr = np.corrcoef(test_preds, test_truth)[0][1]
        mult_a2 = self.__multiclass_acc(test_preds_a2, test_truth_a2)
        mult_a3 = self.__multiclass_acc(test_preds_a3, test_truth_a3)
        mult_a5 = self.__multiclass_acc(test_preds_a5, test_truth_a5)
        f_score = f1_score(test_preds_a2, test_truth_a2, average='weighted')

        eval_results = {
            "Mult_acc_2": round(mult_a2, 4),
            "Mult_acc_3": round(mult_a3, 4),
            "Mult_acc_5": round(mult_a5, 4),
            "F1_score": round(f_score, 4),
            "MAE": round(mae, 4),
            "Corr": round(corr, 4)
        }

        return eval_results
    
    def __eval_sims2_regression(self, y_pred, y_true):
        return self.__eval_sims_regression(y_pred, y_true)
    
    def getMetics(self, datasetName):
        return self.metrics_dict[datasetName.upper()]



# prompt

In [None]:
from IPython.display import Markdown, display
import re

def printmd(string):
    display(Markdown(string))


def extract_score_from_text(text):
    match = re.search(r"-?\d+(\.\d+)?", text)
    if match:
        return float(match.group())
    else:
        return None

        

def prompt_template(subtitle, min_score=-3.0, max_score=3.0):
        
    prompt = (
        "You are an expert in emotional analysis. Your task is to evaluate the emotional polarity intensity of a person in a video clip.\n\n"
        "Based on the video frames, please consider the following aspects:\n"
        "- Facial expressions and body language\n"
        "- Speech rate, tone, and volume\n"
        "- The content of the spoken subtitle\n\n"

        f"Then, integrate these modalities to assign a single emotional polarity score between {min_score} and {max_score}:\n"
        f"- Positive values (e.g., 0.1 to {max_score}) indicate increasingly positive emotions\n"
        f"- Negative values (e.g., -0.1 to {min_score}) indicate increasingly negative emotions\n"
        "- Values near 0 (e.g., -0.1, 0.0, 0.1) reflect neutral or weak emotional tone\n\n"

        f"**Important requirement: Only output the final sentiment polarity score (e.g., {max_score-0.8}), do not output any reasoning process or explanation.**\n\n"

        "Now evaluate the following subtitle:\n"
        f'"{subtitle}"\n\n'
    )

    return prompt

prompt_drbug = prompt_template("BUT I CAN SAFELY ASSURE YOU THAT EVEN IF THEY DIDN’T IT WOULD STILL BE A HIT", -3.0, 3.0)
print(prompt_drbug)

# qwen2.5-vl-32b-instruct

In [None]:
import re
import base64
import pandas as pd
from tqdm import tqdm
import pandas as pd
from openai import OpenAI
import os
tqdm.pandas()


client = OpenAI(
    api_key="sk-xxx",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)


def encode_video(video_path):
    with open(video_path, "rb") as video_file:
        return base64.b64encode(video_file.read()).decode("utf-8")


def QWen2_5_vl_32b_instruct_pridictor(video_path, question):

    base64_video = encode_video(video_path)

    completion = client.chat.completions.create(
        model="qwen2.5-vl-32b-instruct",  
        messages=[
            {
                "role": "system",
                "content": [{"type":"text","text": "You are a helpful assistant."}]},
            {
                "role": "user",
                "content": [
                    {
                        "type": "video_url",
                        "video_url": {"url": f"data:video/mp4;base64,{base64_video}"},
                    },
                    {"type": "text", "text": question},
                ],
            }
        ],
        extra_body={"enable_thinking": False},
    )
    
    return completion.choices[0].message.content

In [None]:
def QWen2_5_vl_32b_instruct_pridictor_with_subtitle(subtitle, min_score=-3.0, max_score=3.0):
    """
    使用QWen2.5 VL 32B模型进行视频情感分析，直接传入字幕文本。
    """

    question = (
        "You are an expert in emotional analysis. Your task is to evaluate the emotional polarity intensity of a person in a video clip.\n\n"
        f"Based on the subtitle, please integrate these information to assign a single emotional polarity score between {min_score} and {max_score}:\n"
        f"- Positive values (e.g., 0.1 to {max_score}) indicate increasingly positive emotions\n"
        f"- Negative values (e.g., -0.1 to {min_score}) indicate increasingly negative emotions\n"
        "- Values near 0 (e.g., -0.1, 0.0, 0.1) reflect neutral or weak emotional tone\n\n"

        f"**Important requirement: Only output the final sentiment polarity score (e.g., {max_score-0.8}), do not output any reasoning process or explanation.**\n\n"

        "Now evaluate the following subtitle:\n"
        f'"{subtitle}"\n\n'
    )
    # print(question)


    completion = client.chat.completions.create(
        # 模型列表：https://help.aliyun.com/zh/model-studio/getting-started/models
        # model="qwen-plus",
        model="qwen2.5-vl-32b-instruct",  
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": f"{question}"},
        ],
        # Qwen3模型通过enable_thinking参数控制思考过程（开源版默认True，商业版默认False）
        # 使用Qwen3开源版模型时，若未启用流式输出，请将下行取消注释，否则会报错
        # extra_body={"enable_thinking": False},
    )

    return completion.choices[0].message.content


### predect for four datasets

In [None]:
def predict(data_csv_path, video_root, dataSet_name):

    output_csv_path = f"./{dataSet_name}_test_with_scores.csv"

    if 'sims' in dataSet_name:
        min_score = -1.0
        max_score = 1.0
    else:
        min_score = -3.0
        max_score = 3.0

    data = pd.read_csv(data_csv_path)


    sample_pbar = tqdm(data.iterrows(), total=len(data), desc=f"Processing {dataSet_name} samples")
    for index, row in sample_pbar:
        video_path = os.path.join(video_root, row['mp4_path'])
        subtitle = row['text_en'] if 'sims' in dataSet_name else row['text']

        question = prompt_template(subtitle, min_score, max_score)
        # print(question)
        
        try:
            response = QWen2_5_vl_32b_instruct_pridictor(video_path, question)
        except Exception as e:
            try:
                response = QWen2_5_vl_32b_instruct_pridictor_with_subtitle(subtitle, min_score, max_score)
            except Exception as e:
                response = "0.0"
            
        score = extract_score_from_text(response)

        sample_pbar.set_postfix({"index": index, "score": score})

        data.at[index, 'score-Qwen2.5-vl-32B'] = score

        if index % 100 == 0:
            data.to_csv(output_csv_path, index=False)
            

    data.to_csv(output_csv_path, index=False)
    print(f"Results saved to {output_csv_path}")

    return data



In [None]:
sims_data = predict('./sims_test.csv', '../MSA_Datasets/SIMS/Raw', 'sims')
sims_data

In [None]:
import torch
sims_data = pd.read_csv('./sims_test_with_scores.csv')
print(sims_data.columns)
print(len(sims_data))
metrics = MetricsTop().getMetics('SIMS')
pred = torch.tensor(sims_data['score-Qwen2.5-vl-32B'].values)
true = torch.tensor(sims_data['label'].values)

test_results = metrics(pred, true)
print(test_results)

In [None]:
mosi_data = predict('./mosi_test.csv', '../MSA_Datasets/MOSI/Raw', 'mosi')
mosi_data

In [None]:
import torch
sims_data = pd.read_csv('./mosi_test_with_scores.csv')
print(sims_data.columns)
print(len(sims_data))
metrics = MetricsTop().getMetics('MOSI')
pred = torch.tensor(sims_data['score-Qwen2.5-vl-32B'].values)
true = torch.tensor(sims_data['label'].values)

test_results = metrics(pred, true)
print(test_results)

In [None]:
sims2_data = predict('./sims2_test.csv', '../MSA_Datasets/SIMS2/Raw', 'sims2')
sims2_data

In [None]:
import torch
sims_data = pd.read_csv('./sims2_test_with_scores.csv')
print(sims_data.columns)
print(len(sims_data))
metrics = MetricsTop().getMetics('SIMS2')
pred = torch.tensor(sims_data['score-Qwen2.5-vl-32B'].values)
true = torch.tensor(sims_data['label'].values)

test_results = metrics(pred, true)
print(test_results)

In [None]:
mosei_data = predict('./mosei_test.csv', '../MSA_Datasets/MOSEI/Raw', 'mosei')
mosei_data

In [None]:
import torch
import pandas as pd
sims_data = pd.read_csv('./mosei_test_with_scores.csv')
print(len(sims_data))
metrics = MetricsTop().getMetics('MOSEI')
zero_count = (sims_data['score-Qwen2.5-vl-32B'] == 0.0).sum()
print(f"Number of samples with score 0.0: {zero_count}")
pred = torch.tensor(sims_data['score-Qwen2.5-vl-32B'].values)
true = torch.tensor(sims_data['label'].values)

test_results = metrics(pred, true)
print(test_results)

# GPPT-4o for mosi, sims, mosei, sims2

In [None]:
import cv2
import base64

def encode_frames(video_path, frames_num=3):
    base64Frames = []
    video = cv2.VideoCapture(video_path)

    if not video.isOpened():
        print(f"无法打开视频文件: {video_path}")
        return base64Frames

    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

    frame_indices = [int(total_frames * i / (frames_num + 1)) for i in range(1, frames_num + 1)]

    target_width = 320
    target_height = 210

    current_frame = 0
    while video.isOpened():
        success, frame = video.read()
        if not success:
            break
        if current_frame in frame_indices:
            resized_frame = cv2.resize(frame, (target_width, target_height))
            _, buffer = cv2.imencode(".jpg", resized_frame)
            base64Frames.append(base64.b64encode(buffer).decode("utf-8"))
        current_frame += 1

    video.release()

    return base64Frames

# 示例调用
video_path = ""
frames_num = 3
base64Frames = encode_frames(video_path, frames_num)

In [None]:
import cv2 
import base64
import time
from openai import OpenAI
from IPython.display import display, Image
from tqdm import tqdm
tqdm.pandas()

client = OpenAI(
    base_url='https://xiaoai.plus/v1',
    api_key='sk-xxx'  # replace with your API Key
)

def GPT_4o_predictor(frames, prompt):

    PROMPT_MESSAGES = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": prompt,
                },
                *[{
                    "type": "image_url",
                    "image_url": {
                        "url": 'data:image/jpeg;base64,' + frame,
                    }
                } for frame in frames]
            ],
        },
    ]
    
    params = {
        "model": "gpt-4o",
        "messages": PROMPT_MESSAGES,
        "max_tokens": 100, 
    }
    result = client.chat.completions.create(**params)
    return result.choices[0].message.content

prompt = prompt_template("IN FACT I HAVE TO SAY THAT THIS WAS ONE OF THOSE OBNOXIOUS MAIN CHARACTERS IVE SEEN A LONG TIME", -3.0, 3.0)
print(prompt)
base64Frames = encode_frames(video_path, frames_num=3)
score = GPT_4o_predictor(base64Frames, prompt)
print(f"Extracted score: {score}")

In [None]:
def predict_GPT4o(data_csv_path, video_root, dataSet_name):

    output_csv_path = f"./{dataSet_name}_test_with_scores.csv"

    if 'sims' in dataSet_name:
        min_score = -1.0
        max_score = 1.0
    else:
        min_score = -3.0
        max_score = 3.0

    data = pd.read_csv(data_csv_path)


    sample_pbar = tqdm(data.iterrows(), total=len(data), desc=f"Processing {dataSet_name} samples")
    for index, row in sample_pbar:
        video_path = os.path.join(video_root, row['mp4_path'])
        subtitle = row['text_en'] if 'sims' in dataSet_name else row['text']
        base64Frames = encode_frames(video_path, frames_num=3)

        question = prompt_template(subtitle, min_score, max_score)
        # print(question)
        
        try:
            response = GPT_4o_predictor(base64Frames, question)
        except Exception as e:
            response = "0.0"
            
        score = extract_score_from_text(response)

        sample_pbar.set_postfix({"index": index, "score": score})

        data.at[index, 'GPT-4o'] = score

        if index % 100 == 0:
            data.to_csv(output_csv_path, index=False)
            

    data.to_csv(output_csv_path, index=False)
    print(f"Results saved to {output_csv_path}")

    return data

In [None]:
sims_data = predict_GPT4o('./sims_test_with_scores.csv', '../MSA_Datasets/SIMS/Raw', 'sims')
sims_data

In [None]:
import torch
sims_data = pd.read_csv('./sims_test_with_scores.csv')
print("Number of samples: ", len(sims_data))
zero_count = (sims_data['GPT-4o'] == 0.0).sum()
print(f"Number of samples with GPT-4o score 0.0: {zero_count}")

metrics = MetricsTop().getMetics('SIMS')
pred = torch.tensor(sims_data['GPT-4o'].values)
true = torch.tensor(sims_data['label'].values)

test_results = metrics(pred, true)
print(test_results)

In [None]:
mosi_data = predict_GPT4o('./mosi_test_with_scores.csv', '../MSA_Datasets/MOSI/Raw', 'mosi')
mosi_data

In [None]:
import torch
mosi_data = pd.read_csv('./mosi_test_with_scores.csv')
print("Number of samples: ", len(mosi_data))
zero_count = (mosi_data['GPT-4o'] == 0.0).sum()
print(f"Number of samples with GPT-4o score 0.0: {zero_count}")
metrics = MetricsTop().getMetics('MOSI')
pred = torch.tensor(mosi_data['GPT-4o'].values)
true = torch.tensor(mosi_data['label'].values)
test_results = metrics(pred, true)
print(test_results)

In [None]:
sims2_data = predict_GPT4o('./sims2_test_with_scores.csv', '../MSA_Datasets/SIMS2/Raw', 'sims2')
sims2_data

In [None]:
import torch
sims2_data = pd.read_csv('./sims2_test_with_scores.csv')
print("Number of samples: ", len(sims2_data))
zero_count = (sims2_data['GPT-4o'] == 0.0).sum()
print(f"Number of samples with GPT-4o score 0.0: {zero_count}")
metrics = MetricsTop().getMetics('SIMS2')
pred = torch.tensor(sims2_data['GPT-4o'].values)
true = torch.tensor(sims2_data['label'].values)
test_results = metrics(pred, true)
print(test_results)

In [None]:
mosei_data = predict_GPT4o('./mosei_test_with_scores.csv', '../MSA_Datasets/MOSEI/Raw', 'mosei')
mosei_data