In [None]:
# 测试prompt-bert
from transformers import BertTokenizer, BertModel
import torch
import torch.nn.functional as F

# 使用 'bert-base-uncased' 模型
model_name = 'result/28/'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

# 示例句子
sentence1 = "I love machine learning and natural language processing."
# sentence2 = "I hate machine learning and natural language processing."
template = 'This sentence : "{sentence}" means [MASK].'
mask_token_id = tokenizer.mask_token_id

sentence2 = template.format(sentence=sentence1)


# 将句子转换为token ID，并添加特殊token [CLS] 和 [SEP]
inputs1 = tokenizer(sentence1, return_tensors='pt', padding=True, truncation=True)
inputs2 = tokenizer(sentence2, return_tensors='pt', padding=True, truncation=True)

# 模型不需要计算梯度，因此使用 torch.no_grad()
with torch.no_grad():
    outputs1 = model(**inputs1)
    outputs2 = model(**inputs2)

# BERT 输出的是一个包含多层的输出，这里我们只关心最后一层的隐藏状态
last_hidden_state1 = outputs1.last_hidden_state
last_hidden_state2 = outputs2.last_hidden_state

# 取 [CLS] token 对应的向量，作为整个句子的向量表示
sentence_embedding1 = last_hidden_state1[:, 0, :]  # [batch_size, hidden_size]
# sentence2_embedding = last_hidden_state2[:, 0, :]  # [batch_size, hidden_size]
sentence2_embedding = last_hidden_state2[inputs2['input_ids'] == mask_token_id]  # [batch_size, hidden_size]
# sentence2_embedding = sentence2_embedding.view(1, -1)

cos_sim = F.cosine_similarity(sentence_embedding1, sentence2_embedding)

print(cos_sim.item())


In [None]:
from transformers import BertTokenizer, BertModel
import torch
import torch.nn.functional as F

model_name = 'princeton-nlp/unsup-simcse-bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

# 示例句子
sentence = "I love machine learning and natural language processing."

# 通过不同的方式计算句子表征
# 1 什么都不做，直接使用 [CLS] token 的向量
inputs = tokenizer(sentence, return_tensors='pt', padding=True, truncation=True)
with torch.no_grad():
    outputs1 = model(**inputs)
    emb1 = outputs1.last_hidden_state[:, 0, :]
cos_sim = F.cosine_similarity(emb1, emb1)
print("余弦相似度 (直接使用[CLS] token 向量):", cos_sim.item())

# 2 整个句子进去
template = 'This sentence : "{sentence}" means [MASK].'
prompt_sentenct = template.format(sentence=sentence).replace('[MASK]', tokenizer.mask_token)
inputs2 = tokenizer(prompt_sentenct, return_tensors='pt', padding=True, truncation=True)
with torch.no_grad():
    outputs2 = model(**inputs2)
    emb2 = outputs2.last_hidden_state[:, 0, :] # cls
    emb3 = outputs2.last_hidden_state[inputs2['input_ids'] == tokenizer.mask_token_id]  # mask
cos_sim = F.cosine_similarity(emb1, emb2)
cos_sim2 = F.cosine_similarity(emb1, emb3)
print("余弦相似度 (使用Prompt-BERT的整个句子向量):", cos_sim.item())
print("余弦相似度 (使用Prompt-BERT的[MASK] token 向量):", cos_sim2.item())



In [None]:
# 3 引入attention mask
template = 'This sentence : "{sentence}" means [MASK].'.replace('[MASK]', tokenizer.mask_token)
prefix = template.split("{sentence}")[0]
suffix = template.split("{sentence}")[1]
prefix_input_ids = tokenizer(prefix, return_tensors='pt', padding=True, truncation=True)['input_ids'].view(-1)
suffix_inputs_ids = tokenizer(suffix, return_tensors='pt', padding=True, truncation=True)['input_ids'].view(-1)
sentence_inputs_ids = tokenizer(sentence, return_tensors='pt', padding=True, truncation=True)['input_ids'].view(-1)

prefix_input_ids = prefix_input_ids[:-1]
suffix_inputs_ids = suffix_inputs_ids[1:]
sentence_inputs_ids = sentence_inputs_ids
input_ids = torch.cat([prefix_input_ids, sentence_inputs_ids, suffix_inputs_ids])

prompt_weight = 0
attention_mask = torch.cat([
    torch.full(prefix_input_ids.size(),prompt_weight),
    torch.full(sentence_inputs_ids.size(),float(1)),
    torch.full(suffix_inputs_ids.size(),prompt_weight)
])

inputs3 = {'input_ids': input_ids.view(1, -1), 'attention_mask': attention_mask.view(1, -1)}
with torch.no_grad():
    outputs3 = model(**inputs3)
    emb3 = outputs3.last_hidden_state[inputs3['input_ids'] == tokenizer.mask_token_id]  # mask
