## 0. 安装依赖

In [None]:
!pip -q install pandas scikit-learn tqdm python-dotenv requests

## 1. 基本配置

In [None]:
import os
from pathlib import Path

#数据路径配置
BASE_DIR = Path("xxx")
TRAIN_CSV = BASE_DIR / "train_data.csv"
TEST_CSV  = BASE_DIR / "test_data.csv"

DATASET_NAME = TRAIN_CSV.stem
PER_FOLD_PREFIX         = BASE_DIR / f"{DATASET_NAME}_cv_fold_preds"
CV_PER_FOLD_METRICS_CSV = BASE_DIR / f"{DATASET_NAME}_cv_metrics_per_fold.csv"
CV_SUMMARY_CSV          = BASE_DIR / f"{DATASET_NAME}_cv_summary.csv"

#选择具体LLMs
"""
QWEN_API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"
QWEN_MODEL   = "qwen-plus-latest"
QWEN_API_KEY = os.getenv("QWEN_API_KEY", "xxx")

or

DEEPSEEK_API_URL = "https://api.deepseek.com/chat/completions"
DEEPSEEK_MODEL   = "deepseek-reasoner"
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY", "xxx")
"""

RANDOM_STATE = 2025

## 2. 读取数据与清洗

In [None]:
import pandas as pd
import numpy as np

train_df = pd.read_csv(TRAIN_CSV, encoding="utf-8-sig")

need = {"orig_index", "abs", "label"}
missing = need - set(train_df.columns)
assert not missing, f"CSV 缺少列: {missing}"

train_df = train_df.copy()
train_df["abs"] = train_df["abs"].astype(str).str.strip()
train_df = train_df.dropna(subset=["abs"])
train_df["label"] = pd.to_numeric(train_df["label"], errors="coerce").fillna(0).astype(int)
train_df["label"] = train_df["label"].clip(0, 1)

print("Loaded rows:", len(train_df))
print("Label distribution:", train_df["label"].value_counts().to_dict())
print(train_df.head(3)[["orig_index", "label", "abs"]])


## 3. 基于 ChineseBERT 的相似样例检索
（在基于Few-Shot Promting的策略中寻找相似判例）

In [None]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics.pairwise import cosine_similarity

MODEL_NAME = "shannonai/ChineseBERT-base" 
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

def encode_texts(texts, batch_size=32, max_length=512):
    if isinstance(texts, str):
        texts = [texts]
        
    all_embeddings = []
    
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i : i + batch_size]
        
        encoded_input = tokenizer(
            batch_texts, 
            padding=True, 
            truncation=True, 
            max_length=max_length, 
            return_tensors='pt'
        ).to(device)
        
        with torch.no_grad():
            model_output = model(**encoded_input)
            sentence_embeddings = model_output.last_hidden_state[:, 0, :]
            sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
            
        all_embeddings.append(sentence_embeddings.cpu().numpy())
        
    return np.vstack(all_embeddings)

def make_fold_encoder_and_index(train_texts, train_labels, batch_size=32):
    X_train_vec = encode_texts(train_texts, batch_size=batch_size)
    train_df_fold = pd.DataFrame({"abs": train_texts, "label": train_labels})
    pos_mask = train_df_fold["label"] == 1
    neg_mask = train_df_fold["label"] == 0
    X_pos = X_train_vec[pos_mask.values] if pos_mask.sum() > 0 else None
    X_neg = X_train_vec[neg_mask.values] if neg_mask.sum() > 0 else None
    
    pos_rows = train_df_fold[pos_mask].reset_index(drop=True) if pos_mask.sum() > 0 else pd.DataFrame(columns=["abs", "label"])
    neg_rows = train_df_fold[neg_mask].reset_index(drop=True) if neg_mask.sum() > 0 else pd.DataFrame(columns=["abs", "label"])

    return encode_texts, X_train_vec, X_pos, X_neg, pos_rows, neg_rows

