In [1]:
class EXP:
    MODEL = 'sentence-transformers/sentence-t5-base'
    DATA = '/root/StickyToken/data/sampled_df.csv'
    VERIFICATION_DATA = '/root/StickyToken/data/sampled_df_not-pair.csv'
    SENT_PAIR_NUM = 5
    INSERT_NUM = 8

In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '7'
import sys
sys.path.append('/root/StickyToken')
from tqdm import tqdm
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity,cosine_distances, euclidean_distances, manhattan_distances
from collections import Counter, namedtuple
import torch
import pandas as pd
import scanpy as sc
import matplotlib.pyplot as plt
import numpy as np
from sklearn.manifold import TSNE
import random
import anndata
import warnings
import pynvml
import json
from time import time
import jsonlines
random.seed(42)

In [None]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(EXP.MODEL)
transformer_model = model._first_module().auto_model
tokenizer = model.tokenizer

In [None]:
from stickytoken.tokenization import TokenizerAnalyzer
toka = TokenizerAnalyzer(EXP.MODEL)

In [None]:
token_info = toka.categorize_tokens()
token_info

In [None]:
# 统计category属性
category_counts = {}
for token in token_info.values():
    category = token['category']
    if category in category_counts:
        category_counts[category] += 1
    else:
        category_counts[category] = 1

# 打印统计结果
print("Category统计结果:")
for category, count in category_counts.items():
    print(f"{category}: {count}")

# 计算百分比
total_tokens = len(token_info)
print("\nCategory百分比:")
for category, count in category_counts.items():
    percentage = (count / total_tokens) * 100
    print(f"{category}: {percentage:.2f}%")

In [None]:
# 查找category属性为'OK_SPECIAL'的所有元素
ok_special_tokens = {k: v for k, v in token_info.items() if v['category'] == 'OK_SPECIAL'}

print("OK_SPECIAL类别的元素:")
for token_id, token_data in ok_special_tokens.items():
    print(f"Token ID: {token_id}")
    print(f"Raw Vocab: {token_data['raw_vocab']}")
    print(f"Decoded: {token_data['decoded']}")
    print("---")

print(f"OK_SPECIAL类别的元素总数: {len(ok_special_tokens)}")

In [None]:
# 查找category属性为'UNREACHABLE_MULTI_TOKEN'的所有元素
unreachable_multi_tokens = {k: v for k, v in token_info.items() if v['category'] == 'UNREACHABLE_MULTI_TOKEN'}

print("UNREACHABLE_MULTI_TOKEN类别的元素:")
for token_id, token_data in unreachable_multi_tokens.items():
    print(f"Token ID: {token_id}")
    print(f"Raw Vocab: {token_data['raw_vocab']}")
    print(f"Decoded: {token_data['decoded']}")
    print(f"Reencoded IDs: {token_data.get('reencoded_ids', 'N/A')}")
    print(f"Reencoded: {token_data.get('reencoded', 'N/A')}")
    print("---")

print(f"UNREACHABLE_MULTI_TOKEN类别的元素总数: {len(unreachable_multi_tokens)}")


In [None]:
DistanceMetrics = namedtuple("Metrics", ["cosine_distance", "euclidean_distance", "manhattan_distance"])

def distance_metrics(emb1: np.ndarray,emb2:np.ndarray ) -> DistanceMetrics:
    """
    计算两个嵌入向量之间的距离度量。

    参数:
    emb1 (np.ndarray): 第一个嵌入向量或嵌入向量矩阵
    emb2 (np.ndarray): 第二个嵌入向量或嵌入向量矩阵

    返回:
    DistanceMetrics: 包含余弦距离、欧几里得距离和曼哈顿距离的命名元组

    注意:
    - 如果emb1是1维向量而emb2是2维矩阵，函数会将emb1重塑为2维
    - 如果两个输入都是2维矩阵，函数会计算对角线上的距离
    """
    if emb1.ndim == 1 and emb2.ndim != 1:
        emb1 = emb1.reshape(1, -1)
        return DistanceMetrics(
            cosine_distances(emb1, emb2)[0],
            euclidean_distances(emb1, emb2)[0],
            manhattan_distances(emb1, emb2)[0],
        )
    elif emb1.ndim != 1 and emb2.ndim != 1:
        return DistanceMetrics(
            cosine_distances(emb1, emb2).diagonal(),
            euclidean_distances(emb1, emb2).diagonal(),
            manhattan_distances(emb1, emb2).diagonal(),
        )

In [None]:
from datasets import load_dataset

dataset = load_dataset('csv', data_files=EXP.DATA,split='train')
gt_texts = dataset['sentence1'][:EXP.SENT_PAIR_NUM]
gt_embs = model.encode(gt_texts)
contract_texts = dataset['sentence2'][:EXP.SENT_PAIR_NUM]
contract_embs = model.encode(contract_texts)
# gt_cs = cosine_similarity(gt_embs, contract_embs).diagonal()
# print(gt_embs.shape)
gt_metrics = distance_metrics(gt_embs, contract_embs)
print(gt_embs.shape)
print(contract_embs.shape)
print(gt_metrics)

In [None]:
verification_dataset = load_dataset('csv', data_files=EXP.VERIFICATION_DATA,split='train')
verification_gt_texts = verification_dataset['sentence1']
verification_contract_texts = verification_dataset['sentence2']
verification_gt_embs = model.encode(verification_gt_texts)
print(verification_gt_embs.shape)
verification_contract_embs = model.encode(verification_contract_texts)
print(verification_contract_embs.shape)
verification_gt_metrics = distance_metrics(verification_gt_embs, verification_contract_embs)
# print(verification_gt_metrics)

In [None]:
# 计算单个token与所有gt_texts的余弦相似度
def calculate_token_distances(token, gt_embs, model):
    """
    计算单个token与所有gt_texts的余弦距离、欧氏距离和曼哈顿距离
    
    参数:
    token (str): 要计算距离的token
    gt_embss (list): 所有的ground truth文本列表的嵌入表示
    model: 用于编码的模型
    
    返回:
    tuple: 包含三个np.array，分别是token与每个gt_text的余弦距离、欧氏距离和曼哈顿距离
    """
    # 编码token
    # token_emb = model.encode(token)   #(768,)
    token_emb = model.encode([token])  #(1,768)
    print(token_emb.shape)
    print(gt_embs.shape)
    # 计算token与所有gt_texts的余弦距离、欧氏距离和曼哈顿距离
    cosine_distance = cosine_distances(token_emb, gt_embs)[0]
    euclidean_distance = euclidean_distances(token_emb, gt_embs)[0]
    manhattan_distance = manhattan_distances(token_emb, gt_embs)[0]
    
    return DistanceMetrics(cosine_distance=cosine_distance,
                           euclidean_distance=euclidean_distance,
                           manhattan_distance=manhattan_distance)

# 示例使用
token = tokenizer.convert_ids_to_tokens(6182)
token_distances = calculate_token_distances(token, gt_embs, model)

print(f"Token '{token}' 与所有gt_texts的距离:")
print(token_distances)
print(f"平均距离: {np.mean(token_distances.cosine_distance):.4f}")
print(f"最大距离: {np.max(token_distances.cosine_distance):.4f}")
print(f"最小距离: {np.min(token_distances.cosine_distance):.4f}")
print(f"欧氏距离平均值: {np.mean(token_distances.euclidean_distance):.4f}")
print(f"曼哈顿距离平均值: {np.mean(token_distances.manhattan_distance):.4f}")

In [None]:
tokenizer.encode('egg',add_special_tokens=False)

In [None]:
try:
    wte = transformer_model.encoder.embed_tokens.weight
    wte = wte.detach().cpu().numpy()
    vocab_size = tokenizer.vocab_size
    wte = wte[0:vocab_size]
