In [None]:
from sentence_transformers import SentenceTransformer, util
from bertopic import BERTopic
from sklearn.datasets import fetch_20newsgroups
import pandas as pd
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords
import nltk
import re
import spacy
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
import json
from FlagEmbedding import BGEM3FlagModel


plt.rcParams['figure.dpi'] = 900
# 将X/Y轴的刻度线方向设置向内
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'
# 设置字体
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.family'] = 'Microsoft Yahei'
# 设置公式字体
config = {
    "mathtext.fontset": 'stix',
}
rcParams.update(config)

# 通过正则表达式匹配替换内容和语句
def get_match_replaced(text):
    flag = True
    replaced_content, replaced_sentence = '', ''
    # v2.0 prompt
    pattern = r'A: Replaced content is <(.*?)>, Replaced sentence is <(.*?)>'
    # 2.1 prompt
    # pattern = r'A: Replaced content is <<<(.*?)>>>, Replaced sentence is <<<(.*?)>>>'
    # flags=re.DOTALL 可以匹配任意字符，不会因为换行符造成错误
    matches = list(re.finditer(pattern, text, flags=re.DOTALL))
    if len(list(matches)) == 1:
        for match in matches:
            replaced_content = match.group(1)
            replaced_sentence = match.group(2)
    else:
        flag = False
    return flag, (replaced_content, replaced_sentence)


def get_similarity_scores_BGEM2(s1, s2, model):
    embedding_1 = model.encode([s1])['dense_vecs']
    embedding_2 = model.encode([s2])['dense_vecs']
    # score = embedding_1 @ embedding_2.T
    score = util.cos_sim(embedding_1, embedding_2).item()
    return score

def get_similarity_scores_AllMpnetBaseV2(s1, s2, model):
    embedding_1 = model.encode(s1, convert_to_tensor=True)
    embedding_2 = model.encode(s2, convert_to_tensor=True)
    score = util.cos_sim(embedding_1, embedding_2).item()
    return score

In [None]:
model_bgem3 = BGEM3FlagModel('data/llm/sbert/model/bge-m3', use_fp16=True)
model_allmpnet = SentenceTransformer('./data/llm/sbert/model/all-mpnet-base-v2')

In [None]:
s1 = "In a word, Photography deals with what the philosopher Descartes called ‘extension’. ‘Extension’ is an attribute of things (substances) made of matter: in the philosophers own words, res extensa. Primarily, Photography, as a visual art, explores the physical side of the World, the surface of what can be caught by our eyes."
s2 = "In a word, Photography deals with what the philosopher Descartes called ‘extension’. ‘Extension’ is an attribute of things (substances) made of matter: in the philosopher's own words, res extensa. Primarily, Photography, as a visual art, explores the physical side of the World, the surface of what can be caught by our eyes."

s1 = 'I love you'
s2 = 'I like you'

print(get_similarity_scores_AllMpnetBaseV2(s1, s2, model_allmpnet))
print(get_similarity_scores_BGEM2(s1, s2, model_bgem3))

In [None]:
from tqdm import tqdm


model_name = 'llama2'
# filepath = 'data/llm/'+ model_name +'-output/LE-dataset v2.0-150-output.json'
filepath = 'data/llm2.1.2/'+ model_name +'-output/LE-dataset v2.1.2-150.json'

print(filepath)
llmed_list = []
with open(filepath, 'r', encoding='utf-8') as file:
    llmed_list = json.load(file)
print(len(llmed_list))
# 使用tqdm创建进度条，并设置总任务数
progress_bar = tqdm(total=len(llmed_list), desc='Progress', unit='task')

# 保存包含相似度的结果
results = []
N, n, m = 0, 0, 0
for i, llmed_ in enumerate(llmed_list):
    # if i >= 10000:
    #     break
    # 进度条加 1
    progress_bar.update(1)
    
    phrase_ = llmed_['Latin phrase']
    local_sentence = llmed_['local sentence']
    llmed_response = llmed_['llmed response']
        
     
    if llmed_response[:8] == 'Replaced':
        llmed_response = 'A: ' + llmed_response
        # print(llmed_response[:11])
    
    if llmed_response[:11] == 'A: Replaced':
        N += 1
    elif llmed_response[:8] == 'Replaced':
        n += 1
    else:
        m += 1    
        
    
    # 比较两个句子是否符合要求
    flag, (llmed_content, llmed_sentence) = get_match_replaced(llmed_response)
    
    # if not flag:
    #     print(i, '----------------------------')
    #     print(local_sentence)
    #     print(llmed_response)
    #     print(llmed_content, llmed_sentence)
    if flag:
        # 编码语句
        score = get_similarity_scores_BGEM2(local_sentence, llmed_sentence, model_bgem3)
        results.append({'Latin phrase': phrase_, 'local sentence': local_sentence, 'llmed response':llmed_sentence, 'similarity score': score})
    else:
        continue

