In [None]:
from FlagEmbedding.visual.modeling import Visualized_BGE
import torch
from PIL import Image
from tqdm import tqdm

from typing import List
from io import BytesIO
from PIL import Image
import requests
import pickle
import json
import pandas as pd

def load(f):
    def load_pkl(pth):
        return pickle.load(open(pth, 'rb'))

    def load_json(pth):
        return json.load(open(pth, 'r', encoding='utf-8'))

    def load_jsonl(f):
        lines = open(f, encoding='utf-8').readlines()
        lines = [x.strip() for x in lines]
        if lines[-1] == '':
            lines = lines[:-1]
        data = [json.loads(x) for x in lines]
        return data

    def load_xlsx(f):
        return pd.read_excel(f)

    def load_csv(f):
        return pd.read_csv(f)

    def load_tsv(f):
        return pd.read_csv(f, sep='\t')

    handlers = dict(pkl=load_pkl, json=load_json, jsonl=load_jsonl, xlsx=load_xlsx, csv=load_csv, tsv=load_tsv)
    suffix = f.split('.')[-1]
    return handlers[suffix](f)

In [None]:
def eval_single_choice(benchmark_data, benchmark_root_path, model):
        
    correct = 0
    results = []

    for _, row in tqdm(benchmark_data.iterrows(), desc=f'Evaluating'):
        images_list = []
        # Iterate over each column in the row
        for column_name in benchmark_data.columns:
            # Check if the column name starts with 'img'
            if column_name.startswith('img') and not column_name.startswith('url') and not pd.isna(row[column_name]):
                images_list.append(benchmark_root_path + '/' + row[column_name])

        if pd.isna(row['below_content']): row['below_content'] = ''
        if pd.isna(row['above_content']): row['above_content'] = ''
        inputs = {
            'above_content': row['above_content'],
            'below_content': row['below_content'],
            'images': images_list,
        }

        sim_answer_list = []
        try:
            for img in images_list:
                img_emb = model.encode(image=img)
                txt_emb = model.encode(text=inputs['above_content'] + inputs['below_content'])
                sim_answer = img_emb @ txt_emb.T
                sim_answer_list.append(sim_answer.item())
            
            model_answer = chr(sim_answer_list.index(max(sim_answer_list)) + 65)
        except: 
            model_answer = 'ERROR!'
        true_label = row['Answer']
        
        # Check if the model's answer is correct
        if model_answer == true_label:
            correct += 1
        
        # Append the results to the list
        result_row = row.to_dict()
        result_row['sim_answer_list'] = sim_answer_list
        result_row['model_answer'] = model_answer
        results.append(result_row)

    # Calculate the accuracy
    accuracy = correct / len(benchmark_data)

    # Create a DataFrame from the results
    results_df = pd.DataFrame(results)

    # Convert the DataFrame to JSON format
    results_json = results_df.to_dict(orient='records')
    
    # Combine accuracy and results into one dictionary
    output = {
        'accuracy': accuracy,
        'results': results_json
    }
    
    return output