except:
    print('无法获取权重')
    pass 
print(wte.shape)
# ad = sc.AnnData(wte)[0:vocab_size]
# ad

In [None]:
all_tokens = [tokenizer.convert_ids_to_tokens(i) for i in range(vocab_size)]
print(all_tokens[:10])
all_embeddings = model.encode(all_tokens)
print(all_embeddings.shape)

In [None]:
np.linalg.norm(all_embeddings[1])

In [None]:
def check_vectors_on_unit_sphere(embeddings):
    """
    检查所有向量是否在单位超球体上

    参数:
    embeddings (np.ndarray): 所有token的嵌入向量

    返回:
    bool: 如果所有向量都在单位超球体上，返回True；否则返回False
    """
    # 计算每个向量的模长
    vector_norms = np.linalg.norm(embeddings, axis=1)

    # 判断是否所有向量的模长都为1
    is_on_unit_sphere = np.allclose(vector_norms, 1)
    if not is_on_unit_sphere:
        # 统计不在单位超球体上的向量数量
        num_not_on_unit_sphere = np.sum(~np.isclose(vector_norms, 1))
        print(f"不在单位超球体上的向量数量: {num_not_on_unit_sphere}")
        
        # 计算这些向量的模长的平均值和方差
        not_on_unit_sphere_norms = vector_norms[~np.isclose(vector_norms, 1)]
        mean_norm = np.mean(not_on_unit_sphere_norms)
        variance_norm = np.var(not_on_unit_sphere_norms)
        print(f"不在单位超球体上的向量模长的平均值: {mean_norm}")
        print(f"不在单位超球体上的向量模长的方差: {variance_norm}")


    return is_on_unit_sphere

# 使用函数
all_embeddings_is_on_unit_sphere = check_vectors_on_unit_sphere(all_embeddings)
print(f"输出侧的所有token的向量是否都在单位超球体上: {all_embeddings_is_on_unit_sphere}")
wte_is_on_unit_sphere = check_vectors_on_unit_sphere(wte)  
print(f"wte中的所有token的权重是否都在单位超球体上: {wte_is_on_unit_sphere}")

In [None]:
def check_embeddings_is_anisotropic(all_embeddings, batch_size=128):
    """
    分析嵌入向量，计算余弦相似度并判断是否具有各向异性

    参数:
    all_embeddings (np.ndarray): 所有token的嵌入向量
    batch_size (int): 批处理大小，默认为128

    返回:
    bool: 如果所有token的向量具有各向异性，返回True；否则返回False
    """
    import torch
    import matplotlib.pyplot as plt
    from tqdm.notebook import tqdm
    import numpy as np
    # 将嵌入移动到GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    all_embeddings = torch.tensor(all_embeddings).to(device)

    # 计算向量之间的模长
    vector_norms = torch.norm(all_embeddings, dim=1)
    print('vector_norms,向量模长：')
    print(vector_norms.shape)
    print(vector_norms)
    # 计算向量之间的余弦相似度
    cosine_similarities = []

    for i in tqdm(range(0, all_embeddings.shape[0], batch_size), desc="计算余弦相似度"):
        batch_embeddings = all_embeddings[i:i+batch_size]
        batch_norms = vector_norms[i:i+batch_size]
        for j in range(batch_embeddings.shape[0]):
            cosine_similarity = torch.matmul(batch_embeddings[j], all_embeddings.T) / (batch_norms[j] * vector_norms)
            cosine_similarities.append(cosine_similarity.cpu().numpy())

    # 将cosine_similarities展平成一个numpy数组
    cosine_similarities_np = np.concatenate(cosine_similarities).flatten()
    
    # 画出余弦距离的分布图和密度估计曲线
    plt.figure(figsize=(10, 6))
    plt.hist(cosine_similarities_np, bins=50, alpha=0.75, color='blue', edgecolor='black', density=True)
    
    # 添加密度估计曲线
    # import seaborn as sns
    # sns.kdeplot(cosine_similarities_np, color='red', linewidth=2)
    
    plt.title('Distribution of Cosine Similarity between Token Embeddings')
    plt.xlabel('Cosine Similarity')
    plt.ylabel('Density/Frequency')
    plt.grid(True)
    plt.show()

    # 对结果进行统计分析
    mean_similarity = np.mean(cosine_similarities_np)
    median_similarity = np.median(cosine_similarities_np)
    std_deviation = np.std(cosine_similarities_np)
    min_similarity = np.min(cosine_similarities_np)
    max_similarity = np.max(cosine_similarities_np)

    # 打印统计结果
    print(f"平均余弦相似度: {mean_similarity}")
    print(f"中位数余弦相似度: {median_similarity}")
    print(f"余弦相似度标准差: {std_deviation}")
    print(f"最小余弦相似度: {min_similarity}")
    print(f"最大余弦相似度: {max_similarity}")

    # # 使用Kolmogorov-Smirnov检验来判断cosine_similarities是否为均匀分布
    # from scipy.stats import kstest
    # from scipy.stats import uniform
    # ks_statistic, p_value = kstest(cosine_similarities_np, uniform(loc=0, scale=1).cdf)

    # # 如果p值小于0.05，则拒绝原假设，认为cosine_similarities不是均匀分布，即具有各向异性
    # is_anisotropic = p_value < 0.05
    # 使用快速方法判断cosine_similarities是否为均匀分布,均匀分布的余弦相似度应该在0附近
    is_anisotropic = not np.allclose(mean_similarity, 0, atol=0.01)

    print(f"所有token的向量是否具有各向异性: {is_anisotropic}")

    return is_anisotropic,mean_similarity,median_similarity,std_deviation,min_similarity,max_similarity

In [None]:
is_anisotropic_output, mean_similarity_output, median_similarity_output, std_deviation_output, min_similarity_output, max_similarity_output = check_embeddings_is_anisotropic(all_embeddings)

In [None]:
is_anisotropic_wte, mean_similarity_wte, median_similarity_wte, std_deviation_wte, min_similarity_wte, max_similarity_wte = check_embeddings_is_anisotropic(wte)

In [None]:
all_embeddings.shape