print(N, n, m, n+N+m)
# # 关闭进度条
progress_bar.close()    

print(len(results))
# 保存相似度数据
# v1.1 是指修改了匹配方法后的数据
# out_file_path = 'data/llm/sbert/similarity-out/bge-m3/LE-dataset v2.0-150-'+ model_name +'-output(similarity) v1.1.json'
out_file_path = 'data/llm2.1.2/sbert/similarity-out/bge-m3/LE-dataset v2.0-150-'+ model_name +'-output(similarity) v2.1.2.json'

with open(out_file_path, 'w', encoding='utf-8') as file:
    json.dump(results, file, indent=2)
file.close()

#### 合并图例

In [None]:
import json
import random
import numpy as np


# 计算相似度
def get_similarity_scores(model_name):
    # filepath = './data/llm/'+ model_name +'-output/LE-dataset v2.0-150-output.json'
    filepath = 'data/llm/sbert/similarity-out/bge-m3/LE-dataset v2.0-150-'+ model_name +'-output(similarity) v1.1.json'

    with open(filepath, 'r', encoding='utf-8') as file:
        llmed_list = json.load(file)
    file.close()
    # 计算每两条语句的相似度
    similarity_scores = []
    for i, llmed_ in enumerate(llmed_list):
        score = llmed_['similarity score']
        similarity_scores.append(score)
    return similarity_scores


def plot_radar_map(ax, model, similarity_scores, N):
    print('Plot radar map ' + model +'...')
    
    accuracy = sum(1 for ele in similarity_scores if 0.999>ele>=0.9) / len(similarity_scores)
    ax.text(-1.55, 0.55, 'ACC ('+ model.capitalize() +') = ' + "{:.1f}%".format(accuracy * 100), ha='center', va='center', color='black', fontsize=14)
    print(accuracy)
    # 相较于 1 的偏离程度（平均绝对偏差）
    mean_abs_deviation = np.mean(np.abs(np.array(similarity_scores) - 1))
    print(mean_abs_deviation)
    ax.text(-1.55, 0.48, 'MAD = ' + "{:.3f}".format(mean_abs_deviation), ha='center', va='center', color='black', fontsize=14)
    
    # 随机抽取样本量用于展示
    similarity_scores = random.sample(similarity_scores, N)
    categories = list(range(0, len(similarity_scores)))
    values = similarity_scores

    # 角度
    angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist()
    angles += angles[:1]  # 闭合图形
    # 极坐标下的点位置
    theta = angles
    r = values + values[:1]  # 添加额外的值，使长度匹配
    colors = ['#6671b5', '#f79779']
    # ax.scatter(theta, r, s=50, color=colors[1], alpha=0.6)
    ax.scatter(theta, r, marker='x', s=15, linewidth=1, color=colors[1], alpha=0.9)  

    # 设置角度刻度
    ax.set_xticks([])
    ax.set_xticklabels([])
    # ax.set_ylim(np.min(values), np.max(values))
    ax.set_ylim(0.6, 1)
    # 反转 y 轴数值
    ax.invert_yaxis()
    # 设置圆圈的刻度
    ax.set_yticks([0.6, 0.7, 0.8, 0.9])
    # 设置刻度数值的大小和颜色
    ax.tick_params(axis='y', labelsize=12, colors='black')
    # 控制圆圈的粗细和颜色
    ax.grid(True, color=colors[0], linewidth=1, alpha=0.6)
    # 不显示最外面圆圈的颜色
    # ax.spines['polar'].set_visible(False)
    ax.spines['polar'].set_linewidth(1.5)  # 设置最外圈的粗细为 2.0
    ax.spines['polar'].set_color(colors[0])  # 设置最外圈的颜色为蓝色

#### 计算Score gap

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
import seaborn as sns
from scipy import stats
import statistics
from matplotlib.patches import FancyArrow
import json
import textstat
import re