cos_sim3 = F.cosine_similarity(emb1, emb3)
print("引入attention mask的的相似度", cos_sim3.item())

In [None]:
# 采样数据集
import json
import random
import os
import numpy as np

dataset_path = 'data/wiki1m_for_simcse_ner.json'

with open(dataset_path, 'r', encoding='utf-8') as f:
    dataset = json.load(f)

print("数据集大小:", len(dataset))
print("数据集示例:", dataset[0])

# 采样1000个样本
dataset_sample = random.sample(dataset, 10)


In [None]:
dict_path = 'data/wiki1m_for_simcse_ner_entity_dict.json'
with open(dict_path, 'r', encoding='utf-8') as f:
    entity_dict = json.load(f)


In [None]:
import hashlib
def hash(text):
    return hashlib.md5(text.encode()).hexdigest()

output_path = 'data/wiki1k_with_entity_knowledge.json'
for item in dataset_sample:
    entity_list = item['entities']
    print(item)
    for entity in entity_list:
        entity_hash = hash(entity['text']) + '.json'
        if entity_hash in entity_dict:
            entity['knowledge'] = entity_dict[entity_hash]
            print(entity)
    break

In [None]:
# 测试apex是否可用
from transformers.utils import is_apex_available
print(is_apex_available())

In [None]:
# 测试评测能力
from utils.auto_eval import eval

eval('model/unsup-PromptBERT-baseline',pooler='avg')

In [None]:
# 测试bert attention mask
import torch
from transformers import BertTokenizer, BertModel
import torch.nn.functional as F

model_name = 'bert-base-uncased'

tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

sentence = "I love machine learning and natural language processing."

inputs = tokenizer(sentence, return_tensors='pt', padding=True, truncation=True)


inputs['attention_mask'] = torch.ones_like(inputs['input_ids'])

padding = torch.full((inputs['input_ids'].shape[0], 5), fill_value=0, dtype=inputs['input_ids'].dtype)

output = model(input_ids=padding)
last_hidden_state1 = output.last_hidden_state[:, 0, :]
pass


In [None]:
import nltk
nltk.download('punkt_tab')

In [None]:
# 检索
from knowledge.retrieval import retrieve_knowledge
from transformers import AutoTokenizer, AutoModel
import torch.nn.functional as F
import torch
import redis
import base64
from tqdm import tqdm
from nltk.tokenize import sent_tokenize
import json

def text_encode(text):
    # base64 编码
    return base64.b64encode(text.encode()).decode()

input_file = 'data/wiki1m_for_simcse.txt'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model_name = 'princeton-nlp/unsup-simcse-bert-base-uncased'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).cuda()

r = redis.Redis(host='59.77.134.205', port=6379, db=2, password='lyuredis579')
prefix = 'similarity_sent_'
topk = 10

with open(input_file, 'r', encoding='utf-8') as f:
    sent_list = f.read().splitlines()

for sent in tqdm(sent_list):
    
    key = prefix + text_encode(sent)
    if r.exists(key):
        continue
    summary = retrieve_knowledge(sent, retrieve_type="summary")
    if not summary:
        continue
    sentences = sent_tokenize(summary)
    # 清洗
    for i, sentence in enumerate(sentences):
        if "References" in sentence:
            sentences = sentences[:i]
            break

    _topk = topk if len(sentences) > topk else len(sentences)
    sentences = [sent] + sentences

    with torch.no_grad():
        inputs = tokenizer(sentences, return_tensors='pt', padding=True, truncation=True).to(device)
        outputs = model(**inputs)

        embeddings = outputs.last_hidden_state[:, 0, :]
        # 第一句是输入句子，后面的是检索到的知识，做相似度计算
        cos_sim = F.cosine_similarity(embeddings[0:1], embeddings[1:], dim=1)

        # 计算topk高的相似度的句子
        topk_index = cos_sim.topk(_topk).indices
        topk_sentences = [sentences[i.item()] for i in topk_index]
        key = prefix + text_encode(sent)
        info = []
        for i, sentence in enumerate(topk_sentences):
            info.append((cos_sim[i].item(),sentence))
        r.set(key, json.dumps(info))

In [None]:
# 检索
from knowledge.retrieval import retrieve_knowledge
import redis
import base64
from tqdm import tqdm
import json

def text_encode(text):
    # base64 编码
    return base64.b64encode(text.encode()).decode()

input_file = 'data/wiki1m_for_simcse.txt'

prefix = 'similarity_sent_'
topk = 10

with open(input_file, 'r', encoding='utf-8') as f:
    sent_list = f.read().splitlines()

sent_list = sent_list[:10]

r = redis.Redis(host='localhost', port=6379, db=2, password='lyuredis579')

for sent in tqdm(sent_list):
    value = retrieve_knowledge(sent, retrieve_type="sentence")
    for sim, sent in value:
        print(sim)
        print(sent)