def topk_from_block_local(q_vec, X_block, rows, k, redundancy_threshold=0.90):
    if (X_block is None) or (getattr(X_block, "shape", (0,0))[0] == 0) or k <= 0:
        return []
    
    if q_vec.ndim == 1:
        q_vec = q_vec.reshape(1, -1)
    # 1. 计算 Query 与所有候选样本的余弦相似度
    sims = cosine_similarity(q_vec, X_block).ravel()
    # 2. 按相似度从大到小排序，获取索引
    sorted_idx = sims.argsort()[::-1]
    selected_indices = []
    # 3. 贪婪策略遍历候选样本，加入去冗余判断
    for idx in sorted_idx:
        if not selected_indices:
            # 第一条最相似的总是直接加入
            selected_indices.append(idx)
        else:
            # 获取当前候选样本的向量
            candidate_vec = X_block[idx].reshape(1, -1)
            # 获取已经选入集合的样本向量矩阵
            selected_vecs = X_block[selected_indices]
            # 计算当前候选样本与【已选入样本】之间的相似度
            intra_sims = cosine_similarity(candidate_vec, selected_vecs).ravel()
            # 如果当前样本与已选样本库中任何一条的相似度都低于阈值，说明具备足够的多样性
            if np.max(intra_sims) < redundancy_threshold:
                selected_indices.append(idx) 
        # 满足 K 条数量要求即停止检索
        if len(selected_indices) >= k:
            break
            
    return rows.iloc[selected_indices][["abs", "label"]].to_dict("records")

## 4. 提示工程设计

In [None]:
# 对AI技术及专利进行基础定义
AI_DEFINITION_CORE = """凡技术属于以下三条路径之一，即视为 AI 相关：
1. 机器人学（Robotics）：具备感知–决策–执行闭环的自主/半自主系统与其核心算法与系统（如SLAM、路径规划、运动控制、协同控制、机器人感知与融合、机器人操作策略、类人/移动/工业/服务/医疗机器人等）。
2. 学习系统（Learning Systems）：以机器学习/深度学习/强化学习等为核心方法的技术与任务（如监督/无监督/自监督/生成式模型、卷积/循环/Transformer、表示学习、图学习、概率生成模型、SVM/Boosting/随机森林等；AI专用硬件若核心目的在于训练/推理加速亦归此类）。
3. 符号系统（Symbolic Systems）：基于符号/逻辑推理与知识表示的AI（如专家系统、知识库/本体、规则引擎+推理机、规划/搜索、约束满足问题CSP、自动定理证明等）。

请注意，这次任务并非是一次“关键词检索”，很多专利的摘要中并不会直接体现上述关键词或者相关联的词汇，但是你需要根据语义推理出其内涵，从而对其进行判断。
判定应关注技术实质是否以以上方法为核心创新，或者设计而非只在背景介绍中泛泛提及。"""

def _sanitize(text: str, max_len=1500) -> str:
    if text is None:
        return ""
    s = str(text)
    s = s.replace("\\", "\\\\")
    s = s.replace("\n", " ").replace("\r", " ")
    s = s.replace('"', '\\"')
    s = s.strip()[:max_len]
    return s

### 4.1. Zero-Shot Prompting

In [None]:
def make_prompt_zero_shot(abstract):
    abs_text = _sanitize(abstract, max_len=1500)
    
    prompt_content = f"""请阅读下方给出的中文专利摘要，并判断它是否属于“AI相关专利”。在本任务中，请严格依据以下定义与判断标准：
【AI相关专利定义】
{AI_DEFINITION_CORE}

请只输出一个数字：
1：属于AI相关专利；
0：不属于AI相关专利。
不要输出任何解释或其他文字。

以下是需要判断的专利摘要：
【待判断专利摘要】
{abs_text}"""

    messages = [
        {"role": "user", "content": prompt_content}
    ]
    return messages

### 4.2. Few-Shot Prompting

In [None]:
def make_prompt_few_shot(abstract, shots):
    example_lines = []
    for i, ex in enumerate(shots):
        label = int(ex["label"])
        label_text = "正类（标签 = 1）" if label == 1 else "负类（标签 = 0）"
        ex_abs = _sanitize(ex["abs"], max_len=220)
        example_lines.append(f"【示例{i+1}：{label_text}】\n摘要内容：{ex_abs}\n")
        
    examples_block = "\n".join(example_lines)
    abs_text = _sanitize(abstract, max_len=1500)

    prompt_content = f"""你将阅读下方给出的中文专利摘要，并判断它们是否属于“AI相关专利”。请首先理解以下定义与判断标准：
【AI相关专利定义】
{AI_DEFINITION_CORE}

请学习以下样例，该样例展示了专利的摘要以及对应其是否属于AI相关专利：
{examples_block}
现在，请在理解上述定义与示例的基础上，对下方“待判断摘要”进行分类。请只输出一个数字：
1：属于AI相关专利；
0：不属于AI相关专利。
不要输出任何其他内容。

以下是需要判断的专利摘要：
【待判断专利摘要】
{abs_text}"""

    messages = [
        {"role": "user", "content": prompt_content}
    ]
    return messages