plt.rcParams['figure.dpi'] = 900

# 将X/Y轴的刻度线方向设置向内
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'

# 设置字体
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.family'] = 'Microsoft Yahei'

# 设置公式字体
config = {
    "mathtext.fontset": 'stix',
}
rcParams.update(config)

    
def appear_times_for_single(data, phrase):
    times = 0
    for i,ele in enumerate(data):
        if phrase in list(ele['phrase'].keys()):
            times += ele['phrase'][phrase]
    return times    


def count_bins(number):
    n_string = '{:.15f}'.format(number)
    decimal_part = str(n_string).split('.')[1]  # 将小数部分转换为字符串并分割
    count = 0
    for digit in decimal_part:
        if digit == '0':
            count += 1
        else:
            break  # 遇到第一个非零数字就停止计数
    return count + 1


# Michel 2007 计算频区间的中心值
def get_center_value(freqs):
    maxi = np.log10(max(freqs))
    minu = np.log10(min(freqs))
    center = (maxi + minu) / 2  # logarithmic mean 对数均值
    return 10**center


# 计算置信区间
def get_confidence_interval(data, confidence_level):
    n = len(data)  # 样本大小
    mean = np.mean(data)  # 样本均值
    std_err = stats.sem(data)  # 样本标准误差
    # 使用 t 分布计算置信区间（适用于样本较小的情况）
    t_value = stats.t.ppf((1 + confidence_level) / 2, df=n-1)  # t 分布临界值
    margin_of_error = t_value * std_err  # 误差范围
    confidence_interval = (mean - margin_of_error, mean + margin_of_error)  # 置信区间
    return confidence_interval


def search(data, phrase):
    results = []
    for ele in data:
        if phrase == ele['Latin phrase']:
            results.append(ele)
    return results
            
            
# 获取拉丁文本的音节数量
def get_latin_syllable(latin_):
    # 拉丁单词的音节数量为 元音+双元音 的数量
    pattern = re.compile('(ae|au|ei|eu|oe|ui|[aeiouy])')
    count = len(pattern.findall(latin_))
    return count


# 获取 Flesch kincaid grade
def get_flesch_kincaid_grade(latin_phrase, mixed_, llmed_):
    latin_syllable = get_latin_syllable(latin_phrase)
    total_syllable = textstat.syllable_count(mixed_.replace(latin_phrase, '')) + latin_syllable
    total_words = textstat.lexicon_count(mixed_, removepunct=True)
    total_sentences = textstat.sentence_count(mixed_)
    mixed_score = 0.39 * (total_words / total_sentences) + 11.8 * (total_syllable / total_words) - 15.59
    # print(total_sentences, total_words, total_syllable)
    llmed_score = textstat.flesch_kincaid_grade(llmed_)
    rate = (mixed_score - llmed_score) / (((mixed_score + llmed_score)) / 2)
    
    return mixed_score, llmed_score, rate


def get_flesch_reading_ease(latin_phrase, mixed_, llmed_):
    # 计算ASL（平均句子长度）
    sentence_count = textstat.sentence_count(mixed_)  # 获取句子数
    word_count = textstat.lexicon_count(mixed_)  # 获取单词数
    asl = word_count / sentence_count
    # 计算ASW（平均每个单词的音节数）
    latin_syllable = get_latin_syllable(latin_phrase)
    total_syllable = textstat.syllable_count(mixed_.replace(latin_phrase, '')) + latin_syllable
    asw = total_syllable / word_count
    mixed_score = 206.835 - (1.015 * asl) - (84.6 * asw)
    llmed_score = textstat.flesch_reading_ease(llmed_)
    rate = (mixed_score - llmed_score) / (((mixed_score + llmed_score)) / 2)
    
    return mixed_score, llmed_score, rate


def get_smog_index(latin_phrase, mixed_, llmed_):
    # 多音节单词数量
    polysyllable_count = 0
    for word in latin_phrase.split():
        if get_latin_syllable(word) >= 3:
            polysyllable_count += 1
    for word in mixed_.replace(latin_phrase, '').split():
        syllable_count = textstat.syllable_count(word)
        if syllable_count >= 3:
            polysyllable_count += 1
    mixed_score = 1.043 * np.sqrt(polysyllable_count * (30 / textstat.sentence_count(mixed_))) + 3.1291
    llmed_score = textstat.smog_index(llmed_)
    rate = (mixed_score - llmed_score) / (((mixed_score + llmed_score)) / 2)
    
    return mixed_score, llmed_score, rate