In [None]:
def calculate_neighbor_distances(embeddings, batch_size=128,mode = 'nearest'):
    """
    计算词表中所有token与其最近邻token之间的距离，使用GPU加速计算
    
    返回:
    dict: 包含余弦距离、欧几里得距离和曼哈顿距离的最小值列表
    """
    distances = {
        'cosine': [],
        'euclidean': [],
        'manhattan': []
    }
    
    # 获取词表中所有token的嵌入表示
    # vocab_size = tokenizer.vocab_size
    # all_tokens = [tokenizer.convert_ids_to_tokens(i) for i in range(vocab_size)]
    # all_embeddings = model.encode(all_tokens, convert_to_tensor=True,batch_size=256)  # 转换为PyTorch张量
    # 生成一个简单的all_embeddings例子
    # 注意:这只是一个示例,实际的all_embeddings会有更多的token和更高的维度
    # all_embeddings = torch.tensor([
    #     [0.1, 0.2, 0.3],
    #     [0.4, 0.5, 0.6],
    #     [0.7, 0.8, 0.9],
    #     [1.0, 1.1, 1.2],
    #     [1.3, 1.4, 1.5],
    #     [1.6, 1.7, 1.8]
    # ])
    # print("示例 all_embeddings 形状:", all_embeddings.shape)
    # print("示例 all_embeddings 内容:\n", all_embeddings)
    
    # 注意:这里我们使用了一个小的示例
    # 实际的代码应该使用原始的all_embeddings,不要替换它
    # 将嵌入移动到GPU
    # transformer_model = model._first_module().auto_model
    # wte = transformer_model.encoder.embed_tokens.weight
    # wte = wte.detach().cpu()[:vocab_size]
    # all_embeddings = wte
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # all_embeddings = all_embeddings.to(device)
    all_embeddings = torch.tensor(embeddings).to(device)
    batch_size = 128  # 可以根据GPU内存调整批处理大小
    
    for i in tqdm(range(0, all_embeddings.shape[0], batch_size), desc="计算最近邻距离"):
        batch = all_embeddings[i:i+batch_size]

        # 计算批处理与所有嵌入之间的距离
        cosine_dist = 1 - torch.nn.functional.cosine_similarity(batch.unsqueeze(1), all_embeddings.unsqueeze(0), dim=2)  #cosine_dist.shape=[128, 32100]
        euclidean_dist = torch.cdist(batch, all_embeddings, p=2)
        manhattan_dist = torch.cdist(batch, all_embeddings, p=1)

        if mode == 'nearest':
            # 将自身距离设为无穷大
            # 将当前批次的对角线元素设置为无穷大
            for j in range(batch_size):
                if i+j < len(all_embeddings):
                    cosine_dist[j, i+j] = float('inf')
                    euclidean_dist[j, i+j] = float('inf')
                    manhattan_dist[j, i+j] = float('inf')
            # 找到每个token的最小距离
            distances['cosine'].extend(cosine_dist.min(dim=1)[0].cpu().numpy())
            distances['euclidean'].extend(euclidean_dist.min(dim=1)[0].cpu().numpy())
            distances['manhattan'].extend(manhattan_dist.min(dim=1)[0].cpu().numpy())
            # print(cosine_dist.shape)
            # print(cosine_dist.cpu().numpy().mean(axis=1).shape)
        elif mode == 'mean':
            distances['cosine'].extend(list(cosine_dist.cpu().numpy().mean(axis=1)))
            distances['euclidean'].extend(list(euclidean_dist.cpu().numpy().mean(axis=1)))
            distances['manhattan'].extend(list(manhattan_dist.cpu().numpy().mean(axis=1)))

    # 清空GPU内存
    torch.cuda.empty_cache()
    
    return distances

# # 计算最近邻距离
# nearest_neighbor_distances = calculate_neighbor_distances(model, tokenizer)
nearest_neighbor_distances = calculate_neighbor_distances(all_embeddings)
print(nearest_neighbor_distances)

In [None]:
mean_neighbor_distances = calculate_neighbor_distances(all_embeddings,mode='mean')
print(mean_neighbor_distances)

In [None]:
'''def calculate_nearest_neighbor_distances_cpu(model, tokenizer):
    """
    计算词表中所有token与其最近邻token之间的距离，使用GPU加速计算
    
    参数:
    model: 用于编码的模型
    tokenizer: 分词器
    
    返回:
    dict: 包含余弦距离、欧几里得距离和曼哈顿距离的最小值列表
    """
    distances = {
        'cosine': [],
        'euclidean': [],
        'manhattan': []
    }
    
    # 获取词表中所有token的嵌入表示
    vocab_size = tokenizer.vocab_size
    all_tokens = [tokenizer.convert_ids_to_tokens(i) for i in range(vocab_size)]
    all_embeddings = model.encode(all_tokens)

    for i in tqdm(range(len(all_embeddings)), desc="计算最近邻距离"):
        # 计算当前token与所有其他token的距离
        emb1 = all_embeddings[i].reshape(1, -1)
        emb2 = all_embeddings
        
        cosine_dist = cosine_distances(emb1, emb2)[0]
        euclidean_dist = euclidean_distances(emb1, emb2)[0]
        manhattan_dist = manhattan_distances(emb1, emb2)[0]
        
        # 将自身距离设为无穷大
        cosine_dist[i] = float('inf')
        euclidean_dist[i] = float('inf')
        manhattan_dist[i] = float('inf')
        
        # 找到每个token的最小距离
        distances['cosine'].append(np.min(cosine_dist))
        distances['euclidean'].append(np.min(euclidean_dist))
        distances['manhattan'].append(np.min(manhattan_dist))
    
    # 不需要清空GPU内存，因为现在使用CPU计算
    
    return distances

nearest_neighbor_distances_cpu = calculate_nearest_neighbor_distances_cpu(model, tokenizer)
print(nearest_neighbor_distances_cpu)
'''

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

def plot_neighbor_distances(nearest_neighbor_distances,mode = 'nearest'):
    """
    绘制最近邻距离的分布图，包括直方图和核密度估计图

    参数:
    nearest_neighbor_distances (dict): 包含不同距离类型及其值的字典
    """
    # 设置图表样式
    plt.style.use('default')  # 使用默认样式而不是seaborn

    # 创建一个2x2的子图布局
    fig, axs = plt.subplots(1, 3, figsize=(24, 8))
    if mode == 'nearest':
        fig.suptitle('Nearest Neighbor Distance Distribution', fontsize=16)
    elif mode == 'mean':
        fig.suptitle('Mean Neighbor Distance Distribution', fontsize=16)
    # 扁平化axs数组以便于索引
    axs = axs.flatten()

    # 为每种距离类型绘制直方图和核密度估计图
    for i, (distance_type, values) in enumerate(nearest_neighbor_distances.items()):
        # 计算阈值，排除接近0的小值
        threshold = np.percentile(values, 0)  # 使用第1百分位数作为阈值
        filtered_values = [v for v in values if v > threshold]
        
        axs[i].hist(filtered_values, bins=50, density=True, alpha=0.7)
        axs[i].set_title(f'{distance_type.capitalize()} Distance Distribution')
        axs[i].set_xlabel('Distance')
        axs[i].set_ylabel('Frequency')
        
        # 添加核密度估计曲线
        sns.kdeplot(filtered_values, ax=axs[i], color='r')
        
        # 设置x轴的范围，排除接近0的部分
        axs[i].set_xlim(left=threshold)

    # 移除多余的子图
    # fig.delaxes(axs[3])

    # 调整子图之间的间距
    plt.tight_layout()

    # 显示图表
    plt.show()

# 调用函数
plot_neighbor_distances(mean_neighbor_distances,mode='mean')
plot_neighbor_distances(nearest_neighbor_distances)


In [None]:
# 计算均值和标准差
for distance_type, values in nearest_neighbor_distances.items():
    mean = np.mean(values)
    std = np.std(values)
    max_value = np.max(values)
    print(f"{distance_type} 距离:")
    print(f"  均值: {mean:.4f}")
    print(f"  标准差: {std:.4f}")
    threshold = mean
    threshold_name = f"{distance_type}_threshold"
    locals()[threshold_name] = threshold
    print(threshold_name)
    print(f"  阈值 (均值 + 标准差): {threshold:.4f}")
    print(f"  最大值: {max_value:.4f}")
    print()

In [None]:
cosine_threshold