In [None]:
from tqdm import tqdm
import re
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def eval_flow_insert(benchmark_data, benchmark_root_path, model, similarity_threshold=0.43):

    def compute_similarity(paragraph, image_path):
        img_emb = model.encode(image=image_path)
        txt_emb = model.encode(text=paragraph)
        similarity = img_emb @ txt_emb.T
        return similarity.item()

    # Initialize counters for each evaluation condition
    correct_matches = 0
    total_matches = 0
    correct_image_matches = 0
    total_image_matches = 0
    correct_blank_matches = 0
    total_blank_matches = 0

    all_true_labels = []
    all_model_answers = []
    news_item_results = []

    for content in tqdm(benchmark_data):
        image_database = content['imagedatabase']

        for news_item in tqdm(content['news_text'], desc="News Items"):
            selected_images = set()
            paragraph_content = news_item['content']
            model_answer = ["" for _ in range(len(paragraph_content))]
            true_label = news_item['groundtruth']
            inputs_paragraphs = ''
            news_id = news_item['id']

            # 遍历每一个段落
            for index_paragraph, paragraph in enumerate(paragraph_content):
                # inputs_paragraphs += '\n' + paragraph
                inputs_paragraphs += paragraph
                highest_similarity = -float('inf')
                best_image_idx = None

                # 遍历每一张图片
                for index_img, img in enumerate(image_database):
                    if index_img in selected_images:
                        continue
                    image_path = benchmark_root_path + '/' + img[0]

                    # 计算相似度
                    similarity = compute_similarity(inputs_paragraphs, image_path)

                    if similarity > highest_similarity:
                        highest_similarity = similarity
                        best_image_idx = index_img

                if best_image_idx is not None and highest_similarity > similarity_threshold:
                    selected_images.add(best_image_idx)
                    model_answer[index_paragraph] = image_database[best_image_idx][0]

            all_true_labels.extend(true_label)
            all_model_answers.extend(model_answer)

            for model_answer_item, true_label_item in zip(model_answer, true_label):
                # Calculate for all matches
                if model_answer_item == true_label_item:
                    correct_matches += 1
                total_matches += 1

                # Calculate only for image names
                if true_label_item != "":
                    if model_answer_item == true_label_item:
                        correct_image_matches += 1
                    total_image_matches += 1

                # Calculate only for ""
                if true_label_item == "":
                    if model_answer_item == true_label_item:
                        correct_blank_matches += 1
                    total_blank_matches += 1

            # 保存news_item的结果
            news_item_results.append({
                'id': news_id,
                'true_label': true_label,
                'model_answer': model_answer
            })

    # 计算性能指标
    performance_metrics = {
        'overall_accuracy': correct_matches / total_matches if total_matches > 0 else 0,
        'image_only_accuracy': correct_image_matches / total_image_matches if total_image_matches > 0 else 0,
        'blank_only_accuracy': correct_blank_matches / total_blank_matches if total_blank_matches > 0 else 0
    }

    # 返回结果字典和性能指标
    return {
        'performance_metrics': performance_metrics,
        'news_item_results': news_item_results,
    }


In [None]:
import os
import random
import pandas as pd
import json

def main(
    model_name: str,
    model_path: str,
    dataset_path: str,
    results_dir: str,
    img_path = None,
    eval_mode: str='single_choice',
    seed = 42,
    similarity_threshold=0.43
):
    random.seed(seed)
    # 加载benchmark数据
    benchmark_data = load(dataset_path)
    benchmark_root_path = os.path.dirname(os.path.abspath(dataset_path)) if img_path is None else img_path

    # 初始化模型
    if model_name == "random":
        model = None
    else:
        model = Visualized_BGE(model_name_bge=model_name, model_weight=model_path).cuda()

    # 确保结果目录存在
    os.makedirs(results_dir, exist_ok=True)

    # 调用验证函数进行验证
    if eval_mode.startswith('single_choice'):
        with torch.no_grad():
            results_dict = eval_single_choice(benchmark_data, benchmark_root_path, model)
        
        json_path = os.path.join(results_dir, 'results.json')
        with open(json_path, 'w') as f:
            json.dump(results_dict, f, ensure_ascii=False, indent=4)
        
        del model
        torch.cuda.empty_cache()
        return results_dict
    
    elif eval_mode.startswith('flow_insert'):
        with torch.no_grad():
            results_dict = eval_flow_insert(benchmark_data, benchmark_root_path, model, similarity_threshold=similarity_threshold)
        
        # 保存结果为JSON文件
        json_path = os.path.join(results_dir, 'results.json')
        with open(json_path, 'w') as f:
            json.dump(results_dict, f, ensure_ascii=False, indent=4)

        del model
        torch.cuda.empty_cache()
        return results_dict
    
    elif eval_mode.startswith('fake_news'):
        results_df = eval_fake_news(benchmark_data[:500], benchmark_root_path, model)

        # 保存结果为CSV文件
        csv_path = os.path.join(results_dir, 'results.csv')
        results_df.to_csv(csv_path, index=False)

        del model
        torch.cuda.empty_cache()
        return results_df
        
    else:
        raise NotImplementedError

In [None]:
eval_mode_list = [
    'single_choice_1_cn',
    'single_choice_2_cn',
    'single_choice_3_cn',
    'single_choice_4_cn',
    'single_choice_1_en',
    'single_choice_2_en',
    'single_choice_3_en',
    'single_choice_4_en',
]

