In [9]:
import json
from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib
import time
from tqdm.auto import tqdm

In [10]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0' 

In [11]:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from sentence_transformers import SentenceTransformer, util

# 加载NLI模型和tokenizer，指定在第0块GPU上运行
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# model_name = "../../models/xlm-roberta-large-xnli"
model_name = "../../models/mDeBERTa-v3-base-mnli-xnli"
nli_model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device)
nli_tokenizer = AutoTokenizer.from_pretrained(model_name)

In [12]:
def nli_judge(sentence_list, candidate_lists, batch_size=32):
    """
    NLI任务的推理函数，输入句子列表和候选句子列表，返回每个句子与候选句子列表的推理判断。
    
    参数：
    - sentence_list: List[str]，基准句子列表。
    - candidate_lists: List[List[str]]，每个基准句子对应的候选句子列表。
    - batch_size: int，批量大小。
    
    返回：
    - List[List[str]]，每个基准句子与候选句子的推理判断结果（entailment、neutral、contradiction）。
    """
    labels = ["contradiction", "neutral","entailment"] if 'mDeBERTa' not in model_name else ["entailment", "neutral","contradiction"]
    
    # 将输入拉平成两个列表
    flattened_sentences = []
    flattened_candidates = []
    sentence_lengths = []

    for sentence, candidates in zip(sentence_list, candidate_lists):
        flattened_sentences.extend([sentence] * len(candidates))
        flattened_candidates.extend(candidates)
        sentence_lengths.append(len(candidates))

    # 批量计算NLI推理结果
    all_results = []
    all_soft_results = []
    for i in range(0, len(flattened_sentences), batch_size):
        batch_sentences = flattened_sentences[i:i + batch_size]
        batch_candidates = flattened_candidates[i:i + batch_size]

        encoded_input = nli_tokenizer(batch_sentences, batch_candidates,
                                      return_tensors="pt", padding=True, truncation=True).to(device)
        with torch.no_grad():
            outputs = nli_model(**encoded_input)
            soft_outputs = torch.softmax(outputs.logits, dim=-1).cpu().numpy()
            predictions = torch.argmax(outputs.logits, dim=-1).cpu().numpy()
            all_results.extend([labels[pred] for pred in predictions])
            soft_outputs = soft_outputs.tolist()
            all_soft_results.extend([soft_pred[labels.index('entailment')] -soft_pred[labels.index('contradiction')] for soft_pred in soft_outputs])
            # all_soft_results.extend(soft_outputs.tolist())

    # 将结果映射回原始结构
    mapped_results, mapped_soft_results = [], []
    idx = 0
    for length in sentence_lengths:
        mapped_results.append(all_results[idx:idx + length])
        mapped_soft_results.append(all_soft_results[idx:idx+length])
        idx += length

    return mapped_results, mapped_soft_results


def sts_judge(sentence_list, candidate_lists, batch_size=32):
    """
    STS任务的相似度计算函数，输入句子列表和候选句子列表，返回相似度分数列表。
    
    参数：
    - sentence_list: List[str]，基准句子列表。
    - candidate_lists: List[List[str]]，每个基准句子对应的候选句子列表。
    - batch_size: int，批量大小。
    
    返回：
    - List[List[float]]，每个基准句子与候选句子的相似度分数列表。
    """
    # 将输入拉平成两个列表
    flattened_sentences = []
    flattened_candidates = []
    sentence_lengths = []

    for sentence, candidates in zip(sentence_list, candidate_lists):
        flattened_sentences.extend([sentence] * len(candidates))
        flattened_candidates.extend(candidates)
        sentence_lengths.append(len(candidates))

    # 批量计算相似度
    all_similarities = []
    for i in range(0, len(flattened_sentences), batch_size):
        batch_sentences = flattened_sentences[i:i + batch_size]
        batch_candidates = flattened_candidates[i:i + batch_size]

        # 编码基准句子和候选句子
        sentence_embeddings = sts_model.encode(batch_sentences, convert_to_tensor=True).to(device)
        candidate_embeddings = sts_model.encode(batch_candidates, convert_to_tensor=True).to(device)

        # 计算余弦相似度
        similarities = util.cos_sim(sentence_embeddings, candidate_embeddings).diagonal().cpu().numpy()
        all_similarities.extend(similarities.tolist())
    

    # 将结果映射回原始结构
    mapped_results = []
    idx = 0
    for length in sentence_lengths:
        mapped_results.append(all_similarities[idx:idx + length])
        idx += length

    return mapped_results