### 4.3. Role-Based Prompting

In [None]:
def make_prompt_role_based(abstract):
    abs_text = _sanitize(abstract, max_len=1500)
    
    system_role = "你现在的身份是一名长期从事技术情报分析的专利审查员，负责筛选“AI相关专利”。"
    
    user_content = f"""请你根据以下定义与判断标准，阅读下方的中文专利摘要，并给出是否属于“AI相关专利”的判断。AI相关专利的定义与判断标准如下：
【AI相关专利的定义】
{AI_DEFINITION_CORE}

请站在“专利分析员”的立场上，先根据上述标准完成判断，但在最终输出中只给出一个数字：
1：属于AI相关专利；
0：不属于AI相关专利。
不要输出任何解释或附加文字。

以下是需要判断的专利摘要：
【待判断专利摘要】
{abs_text}"""

    messages = [
        {"role": "system", "content": system_role},
        {"role": "user", "content": user_content}
    ]
    return messages

## 5. API封装

In [None]:
import json, time, random, requests

#根据具体LLMs进行调整
def call_deepseek(messages, model=DEEPSEEK_MODEL, temperature=0.0, max_retries=5, timeout=60):
    headers = {
        "Authorization": f"Bearer {DEEPSEEK_API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": model,
        "messages": messages,
        "temperature": float(temperature),
        "response_format": {"type": "json_object"},
    }
    backoff = 1.5

    for attempt in range(max_retries):
        try:
            resp = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=timeout)
            if resp.status_code == 200:
                data = resp.json()
                content = None
                try:
                    content = data["choices"][0]["message"]["content"]
                except Exception:
                    try:
                        content = data["choices"][0]["text"]
                    except Exception:
                        content = json.dumps(data)

                start = content.find("{")
                end   = content.rfind("}")
                if start != -1 and end != -1 and end > start:
                    content_json = content[start:end+1]
                else:
                    content_json = content

                try:
                    obj = json.loads(content_json)
                except Exception:
                    return {"label": 0}
                lbl = obj.get("label", None)
                try:
                    lbl = int(lbl)
                    lbl = 1 if lbl == 1 else 0
                except Exception:
                    lbl = 0
                return {"label": lbl}
            else:
                time.sleep((backoff ** attempt) + random.random())
        except Exception:
            time.sleep((backoff ** attempt) + random.random())
    return {"label": 0}


## 6. 对测试集逐条判读并保存预测结果

In [None]:
import numpy as np
import pandas as pd
import re
from tqdm import tqdm


# 策略配置，可选: "zero_shot", "few_shot", "role_based"
PROMPT_STRATEGY = "role_based"
# 投票次数设置
VOTE_N = 3
# Few-Shot Prompting中TOP-K判例数设置
K_PER_CLASS = 3
OUTPUT_DIR = BASE_DIR
OUTPUT_FILE = OUTPUT_DIR / f"test_preds_{PROMPT_STRATEGY}.csv"

print("正在加载数据集...")
train_df = pd.read_csv(TRAIN_CSV)
test_data = pd.read_csv(TEST_CSV)

X_train = train_df["abs"].fillna("").astype(str).tolist()
y_train = train_df["label"].astype(int).tolist()

X_test = test_data["abs"].fillna("").astype(str).tolist()
if "label" in test_data.columns:
    y_test = test_data["label"].astype(int).tolist()
else:
    raise ValueError("ERROR：测试集中未找到 'label' 列！请检查测试集文件是否包含真实的标签数据。")

def parse_numeric_label(response_text, default=0):
    text = str(response_text).strip()
    match = re.search(r'\d', text)
    if match:
        val = int(match.group())
        return val if val in [0, 1] else default
    return default