In [None]:
# 生成新的数据集
from knowledge.retrieval import retrieve_knowledge
import base64
import json
import random
from tqdm import tqdm

def text_encode(text):
    # base64 编码
    return base64.b64encode(text.encode()).decode()

input_file = 'data/wiki1m_for_simcse.txt'
with open(input_file, 'r', encoding='utf-8') as f:
    sent_list = f.read().splitlines()

new_list = []

for sent in tqdm(sent_list):
    value = retrieve_knowledge(sent, retrieve_type="sentence")
    if value:
        new_list.append((sent, value))

bs = 128
total = 1e6 # 100w
batch_num = int(total / bs)
print("新数据集大小:", len(new_list))

empty_rate = len(new_list) / len(sent_list)
print("空值率:", empty_rate)
empty_in_batch = int(bs * empty_rate)
add_num = int(bs - empty_in_batch)
print("每个batch增加{}个样本".format(add_num))

output_file = 'data/wiki1m_for_simcse_test0.txt'

with open(output_file, 'w', encoding='utf-8') as f:
    for i in range (batch_num):
        batch = []
        orgin_batch = new_list[i * empty_in_batch: (i + 1) * empty_in_batch]
        # 从value中补充add_num个
        new_sent = []
        # 把所有的value组成一个list
        value_list = []
        for _, value in orgin_batch:
            for sim,sent in value:
                if sim > 0.3 and sim < 0.9 and "==" not in sent:
                    value_list.append(sent)
        value_list = list(set(value_list))
        random.shuffle(value_list)
        value_list = value_list[:add_num]
        # 组成新的batch
        batch = value_list + [sent for sent, _ in orgin_batch]
        print("batch大小:", len(batch))
        f.write('\n'.join(batch))

In [None]:
from transformers import BertTokenizer, BertModel

model_name = 'princeton-nlp/unsup-simcse-bert-base-uncased'

model = BertModel.from_pretrained(model_name)
tokenizer = BertTokenizer.from_pretrained(model_name)

sentence = "I love machine learning and natural language processing."

inputs = tokenizer.encode(sentence, add_special_tokens=False)
a = inputs

In [None]:
# 检查知识库命中率
from knowledge.retrieval import retrieval_knowledge_batch
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np

input_file = 'data/wiki1m_for_simcse.txt'
with open(input_file, 'r', encoding='utf-8') as f:
    sent_list = f.read().splitlines()

bs = 64
hit_rate_list = []

count = 0

for i in tqdm(range(0, len(sent_list), bs)):
    sent_batch = sent_list[i:i+bs]
    knowledge = retrieval_knowledge_batch(sent_batch, retrieve_type='title', title_num=-1)

    title_list = knowledge
    title_num_list = [len(t) for t in title_list]
    count += sum([1 for num in title_num_list if num >= 1])
    hit_count = sum(title_num_list)

    # hit_count = sum([1 for k in knowledge if k])
    hit_rate_list.append(hit_count / bs)

print(f"多知识:{count}")

mean = np.mean(hit_rate_list)
median = np.median(hit_rate_list)
std_dev = np.std(hit_rate_list)
min_val = np.min(hit_rate_list)
max_val = np.max(hit_rate_list)
percentiles = np.perxcentile(hit_rate_list, [25, 50, 75])

print(f"平均值: {mean}, 中位数: {median}, 标准差: {std_dev}, 最小值: {min_val}, 最大值: {max_val}, 四分位数: {percentiles}")


In [None]:

# 检查知识库命中率
from knowledge.retrieval import retrieval_knowledge_batch
from tqdm import tqdm
input_file = 'data/wiki1m_for_simcse.txt'
with open(input_file, 'r', encoding='utf-8') as f:
    sent_list = f.read().splitlines()

# sent_list = sent_list[:1000]

knowledge_list = retrieval_knowledge_batch(sent_list, retrieve_type='sentence')

cnt = 0
for knowledge in tqdm(knowledge_list):
    cnt += len(knowledge)
print(cnt)

In [None]:


def get_delta(template_token, length=50):
    with torch.set_grad_enabled(not cls.model_args.mask_embedding_sentence_delta_freeze):
        device = input_ids.device
        # length, template_token len
        d_input_ids = torch.Tensor(template_token).repeat(length, 1).to(device).long()
        d_inputs_embeds = None

        d_position_ids = torch.arange(d_input_ids.shape[1]).to(device).unsqueeze(0).repeat(length, 1).long()
            
        d_position_ids[:, len(cls.bs)+1:] += torch.arange(length).to(device).unsqueeze(-1)
        
        m_mask = d_input_ids == cls.mask_token_id
        outputs = encoder(input_ids=d_input_ids if d_inputs_embeds is None else None ,
                            inputs_embeds=d_inputs_embeds,
                            position_ids=d_position_ids,  output_hidden_states=True, return_dict=True)
        last_hidden = outputs.last_hidden_state
        delta = last_hidden[m_mask]
        template_len = d_input_ids.shape[1]
        if cls.model_args.mask_embedding_sentence_org_mlp:
            delta = cls.mlp(delta)
        return delta, template_len