import numpy as np

def sigmoid(x, t=1.0):
    x = np.array(x) / t 
    return 1 / (1 + np.exp(-x))


In [13]:
def calculate_nli_points_score(response_dict, points):
    """
    输入一个包含多个模型 response 的字典，以及 points 列表。
    返回每个模型对应的 NLI 得分。
    """
    model_names = list(response_dict.keys())
    responses = list(response_dict.values())

    # 批量处理所有模型 response 与 points 的蕴含关系
    nli_results, nli_soft_results = nli_judge(responses, [points] * len(responses))
    
    # 映射回模型名称，并计算得分
    scores = {
        model_name: (sum(1 for result in nli_result if result == "entailment") - sum(1 for result in nli_result if result == "contradiction")) / len(points)
        for model_name, nli_result in zip(model_names, nli_results)
    }

    soft_scores = {
        model_name: sum(nli_soft_results) / len(points)
        for model_name, nli_soft_results in zip(model_names, nli_soft_results)
    }
    
    return scores, soft_scores


# 计算 examples 的 STS 得分
def calculate_sts_example_score(response_dict, examples):
    """
    输入一个包含多个模型 response 的字典，以及 examples 字典。
    返回每个模型对应的 STS 得分。
    """
    scores = {}
    example_texts = list(examples.keys())

    for model_name, response in response_dict.items():
        # 使用 STS 判断 response 和每个 example 的相似度
        sts_scores = sts_judge([response], [example_texts])
        print(f"Model: {model_name}")
        print("Response:", response)
        print("Examples:", example_texts)
        print("STS Scores:", sts_scores)

        # 找到最高相似度的 example
        max_score_index = sts_scores[0].index(max(sts_scores[0]))
        # 返回该 example 对应的分数
        scores[model_name] = examples[example_texts[max_score_index]] / max(examples.values())
    
    return scores

def calculate_critic_similarity_weight(critic, answer_critic):
    """
    计算每个模型 critic 与 answer_critic 的相似度并生成加权得分。
    
    参数：
    - critic: dict，每个模型名称对应的列表，包含对模型回复的评价。
    - answer_critic: List[str]，标准答案的评价列表。
    
    返回：
    - dict，每个模型的平均加权评分。
    """
    scores = {}

    # 遍历每个模型的 critic
    for model_name, model_critic in critic.items():
        max_similarities = []

        # 对于每个 critic 项，计算与 answer_critic 每条评价的相似度，取最大值
        for critic_item in model_critic:
            sts_results = sts_judge([critic_item], [answer_critic])
            # print(sts_results)
            max_similarity = max(sts_results[0])  # 取与 answer_critic 最相似的值
            max_similarities.append(1.0-max_similarity)

        # 计算 softmax 权重，确保和为1
        # weights = sigmoid(max_similarities)
        weights = np.clip(max_similarities,0.,2.)

        # 计算加权评分
        total_weighted_score = sum(-1 * weight for weight in weights)
        
        # 平均得分
        scores[model_name] = total_weighted_score / len(model_critic)
    
    return scores

In [14]:
import json
from tqdm.auto import tqdm
samples = []
with open("../../data/align/fact/align_points.json", 'r', encoding="utf-8-sig") as file:
    data = json.loads(file.read())
    samples = data

In [None]:
# 计算 NLI points 得分
for item in tqdm(samples):
    response = item['response']
    points = item['points']
    points_score, points_soft_score = calculate_nli_points_score(response, points)
    item['points_score'] = points_soft_score

with open("../../data/align/fact/align_points_"+model_name.split('/')[-1]+".json", "w", encoding="utf-8") as f:
    json.dump(samples, f, ensure_ascii=False, indent=4)