def get_coleman_liau_index(latin_phrase, mixed_, llmed_):
    mixed_score = textstat.coleman_liau_index(mixed_)
    llmed_score = textstat.coleman_liau_index(llmed_)
    if mixed_score == 0 and llmed_score == 0:
        return 0, 0, 0
    else:
        rate = (mixed_score - llmed_score) / (((mixed_score + llmed_score)) / 2)
        
        return mixed_score, llmed_score, rate


def get_gunning_fog_index(latin_phrase, mixed_, llmed_):
    # 多音节单词数量
    polysyllable_count = 0
    for word in latin_phrase.split():
        if get_latin_syllable(word) >= 3:
            polysyllable_count += 1
    for word in mixed_.replace(latin_phrase, '').split():
        syllable_count = textstat.syllable_count(word)
        if syllable_count >= 3:
            polysyllable_count += 1
    total_words = textstat.lexicon_count(mixed_, removepunct=True)
    total_sentences = textstat.sentence_count(mixed_)
    complex_words = polysyllable_count
    mixed_score = 0.4 * ( (total_words / total_sentences) + 100 * (complex_words / total_words) )
    llmed_score = textstat.gunning_fog(llmed_)
    rate = (mixed_score - llmed_score) / (((mixed_score + llmed_score)) / 2)
    
    return mixed_score, llmed_score, rate

In [None]:
def read_llmed(model):
    filepath = 'data/llm2.1.2/sbert/similarity-out/bge-m3/LE-dataset v2.0-150-'+ model +'-output(similarity) v2.1.2.json'
    llmed_list = []
    with open(filepath, 'r', encoding='utf-8') as file:
        llmed_list = json.load(file)
    file.close()
    return llmed_list


def get_score_dict(llmed_list, calculate_index):
    j = 0
    n = 0
    score_dict = {}
    for i, ele in enumerate(llmed_list): 
        # <<<<<<使用带有相似度数据集>>>>>>
        latin_phrase = ele['Latin phrase']
        local_sentence = ele['local sentence']
        llmed_response = ele['llmed response']
        score = ele['similarity score']
        
        llmed_sentence = ''
        # if 0.9999999 > score >= 0.9:
        if 0.999 > score >= 0.9:
            llmed_sentence = llmed_response
            n += 1
        else:
            continue
        
        # 选择评价指标
        s1, s2, rate = 0, 0, 0
        if calculate_index == 'FKGL':
            s1, s2, rate = get_flesch_kincaid_grade(latin_phrase, local_sentence, llmed_sentence)
        elif calculate_index == 'FRE':
            s1, s2, rate = get_flesch_reading_ease(latin_phrase, local_sentence, llmed_sentence)
        elif calculate_index == 'GFI':
            s1, s2, rate = get_gunning_fog_index(latin_phrase, local_sentence, llmed_sentence)
        elif calculate_index == 'SMOG':
            s1, s2, rate = get_smog_index(latin_phrase, local_sentence, llmed_sentence)
        elif calculate_index == 'CLI':
            s1, s2, rate = get_coleman_liau_index(latin_phrase, local_sentence, llmed_sentence)

        if s1 - s2 > 0:
            j += 1
        # 构建 {'de facto': [0.23, 0.25]} 词组-分差 对
        para = s1 - s2
        # para = rate
        if latin_phrase not in list(score_dict.keys()):
            score_dict[latin_phrase] = [para]
        else :
            score_dict[latin_phrase].append(para)
    return score_dict

In [None]:
dict_rate_all = {}

model = 'llama2'
# FKGL FRE SMOG CL
indexs = ['FKGL', 'FRE', 'GFI', 'SMOG', 'CLI']
for index in indexs:
    dict_temp = get_score_dict(read_llmed(model), index)
    dict_rate_v = {}
    for key, val in dict_temp.items():
        rate = np.median(val)
        # rate = np.average(val)
        dict_rate_v[key] = rate
    dict_rate_all[index] = dict_rate_v
    print(dict_rate_all.keys())

In [None]:
out_file_path = './data./llm2.1.2/sbert/score-gap/LE-dataset v2.0-150-'+ model +'-score-gap v2.1.2.json'
with open(out_file_path, 'w', encoding='utf-8') as file:
    json.dump(dict_rate_all, file, indent=2)
file.close()