In [None]:
'''
# def compare_token_embeddings(token, wte, tokenizer, model):
#     """
#     比较词表中某个token对应的wte中的向量和model.encode([token])之后的向量是否相同
    
#     参数:
#     token (str): 要比较的token
#     wte (numpy.ndarray): 词嵌入矩阵
#     tokenizer: 分词器
#     model: 用于编码的模型
    
#     返回:
#     bool: 两个向量是否相同
#     float: 两个向量的余弦相似度
#     """
#     # 获取token的ID
#     token_id = tokenizer.convert_tokens_to_ids(token)
#     # print(token_id)
#     # 从wte中获取对应的向量
#     wte_vector = wte[token_id]
#     # print(wte_vector.shape)

#     # 使用model.encode获取向量
#     encoded_vector = model.encode([token])[0]
#     # print(encoded_vector.shape)

#     # 计算余弦相似度
#     similarity = cosine_similarity([wte_vector], [encoded_vector])[0][0]
    
#     # 判断两个向量是否相同（考虑到浮点数精度，使用近似相等）
#     is_same = np.allclose(wte_vector, encoded_vector, rtol=1e-5, atol=1e-8)
    
#     return is_same, similarity

# # 示例使用
# token_to_compare = tokenizer.convert_ids_to_tokens(32073)
# is_same, similarity = compare_token_embeddings(token_to_compare, wte, tokenizer, model)

# print(f"Token '{token_to_compare}':")
# print(f"wte向量和model.encode()向量是否相同: {is_same}")
# print(f"两个向量的余弦相似度: {similarity}")
# # 统计所有词表中的token

# # 初始化结果列表
# results = []

# # 遍历词表中的所有token
# for token_id in tqdm(range(vocab_size), desc="处理词表", unit="token"):
#     token = tokenizer.convert_ids_to_tokens(token_id)
#     is_same, similarity = compare_token_embeddings(token, wte, tokenizer, model)
    
#     results.append({
#         'token_id': token_id,
#         'token': token,
#         'is_same': is_same,
#         'similarity': similarity
#     })


# df_results = pd.DataFrame(results)

# # 打印统计信息
# print("统计结果:")
# print(f"总token数: {len(df_results)}")
# print(f"wte向量和model.encode()向量相同的token数: {df_results['is_same'].sum()}")
# print(f"平均余弦相似度: {df_results['similarity'].mean():.4f}")

# # 显示前几行结果
# print("\n前5行结果:")
# print(df_results.head())
'''

In [None]:
def random_insert(text, insert_string, times):
    words = text.split()  # 将句子分割成单词列表
    for _ in range(times):
        insert_position = random.randint(0, len(words))  # 随机选择插入位置
        words.insert(insert_position, insert_string)  # 在随机位置插入字符串
    return " ".join(words)  # 将单词列表重新组合成句子

def calculate_token_distances(token, gt_embs, model):
    """
    计算单个token与所有gt_texts的余弦距离、欧氏距离和曼哈顿距离
    
    参数:
    token (str): 要计算距离的token
    gt_embss (list): 所有的ground truth文本列表的嵌入表示
    model: 用于编码的模型
    
    返回:
    tuple: 包含三个np.array，分别是token与每个gt_text的余弦距离、欧氏距离和曼哈顿距离
    """
    # 编码token
    # token_emb = model.encode(token)   #(768,)
    token_emb = model.encode([token])  #(1,768)

    # 计算token与所有gt_texts的余弦距离、欧氏距离和曼哈顿距离
    cosine_distance = cosine_distances(token_emb, gt_embs)[0]
    euclidean_distance = euclidean_distances(token_emb, gt_embs)[0]
    manhattan_distance = manhattan_distances(token_emb, gt_embs)[0]
    
    return DistanceMetrics(cosine_distance=cosine_distance,
                           euclidean_distance=euclidean_distance,
                           manhattan_distance=manhattan_distance)

def calculate_score(metrics_list, alpha=0.1, beta=0.05, gamma=0.1, smaller_is_better =True):
    """
    计算综合指标
    
    :param metrics_list: 相似度列表
    :param alpha: 上升次数的权重
    :param beta: 下降次数的权重
    :param gamma: 防止除零的常数
    :return: 综合指标
    """
    changes = np.diff(metrics_list)
    rise_amplitude = changes[changes > 0].sum()
    rise_count = (changes > 0).sum()
    fall_amplitude = (-changes[changes < 0]).sum()
    fall_count = (changes < 0).sum()
    if smaller_is_better:
        comprehensive_score = (fall_amplitude + alpha * fall_count) / (rise_amplitude + beta * rise_count + gamma)
    else:
        comprehensive_score = (rise_amplitude + alpha * rise_count) / (fall_amplitude + beta * fall_count + gamma)
    return comprehensive_score

def calculate_add_score(similarities, w1=0.5, w2=0.3, w3=0.2):
    # Calculate Mean Rate of Change (MRC)
    changes = np.diff(similarities)
    MRC = np.mean(changes)
    
    # Calculate Variance (VAR)
    VAR = np.var(similarities)
    
    # Calculate Proportion of Increases (PI)
    PI = np.sum(changes < 0) / len(changes)
    
    # Calculate Score
    score = w1 * MRC - w2 * (1 - VAR) - w3 * PI
    
    return score

def calculate_score_with_token_distance(metrics_list, token_distance ,alpha=0.1, beta=0.05, gamma=0.01, smaller_is_better =True):
    """
    计算综合指标
    
    :param metrics_list: 相似度列表
    :param token_distance: 令牌和被比较的句子之间的距离
    :param alpha: 上升次数的权重
    :param beta: 下降次数的权重
    :param gamma: 防止除零的常数
    :return: 综合指标
    """
    changes = np.diff(metrics_list)
    rise_amplitude = changes[changes > 0].sum()
    rise_count = (changes > 0).sum()
    fall_amplitude = (-changes[changes < 0]).sum()
    fall_count = (changes < 0).sum()
    if smaller_is_better:
        comprehensive_score = (fall_amplitude + alpha * fall_count + 0.1*token_distance) / (rise_amplitude + beta * rise_count + gamma )
    else:
        comprehensive_score = (rise_amplitude + alpha * rise_count + 0.1*token_distance) / (fall_amplitude + beta * fall_count + gamma )

    return comprehensive_score