In [None]:
# T5 test
from transformers import T5Tokenizer, T5ForConditionalGeneration

# 加载 T5 模型和分词器
model_name = "t5-base"
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)

# 准备输入文本
context = "Machine learning is a subset of artificial intelligence that focuses on building systems that can learn from and make decisions based on data. Unlike traditional programming, machine learning models improve over time with more data."
question = "What is the focus of machine learning?"
input_text = f"question: {question} context: {context}"

# 对输入文本进行分词并转换为张量
inputs = tokenizer(input_text, return_tensors="pt")

# 使用模型生成输出
output_ids = model.generate(inputs.input_ids, max_length=50, num_beams=5, early_stopping=True)

# 解码生成的输出
output_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)

# 打印结果
print("Generated Output:", output_text)

In [None]:
from knowledge.retrieval import retrieval_knowledge

from tqdm import tqdm
input_file = 'data/wiki1m_for_simcse.txt'
with open(input_file, 'r', encoding='utf-8') as f:
    sent_list = f.read().splitlines()

sent_list = sent_list[:1000]

knowledge_list = retrieval_knowledge(sent_list, retrieve_type='sentence')

pass

cnt = 0
sent_cnt = 0
for knowledge in tqdm(knowledge_list):
    o_cnt = cnt
    for sim, sent in knowledge:
        if sim > 0.6:
            cnt += 1
    if cnt > o_cnt:
        sent_cnt += 1
print(cnt)
print(sent_cnt)

In [38]:
# 测试prompt的相似度
from transformers import AutoTokenizer, AutoModel
import torch.nn.functional as F
import torch

model_name = "princeton-nlp/unsup-simcse-bert-base-uncased"
# model_name = "bert-base-uncased"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

s1 = "Deep learning models have transformed image recognition."
s2 = "Convolutional neural networks are commonly used for image classification."

# 句子1的外部知识
knowledge_sent1 = [
    "Deep learning is a subset of machine learning that uses neural networks with many layers.",
    "Image recognition involves identifying objects, people, or features in an image.",
    "Convolutional neural networks (CNNs) are a type of deep learning model particularly effective for image data."
]

prompt1 = "This sentence of '{sentence}' means [MASK].".replace('[MASK]', tokenizer.mask_token)
prompt2 = "The phrase '{sentence}' may relate to '{knowledge}', thus [MASK] is implied.".replace('[MASK]', tokenizer.mask_token)
# prompt2 = prompt

sent1 = prompt1.format(sentence=s1)
sent2 = prompt1.format(sentence=s2)
k_sent1 = prompt2.format(sentence=s1, knowledge=knowledge_sent1[1])

mask_token_id = tokenizer.mask_token_id

inputs1 = tokenizer(sent1, return_tensors='pt', padding=True, truncation=True)
inputs2 = tokenizer(sent2, return_tensors='pt', padding=True, truncation=True)
k_inputs1 = tokenizer(k_sent1, return_tensors='pt', padding=True, truncation=True)

with torch.no_grad():
    output1 = model(**inputs1)
    output2 = model(**inputs2)

    k_output1 = model(**k_inputs1)

emb1 = output1.last_hidden_state[inputs1["input_ids"] == mask_token_id]
emb2 = output2.last_hidden_state[inputs2["input_ids"] == mask_token_id]
k_emb1 = k_output1.last_hidden_state[k_inputs1["input_ids"] == mask_token_id]

print(f"sent1 and sent2: {F.cosine_similarity(emb1, emb2).item()}")
print(f"sent1 and k_sent1: {F.cosine_similarity(emb1, k_emb1).item()}")
print(f"sent2 and k_sent1: {F.cosine_similarity(emb2, k_emb1).item()}")

sent1 and sent2: 0.7866969704627991
sent1 and k_sent1: 0.8518177270889282
sent2 and k_sent1: 0.6989714503288269


In [None]:
template = "*cls*_This_sentence_of_\"*sent_0*\"_means*mask*.*sep+*"

template = template.replace('*mask*', tokenizer.mask_token)\
                    .replace('*sep+*', '')\
                    .replace('*cls*', '').replace('*sent_0*', ' ')
template = template.split(' ')

print(template)

template = "The phrase \"{sentence}\" may relate to \"{knowledge}\", thus [MASK] is implied."
template = "This sentence of \"{sentence}\" means [MASK]."
template = template.split("{sentence}")
print(template)


SyntaxError: invalid syntax (958152846.py, line 11)