In [1]:
import sys
sys.path.append('FinNLP')
from transformers import AutoModel, AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer
from peft import PeftModel
import os
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import numpy as np
import pandas as pd
import faiss
import torch
from huggingface_hub import login
from finnlp.benchmarks.fpb import test_fpb
from finnlp.benchmarks.fiqa import test_fiqa , add_instructions
from finnlp.benchmarks.tfns import test_tfns
from finnlp.benchmarks.nwgi import test_nwgi

In [2]:


# 登录到 Hugging Face Hub
login("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")

# 定义模型名称和批处理大小
model_name = "codex68/model"
batch_size = 8

# 加载 tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 配置量化
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True
)

# 加载模型并自动映射到设备（如 GPU）
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True
)

# 检查是否有可用的 GPU
device = "cuda" if torch.cuda.is_available() else "cpu"

# 加载数据集并准备知识库
data = pd.read_csv("final_combined_dataset.csv")

# 将数据分为有情绪标签和无情绪标签的两部分
labeled_data = data.dropna(subset=["label"])  # 有情绪标签的数据
unlabeled_data = data[data["label"].isna()]  # 无情绪标签的数据

# 使用有情绪标签的部分数据生成嵌入
labeled_texts = labeled_data['context'].tolist()
labeled_labels = labeled_data['label'].tolist()
labeled_instructions = labeled_data['instruction'].tolist()

# 使用无情绪标签的部分数据生成嵌入
unlabeled_texts = unlabeled_data['context'].tolist()
unlabeled_instructions = unlabeled_data['instruction'].tolist()

# 生成并保存嵌入
labeled_embedding_file = "labeled_embeddings.npy"
unlabeled_embedding_file = "unlabeled_embeddings.npy"

if not os.path.exists(labeled_embedding_file) or not os.path.exists(unlabeled_embedding_file):
    # 使用 SentenceTransformer 在 GPU 上创建嵌入
    embedding_model = SentenceTransformer('paraphrase-MiniLM-L3-v2').to(device)
    
    # 生成有情绪标签部分的嵌入
    labeled_embeddings = embedding_model.encode(labeled_texts, convert_to_tensor=True).cpu().numpy()
    np.save(labeled_embedding_file, labeled_embeddings)
    
    # 生成无情绪标签部分的嵌入
    unlabeled_embeddings = embedding_model.encode(unlabeled_texts, convert_to_tensor=True).cpu().numpy()
    np.save(unlabeled_embedding_file, unlabeled_embeddings)
else:
    # 从文件加载嵌入
    labeled_embeddings = np.load(labeled_embedding_file)
    unlabeled_embeddings = np.load(unlabeled_embedding_file)


The token has not been saved to the git credentials helper. Pass `add_to_git_credential=True` in this function directly or `--add-to-git-credential` if using via `huggingface-cli` if you want to set the git credential as well.
Token is valid (permission: fineGrained).
Your token has been saved to C:\Users\PC\.cache\huggingface\token
Login successful


Unused kwargs: ['_load_in_4bit', '_load_in_8bit', 'quant_method']. These kwargs are not used in <class 'transformers.utils.quantization_config.BitsAndBytesConfig'>.


In [3]:
if not os.path.exists(labeled_embedding_file) or not os.path.exists(unlabeled_embedding_file):
    # 使用 SentenceTransformer 在 GPU 上创建嵌入
    embedding_model = SentenceTransformer('paraphrase-MiniLM-L3-v2').to(device)
    
    # 生成有情绪标签部分的嵌入
    labeled_embeddings = embedding_model.encode(labeled_texts, convert_to_tensor=True).cpu().numpy()
    np.save(labeled_embedding_file, labeled_embeddings)
    
    # 生成无情绪标签部分的嵌入
    unlabeled_embeddings = embedding_model.encode(unlabeled_texts, convert_to_tensor=True).cpu().numpy()
    np.save(unlabeled_embedding_file, unlabeled_embeddings)
else:
    # 从文件加载嵌入
    labeled_embeddings = np.load(labeled_embedding_file)
    unlabeled_embeddings = np.load(unlabeled_embedding_file)

# 使用 FAISS 创建索引
d = labeled_embeddings.shape[1]
nlist = 100
quantizer = faiss.IndexFlatL2(d)

# 有标签数据的索引
labeled_index = faiss.IndexIVFFlat(quantizer, d, nlist)
labeled_index.train(labeled_embeddings)
labeled_index.add(labeled_embeddings)
labeled_index.nprobe = 10

# 无标签数据的索引
unlabeled_index = faiss.IndexIVFFlat(quantizer, d, nlist)
unlabeled_index.train(unlabeled_embeddings)
unlabeled_index.add(unlabeled_embeddings)
unlabeled_index.nprobe = 10

# 定义判断是否为情绪问题的函数
def is_sentiment_question(instruction):
    return "sentiment" in instruction.lower() or "choose an answer from" in instruction.lower()

# 定义 RAG 检索和生成流程
def rag_generate(instruction, context, top_k=3, max_new_tokens=50):
    if is_sentiment_question(instruction):
        # 情绪问题，使用有标签数据
        query_text = instruction + " " + context
        embedding_model = SentenceTransformer('paraphrase-MiniLM-L3-v2').to(device)
        query_embedding = embedding_model.encode([query_text], convert_to_tensor=True).cpu().numpy()
        
        # 检索有标签的上下文
        _, indices = labeled_index.search(query_embedding, top_k)
        retrieved_labels = [labeled_labels[i] for i in indices[0]]  # 获取情绪标签
        
        # 返回最常见的情绪标签作为回答
        final_label = max(set(retrieved_labels), key=retrieved_labels.count)
        return final_label
    else:
        # 非情绪问题，使用无标签数据
        query_text = instruction + " " + context
        embedding_model = SentenceTransformer('paraphrase-MiniLM-L3-v2').to(device)
        query_embedding = embedding_model.encode([query_text], convert_to_tensor=True).cpu().numpy()
        
        # 检索无标签的上下文
        _, indices = unlabeled_index.search(query_embedding, top_k)
        retrieved_texts = [unlabeled_texts[i] for i in indices[0]]  # 获取检索的上下文文本
        
        # 拼接检索到的上下文作为输入
        input_text = instruction + " " + context + " " + " ".join(retrieved_texts[:top_k])
        inputs = tokenizer(input_text, return_tensors="pt").to(device)
        
        # 模型生成答案
        outputs = model.generate(**inputs, max_new_tokens=max_new_tokens)
        generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        return generated_text