def magic_token_test_metric(token,
                            gt_texts,
                            contract_texts,
                            gt_embs,
                            contract_embs,
                            gt_metrics, 
                            num = EXP.INSERT_NUM,
                            ):
    results = {'Prefix': [], 'Suffix': [], 'Insert': []}
    total = len(contract_texts)
    token_distances = calculate_token_distances(token, gt_embs, model)  #输出一个列表，包含token与每个gt_text的距离


    def process_texts(text_list, gt_emb, gt_metric, text_type, id):
        pred_emb = model.encode(text_list)
        
        # ret = cosine_similarity(gt_emb.reshape(1, -1), pred_emb)[0]
        ret = distance_metrics(gt_emb, pred_emb)

        result = {
            'Pair_id': id,  
            'Source text': gt_texts[id],
            'Texts to be contrasted': [contract_text] + text_list,
            'cosine_distance': [gt_metric.cosine_distance] + list(ret.cosine_distance),
            'cosine_distance_contrast': [None],
            'euclidean_distance': [gt_metric.euclidean_distance] + list(ret.euclidean_distance),
            'euclidean_distance_contrast': [None],
            'manhattan_distance': [gt_metric.manhattan_distance] + list(ret.manhattan_distance),
            'manhattan_distance_contrast': [None],
        }
        # print(token_similarities[id])
        result['cosine_distance_score'] = calculate_score_with_token_distance(result['cosine_distance'],token_distances.cosine_distance[id],smaller_is_better=True)
        result['euclidean_distance_score'] = calculate_score_with_token_distance(result['euclidean_distance'],token_distances.euclidean_distance[id],smaller_is_better=True)
        result['manhattan_distance_score'] = calculate_score_with_token_distance(result['manhattan_distance'],token_distances.manhattan_distance[id],alpha=1,beta=0.5,gamma=0.1,smaller_is_better=True)

        for i in range(num):
            cosine_distance_contrast = (result['cosine_distance'][i] > result['cosine_distance'][i + 1])
            result['cosine_distance_contrast'].append(cosine_distance_contrast)
            euclidean_distance_contrast = (result['euclidean_distance'][i] > result['euclidean_distance'][i + 1])
            result['euclidean_distance_contrast'].append(euclidean_distance_contrast)
            manhattan_distance_contrast = (result['manhattan_distance'][i] > result['manhattan_distance'][i + 1])
            result['manhattan_distance_contrast'].append(manhattan_distance_contrast)

        results[text_type].append(result)

    for id, contract_text in enumerate(contract_texts):
        text_list_prefix = [token * i + contract_text for i in range(1, num + 1)]
        text_list_suffix = [contract_text + token * i for i in range(1, num + 1)]
        text_list_insert = []
        tem_text = contract_text
        for i in range(1, num + 1):
            new_text = random_insert(tem_text, token, 1)
            tem_text = new_text
            text_list_insert.append(new_text)
        
        for text_list, text_type in [(text_list_prefix, 'Prefix'), (text_list_suffix, 'Suffix'), (text_list_insert, 'Insert')]:
            process_texts(text_list, gt_embs[id],
                           DistanceMetrics(
                                gt_metrics.cosine_distance[id],
                                gt_metrics.euclidean_distance[id],
                                gt_metrics.manhattan_distance[id]
                            )
                            , text_type, id)

    # token_score_aggregation = {'cosine_distance_score':0,
    #                            'euclidean_distance_score':0,
    #                            'manhattan_distance_score':0}
    # for result_list in results.values():
    #     for result in result_list:
    #         token_score_aggregation['cosine_distance_score'] += result['cosine_distance_score']
    #         token_score_aggregation['euclidean_distance_score'] += result['euclidean_distance_score']
    #         token_score_aggregation['manhattan_distance_score'] += result['manhattan_distance_score']
    # token_score_aggregation = {k: round(v / total,6) for k, v in token_score_aggregation.items()}

    # 加权聚合
    weights = {'Prefix': 0.35, 'Suffix': 0.35, 'Insert': 0.3}
    token_score_aggregation = {
        'cosine_distance_score': 0,
        'euclidean_distance_score': 0,
        'manhattan_distance_score': 0
    }
    for text_type, weight in weights.items():
        for result in results[text_type]:
            token_score_aggregation['cosine_distance_score'] += result['cosine_distance_score'] * weight
            token_score_aggregation['euclidean_distance_score'] += result['euclidean_distance_score'] * weight
            token_score_aggregation['manhattan_distance_score'] += result['manhattan_distance_score'] * weight
    
    token_score_aggregation = {k: round(v / total, 6) for k, v in token_score_aggregation.items()}

    return results, token_score_aggregation

def token_verification(token,
                       verification_gt_texts,
                       verification_gt_embs, 
                       verification_contract_texts,
                       verification_gt_metrics,
                        add_num = EXP.INSERT_NUM):
    results = {'Prefix': [], 'Suffix': [], 'Insert': []}
    total = len(verification_contract_texts)

    def process_texts(text_list, gt_emb, gt_metric, text_type, id):
        pred_emb = model.encode(text_list)
        
        # ret = cosine_similarity(gt_emb.reshape(1, -1), pred_emb)[0]
        ret = distance_metrics(gt_emb, pred_emb)

        result = {
            'Pair_id': id,  
            'Source text': verification_gt_texts[id],
            'Texts to be contrasted': [verification_contract_text] + text_list,
            'cosine_distance': [gt_metric.cosine_distance] + list(ret.cosine_distance),
            'cosine_distance_contrast': np.mean(ret.cosine_distance) - gt_metric.cosine_distance,
            'euclidean_distance': [gt_metric.euclidean_distance] + list(ret.euclidean_distance),
            'euclidean_distance_contrast': np.mean(ret.euclidean_distance) - gt_metric.euclidean_distance,
            'manhattan_distance': [gt_metric.manhattan_distance] + list(ret.manhattan_distance),
            'manhattan_distance_contrast': np.mean(ret.manhattan_distance) - gt_metric.manhattan_distance,
        }


        # for i in range(add_num):
        #     cosine_distance_contrast = (result['cosine_distance'][i] > result['cosine_distance'][i + 1])
        #     result['cosine_distance_contrast'].append(cosine_distance_contrast)
        #     euclidean_distance_contrast = (result['euclidean_distance'][i] > result['euclidean_distance'][i + 1])
        #     result['euclidean_distance_contrast'].append(euclidean_distance_contrast)
        #     manhattan_distance_contrast = (result['manhattan_distance'][i] > result['manhattan_distance'][i + 1])
        #     result['manhattan_distance_contrast'].append(manhattan_distance_contrast)

        results[text_type].append(result)
    
    for id, verification_contract_text in enumerate(verification_contract_texts):
        text_list_prefix = [token * i + verification_contract_text for i in range(1, add_num + 1)]
        text_list_suffix = [verification_contract_text + token * i for i in range(1, add_num + 1)]
        text_list_insert = []
        tem_text = verification_contract_text
        for i in range(1, add_num + 1):
            new_text = random_insert(tem_text, token, 1)
            tem_text = new_text
            text_list_insert.append(new_text)
        
        for text_list, text_type in [(text_list_prefix, 'Prefix'), (text_list_suffix, 'Suffix'), (text_list_insert, 'Insert')]:
            process_texts(text_list, verification_gt_embs[id],
                           DistanceMetrics(
                                verification_gt_metrics.cosine_distance[id],
                                verification_gt_metrics.euclidean_distance[id],
                                verification_gt_metrics.manhattan_distance[id]
                            ),
                            text_type, id)
            
    
    # token_flag_aggregation = {
    #     'cosine_distance_flag': 0,
    #     'euclidean_distance_flag': 0,
    #     'manhattan_distance_flag': 0
    # }
    # # 设置阈值
    # threshold =  2/3  # 可以根据需要调整阈值

    # cosine_true_count = 0
    # euclidean_true_count = 0
    # manhattan_true_count = 0
    # # 遍历所有结果
    # for loc in ['Prefix', 'Suffix', 'Insert']:
    #     for result in results[loc]:
    #         # 统计每种距离度量中为True的个数
    #         cosine_true_count += sum(1 for x in result['cosine_distance_contrast'][1:] if x)
    #         euclidean_true_count += sum(1 for x in result['euclidean_distance_contrast'][1:] if x)
    #         manhattan_true_count += sum(1 for x in result['manhattan_distance_contrast'][1:] if x)
            
    # print(f'阈值:{threshold:.3%}')
    # print(f'余弦距离真值计数:{cosine_true_count}/{total}--{cosine_true_count/total:.3%},欧几里得距离真值计数:{euclidean_true_count}/{total}--{euclidean_true_count/total:.3%},曼哈顿距离真值计数:{manhattan_true_count}/{total}--{manhattan_true_count/total:.3%}')
    # # 如果True的个数超过阈值，将对应的flag设为
    # if cosine_true_count > threshold*total:
    #     token_flag_aggregation['cosine_distance_flag'] = 1
    # if euclidean_true_count > threshold*total:
    #     token_flag_aggregation['euclidean_distance_flag'] = 1
    # if manhattan_true_count > threshold*total:
    #     token_flag_aggregation['manhattan_distance_flag'] = 1

    token_flag_aggregation = {
        'cosine_distance_flag': 0,
        'euclidean_distance_flag': 0,
        'manhattan_distance_flag': 0
    }
    # 设置阈值
    threshold =  {
        'cosine_distance': -1*cosine_threshold,
        'euclidean_distance': -1*euclidean_threshold,
        'manhattan_distance': -1*manhattan_threshold
    } # 可以根据需要调整阈值

    # total_count = 0
    cosine_distance_sum = 0
    euclidean_distance_sum = 0
    manhattan_distance_sum = 0

    # 遍历所有结果
    weights = {'Prefix': 0.35, 'Suffix': 0.35, 'Insert': 0.3}
    for text_type, weight in weights.items():
        for result in results[text_type]:
            # total_count += 1
            cosine_distance_sum += result['cosine_distance_contrast']*weight
            euclidean_distance_sum += result['euclidean_distance_contrast']*weight
            manhattan_distance_sum += result['manhattan_distance_contrast']*weight
    
    # print(total_count)
    # 计算平均值
    mean_cosine_distance = cosine_distance_sum / total
    mean_euclidean_distance = euclidean_distance_sum / total
    mean_manhattan_distance = manhattan_distance_sum / total

    # 与阈值比较
    if mean_cosine_distance < threshold['cosine_distance']:
        token_flag_aggregation['cosine_distance_flag'] = 1
    if mean_euclidean_distance < threshold['euclidean_distance']:
        token_flag_aggregation['euclidean_distance_flag'] = 1
    if mean_manhattan_distance < threshold['manhattan_distance']:
        token_flag_aggregation['manhattan_distance_flag'] = 1

    print(f"平均余弦距离变化量: {mean_cosine_distance}")
    print(f"平均欧几里得距离变化量: {mean_euclidean_distance}")
    print(f"平均曼哈顿距离变化量: {mean_manhattan_distance}")
    print(f"标志聚合结果: {token_flag_aggregation}")
    # print(f'阈值:{threshold:.3%}')
    # print(f'余弦距离真值计数:{cosine_true_count}/{total}--{cosine_true_count/total:.3%},欧几里得距离真值计数:{euclidean_true_count}/{total}--{euclidean_true_count/total:.3%},曼哈顿距离真值计数:{manhattan_true_count}/{total}--{manhattan_true_count/total:.3%}')
    # # 如果True的个数超过阈值，将对应的flag设为
    # if cosine_true_count > threshold*total:
    #     token_flag_aggregation['cosine_distance_flag'] = 1
    # if euclidean_true_count > threshold*total:
    #     token_flag_aggregation['euclidean_distance_flag'] = 1
    # if manhattan_true_count > threshold*total:
    #     token_flag_aggregation['manhattan_distance_flag'] = 1
    return results,True