for eval_mode in eval_mode_list:
    model_name = "BAAI/bge-m3"
    model_path = "/PATH_TO_YOURS/models/BAAI/bge-visualized/Visualized_m3.pth"
    dataset_path = f'/PATH_TO_YOURS_ftii_data/newsinsertbench_{eval_mode}.tsv'
    if eval_mode.split('_')[-1] == 'cn':
        img_path = '/PATH_TO_YOURS/images/NewsImages_cn_jpg'
    else: img_path = '/PATH_TO_YOURS/images'

    results_dir = '/PATH_TO_YOURS/results/' + eval_mode + '/' + model_path.split('/')[-1]
    results_dict = main(model_name = model_name,
                        model_path = model_path,
                        dataset_path = dataset_path,
                        results_dir = results_dir,
                        img_path = img_path,
                        eval_mode = eval_mode)

In [None]:
eval_mode_list = [
    'single_choice_1_cn',
    'single_choice_2_cn',
    'single_choice_3_cn',
    'single_choice_4_cn',
    'single_choice_1_en',
    'single_choice_2_en',
    'single_choice_3_en',
    'single_choice_4_en',
]

for eval_mode in eval_mode_list:
    model_name = "BAAI/bge-base-en-v1.5"
    model_path = "/PATH_TO_YOURS/models/BAAI/bge-visualized/Visualized_base_en_v1.5.pth"
    dataset_path = f'/PATH_TO_YOURS_ftii_data/newsinsertbench_{eval_mode}.tsv'
    if eval_mode.split('_')[-1] == 'cn':
        img_path = '/PATH_TO_YOURS/images/NewsImages_cn_jpg'
    else: img_path = '/PATH_TO_YOURS/images'

    results_dir = '/PATH_TO_YOURS/results/' + eval_mode + '/' + model_path.split('/')[-1]
    results_dict = main(model_name = model_name,
                        model_path = model_path,
                        dataset_path = dataset_path,
                        results_dir = results_dir,
                        img_path = img_path,
                        eval_mode = eval_mode)

In [None]:
eval_mode_list = [
    'flow_insert_1_cn',
    'flow_insert_2_cn',
    'flow_insert_3_cn',
    'flow_insert_1_en',
    'flow_insert_2_en',
    'flow_insert_3_en',
]

for eval_mode in eval_mode_list:
    model_name = "BAAI/bge-m3"
    model_path = "/PATH_TO_YOURS/models/BAAI/bge-visualized/Visualized_m3.pth"
    dataset_path = f'/PATH_TO_YOURS_ftii_data/newsinsertbench_{eval_mode}.json'
    if eval_mode.split('_')[-1] == 'cn':
        img_path = '/PATH_TO_YOURS/images/NewsImages_cn_jpg'
    else: img_path = '/PATH_TO_YOURS/images'

    results_dir = '/PATH_TO_YOURS/results/' + eval_mode + '/' + model_path.split('/')[-1]
    results_dict = main(model_name = model_name,
                        model_path = model_path,
                        dataset_path = dataset_path,
                        results_dir = results_dir,
                        img_path = img_path,
                        eval_mode = eval_mode)

In [None]:
eval_mode_list = [
    'flow_insert_1_cn',
    'flow_insert_2_cn',
    'flow_insert_3_cn',
    'flow_insert_1_en',
    'flow_insert_2_en',
    'flow_insert_3_en',
]

for eval_mode in eval_mode_list:
    model_name = "BAAI/bge-base-en-v1.5"
    model_path = "/PATH_TO_YOURS/models/BAAI/bge-visualized/Visualized_base_en_v1.5.pth"
    dataset_path = f'/PATH_TO_YOURS_ftii_data/newsinsertbench_{eval_mode}.tsv'
    if eval_mode.split('_')[-1] == 'cn':
        img_path = '/PATH_TO_YOURS/images/NewsImages_cn_jpg'
    else: img_path = '/PATH_TO_YOURS/images'

    results_dir = '/PATH_TO_YOURS/results/' + eval_mode + '/' + model_path.split('/')[-1]
    results_dict = main(model_name = model_name,
                        model_path = model_path,
                        dataset_path = dataset_path,
                        results_dir = results_dir,
                        img_path = img_path,
                        eval_mode = eval_mode)