In [12]:
# 测试 RAG 生成
instruction = "Twitter and Facebook parent Meta previously moved to restrict Russian accounts from profiting on their platforms following Russia's invasion of Ukraine."
context = "Please choose an answer from {strong negative/moderately negative/mildly negative/neutral/mildly positive/moderately positive/strong positive}."
output = rag_generate(instruction, context, top_k=3, max_new_tokens=200)
print("生成的情绪标签或回答:", output)

生成的情绪标签或回答: Twitter and Facebook parent Meta previously moved to restrict Russian accounts from profiting on their platforms following Russia's invasion of Ukraine. Please choose an answer from {strong negative/moderately negative/mildly negative/neutral/mildly positive/moderately positive/strong positive}. The Russia-based cybercriminals who  company in February did not walk away from the endeavor empty-handed. "A ransom was paid as part of the company's commitment to do all it could to protect patient data from disclosure," a UnitedHealth Group spokesperson confirmed with CBS News late Monday. The spokesperson did not disclose how much the health giant paid after the cyberattack,  at hospitals and pharmacies for more than a week. Multiple media sources have reported that UnitedHealth paid  in the form of bitcoin. "We know this attack has caused concern and been disruptive for consumers and providers and we are committed to doing everything possible to help and provide support to anyo

In [13]:
# 测试情绪问题的输入
instruction = "What is the sentiment of this headline?"
context = "Twitter and Facebook parent Meta previously moved to restrict Russian accounts from profiting on their platforms following Russia's invasion of Ukraine.{strong negative/moderately negative/mildly negative/neutral/mildly positive/moderately positive/strong positive}."

# 调用 rag_generate 函数
output = rag_generate(instruction, context, top_k=3, max_new_tokens=200)
print("生成的情绪标签或回答:", output)


生成的情绪标签或回答: mildly negative


test with rag 

In [5]:
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score
from datasets import load_dataset
from tqdm import tqdm

# 加载 `dohonba/tfns` 数据集
dataset = load_dataset("dohonba/tfns")
dataset = dataset['validation']
dataset = dataset.to_pandas()

# 假设 `answer` 列是情绪标签
dic = {
    "negative": "negative",
    "positive": "positive",
    "neutral": "neutral",
}

# 将标签列映射到情绪标签（根据 `answer` 列）
dataset['label'] = dataset['answer'].apply(lambda x: dic.get(x.lower(), "neutral"))

# 定义指令
dataset["instruction"] = 'What is the sentiment of this news? Please choose an answer from {negative/neutral/positive}.'

# 生成 context 和 target 列
dataset["context"] = "Instruction: " + dataset["instruction"] + "\nInput: " + dataset["context"] + "\nAnswer: "
dataset["target"] = dataset["label"]

# 定义 `rag_generate` 函数
# 确保您已经定义了 `rag_generate`，该函数可以根据给定的 instruction 和 context 返回预测的情绪标签
# 例如：
# def rag_generate(instruction, context):
#     # RAG 检索和生成逻辑
#     pass

# 使用 RAG 生成预测
predictions = []
for idx, row in tqdm(dataset.iterrows(), total=len(dataset), desc="Generating predictions"):
    instruction = row["instruction"]
    context = row["context"]
    # 获取模型预测结果
    prediction = rag_generate(instruction, context)
    predictions.append(prediction)

# 将预测结果添加到数据集中
dataset["predicted_label"] = predictions

# 规范化标签
def normalize_label(x):
    if isinstance(x, str):
        if 'positive' in x.lower():
            return 'positive'
        elif 'negative' in x.lower():
            return 'negative'
        else:
            return 'neutral'
    return 'neutral'

dataset["normalized_target"] = dataset["target"].apply(normalize_label)
dataset["normalized_prediction"] = dataset["predicted_label"].apply(normalize_label)

# 计算准确率和 F1 分数
accuracy = accuracy_score(dataset["normalized_target"], dataset["normalized_prediction"])
f1_macro = f1_score(dataset["normalized_target"], dataset["normalized_prediction"], average="macro")
f1_micro = f1_score(dataset["normalized_target"], dataset["normalized_prediction"], average="micro")
f1_weighted = f1_score(dataset["normalized_target"], dataset["normalized_prediction"], average="weighted")

# 输出结果
print(f"Accuracy: {accuracy:.2%}")
print(f"F1 Macro: {f1_macro:.2%}")
print(f"F1 Micro: {f1_micro:.2%}")
print(f"F1 Weighted: {f1_weighted:.2%}")


Generating predictions:   1%|▌                                                                         | 17/2388 [00:42<1:39:35,  2.52s/it]


KeyboardInterrupt: 

In [22]:
# 检查数据集的列名称
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score
from datasets import load_dataset
from tqdm import tqdm

# 加载 `dohonba/tfns` 数据集
dataset = load_dataset("dohonba/tfns")
print(dataset.column_names)


{'train': ['context', 'answer', 'question'], 'validation': ['context', 'answer', 'question']}