def print_results(results):
    for loc,results_ in results.items():  
        print(loc)  
        for id, result in results_.items():
            try:
                print(f"ID_{id + 1} Compared text: {gt_texts[id]}")
            except:
                print(f"ID_{int(id) + 1} Compared text: {gt_texts[int(id)]}")
            result_df = pd.DataFrame(result).drop('positive_flag',axis=1).drop('negative_flag',axis=1)
            result_df['cs_contrast'] = result_df['cs_contrast'].apply(lambda x: '↑' if x else '-' if x is None else '↓')
            print(result_df.to_markdown())
            # print(result_df)
            print('')
        print('')

def record_experiment_time(model_name,experiment_time, json_file='experiment_times.json'):

    experiment_record = {
        "model_name": model_name,
        "vocab_size": vocab_size,
        "duration_seconds": experiment_time,
        "sentence pair number": EXP.SENT_PAIR_NUM,
        "insert number": EXP.INSERT_NUM,
        "data_file": EXP.DATA,
    }
    # 读取现有的JSON文件内容，如果文件不存在则初始化为空列表
    try:
        with open(json_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
    except FileNotFoundError:
        data = []

    # 将新记录添加到现有数据中
    data.append(experiment_record)

    # 保存更新后的数据到JSON文件
    with open(json_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)       

In [None]:
# tokenizer.decode([378])
# tokenizer.convert_ids_to_tokens(378)

In [None]:
magic_results, magic_score = magic_token_test_metric('</s>', gt_texts, contract_texts, gt_embs, contract_embs, gt_metrics)
print(magic_score)
# results

In [None]:
def calculate_magic_token_cs_changes(results):
    magic_token_cs_changes = []
    for method in ['Prefix', 'Suffix', 'Insert']:
        for result in results[method]:
            if result['Pair_id'] == 0:
                magic_token_cs_changes.append(result['cosine_distance'])
    return np.array(magic_token_cs_changes).mean(axis=0),magic_token_cs_changes

mean_magic_token_cs_changes,magic_token_cs_changes = calculate_magic_token_cs_changes(magic_results)

print(mean_magic_token_cs_changes)
magic_token_cs_changes

In [None]:
normal_results, normal_score = magic_token_test_metric('x', gt_texts, contract_texts, gt_embs, contract_embs, gt_metrics)
print(normal_score)
# results

In [None]:
mean_normal_token_cs_changes,normal_token_cs_changes = calculate_magic_token_cs_changes(normal_results)
print(mean_normal_token_cs_changes)
normal_token_cs_changes

In [None]:
import matplotlib.pyplot as plt

# 生成横坐标
x = range(len(mean_magic_token_cs_changes))

# 创建折线图
plt.figure(figsize=(8, 5))

# 绘制均值曲线
plt.plot(x, mean_magic_token_cs_changes, label='Mean Magic Token CS Changes', marker='o', color='darkblue')
plt.plot(x, mean_normal_token_cs_changes, label='Mean Normal Token CS Changes', marker='x', color='darkgreen')

# 绘制所有变化曲线
for changes in magic_token_cs_changes:
    plt.plot(x, changes, color='lightblue', alpha=0.5)
for changes in normal_token_cs_changes:
    plt.plot(x, changes, color='lightgreen', alpha=0.5)

# 添加标题和标签
plt.title('Magic Token CS Changes vs Normal Token CS Changes')
plt.xlabel('Number of Additions')
plt.ylabel('Cosine Similarity Changes')

# 显示图例
plt.legend()

# 显示图表
plt.show()

In [None]:
results, score = magic_token_test_metric(tokenizer.decode([30332]), gt_texts, contract_texts, gt_embs, contract_embs, gt_metrics)
print(score)
# results

In [None]:
results, score = magic_token_test_metric(tokenizer.decode([15970]), gt_texts, contract_texts, gt_embs, contract_embs, gt_metrics)
print(score)
# results

In [None]:
cosine_threshold,euclidean_threshold,manhattan_threshold

In [None]:
verification_results ,flag = token_verification(tokenizer.decode([1]),verification_gt_texts, verification_gt_embs, verification_contract_texts, verification_gt_metrics)  
# verification_results['Prefix']

In [None]:
verification_results ,flag = token_verification(tokenizer.decode([30332]),verification_gt_texts, verification_gt_embs, verification_contract_texts, verification_gt_metrics)  
# verification_results['Prefix']

In [None]:
verification_results ,flag = token_verification(tokenizer.decode([25941]),verification_gt_texts, verification_gt_embs, verification_contract_texts, verification_gt_metrics)  
# verification_results['Prefix']

In [None]:
# pos_results, pos_flag, neg_results, neg_flag,score = magic_token_test_metric('lucrarea', gt_texts, contract_texts, gt_embs, contract_embs, gt_cs)
# print_results(pos_results, pos_flag)
# print_results(neg_results, neg_flag)
# pos_results['Prefix'][0]['cs']
# magic_token_test_fast('lucrarea', gt_texts, contract_texts, gt_embs, contract_embs, gt_cs)
# pos_flag
# score

In [None]:
# fast_found_token_ids=identify_magic_token(ad, vocab_size, threshold = 0, k = 50, gamma = 75)
# wte.X
# wte.obs_names = [f"Cell_{i:d}" for i in range(wte.n_obs)]
# wte.var_names = [f"Gene_{i:d}" for i in range(wte.n_vars)]
# print(wte.obs_names)
# wte.to_df()
# vocab_dict = model.tokenizer.get_vocab()
# vocab_dict
# vocab_lis = [v[0] for v in vocab_dict.items()]
# vocab_lis[:10]
# vocab_lis[0]
# tokenizer.convert_ids_to_tokens(0)

In [None]:
# # pos_results, pos_flag, neg_results, neg_flag = magic_token_test(tokenizer.convert_ids_to_tokens(1), gt_texts, contract_texts, gt_embs, contract_embs, gt_cs)
# pos_results, pos_flag, neg_results, neg_flag, score = magic_token_test_metric(tokenizer.decode([24166]), gt_texts, contract_texts, gt_embs, contract_embs, gt_cs)
# print_results(pos_results, pos_flag)
# print_results(neg_results, neg_flag)
# score

In [None]:
# pos_results, pos_flag, neg_results, neg_flag = magic_token_test(tokenizer.convert_ids_to_tokens(1), gt_texts, contract_texts, gt_embs, contract_embs, gt_cs)
# pos_results, pos_flag, neg_results, neg_flag = magic_token_test(tokenizer.decode([1]), gt_texts, contract_texts, gt_embs, contract_embs, gt_cs, num=10)
# print_results(pos_results, pos_flag)
# print_results(neg_results, neg_flag)

In [None]:
# for token in tqdm(vocab_lis, desc='procec',total=len(vocab_lis)):
#     pos_results, pos_flag, neg_results, neg_flag = magic_token_test(token, gt_texts, contract_texts, gt_embs, contract_embs, gt_cs, num=10)

#     if pos_flag:
#         found_pos_examples.append((token,pos_results))
#     if neg_flag:
#         found_neg_examples.append((token,neg_results))

In [None]:
start = time()

all_results = []
token_scores = {}

# for token_id in tqdm(range(1000), desc='procec',total=1000):
for token_id in tqdm(range(vocab_size), desc='procec',total=vocab_size):
    results, score = magic_token_test_metric(tokenizer.convert_ids_to_tokens(token_id), gt_texts, \
                                             contract_texts, gt_embs, contract_embs, gt_metrics)
    all_results.append(results)
    token_scores[token_id] = score
    
print('Time:',time()-start)
model_name = os.path.basename(EXP.MODEL)
record_experiment_time(model_name,time()-start)

In [None]:
all_results[0]['Prefix'][0]

In [None]:
# mean_token_distances = {}
# for id, results in enumerate(all_results):
#     mean_token_distance = {'mean_cosine_distance':0,
#                                'mean_euclidean_distance':0,
#                                'mean_manhattan_distance':0}
#     for result_list in results.values():
#         for result in result_list:
#             mean_token_distance['mean_cosine_distance'] += (np.array(result['cosine_distance']).mean()-result['cosine_distance'][0])
#             mean_token_distance['mean_euclidean_distance'] += (np.array(result['euclidean_distance']).mean()-result['euclidean_distance'][0])
#             mean_token_distance['mean_manhattan_distance'] += (np.array(result['manhattan_distance']).mean()-result['manhattan_distance'][0])
#     mean_token_distance = {k: round(v / 9,6) for k, v in mean_token_distance.items()}
#     mean_token_distances[id] = mean_token_distance


In [None]:
# mean_token_distances[1]

In [None]:
# add_token_scores = {}
# for id, results in enumerate(all_results):
#     add_token_score = {'cosine_distance_add_score':0,
#                                'euclidean_distance_add_score':0,
#                                'manhattan_distance_add_score':0}
#     for result_list in results.values():
#         for result in result_list:
#             add_token_score['cosine_distance_add_score'] += calculate_add_score(result['cosine_distance'])
#             add_token_score['euclidean_distance_add_score'] += calculate_add_score(result['euclidean_distance'])
#             add_token_score['manhattan_distance_add_score'] += calculate_add_score(result['manhattan_distance'])
#     add_token_scores[id] = add_token_score


In [None]:
# add_token_scores[1]

In [None]:
threshold = 0.01
sorted_token_cosine_distance_scores = sorted(token_scores.items(), key=lambda item: item[1]['cosine_distance_score'], reverse=True)
cosine_distance_top_percent_count = max(1, int(len(sorted_token_cosine_distance_scores) * threshold))
cosine_distance_top_percent_tokens = sorted_token_cosine_distance_scores[:cosine_distance_top_percent_count]
print("前1%的 token:", cosine_distance_top_percent_tokens)

In [None]:
magic_token_ids = [token_id for token_id, score in cosine_distance_top_percent_tokens]
magic_token_ids

In [None]:
def calculate_magic_token_cs_changes_plot(all_results, magic_token_ids):
    magic_token_cs_changes = []
    normal_token_cs_changes = []
    
    for id in magic_token_ids:
        results = all_results[id]
        for method in ['Prefix', 'Suffix', 'Insert']:
            for result in results[method]:
                if result['Pair_id'] == 0:
                    magic_token_cs_changes.append(result['cosine_distance'])
    
    for id in range(len(all_results)):
        if id not in magic_token_ids:
            results = all_results[id]
            for method in ['Prefix', 'Suffix', 'Insert']:
                for result in results[method]:
                    if result['Pair_id'] == 0:
                        normal_token_cs_changes.append(result['cosine_distance'])
    
    mean_magic_token_cs_changes = np.array(magic_token_cs_changes).mean(axis=0)
    mean_normal_token_cs_changes = np.array(normal_token_cs_changes).mean(axis=0)
    
    # 计算每个变化曲线与均值的距离
    def calculate_distance_to_mean(changes, mean_changes):
        return np.linalg.norm(np.array(changes) - mean_changes)
    
    magic_token_distances = [calculate_distance_to_mean(changes, mean_magic_token_cs_changes) for changes in magic_token_cs_changes]
    normal_token_distances = [calculate_distance_to_mean(changes, mean_normal_token_cs_changes) for changes in normal_token_cs_changes]
    
    # 挑选出距离均值最近的前10个变化曲线
    num = len(magic_token_ids)
    closest_magic_token_cs_changes = [x for _, x in sorted(zip(magic_token_distances, magic_token_cs_changes))[:num]]
    closest_normal_token_cs_changes = [x for _, x in sorted(zip(normal_token_distances, normal_token_cs_changes))[:num]]
    
    # 生成横坐标
    x = range(len(mean_magic_token_cs_changes))
    
    # 创建折线图
    plt.figure(figsize=(8, 5))
    
    # 绘制距离均值最近的变化曲线
    for changes in closest_magic_token_cs_changes:
        plt.plot(x, changes, color='lightblue', alpha=0.5)
    for changes in closest_normal_token_cs_changes:
        plt.plot(x, changes, color='lightgreen', alpha=0.5)
    
    # 绘制均值曲线
    plt.plot(x, mean_magic_token_cs_changes, label='Mean Magic Token CS Changes', marker='o', color='darkblue')
    plt.plot(x, mean_normal_token_cs_changes, label='Mean Normal Token CS Changes', marker='x', color='darkgreen')
    
    # 添加标题和标签
    plt.title('Magic Token CS Changes vs Normal Token CS Changes')
    plt.xlabel('Number of Additions')
    plt.ylabel('Cosine Similarity Changes')
    
    # 显示图例
    plt.legend()
    
    # 显示图表
    plt.show()
    
    return np.array(magic_token_cs_changes).mean(axis=0), magic_token_cs_changes, np.array(normal_token_cs_changes).mean(axis=0), normal_token_cs_changes

mean_magic_token_cs_changes, magic_token_cs_changes, mean_normal_token_cs_changes, normal_token_cs_changes = calculate_magic_token_cs_changes_plot(all_results, magic_token_ids)
print(mean_magic_token_cs_changes)
print(mean_normal_token_cs_changes)
len(magic_token_cs_changes), len(normal_token_cs_changes)

In [None]:
import gzip
import json
import os
import re
import csv

def output_name(model_id, tag, extension):
    model_id_alphanum = re.sub(r"[^a-zA-Z0-9]", "_", model_id)
    filename = f"/root/StickyToken/results/{tag}/{model_id_alphanum}.{extension}"
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    return filename

def write_ground_truth_magic_tokens(token_scores, model_name, score_name, threshold=0.01,reverse=True):
    sorted_token_scores = sorted(token_scores.items(), key=lambda item: item[1][f'{score_name}'], reverse=reverse)
    top_percent_count = max(1, int(len(sorted_token_scores) * threshold))
    top_percent_tokens = sorted_token_scores[:top_percent_count]
    print(f"根据指标{score_name}前1%的 token:", top_percent_tokens)
    output_path = output_name(model_name+f'_{score_name}+'f'{threshold}', "ground_truth_magic_tokens", "csv")
    with open(output_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["Token ID",'Token', f"{score_name}"]) 
        for token_id, score in top_percent_tokens:
            writer.writerow([token_id, tokenizer.convert_ids_to_tokens(token_id),score[f'{score_name}']])

def write_verification_results(token_infos, model_name, compress=True) -> str:
    output_file = output_name(model_name, "verifications", "jsonl")
    open_fn_with_formats = [(open, "")]
    if compress:  # write both compressed and uncompressed versions, with uncompressed never committed
        open_fn_with_formats.append((gzip.open, ".gz"))
    for open_func, gzext in open_fn_with_formats:
        with open_func(output_file + gzext, "wt") as f:
            for _, token_info in sorted(token_infos.items()):
                print(json.dumps(token_info), file=f)
    return output_file


In [None]:
write_ground_truth_magic_tokens(token_scores, model_name, 'cosine_distance_score')
write_ground_truth_magic_tokens(token_scores, model_name, 'euclidean_distance_score')
write_ground_truth_magic_tokens(token_scores, model_name, 'manhattan_distance_score')

In [None]:
# write_ground_truth_magic_tokens(mean_token_distances, model_name, 'mean_cosine_distance',reverse=False)
# write_ground_truth_magic_tokens(mean_token_distances, model_name, 'mean_euclidean_distance',reverse=False)
# write_ground_truth_magic_tokens(mean_token_distances, model_name, 'mean_manhattan_distance',reverse=False)

In [None]:
# write_ground_truth_magic_tokens(add_token_scores, model_name, 'cosine_distance_add_score')
# write_ground_truth_magic_tokens(add_token_scores, model_name, 'euclidean_distance_add_score')
# write_ground_truth_magic_tokens(add_token_scores, model_name, 'manhattan_distance_add_score')

In [None]:
# import csv
# token_output_path = f'/root/magic_embed/ground truth of magic tokens/{model_name}_1%.csv'

# with open(token_output_path, mode='w', newline='') as file:
#     writer = csv.writer(file)
#     writer.writerow(["Token ID",'Token', "Score"])  # 写入表头
#     for token_id, score in top_percent_tokens:
#         writer.writerow([token_id, tokenizer.convert_ids_to_tokens(token_id),score])

In [None]:
# 获取 special tokens map
special_tokens_map = tokenizer.special_tokens_map

# 获取所有 special tokens
all_special_tokens = [special_tokens_map['eos_token'], 
                      special_tokens_map['unk_token'], 
                      special_tokens_map['pad_token']] + special_tokens_map['additional_special_tokens']

# 转换这些 tokens 为 id
special_token_ids = tokenizer.convert_tokens_to_ids(all_special_tokens)

print(special_token_ids)
print(len(special_token_ids))

In [None]:
from magikarp.utils import oov_distance_metrics

metrics = oov_distance_metrics(wte,special_token_ids)
l2_norm = np.linalg.norm(wte, axis=1)
l2_norm.shape

In [None]:
metrics

In [None]:
token_infos = {}
for token_id, token_score in token_scores.items():
    metric =  {'l2_norm': l2_norm[token_id],
              'l2_distance':  metrics.l2_distance[token_id],
              'cosine_distance': metrics.cosine_distance[token_id],
                'cosine_distance_without_first_pc': metrics.cosine_distance_without_first_pc[token_id],
                
      }  
    token_info = dict(i=token_id,
                       raw_vocab=tokenizer.convert_ids_to_tokens(token_id) ,
                      metrics = {k: round(float(v),6) for k, v in metric.items()},
                      token_scores=token_score,
                      # add_token_scores=add_token_scores[token_id],
                      # mean_token_distances = mean_token_distances[token_id]
                      )
    token_infos[token_id] = token_info
token_infos

In [None]:
write_verification_results(token_infos, model_name, compress=True)

In [None]:
def save_results(all_results, model_name,copress=True):
    def convert_np_types(data):
        if isinstance(data, dict):
            return {key: convert_np_types(value) for key, value in data.items()}
        elif isinstance(data, list):
            return [convert_np_types(item) for item in data]
        elif isinstance(data, np.generic):
            return data.item()
        else:
            return data

    results_output_path = output_name(model_name, "all_results", "jsonl")
    compressed_output_path = output_name(model_name, "all_results", "jsonl.gz")

    if copress:
        with gzip.open(compressed_output_path, 'wt', encoding='utf-8') as f:
            for item in convert_np_types(all_results):
                f.write(json.dumps(item, ensure_ascii=False) + '\n')
        print(f"结果已保存到压缩文件: {compressed_output_path}")
    else:
        with jsonlines.open(results_output_path, mode='w') as writer:
            for item in convert_np_types(all_results):
                writer.write(item)
        print(f"结果已保存到文件: {results_output_path}")


# found_pos_token_df = pd.DataFrame({'token':found_pos_token,'token_id':found_pos_token_id})
# found_pos_token_df.to_csv(found_token_output_path,index=False)

In [None]:
verification_results_dataset_path = '/root/StickyToken/results/ground_truth_magic_tokens/sentence_t5_base_euclidean_distance_score_0_01.csv'
verification_results_dataset = load_dataset('csv', data_files=verification_results_dataset_path,split='train')
verification_results_dataset

In [None]:
# 初始化一个列表来存储验证结果
verification_results = []

# 遍历数据集中的每个token
from tqdm import tqdm

for row in tqdm(verification_results_dataset, desc="验证进度"):
    token_id = row['Token ID']
    token = row['Token']
    
    # 使用token_verification函数进行验证
    _, flag = token_verification(token, verification_gt_texts, verification_gt_embs, verification_contract_texts, verification_gt_metrics)
    
    # 将结果添加到列表中
    verification_results.append({
        'Token ID': token_id,
        'Token': token,
        'Verification Result': flag
    })
#结果统计
verification_results_df = pd.DataFrame(verification_results)
print('验证结果统计：',verification_results_df['Verification Result'].value_counts())

# 将结果保存为CSV文件
output_path = '/root/StickyToken/results/verification_results.csv'
verification_results_df.to_csv(output_path, index=False, encoding='utf-8')

print(f"验证结果已保存到: {output_path}")


In [None]:
verification_results_df.head()

In [None]:
verification_results_df['Verification Result'].value_counts()

In [None]:
# 筛选出全为1的验证结果
all_ones_results = verification_results_df[verification_results_df['Verification Result'].apply(lambda x: x == {'cosine_distance_flag': 1, 'euclidean_distance_flag': 1, 'manhattan_distance_flag': 1})]

print("全为1的验证结果数量:", len(all_ones_results))
print("\n前5行结果:")
print(all_ones_results.head())

# 保存筛选后的结果
output_path = '/root/StickyToken/results/all_ones_verification_results.csv'
all_ones_results.to_csv(output_path, index=False, encoding='utf-8')
print(f"\n全为1的验证结果已保存到: {output_path}")