if PROMPT_STRATEGY == "few_shot":
    print("正在构建全量训练集的 ChineseBERT 检索库，请稍候...")
    encoder, X_train_vec, X_pos, X_neg, pos_rows, neg_rows = make_fold_encoder_and_index(X_train, y_train)

    def pick_few_shots_local(abstract, k_per_class=K_PER_CLASS):
        q = encoder(abstract) 
        exs = []
        if X_pos is not None:
            exs += topk_from_block_local(q, X_pos, pos_rows, k_per_class)
        if X_neg is not None:
            exs += topk_from_block_local(q, X_neg, neg_rows, k_per_class)
        return exs

preds = []

for txt in tqdm(X_test, desc=f"Testing on test_data ({PROMPT_STRATEGY})"):
    gate = None
    if "heuristic_gate" in globals():
        try:
            gate = heuristic_gate(txt)
        except Exception:
            gate = None

    if gate is not None:
        preds.append(int(gate))
        continue

    if PROMPT_STRATEGY == "few_shot":
        shots = pick_few_shots_local(txt, K_PER_CLASS)
        messages = make_prompt_few_shot(txt, shots)
    elif PROMPT_STRATEGY == "zero_shot":
        messages = make_prompt_zero_shot(txt)
    elif PROMPT_STRATEGY == "role_based":
        messages = make_prompt_role_based(txt)
    else:
        raise ValueError(f"未知的策略类型: {PROMPT_STRATEGY}")

    if VOTE_N == 1:
        out = call_deepseek(messages, temperature=0.0)
        label = parse_numeric_label(out)
    else:
        votes = []
        for _ in range(VOTE_N):
            out = call_deepseek(messages, temperature=0.3)
            votes.append(parse_numeric_label(out))
        label = 1 if sum(votes) > (VOTE_N / 2) else 0

    preds.append(label)


preds = np.array(preds, dtype=int)
test_df_out = pd.DataFrame({
    "abs": X_test,
    "true_label": y_test,
    "pred_label": preds
})

test_df_out.to_csv(OUTPUT_FILE, index=False, encoding="utf-8-sig")
print(f"\n 测试集预测完成！")
print(f"当前策略: [{PROMPT_STRATEGY}]")
print(f"结果已保存至: {OUTPUT_FILE} (共 {len(X_test)} 条数据)")

## 7. 计算并导出指标

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from pathlib import Path
import os

OUTPUT_FILE = BASE_DIR / f"test_preds_{PROMPT_STRATEGY}.csv"

if not os.path.exists(OUTPUT_FILE):
    raise FileNotFoundError(f"未找到预测文件：{OUTPUT_FILE}\n请先运行前面的推理代码。")

print("找到的预测文件：")
print(" -", OUTPUT_FILE)

df = pd.read_csv(OUTPUT_FILE, encoding="utf-8-sig")
assert {"true_label","pred_label"}.issubset(df.columns), f"{OUTPUT_FILE} 缺少 true_label 或 pred_label 列"

y_true = df["true_label"].astype(int).values
y_pred = df["pred_label"].astype(int).values

TP = int(((y_true==1) & (y_pred==1)).sum())
FN = int(((y_true==1) & (y_pred==0)).sum())
FP = int(((y_true==0) & (y_pred==1)).sum())
TN = int(((y_true==0) & (y_pred==0)).sum())

acc = float(accuracy_score(y_true, y_pred))
prec = float(precision_score(y_true, y_pred, zero_division=0))
rec = float(recall_score(y_true, y_pred, zero_division=0))
f1  = float(f1_score(y_true, y_pred, zero_division=0))

metrics_data = [{
    "Strategy": PROMPT_STRATEGY,
    "TP": TP, "FN": FN, "FP": FP, "TN": TN,
    "Accuracy": acc, "Precision": prec, "Recall": rec, "F1": f1,
    "n_val": int(len(y_true))
}]

metrics_df = pd.DataFrame(metrics_data)
metrics_out = BASE_DIR / f"test_metrics_{PROMPT_STRATEGY}.csv"
metrics_df.to_csv(metrics_out, index=False, encoding="utf-8-sig")

print("\nMetrics saved to:", metrics_out)

try:
    from IPython.display import display
    display(metrics_df)
except ImportError:
    print(metrics_df)