In [3]:
import json

# 读取原始数据
with open("mbti_sample_with_all_views.json", "r", encoding="utf-8") as f:
    data = json.load(f)

# 提取前8000条
subset = data[:8675]

# 保存为新文件
with open("YS.json", "w", encoding="utf-8") as f:
    json.dump(subset, f, ensure_ascii=False, indent=2)

print("前8000条数据已保存到 前8000条数据.json")


前8000条数据已保存到 前8000条数据.json


In [4]:
import json

# 读入原始数据
with open("YS.json", "r", encoding="utf-8") as f:
    original = json.load(f)

# 读入test数据
with open("test.json", "r", encoding="utf-8") as f:
    test = json.load(f)

# 建立一个字典，加快查找速度（用posts_cleaned作为key）
original_dict = {item["posts_cleaned"]: item for item in original}

# 在原始数据中找到test对应的记录
matched = []
for t in test:
    key = t["posts_cleaned"]
    if key in original_dict:
        matched.append(original_dict[key])

# 保存结果
with open("test对应的原始数据.json", "w", encoding="utf-8") as f:
    json.dump(matched, f, ensure_ascii=False, indent=2)

print(f"在原始数据中找到了 {len(matched)} 条与test匹配的记录")


在原始数据中找到了 853 条与test匹配的记录


In [5]:
# -*- coding: utf-8 -*-
"""
Evaluate LoRA adapter on TEST ONLY.
依赖：transformers==4.55, peft, scikit-learn, matplotlib, torch, bitsandbytes(如用4bit)
"""

import os, json
from typing import Dict, Any

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

import numpy as np
import torch
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc
from sklearn.preprocessing import label_binarize

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    BitsAndBytesConfig,
    DataCollatorWithPadding,
    Trainer, TrainingArguments,
    set_seed,
)

# ======== 需要确认的两处路径 ========
CKPT_DIR  = "mbti_lora_llama-1b_ckpt"   # 你保存LoRA适配器的目录
TEST_JSON = "picked_balanced_around30.json"                               # 仅评测测试集

# ======== 与训练保持一致的配置 ========
#MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
#MODEL_NAME = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
MODEL_NAME = "meta-llama/Llama-3.2-1B"
MAX_LEN      = 400
BUDGET = {"posts_cleaned": 280, "semantic_view": 64, "sentiment_view": 32, "linguistic_view": 24}

MBTI_16 = [
    "INTJ","INTP","ENTJ","ENTP","INFJ","INFP","ENFJ","ENFP",
    "ISTJ","ISFJ","ESTJ","ESFJ","ISTP","ISFP","ESTP","ESFP"
]
MBTI2ID = {t: i for i, t in enumerate(MBTI_16)}

USE_4BIT = True
SEED = 42
HF_TOKEN = os.getenv("HF_TOKEN")
HF_KW = {"token": HF_TOKEN} if HF_TOKEN else {}
OUTPUT_DIR = os.path.join(CKPT_DIR, "eval_test_only_kaggle_final")

# ======== 工具函数 ========
def load_rows(path: str):
    with open(path, "r", encoding="utf-8") as f:
        rows = json.load(f)
    rows = [r for r in rows if isinstance(r, dict) and r.get("type") in MBTI2ID]
    if not rows:
        raise ValueError(f"{path} 中没有合法样本。")
    return rows

def mbti_to_4d(m: str):
    return (
        0 if m[0]=="I" else 1,
        0 if m[1]=="S" else 1,
        0 if m[2]=="F" else 1,
        0 if m[3]=="P" else 1,
    )

def truncate_to_budget(tok: AutoTokenizer, text: str, budget: int) -> str:
    enc = tok(text or "", add_special_tokens=False)
    ids = enc["input_ids"][: budget]
    return tok.decode(ids)

def build_input(item: Dict[str, Any], tok: AutoTokenizer) -> str:
    p   = truncate_to_budget(tok, item.get("posts_cleaned", item.get("posts","")) or "", BUDGET["posts_cleaned"])
    sem = truncate_to_budget(tok, item.get("semantic_view","")  or "", BUDGET["semantic_view"])
    sen = truncate_to_budget(tok, item.get("sentiment_view","") or "", BUDGET["sentiment_view"])
    lin = truncate_to_budget(tok, item.get("linguistic_view","") or "", BUDGET["linguistic_view"])
    return (
        f"[POSTS]\n{p}\n[SEMANTIC]\n{sem}\n[SENTIMENT]\n{sen}\n[LINGUISTIC]\n{lin}\n"
        f"[TASK] Predict MBTI type among {', '.join(MBTI_16)}."
    )

class MBTIDataset(torch.utils.data.Dataset):
    def __init__(self, rows, tokenizer, max_len=512):
        self.rows = rows
        self.tok  = tokenizer
        self.max_len = max_len
    def __len__(self): return len(self.rows)
    def __getitem__(self, idx):
        it  = self.rows[idx]
        text= build_input(it, self.tok)
        y   = MBTI2ID[it["type"]]
        enc = self.tok(text, truncation=True, max_length=self.max_len)
        return {"input_ids": enc["input_ids"], "attention_mask": enc["attention_mask"], "labels": y}

def plot_confusion_and_roc(y_true, y_prob, class_names, out_dir, suffix=""):
    os.makedirs(out_dir, exist_ok=True)
    y_pred = np.argmax(y_prob, axis=-1)

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    fig_cm, ax_cm = plt.subplots(figsize=(8, 8), dpi=150)
    disp.plot(ax=ax_cm, xticks_rotation=45, cmap="Blues", colorbar=False)
    ax_cm.set_title(f"Confusion Matrix-V:kaggle T:kaggle")
    fig_cm.tight_layout()
    fig_cm.savefig(os.path.join(out_dir, f"confusion_matrix{suffix}.png"))
    plt.close(fig_cm)

    Y_true_bin = label_binarize(y_true, classes=list(range(len(class_names))))
    fpr = {}; tpr = {}; roc_auc = {}
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(Y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    fpr["micro"], tpr["micro"], _ = roc_curve(Y_true_bin.ravel(), y_prob.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(len(class_names))]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(len(class_names)):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= len(class_names)
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    fig_roc, ax_roc = plt.subplots(figsize=(7, 7), dpi=150)
    ax_roc.plot(fpr["micro"], tpr["micro"], label=f"micro-average ROC (AUC = {roc_auc['micro']:.3f})", linewidth=2)
    ax_roc.plot(fpr["macro"], tpr["macro"], label=f"macro-average ROC (AUC = {roc_auc['macro']:.3f})", linewidth=2)
    ax_roc.plot([0, 1], [0, 1], "k--", linewidth=1)
    ax_roc.set_xlim([0.0, 1.0]); ax_roc.set_ylim([0.0, 1.05])
    ax_roc.set_xlabel("False Positive Rate"); ax_roc.set_ylabel("True Positive Rate")
    ax_roc.set_title(f"Multiclass ROC (micro & macro)-V:kaggle T:kaggle")
    ax_roc.legend(loc="lower right")
    fig_roc.tight_layout()
    fig_roc.savefig(os.path.join(out_dir, f"roc_micro_macro{suffix}.png"))
    plt.close(fig_roc)

# ======== 主流程（仅TEST） ========
def main():
    set_seed(SEED)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    # 读取 test.json
    test_rows = load_rows(TEST_JSON)

    # tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, **HF_KW)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"

    # 基座模型 + 量化（与训练保持一致）
    quant_cfg = BitsAndBytesConfig(
        load_in_4bit=USE_4BIT,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    ) if USE_4BIT else None

    base_model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,
        num_labels=16,
        quantization_config=quant_cfg,
        device_map={"": device},
        low_cpu_mem_usage=True,
        **HF_KW
    )
    base_model.config.pad_token_id = tokenizer.pad_token_id
    base_model.config.use_cache = False

    # 叠加 LoRA 适配器
    from peft import PeftModel
    model = PeftModel.from_pretrained(base_model, CKPT_DIR, is_trainable=False)
    model = model.to(device)
    model.eval()

    # 构建测试数据集
    test_ds = MBTIDataset(test_rows, tokenizer, max_len=MAX_LEN)
    collator = DataCollatorWithPadding(tokenizer, pad_to_multiple_of=8)

    # 仅推理配置（不训练）
    args = TrainingArguments(
        output_dir=os.path.join(OUTPUT_DIR, "tmp_test"),
        per_device_eval_batch_size=4,
        eval_accumulation_steps=12,
        report_to="none",
    )

    trainer = Trainer(model=model, args=args, eval_dataset=test_ds,
                      tokenizer=tokenizer, data_collator=collator)

    # 预测
    output = trainer.predict(test_ds)
    logits = output.predictions[0] if isinstance(output.predictions, (list, tuple)) else output.predictions
    probs  = F.softmax(torch.tensor(logits, dtype=torch.float32), dim=-1).cpu().numpy()
    y_true = output.label_ids

    # 指标（16类 + 4D）
    pred_ids = logits.argmax(-1)
    acc16 = float((pred_ids == y_true).mean())
    pred_types = [MBTI_16[i] for i in pred_ids]
    true_types = [MBTI_16[i] for i in y_true]
    c_ei=c_ns=c_tf=c_jp=c_all=0
    for pt, tt in zip(pred_types, true_types):
        pei,pns,ptf,pjp = mbti_to_4d(pt)
        tei,tns,ttf,tjp = mbti_to_4d(tt)
        c_ei += (pei==tei); c_ns += (pns==tns); c_tf += (ptf==ttf); c_jp += (pjp==tjp)
        c_all+= (pei==tei and pns==tns and ptf==ttf and pjp==tjp)
    n = len(y_true)

    os.makedirs(OUTPUT_DIR, exist_ok=True)
    plot_confusion_and_roc(y_true, probs, MBTI_16, OUTPUT_DIR, suffix="_test")

    print("\n=== TEST Results ===")
    print(f"acc_16: {acc16:.4f}")
    print(f"acc_ei: {c_ei/n:.4f}  acc_ns: {c_ns/n:.4f}  acc_tf: {c_tf/n:.4f}  acc_jp: {c_jp/n:.4f}  acc_4D: {c_all/n:.4f}")
    print(f"Saved figs to: {OUTPUT_DIR}/confusion_matrix_test.png, {OUTPUT_DIR}/roc_micro_macro_test.png")

    # 推理示例
    sample = test_rows[0]
    text = build_input(sample, tokenizer)
    batch = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LEN)
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        plogits = model(**batch).logits
        pred_id = int(torch.argmax(plogits, dim=-1))
        pred_mbti = MBTI_16[pred_id]
    print("\n[Inference on TEST sample]")
    print("原标签:", sample["type"], " | 预测:", pred_mbti)

if __name__ == "__main__":
    main()


Some weights of LlamaForSequenceClassification were not initialized from the model checkpoint at meta-llama/Llama-3.2-1B and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(model=model, args=args, eval_dataset=test_ds,



=== TEST Results ===
acc_16: 0.5969
acc_ei: 0.8246  acc_ns: 0.8308  acc_tf: 0.8338  acc_jp: 0.7692  acc_4D: 0.5969
Saved figs to: mbti_lora_llama-1b_ckpt/eval_test_only_kaggle_final/confusion_matrix_test.png, mbti_lora_llama-1b_ckpt/eval_test_only_kaggle_final/roc_micro_macro_test.png

[Inference on TEST sample]
原标签: INTJ  | 预测: INTJ


In [7]:
# -*- coding: utf-8 -*-
"""
Evaluate LoRA adapter on TEST ONLY.
依赖：transformers==4.55, peft, scikit-learn, matplotlib, torch, bitsandbytes(如用4bit)
"""

import os, json
from typing import Dict, Any

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

import numpy as np
import torch
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc
from sklearn.preprocessing import label_binarize

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    BitsAndBytesConfig,
    DataCollatorWithPadding,
    Trainer, TrainingArguments,
    set_seed,
)

# ======== 需要确认的两处路径 ========
CKPT_DIR  = "qwen-test-on-pandora_new/lora_adapter"   # 你保存LoRA适配器的目录
TEST_JSON = "picked_balanced_around30.json"                               # 仅评测测试集

# ======== 与训练保持一致的配置 ========
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
MAX_LEN      = 320
BUDGET = {"posts_cleaned": 192, "semantic_view": 64, "sentiment_view": 32, "linguistic_view": 24}

MBTI_16 = [
    "INTJ","INTP","ENTJ","ENTP","INFJ","INFP","ENFJ","ENFP",
    "ISTJ","ISFJ","ESTJ","ESFJ","ISTP","ISFP","ESTP","ESFP"
]
MBTI2ID = {t: i for i, t in enumerate(MBTI_16)}

USE_4BIT = True
SEED = 42
HF_TOKEN = os.getenv("HF_TOKEN")
HF_KW = {"token": HF_TOKEN} if HF_TOKEN else {}
OUTPUT_DIR = os.path.join(CKPT_DIR, "kaggle测kaggle")

# ======== 工具函数 ========
def load_rows(path: str):
    with open(path, "r", encoding="utf-8") as f:
        rows = json.load(f)
    rows = [r for r in rows if isinstance(r, dict) and r.get("type") in MBTI2ID]
    if not rows:
        raise ValueError(f"{path} 中没有合法样本。")
    return rows

def mbti_to_4d(m: str):
    return (
        0 if m[0]=="I" else 1,
        0 if m[1]=="S" else 1,
        0 if m[2]=="F" else 1,
        0 if m[3]=="P" else 1,
    )

def truncate_to_budget(tok: AutoTokenizer, text: str, budget: int) -> str:
    enc = tok(text or "", add_special_tokens=False)
    ids = enc["input_ids"][: budget]
    return tok.decode(ids)

def build_input(item: Dict[str, Any], tok: AutoTokenizer) -> str:
    p   = truncate_to_budget(tok, item.get("posts_cleaned", item.get("posts","")) or "", BUDGET["posts_cleaned"])
    sem = truncate_to_budget(tok, item.get("semantic_view","")  or "", BUDGET["semantic_view"])
    sen = truncate_to_budget(tok, item.get("sentiment_view","") or "", BUDGET["sentiment_view"])
    lin = truncate_to_budget(tok, item.get("linguistic_view","") or "", BUDGET["linguistic_view"])
    return (
        f"[POSTS]\n{p}\n[SEMANTIC]\n{sem}\n[SENTIMENT]\n{sen}\n[LINGUISTIC]\n{lin}\n"
        f"[TASK] Predict MBTI type among {', '.join(MBTI_16)}."
    )

class MBTIDataset(torch.utils.data.Dataset):
    def __init__(self, rows, tokenizer, max_len=512):
        self.rows = rows
        self.tok  = tokenizer
        self.max_len = max_len
    def __len__(self): return len(self.rows)
    def __getitem__(self, idx):
        it  = self.rows[idx]
        text= build_input(it, self.tok)
        y   = MBTI2ID[it["type"]]
        enc = self.tok(text, truncation=True, max_length=self.max_len)
        return {"input_ids": enc["input_ids"], "attention_mask": enc["attention_mask"], "labels": y}

def plot_confusion_and_roc(y_true, y_prob, class_names, out_dir, suffix=""):
    os.makedirs(out_dir, exist_ok=True)
    y_pred = np.argmax(y_prob, axis=-1)

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    fig_cm, ax_cm = plt.subplots(figsize=(8, 8), dpi=150)
    disp.plot(ax=ax_cm, xticks_rotation=45, cmap="Blues", colorbar=False)
    ax_cm.set_title(f"Confusion Matrix-V:Kaggle,T:kaggle")
    fig_cm.tight_layout()
    fig_cm.savefig(os.path.join(out_dir, f"confusion_matrix{suffix}.png"))
    plt.close(fig_cm)

    Y_true_bin = label_binarize(y_true, classes=list(range(len(class_names))))
    fpr = {}; tpr = {}; roc_auc = {}
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(Y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    fpr["micro"], tpr["micro"], _ = roc_curve(Y_true_bin.ravel(), y_prob.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(len(class_names))]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(len(class_names)):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= len(class_names)
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    fig_roc, ax_roc = plt.subplots(figsize=(7, 7), dpi=150)
    ax_roc.plot(fpr["micro"], tpr["micro"], label=f"micro-average ROC (AUC = {roc_auc['micro']:.3f})", linewidth=2)
    ax_roc.plot(fpr["macro"], tpr["macro"], label=f"macro-average ROC (AUC = {roc_auc['macro']:.3f})", linewidth=2)
    ax_roc.plot([0, 1], [0, 1], "k--", linewidth=1)
    ax_roc.set_xlim([0.0, 1.0]); ax_roc.set_ylim([0.0, 1.05])
    ax_roc.set_xlabel("False Positive Rate"); ax_roc.set_ylabel("True Positive Rate")
    ax_roc.set_title(f"Multiclass ROC (micro & macro)-V:Kaggle,T:kaggle")
    ax_roc.legend(loc="lower right")
    fig_roc.tight_layout()
    fig_roc.savefig(os.path.join(out_dir, f"roc_micro_macro{suffix}.png"))
    plt.close(fig_roc)

# ======== 主流程（仅TEST） ========
def main():
    set_seed(SEED)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    # 读取 test.json
    test_rows = load_rows(TEST_JSON)

    # tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, **HF_KW)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"

    # 基座模型 + 量化（与训练保持一致）
    quant_cfg = BitsAndBytesConfig(
        load_in_4bit=USE_4BIT,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    ) if USE_4BIT else None

    base_model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,
        num_labels=16,
        quantization_config=quant_cfg,
        device_map={"": device},
        low_cpu_mem_usage=True,
        **HF_KW
    )
    base_model.config.pad_token_id = tokenizer.pad_token_id
    base_model.config.use_cache = False

    # 叠加 LoRA 适配器
    from peft import PeftModel
    model = PeftModel.from_pretrained(base_model, CKPT_DIR, is_trainable=False)
    model = model.to(device)
    model.eval()

    # 构建测试数据集
    test_ds = MBTIDataset(test_rows, tokenizer, max_len=MAX_LEN)
    collator = DataCollatorWithPadding(tokenizer, pad_to_multiple_of=8)

    # 仅推理配置（不训练）
    args = TrainingArguments(
        output_dir=os.path.join(OUTPUT_DIR, "tmp_test"),
        per_device_eval_batch_size=4,
        eval_accumulation_steps=12,
        report_to="none",
    )

    trainer = Trainer(model=model, args=args, eval_dataset=test_ds,
                      tokenizer=tokenizer, data_collator=collator)

    # 预测
    output = trainer.predict(test_ds)
    logits = output.predictions[0] if isinstance(output.predictions, (list, tuple)) else output.predictions
    probs  = F.softmax(torch.tensor(logits, dtype=torch.float32), dim=-1).cpu().numpy()
    y_true = output.label_ids

    # 指标（16类 + 4D）
    pred_ids = logits.argmax(-1)
    acc16 = float((pred_ids == y_true).mean())
    pred_types = [MBTI_16[i] for i in pred_ids]
    true_types = [MBTI_16[i] for i in y_true]
    c_ei=c_ns=c_tf=c_jp=c_all=0
    for pt, tt in zip(pred_types, true_types):
        pei,pns,ptf,pjp = mbti_to_4d(pt)
        tei,tns,ttf,tjp = mbti_to_4d(tt)
        c_ei += (pei==tei); c_ns += (pns==tns); c_tf += (ptf==ttf); c_jp += (pjp==tjp)
        c_all+= (pei==tei and pns==tns and ptf==ttf and pjp==tjp)
    n = len(y_true)

    os.makedirs(OUTPUT_DIR, exist_ok=True)
    plot_confusion_and_roc(y_true, probs, MBTI_16, OUTPUT_DIR, suffix="_test")

    print("\n=== TEST Results ===")
    print(f"acc_16: {acc16:.4f}")
    print(f"acc_ei: {c_ei/n:.4f}  acc_ns: {c_ns/n:.4f}  acc_tf: {c_tf/n:.4f}  acc_jp: {c_jp/n:.4f}  acc_4D: {c_all/n:.4f}")
    print(f"Saved figs to: {OUTPUT_DIR}/confusion_matrix_test.png, {OUTPUT_DIR}/roc_micro_macro_test.png")

    # 推理示例
    sample = test_rows[0]
    text = build_input(sample, tokenizer)
    batch = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LEN)
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        plogits = model(**batch).logits
        pred_id = int(torch.argmax(plogits, dim=-1))
        pred_mbti = MBTI_16[pred_id]
    print("\n[Inference on TEST sample]")
    print("原标签:", sample["type"], " | 预测:", pred_mbti)

if __name__ == "__main__":
    main()


Some weights of Qwen2ForSequenceClassification were not initialized from the model checkpoint at Qwen/Qwen2.5-1.5B-Instruct and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(model=model, args=args, eval_dataset=test_ds,



=== TEST Results ===
acc_16: 0.8554
acc_ei: 0.9354  acc_ns: 0.9354  acc_tf: 0.9569  acc_jp: 0.9323  acc_4D: 0.8554
Saved figs to: qwen-test-on-pandora_new/lora_adapter/kaggle测kaggle/confusion_matrix_test.png, qwen-test-on-pandora_new/lora_adapter/kaggle测kaggle/roc_micro_macro_test.png

[Inference on TEST sample]
原标签: INTJ  | 预测: INTJ


In [12]:
# -*- coding: utf-8 -*- 
"""
Evaluate LoRA adapter on TEST ONLY.
依赖：transformers==4.55, peft, scikit-learn, matplotlib, torch, bitsandbytes(如用4bit)
"""

import os, json
from typing import Dict, Any

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

import numpy as np
import torch
import torch.nn.functional as F
from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc,
    f1_score, recall_score
)
from sklearn.preprocessing import label_binarize

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    BitsAndBytesConfig,
    DataCollatorWithPadding,
    Trainer, TrainingArguments,
    set_seed,
)

# ======== 需要确认的两处路径 ========
CKPT_DIR  = "qwen-test-on-pandora_new/lora_adapter"   # 你保存LoRA适配器的目录
TEST_JSON = "picked_balanced_around30.json"           # 仅评测测试集

# ======== 与训练保持一致的配置 ========
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
MAX_LEN      = 320
BUDGET = {"posts_cleaned": 192, "semantic_view": 64, "sentiment_view": 32, "linguistic_view": 24}

MBTI_16 = [
    "INTJ","INTP","ENTJ","ENTP","INFJ","INFP","ENFJ","ENFP",
    "ISTJ","ISFJ","ESTJ","ESFJ","ISTP","ISFP","ESTP","ESFP"
]
MBTI2ID = {t: i for i, t in enumerate(MBTI_16)}

USE_4BIT = True
SEED = 42
HF_TOKEN = os.getenv("HF_TOKEN")
HF_KW = {"token": HF_TOKEN} if HF_TOKEN else {}
OUTPUT_DIR = os.path.join(CKPT_DIR, "kaggle测kaggle")

# ======== 工具函数 ========
def load_rows(path: str):
    with open(path, "r", encoding="utf-8") as f:
        rows = json.load(f)
    rows = [r for r in rows if isinstance(r, dict) and r.get("type") in MBTI2ID]
    if not rows:
        raise ValueError(f"{path} 中没有合法样本。")
    return rows

def mbti_to_4d(m: str):
    # E=1/I=0, N=1/S=0, F=1/T=0, P=1/J=0
    return (
        0 if m[0]=="I" else 1,
        0 if m[1]=="S" else 1,
        0 if m[2]=="F" else 1,
        0 if m[3]=="P" else 1,
    )

def truncate_to_budget(tok: AutoTokenizer, text: str, budget: int) -> str:
    enc = tok(text or "", add_special_tokens=False)
    ids = enc["input_ids"][: budget]
    return tok.decode(ids)

def build_input(item: Dict[str, Any], tok: AutoTokenizer) -> str:
    p   = truncate_to_budget(tok, item.get("posts_cleaned", item.get("posts","")) or "", BUDGET["posts_cleaned"])
    sem = truncate_to_budget(tok, item.get("semantic_view","")  or "", BUDGET["semantic_view"])
    sen = truncate_to_budget(tok, item.get("sentiment_view","") or "", BUDGET["sentiment_view"])
    lin = truncate_to_budget(tok, item.get("linguistic_view","") or "", BUDGET["linguistic_view"])
    return (
        f"[POSTS]\n{p}\n[SEMANTIC]\n{sem}\n[SENTIMENT]\n{sen}\n[LINGUISTIC]\n{lin}\n"
        f"[TASK] Predict MBTI type among {', '.join(MBTI_16)}."
    )

class MBTIDataset(torch.utils.data.Dataset):
    def __init__(self, rows, tokenizer, max_len=512):
        self.rows = rows
        self.tok  = tokenizer
        self.max_len = max_len
    def __len__(self): return len(self.rows)
    def __getitem__(self, idx):
        it  = self.rows[idx]
        text= build_input(it, self.tok)
        y   = MBTI2ID[it["type"]]
        enc = self.tok(text, truncation=True, max_length=self.max_len)
        return {"input_ids": enc["input_ids"], "attention_mask": enc["attention_mask"], "labels": y}

def plot_confusion_and_roc(y_true, y_prob, class_names, out_dir, suffix=""):
    os.makedirs(out_dir, exist_ok=True)
    y_pred = np.argmax(y_prob, axis=-1)

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    fig_cm, ax_cm = plt.subplots(figsize=(8, 8), dpi=150)
    disp.plot(ax=ax_cm, xticks_rotation=45, cmap="Blues", colorbar=False)
    ax_cm.set_title(f"Confusion Matrix-V:Kaggle,T:Kaggle")
    fig_cm.tight_layout()
    fig_cm.savefig(os.path.join(out_dir, f"confusion_matrix{suffix}.png"))
    plt.close(fig_cm)

    Y_true_bin = label_binarize(y_true, classes=list(range(len(class_names))))
    fpr = {}; tpr = {}; roc_auc = {}
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(Y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    fpr["micro"], tpr["micro"], _ = roc_curve(Y_true_bin.ravel(), y_prob.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(len(class_names))]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(len(class_names)):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= len(class_names)
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    fig_roc, ax_roc = plt.subplots(figsize=(7, 7), dpi=150)
    ax_roc.plot(fpr["micro"], tpr["micro"], label=f"micro-average ROC (AUC = {roc_auc['micro']:.3f})", linewidth=2)
    ax_roc.plot(fpr["macro"], tpr["macro"], label=f"macro-average ROC (AUC = {roc_auc['macro']:.3f})", linewidth=2)
    ax_roc.plot([0, 1], [0, 1], "k--", linewidth=1)
    ax_roc.set_xlim([0.0, 1.0]); ax_roc.set_ylim([0.0, 1.05])
    ax_roc.set_xlabel("False Positive Rate"); ax_roc.set_ylabel("True Positive Rate")
    ax_roc.set_title(f"Multiclass ROC (micro & macro)-V:Kaggle,T:Kaggle")
    ax_roc.legend(loc="lower right")
    fig_roc.tight_layout()
    fig_roc.savefig(os.path.join(out_dir, f"roc_micro_macro{suffix}.png"))
    plt.close(fig_roc)

# ======== 主流程（仅TEST） ========
def main():
    set_seed(SEED)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    # 读取 test.json
    test_rows = load_rows(TEST_JSON)

    # tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, **HF_KW)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"

    # 基座模型 + 量化（与训练保持一致）
    quant_cfg = BitsAndBytesConfig(
        load_in_4bit=USE_4BIT,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    ) if USE_4BIT else None

    base_model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,
        num_labels=16,
        quantization_config=quant_cfg,
        device_map={"": device},
        low_cpu_mem_usage=True,
        **HF_KW
    )
    base_model.config.pad_token_id = tokenizer.pad_token_id
    base_model.config.use_cache = False

    # 叠加 LoRA 适配器
    from peft import PeftModel
    model = PeftModel.from_pretrained(base_model, CKPT_DIR, is_trainable=False)
    model = model.to(device)
    model.eval()

    # 构建测试数据集
    test_ds = MBTIDataset(test_rows, tokenizer, max_len=MAX_LEN)
    collator = DataCollatorWithPadding(tokenizer, pad_to_multiple_of=8)

    # 仅推理配置（不训练）
    args = TrainingArguments(
        output_dir=os.path.join(OUTPUT_DIR, "tmp_test"),
        per_device_eval_batch_size=4,
        eval_accumulation_steps=12,
        report_to="none",
    )

    trainer = Trainer(model=model, args=args, eval_dataset=test_ds,
                      tokenizer=tokenizer, data_collator=collator)

    # 预测
    output = trainer.predict(test_ds)
    logits = output.predictions[0] if isinstance(output.predictions, (list, tuple)) else output.predictions
    probs  = F.softmax(torch.tensor(logits, dtype=torch.float32), dim=-1).cpu().numpy()
    y_true = output.label_ids

    # 指标（16类 + 4D）
    pred_ids = logits.argmax(-1)
    acc16 = float((pred_ids == y_true).mean())

    pred_types = [MBTI_16[i] for i in pred_ids]
    true_types = [MBTI_16[i] for i in y_true]

    # 4D 准确率（与你原来一致）
    c_ei=c_ns=c_tf=c_jp=c_all=0
    # 4D 的二分类标签收集（0/1）
    ei_t, ns_t, tf_t, jp_t = [], [], [], []
    ei_p, ns_p, tf_p, jp_p = [], [], [], []

    for pt, tt in zip(pred_types, true_types):
        pei,pns,ptf,pjp = mbti_to_4d(pt)
        tei,tns,ttf,tjp = mbti_to_4d(tt)
        # 准确率计数
        c_ei += (pei==tei); c_ns += (pns==tns); c_tf += (ptf==ttf); c_jp += (pjp==tjp)
        c_all+= (pei==tei and pns==tns and ptf==ttf and pjp==tjp)
        # 记录二分类标签
        ei_t.append(tei); ns_t.append(tns); tf_t.append(ttf); jp_t.append(tjp)
        ei_p.append(pei); ns_p.append(pns); tf_p.append(ptf); jp_p.append(pjp)

    n = len(y_true)

    # ===== 新增：16类整体 F1 / Recall =====
    f1_micro_16     = f1_score(y_true, pred_ids, average="micro")
    f1_macro_16     = f1_score(y_true, pred_ids, average="macro")
    f1_weighted_16  = f1_score(y_true, pred_ids, average="weighted")

    rec_micro_16    = recall_score(y_true, pred_ids, average="micro")
    rec_macro_16    = recall_score(y_true, pred_ids, average="macro")
    rec_weighted_16 = recall_score(y_true, pred_ids, average="weighted")

    # ===== 新增：四个维度的二分类 F1 / Recall（正类统一取 1，对应 E/N/F/P）=====
    ei_f1  = f1_score(ei_t, ei_p, average="binary", pos_label=1)
    ns_f1  = f1_score(ns_t, ns_p, average="binary", pos_label=1)
    tf_f1  = f1_score(tf_t, tf_p, average="binary", pos_label=1)
    jp_f1  = f1_score(jp_t, jp_p, average="binary", pos_label=1)

    ei_rec = recall_score(ei_t, ei_p, average="binary", pos_label=1)
    ns_rec = recall_score(ns_t, ns_p, average="binary", pos_label=1)
    tf_rec = recall_score(tf_t, tf_p, average="binary", pos_label=1)
    jp_rec = recall_score(jp_t, jp_p, average="binary", pos_label=1)

    # ===== 新增：4D 的总体分数 =====
    # micro：把四个维度的标签都拼接在一起计算
    y4_true = np.concatenate([ei_t, ns_t, tf_t, jp_t])
    y4_pred = np.concatenate([ei_p, ns_p, tf_p, jp_p])
    f1_micro_4d  = f1_score(y4_true, y4_pred, average="binary", pos_label=1)
    rec_micro_4d = recall_score(y4_true, y4_pred, average="binary", pos_label=1)

    # macro：四个维度分数的平均
    f1_macro_4d  = float(np.mean([ei_f1, ns_f1, tf_f1, jp_f1]))
    rec_macro_4d = float(np.mean([ei_rec, ns_rec, tf_rec, jp_rec]))

    os.makedirs(OUTPUT_DIR, exist_ok=True)
    plot_confusion_and_roc(y_true, probs, MBTI_16, OUTPUT_DIR, suffix="_test")

    print("\n=== TEST Results ===")
    print(f"acc_16: {acc16:.4f}")
    print(f"acc_ei: {c_ei/n:.4f}  acc_ns: {c_ns/n:.4f}  acc_tf: {c_tf/n:.4f}  acc_jp: {c_jp/n:.4f}  acc_4D: {c_all/n:.4f}")

    # 16类总体
    print(f"F1-16(micro/macro/weighted): {f1_micro_16:.4f} / {f1_macro_16:.4f} / {f1_weighted_16:.4f}")
    print(f"Recall-16(micro/macro/weighted): {rec_micro_16:.4f} / {rec_macro_16:.4f} / {rec_weighted_16:.4f}")

    # 4D 总体（把四个二分类合在一起的 micro，以及四维平均的 macro）
    print(f"F1-4D(overall micro/macro): {f1_micro_4d:.4f} / {f1_macro_4d:.4f}")
    print(f"Recall-4D(overall micro/macro): {rec_micro_4d:.4f} / {rec_macro_4d:.4f}")

    # 4D 各维度
    print(f"[EI]  F1: {ei_f1:.4f}  Recall: {ei_rec:.4f}")
    print(f"[NS]  F1: {ns_f1:.4f}  Recall: {ns_rec:.4f}")
    print(f"[TF]  F1: {tf_f1:.4f}  Recall: {tf_rec:.4f}")
    print(f"[JP]  F1: {jp_f1:.4f}  Recall: {jp_rec:.4f}")

    print(f"Saved figs to: {OUTPUT_DIR}/confusion_matrix_test.png, {OUTPUT_DIR}/roc_micro_macro_test.png")

    # 推理示例
    sample = test_rows[0]
    text = build_input(sample, tokenizer)
    batch = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LEN)
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        plogits = model(**batch).logits
        pred_id = int(torch.argmax(plogits, dim=-1))
        pred_mbti = MBTI_16[pred_id]
    print("\n[Inference on TEST sample]")
    print("原标签:", sample["type"], " | 预测:", pred_mbti)

if __name__ == "__main__":
    main()


Some weights of Qwen2ForSequenceClassification were not initialized from the model checkpoint at Qwen/Qwen2.5-1.5B-Instruct and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(model=model, args=args, eval_dataset=test_ds,



=== TEST Results ===
acc_16: 0.8554
acc_ei: 0.9354  acc_ns: 0.9354  acc_tf: 0.9569  acc_jp: 0.9323  acc_4D: 0.8554
F1-16(micro/macro/weighted): 0.8554 / 0.8524 / 0.8559
Recall-16(micro/macro/weighted): 0.8554 / 0.8437 / 0.8554
F1-4D(overall micro/macro): 0.9408 / 0.9375
Recall-4D(overall micro/macro): 0.9466 / 0.9402
[EI]  F1: 0.9170  Recall: 0.8855
[NS]  F1: 0.9526  Recall: 0.9814
[TF]  F1: 0.9591  Recall: 0.9591
[JP]  F1: 0.9214  Recall: 0.9348
Saved figs to: qwen-test-on-pandora_new/lora_adapter/kaggle测kaggle/confusion_matrix_test.png, qwen-test-on-pandora_new/lora_adapter/kaggle测kaggle/roc_micro_macro_test.png

[Inference on TEST sample]
原标签: INTJ  | 预测: INTJ


In [8]:
# -*- coding: utf-8 -*-
"""
Evaluate LoRA adapter on TEST ONLY.
依赖：transformers==4.55, peft, scikit-learn, matplotlib, torch, bitsandbytes(如用4bit)
"""

import os, json
from typing import Dict, Any

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

import numpy as np
import torch
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc
from sklearn.preprocessing import label_binarize

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    BitsAndBytesConfig,
    DataCollatorWithPadding,
    Trainer, TrainingArguments,
    set_seed,
)

# ======== 需要确认的两处路径 ========
CKPT_DIR  = "qwen-test-on-pandora_new/lora_adapter"   # 你保存LoRA适配器的目录
TEST_JSON = "pandora_testdataset.json"                               # 仅评测测试集

# ======== 与训练保持一致的配置 ========
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
MAX_LEN      = 320
BUDGET = {"posts_cleaned": 192, "semantic_view": 64, "sentiment_view": 32, "linguistic_view": 24}

MBTI_16 = [
    "INTJ","INTP","ENTJ","ENTP","INFJ","INFP","ENFJ","ENFP",
    "ISTJ","ISFJ","ESTJ","ESFJ","ISTP","ISFP","ESTP","ESFP"
]
MBTI2ID = {t: i for i, t in enumerate(MBTI_16)}

USE_4BIT = True
SEED = 42
HF_TOKEN = os.getenv("HF_TOKEN")
HF_KW = {"token": HF_TOKEN} if HF_TOKEN else {}
OUTPUT_DIR = os.path.join(CKPT_DIR, "kaggle测pandora")

# ======== 工具函数 ========
def load_rows(path: str):
    with open(path, "r", encoding="utf-8") as f:
        rows = json.load(f)
    rows = [r for r in rows if isinstance(r, dict) and r.get("type") in MBTI2ID]
    if not rows:
        raise ValueError(f"{path} 中没有合法样本。")
    return rows

def mbti_to_4d(m: str):
    return (
        0 if m[0]=="I" else 1,
        0 if m[1]=="S" else 1,
        0 if m[2]=="F" else 1,
        0 if m[3]=="P" else 1,
    )

def truncate_to_budget(tok: AutoTokenizer, text: str, budget: int) -> str:
    enc = tok(text or "", add_special_tokens=False)
    ids = enc["input_ids"][: budget]
    return tok.decode(ids)

def build_input(item: Dict[str, Any], tok: AutoTokenizer) -> str:
    p   = truncate_to_budget(tok, item.get("posts_cleaned", item.get("posts","")) or "", BUDGET["posts_cleaned"])
    sem = truncate_to_budget(tok, item.get("semantic_view","")  or "", BUDGET["semantic_view"])
    sen = truncate_to_budget(tok, item.get("sentiment_view","") or "", BUDGET["sentiment_view"])
    lin = truncate_to_budget(tok, item.get("linguistic_view","") or "", BUDGET["linguistic_view"])
    return (
        f"[POSTS]\n{p}\n[SEMANTIC]\n{sem}\n[SENTIMENT]\n{sen}\n[LINGUISTIC]\n{lin}\n"
        f"[TASK] Predict MBTI type among {', '.join(MBTI_16)}."
    )

class MBTIDataset(torch.utils.data.Dataset):
    def __init__(self, rows, tokenizer, max_len=512):
        self.rows = rows
        self.tok  = tokenizer
        self.max_len = max_len
    def __len__(self): return len(self.rows)
    def __getitem__(self, idx):
        it  = self.rows[idx]
        text= build_input(it, self.tok)
        y   = MBTI2ID[it["type"]]
        enc = self.tok(text, truncation=True, max_length=self.max_len)
        return {"input_ids": enc["input_ids"], "attention_mask": enc["attention_mask"], "labels": y}

def plot_confusion_and_roc(y_true, y_prob, class_names, out_dir, suffix=""):
    os.makedirs(out_dir, exist_ok=True)
    y_pred = np.argmax(y_prob, axis=-1)

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    fig_cm, ax_cm = plt.subplots(figsize=(8, 8), dpi=150)
    disp.plot(ax=ax_cm, xticks_rotation=45, cmap="Blues", colorbar=False)
    ax_cm.set_title(f"Confusion Matrix-V:Kaggle,T:Pandora")
    fig_cm.tight_layout()
    fig_cm.savefig(os.path.join(out_dir, f"confusion_matrix{suffix}.png"))
    plt.close(fig_cm)

    Y_true_bin = label_binarize(y_true, classes=list(range(len(class_names))))
    fpr = {}; tpr = {}; roc_auc = {}
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(Y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    fpr["micro"], tpr["micro"], _ = roc_curve(Y_true_bin.ravel(), y_prob.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(len(class_names))]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(len(class_names)):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= len(class_names)
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    fig_roc, ax_roc = plt.subplots(figsize=(7, 7), dpi=150)
    ax_roc.plot(fpr["micro"], tpr["micro"], label=f"micro-average ROC (AUC = {roc_auc['micro']:.3f})", linewidth=2)
    ax_roc.plot(fpr["macro"], tpr["macro"], label=f"macro-average ROC (AUC = {roc_auc['macro']:.3f})", linewidth=2)
    ax_roc.plot([0, 1], [0, 1], "k--", linewidth=1)
    ax_roc.set_xlim([0.0, 1.0]); ax_roc.set_ylim([0.0, 1.05])
    ax_roc.set_xlabel("False Positive Rate"); ax_roc.set_ylabel("True Positive Rate")
    ax_roc.set_title(f"Multiclass ROC (micro & macro)-V:Kaggle,T:Pandora")
    ax_roc.legend(loc="lower right")
    fig_roc.tight_layout()
    fig_roc.savefig(os.path.join(out_dir, f"roc_micro_macro{suffix}.png"))
    plt.close(fig_roc)

# ======== 主流程（仅TEST） ========
def main():
    set_seed(SEED)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    # 读取 test.json
    test_rows = load_rows(TEST_JSON)

    # tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, **HF_KW)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"

    # 基座模型 + 量化（与训练保持一致）
    quant_cfg = BitsAndBytesConfig(
        load_in_4bit=USE_4BIT,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    ) if USE_4BIT else None

    base_model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,
        num_labels=16,
        quantization_config=quant_cfg,
        device_map={"": device},
        low_cpu_mem_usage=True,
        **HF_KW
    )
    base_model.config.pad_token_id = tokenizer.pad_token_id
    base_model.config.use_cache = False

    # 叠加 LoRA 适配器
    from peft import PeftModel
    model = PeftModel.from_pretrained(base_model, CKPT_DIR, is_trainable=False)
    model = model.to(device)
    model.eval()

    # 构建测试数据集
    test_ds = MBTIDataset(test_rows, tokenizer, max_len=MAX_LEN)
    collator = DataCollatorWithPadding(tokenizer, pad_to_multiple_of=8)

    # 仅推理配置（不训练）
    args = TrainingArguments(
        output_dir=os.path.join(OUTPUT_DIR, "tmp_test"),
        per_device_eval_batch_size=4,
        eval_accumulation_steps=12,
        report_to="none",
    )

    trainer = Trainer(model=model, args=args, eval_dataset=test_ds,
                      tokenizer=tokenizer, data_collator=collator)

    # 预测
    output = trainer.predict(test_ds)
    logits = output.predictions[0] if isinstance(output.predictions, (list, tuple)) else output.predictions
    probs  = F.softmax(torch.tensor(logits, dtype=torch.float32), dim=-1).cpu().numpy()
    y_true = output.label_ids

    # 指标（16类 + 4D）
    pred_ids = logits.argmax(-1)
    acc16 = float((pred_ids == y_true).mean())
    pred_types = [MBTI_16[i] for i in pred_ids]
    true_types = [MBTI_16[i] for i in y_true]
    c_ei=c_ns=c_tf=c_jp=c_all=0
    for pt, tt in zip(pred_types, true_types):
        pei,pns,ptf,pjp = mbti_to_4d(pt)
        tei,tns,ttf,tjp = mbti_to_4d(tt)
        c_ei += (pei==tei); c_ns += (pns==tns); c_tf += (ptf==ttf); c_jp += (pjp==tjp)
        c_all+= (pei==tei and pns==tns and ptf==ttf and pjp==tjp)
    n = len(y_true)

    os.makedirs(OUTPUT_DIR, exist_ok=True)
    plot_confusion_and_roc(y_true, probs, MBTI_16, OUTPUT_DIR, suffix="_test")

    print("\n=== TEST Results ===")
    print(f"acc_16: {acc16:.4f}")
    print(f"acc_ei: {c_ei/n:.4f}  acc_ns: {c_ns/n:.4f}  acc_tf: {c_tf/n:.4f}  acc_jp: {c_jp/n:.4f}  acc_4D: {c_all/n:.4f}")
    print(f"Saved figs to: {OUTPUT_DIR}/confusion_matrix_test.png, {OUTPUT_DIR}/roc_micro_macro_test.png")

    # 推理示例
    sample = test_rows[0]
    text = build_input(sample, tokenizer)
    batch = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LEN)
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        plogits = model(**batch).logits
        pred_id = int(torch.argmax(plogits, dim=-1))
        pred_mbti = MBTI_16[pred_id]
    print("\n[Inference on TEST sample]")
    print("原标签:", sample["type"], " | 预测:", pred_mbti)

if __name__ == "__main__":
    main()


Some weights of Qwen2ForSequenceClassification were not initialized from the model checkpoint at Qwen/Qwen2.5-1.5B-Instruct and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(model=model, args=args, eval_dataset=test_ds,



=== TEST Results ===
acc_16: 0.8115
acc_ei: 0.9044  acc_ns: 0.9125  acc_tf: 0.9135  acc_jp: 0.9044  acc_4D: 0.8115
Saved figs to: qwen-test-on-pandora_new/lora_adapter/kaggle测pandora/confusion_matrix_test.png, qwen-test-on-pandora_new/lora_adapter/kaggle测pandora/roc_micro_macro_test.png

[Inference on TEST sample]
原标签: INTJ  | 预测: INTJ


In [13]:
# -*- coding: utf-8 -*- 
"""
Evaluate LoRA adapter on TEST ONLY.
依赖：transformers==4.55, peft, scikit-learn, matplotlib, torch, bitsandbytes(如用4bit)
"""

import os, json
from typing import Dict, Any

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

import numpy as np
import torch
import torch.nn.functional as F
from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc,
    f1_score, recall_score
)
from sklearn.preprocessing import label_binarize

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    BitsAndBytesConfig,
    DataCollatorWithPadding,
    Trainer, TrainingArguments,
    set_seed,
)

# ======== 需要确认的两处路径 ========
CKPT_DIR  = "qwen-test-on-pandora_new/lora_adapter"   # 你保存LoRA适配器的目录
TEST_JSON = "pandora_testdataset.json"           # 仅评测测试集

# ======== 与训练保持一致的配置 ========
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
MAX_LEN      = 320
BUDGET = {"posts_cleaned": 192, "semantic_view": 64, "sentiment_view": 32, "linguistic_view": 24}

MBTI_16 = [
    "INTJ","INTP","ENTJ","ENTP","INFJ","INFP","ENFJ","ENFP",
    "ISTJ","ISFJ","ESTJ","ESFJ","ISTP","ISFP","ESTP","ESFP"
]
MBTI2ID = {t: i for i, t in enumerate(MBTI_16)}

USE_4BIT = True
SEED = 42
HF_TOKEN = os.getenv("HF_TOKEN")
HF_KW = {"token": HF_TOKEN} if HF_TOKEN else {}
OUTPUT_DIR = os.path.join(CKPT_DIR, "kaggle测pandora")

# ======== 工具函数 ========
def load_rows(path: str):
    with open(path, "r", encoding="utf-8") as f:
        rows = json.load(f)
    rows = [r for r in rows if isinstance(r, dict) and r.get("type") in MBTI2ID]
    if not rows:
        raise ValueError(f"{path} 中没有合法样本。")
    return rows

def mbti_to_4d(m: str):
    # E=1/I=0, N=1/S=0, F=1/T=0, P=1/J=0
    return (
        0 if m[0]=="I" else 1,
        0 if m[1]=="S" else 1,
        0 if m[2]=="F" else 1,
        0 if m[3]=="P" else 1,
    )

def truncate_to_budget(tok: AutoTokenizer, text: str, budget: int) -> str:
    enc = tok(text or "", add_special_tokens=False)
    ids = enc["input_ids"][: budget]
    return tok.decode(ids)

def build_input(item: Dict[str, Any], tok: AutoTokenizer) -> str:
    p   = truncate_to_budget(tok, item.get("posts_cleaned", item.get("posts","")) or "", BUDGET["posts_cleaned"])
    sem = truncate_to_budget(tok, item.get("semantic_view","")  or "", BUDGET["semantic_view"])
    sen = truncate_to_budget(tok, item.get("sentiment_view","") or "", BUDGET["sentiment_view"])
    lin = truncate_to_budget(tok, item.get("linguistic_view","") or "", BUDGET["linguistic_view"])
    return (
        f"[POSTS]\n{p}\n[SEMANTIC]\n{sem}\n[SENTIMENT]\n{sen}\n[LINGUISTIC]\n{lin}\n"
        f"[TASK] Predict MBTI type among {', '.join(MBTI_16)}."
    )

class MBTIDataset(torch.utils.data.Dataset):
    def __init__(self, rows, tokenizer, max_len=512):
        self.rows = rows
        self.tok  = tokenizer
        self.max_len = max_len
    def __len__(self): return len(self.rows)
    def __getitem__(self, idx):
        it  = self.rows[idx]
        text= build_input(it, self.tok)
        y   = MBTI2ID[it["type"]]
        enc = self.tok(text, truncation=True, max_length=self.max_len)
        return {"input_ids": enc["input_ids"], "attention_mask": enc["attention_mask"], "labels": y}

def plot_confusion_and_roc(y_true, y_prob, class_names, out_dir, suffix=""):
    os.makedirs(out_dir, exist_ok=True)
    y_pred = np.argmax(y_prob, axis=-1)

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    fig_cm, ax_cm = plt.subplots(figsize=(8, 8), dpi=150)
    disp.plot(ax=ax_cm, xticks_rotation=45, cmap="Blues", colorbar=False)
    ax_cm.set_title(f"Confusion Matrix-V:Kaggle,T:Pandora")
    fig_cm.tight_layout()
    fig_cm.savefig(os.path.join(out_dir, f"confusion_matrix{suffix}.png"))
    plt.close(fig_cm)

    Y_true_bin = label_binarize(y_true, classes=list(range(len(class_names))))
    fpr = {}; tpr = {}; roc_auc = {}
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(Y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    fpr["micro"], tpr["micro"], _ = roc_curve(Y_true_bin.ravel(), y_prob.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(len(class_names))]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(len(class_names)):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= len(class_names)
    fpr["macro"] = all_fpr
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    fig_roc, ax_roc = plt.subplots(figsize=(7, 7), dpi=150)
    ax_roc.plot(fpr["micro"], tpr["micro"], label=f"micro-average ROC (AUC = {roc_auc['micro']:.3f})", linewidth=2)
    ax_roc.plot(fpr["macro"], tpr["macro"], label=f"macro-average ROC (AUC = {roc_auc['macro']:.3f})", linewidth=2)
    ax_roc.plot([0, 1], [0, 1], "k--", linewidth=1)
    ax_roc.set_xlim([0.0, 1.0]); ax_roc.set_ylim([0.0, 1.05])
    ax_roc.set_xlabel("False Positive Rate"); ax_roc.set_ylabel("True Positive Rate")
    ax_roc.set_title(f"Multiclass ROC (micro & macro)-V:Kaggle,T:Pandora")
    ax_roc.legend(loc="lower right")
    fig_roc.tight_layout()
    fig_roc.savefig(os.path.join(out_dir, f"roc_micro_macro{suffix}.png"))
    plt.close(fig_roc)

# ======== 主流程（仅TEST） ========
def main():
    set_seed(SEED)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    # 读取 test.json
    test_rows = load_rows(TEST_JSON)

    # tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, **HF_KW)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"

    # 基座模型 + 量化（与训练保持一致）
    quant_cfg = BitsAndBytesConfig(
        load_in_4bit=USE_4BIT,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    ) if USE_4BIT else None

    base_model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,
        num_labels=16,
        quantization_config=quant_cfg,
        device_map={"": device},
        low_cpu_mem_usage=True,
        **HF_KW
    )
    base_model.config.pad_token_id = tokenizer.pad_token_id
    base_model.config.use_cache = False

    # 叠加 LoRA 适配器
    from peft import PeftModel
    model = PeftModel.from_pretrained(base_model, CKPT_DIR, is_trainable=False)
    model = model.to(device)
    model.eval()

    # 构建测试数据集
    test_ds = MBTIDataset(test_rows, tokenizer, max_len=MAX_LEN)
    collator = DataCollatorWithPadding(tokenizer, pad_to_multiple_of=8)

    # 仅推理配置（不训练）
    args = TrainingArguments(
        output_dir=os.path.join(OUTPUT_DIR, "tmp_test"),
        per_device_eval_batch_size=4,
        eval_accumulation_steps=12,
        report_to="none",
    )

    trainer = Trainer(model=model, args=args, eval_dataset=test_ds,
                      tokenizer=tokenizer, data_collator=collator)

    # 预测
    output = trainer.predict(test_ds)
    logits = output.predictions[0] if isinstance(output.predictions, (list, tuple)) else output.predictions
    probs  = F.softmax(torch.tensor(logits, dtype=torch.float32), dim=-1).cpu().numpy()
    y_true = output.label_ids

    # 指标（16类 + 4D）
    pred_ids = logits.argmax(-1)
    acc16 = float((pred_ids == y_true).mean())

    pred_types = [MBTI_16[i] for i in pred_ids]
    true_types = [MBTI_16[i] for i in y_true]

    # 4D 准确率（与你原来一致）
    c_ei=c_ns=c_tf=c_jp=c_all=0
    # 4D 的二分类标签收集（0/1）
    ei_t, ns_t, tf_t, jp_t = [], [], [], []
    ei_p, ns_p, tf_p, jp_p = [], [], [], []

    for pt, tt in zip(pred_types, true_types):
        pei,pns,ptf,pjp = mbti_to_4d(pt)
        tei,tns,ttf,tjp = mbti_to_4d(tt)
        # 准确率计数
        c_ei += (pei==tei); c_ns += (pns==tns); c_tf += (ptf==ttf); c_jp += (pjp==tjp)
        c_all+= (pei==tei and pns==tns and ptf==ttf and pjp==tjp)
        # 记录二分类标签
        ei_t.append(tei); ns_t.append(tns); tf_t.append(ttf); jp_t.append(tjp)
        ei_p.append(pei); ns_p.append(pns); tf_p.append(ptf); jp_p.append(pjp)

    n = len(y_true)

    # ===== 新增：16类整体 F1 / Recall =====
    f1_micro_16     = f1_score(y_true, pred_ids, average="micro")
    f1_macro_16     = f1_score(y_true, pred_ids, average="macro")
    f1_weighted_16  = f1_score(y_true, pred_ids, average="weighted")

    rec_micro_16    = recall_score(y_true, pred_ids, average="micro")
    rec_macro_16    = recall_score(y_true, pred_ids, average="macro")
    rec_weighted_16 = recall_score(y_true, pred_ids, average="weighted")

    # ===== 新增：四个维度的二分类 F1 / Recall（正类统一取 1，对应 E/N/F/P）=====
    ei_f1  = f1_score(ei_t, ei_p, average="binary", pos_label=1)
    ns_f1  = f1_score(ns_t, ns_p, average="binary", pos_label=1)
    tf_f1  = f1_score(tf_t, tf_p, average="binary", pos_label=1)
    jp_f1  = f1_score(jp_t, jp_p, average="binary", pos_label=1)

    ei_rec = recall_score(ei_t, ei_p, average="binary", pos_label=1)
    ns_rec = recall_score(ns_t, ns_p, average="binary", pos_label=1)
    tf_rec = recall_score(tf_t, tf_p, average="binary", pos_label=1)
    jp_rec = recall_score(jp_t, jp_p, average="binary", pos_label=1)

    # ===== 新增：4D 的总体分数 =====
    # micro：把四个维度的标签都拼接在一起计算
    y4_true = np.concatenate([ei_t, ns_t, tf_t, jp_t])
    y4_pred = np.concatenate([ei_p, ns_p, tf_p, jp_p])
    f1_micro_4d  = f1_score(y4_true, y4_pred, average="binary", pos_label=1)
    rec_micro_4d = recall_score(y4_true, y4_pred, average="binary", pos_label=1)

    # macro：四个维度分数的平均
    f1_macro_4d  = float(np.mean([ei_f1, ns_f1, tf_f1, jp_f1]))
    rec_macro_4d = float(np.mean([ei_rec, ns_rec, tf_rec, jp_rec]))

    os.makedirs(OUTPUT_DIR, exist_ok=True)
    plot_confusion_and_roc(y_true, probs, MBTI_16, OUTPUT_DIR, suffix="_test")

    print("\n=== TEST Results ===")
    print(f"acc_16: {acc16:.4f}")
    print(f"acc_ei: {c_ei/n:.4f}  acc_ns: {c_ns/n:.4f}  acc_tf: {c_tf/n:.4f}  acc_jp: {c_jp/n:.4f}  acc_4D: {c_all/n:.4f}")

    # 16类总体
    print(f"F1-16(micro/macro/weighted): {f1_micro_16:.4f} / {f1_macro_16:.4f} / {f1_weighted_16:.4f}")
    print(f"Recall-16(micro/macro/weighted): {rec_micro_16:.4f} / {rec_macro_16:.4f} / {rec_weighted_16:.4f}")

    # 4D 总体（把四个二分类合在一起的 micro，以及四维平均的 macro）
    print(f"F1-4D(overall micro/macro): {f1_micro_4d:.4f} / {f1_macro_4d:.4f}")
    print(f"Recall-4D(overall micro/macro): {rec_micro_4d:.4f} / {rec_macro_4d:.4f}")

    # 4D 各维度
    print(f"[EI]  F1: {ei_f1:.4f}  Recall: {ei_rec:.4f}")
    print(f"[NS]  F1: {ns_f1:.4f}  Recall: {ns_rec:.4f}")
    print(f"[TF]  F1: {tf_f1:.4f}  Recall: {tf_rec:.4f}")
    print(f"[JP]  F1: {jp_f1:.4f}  Recall: {jp_rec:.4f}")

    print(f"Saved figs to: {OUTPUT_DIR}/confusion_matrix_test.png, {OUTPUT_DIR}/roc_micro_macro_test.png")

    # 推理示例
    sample = test_rows[0]
    text = build_input(sample, tokenizer)
    batch = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LEN)
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        plogits = model(**batch).logits
        pred_id = int(torch.argmax(plogits, dim=-1))
        pred_mbti = MBTI_16[pred_id]
    print("\n[Inference on TEST sample]")
    print("原标签:", sample["type"], " | 预测:", pred_mbti)

if __name__ == "__main__":
    main()


Some weights of Qwen2ForSequenceClassification were not initialized from the model checkpoint at Qwen/Qwen2.5-1.5B-Instruct and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(model=model, args=args, eval_dataset=test_ds,



=== TEST Results ===
acc_16: 0.8115
acc_ei: 0.9044  acc_ns: 0.9125  acc_tf: 0.9135  acc_jp: 0.9044  acc_4D: 0.8115
F1-16(micro/macro/weighted): 0.8115 / 0.8118 / 0.8118
Recall-16(micro/macro/weighted): 0.8115 / 0.8115 / 0.8115
F1-4D(overall micro/macro): 0.9088 / 0.9088
Recall-4D(overall micro/macro): 0.9097 / 0.9097
[EI]  F1: 0.9044  Recall: 0.9046
[NS]  F1: 0.9133  Recall: 0.9217
[TF]  F1: 0.9133  Recall: 0.9108
[JP]  F1: 0.9041  Recall: 0.9017
Saved figs to: qwen-test-on-pandora_new/lora_adapter/kaggle测pandora/confusion_matrix_test.png, qwen-test-on-pandora_new/lora_adapter/kaggle测pandora/roc_micro_macro_test.png

[Inference on TEST sample]
原标签: INTJ  | 预测: INTJ


In [7]:
# 改这里：输入/输出文件名 & 每类上限
INPUT_JSON  = "test对应的原始数据.json"          # e.g., "test对应的原始数据.json"
OUTPUT_JSON = "picked_30_per_type.json"
PER_CLASS   = 30

import json, hashlib
from collections import defaultdict, Counter

MBTI_16 = [
    "INTJ","INTP","ENTJ","ENTP","INFJ","INFP","ENFJ","ENFP",
    "ISTJ","ISFJ","ESTJ","ESFJ","ISTP","ISFP","ESTP","ESFP"
]
MBTI2ID = {t:i for i,t in enumerate(MBTI_16)}

def mbti_to_4d(m: str):
    m = (m or "").upper()
    # I/S/F/P 记为0, E/N/T/J 记为1
    return (
        0 if m[0]=="I" else 1,  # EI
        0 if m[1]=="S" else 1,  # NS
        0 if m[2]=="F" else 1,  # TF
        0 if m[3]=="P" else 1,  # JP
    )

def stable_key(ex):
    """type + 文本哈希，保证选择确定性；也用于去重"""
    t = (ex.get("type") or "").upper()
    txt = ex.get("query_text") or ex.get("posts_cleaned") or ex.get("posts") or ""
    h = hashlib.sha1(txt.strip().lower().encode("utf-8")).hexdigest()
    return f"{t}::{h}"

# 读取
with open(INPUT_JSON, "r", encoding="utf-8") as f:
    data = json.load(f)

# 过滤非法 & 去重（按 stable_key）
items, seen = [], set()
for ex in data:
    t = (ex.get("type") or "").upper()
    if t not in MBTI2ID:
        continue
    ex["type"] = t
    key = stable_key(ex)
    if key in seen:
        continue
    seen.add(key)
    ex["_key"] = key
    items.append(ex)

# 按类型分桶并确定性排序（键字典序）
buckets = defaultdict(list)
for ex in items:
    buckets[ex["type"]].append(ex)
for t in buckets:
    buckets[t].sort(key=lambda x: x["_key"])

# 每类最多取 PER_CLASS 条（不够就不够）
picked = []
for t in MBTI_16:
    picked.extend(buckets.get(t, [])[:PER_CLASS])

# 报告：16类分布
type_dist = Counter(ex["type"] for ex in picked)
print("== 每类数量（最多30，不够就全保）==")
for t in MBTI_16:
    print(f"{t}: {type_dist.get(t,0)}")
total = len(picked)
print(f"Total picked: {total}")

# 报告：4维分布与偏差
four = [[0,0],[0,0],[0,0],[0,0]]  # EI, NS, TF, JP
for ex in picked:
    b = mbti_to_4d(ex["type"])
    for d in range(4):
        four[d][b[d]] += 1

names = ["E/I (I=0,E=1)", "N/S (S=0,N=1)", "T/F (F=0,T=1)", "J/P (P=0,J=1)"]
ideal = total / 2.0
print("\n== 四维统计（左,右） & 与理想差距(各侧理想=总数/2) ==")
for d, nm in enumerate(names):
    left, right = four[d]
    gap = abs(left - ideal) + abs(right - ideal)
    print(f"{nm}: {four[d]}    偏差和={gap:.1f}")

# 清理临时键并保存
for ex in picked:
    ex.pop("_key", None)

with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
    json.dump(picked, f, ensure_ascii=False, indent=2)
print(f"\n✅ 已保存：{OUTPUT_JSON}")


== 每类数量（最多30，不够就全保）==
INTJ: 30
INTP: 30
ENTJ: 30
ENTP: 30
INFJ: 30
INFP: 30
ENFJ: 10
ENFP: 30
ISTJ: 11
ISFJ: 15
ESTJ: 5
ESFJ: 6
ISTP: 30
ISFP: 23
ESTP: 10
ESFP: 5
Total picked: 325

== 四维统计（左,右） & 与理想差距(各侧理想=总数/2) ==
E/I (I=0,E=1): [199, 126]    偏差和=73.0
N/S (S=0,N=1): [105, 220]    偏差和=115.0
T/F (F=0,T=1): [149, 176]    偏差和=27.0
J/P (P=0,J=1): [188, 137]    偏差和=51.0

✅ 已保存：picked_30_per_type.json


In [8]:
# ======= 配置（改这里） =======
INPUT_JSON  = "test对应的原始数据.json"   # 你的输入数据
OUTPUT_JSON = "picked_balanced_around30.json"
PER_CLASS   = 30      # 目标每类数量（中心值）
WIGGLE      = 5       # 允许上下浮动范围：即每类 ∈ [PER_CLASS-WIGGLE, PER_CLASS+WIGGLE]
# ============================

import json, hashlib, copy
from collections import defaultdict, Counter

MBTI_16 = [
    "INTJ","INTP","ENTJ","ENTP","INFJ","INFP","ENFJ","ENFP",
    "ISTJ","ISFJ","ESTJ","ESFJ","ISTP","ISFP","ESTP","ESFP"
]
MBTI2ID = {t:i for i,t in enumerate(MBTI_16)}

def mbti_bits(m: str):
    m = m.upper()
    # I/S/F/P 记为0, E/N/T/J 记为1
    return (
        0 if m[0]=="I" else 1,  # EI
        0 if m[1]=="S" else 1,  # NS
        0 if m[2]=="F" else 1,  # TF
        0 if m[3]=="P" else 1,  # JP
    )

# 确定性键：type + 文本哈希（用于排序与去重）
def stable_key(ex):
    t = (ex.get("type") or "").upper()
    txt = ex.get("query_text") or ex.get("posts_cleaned") or ex.get("posts") or ""
    h = hashlib.sha1(txt.strip().lower().encode("utf-8")).hexdigest()
    return f"{t}::{h}"

# 读取并去重
with open(INPUT_JSON, "r", encoding="utf-8") as f:
    raw = json.load(f)

items, seen = [], set()
for ex in raw:
    t = (ex.get("type") or "").upper()
    if t not in MBTI2ID:
        continue
    ex["type"] = t
    key = stable_key(ex)
    if key in seen:
        continue
    seen.add(key)
    ex["_key"] = key
    items.append(ex)

# 分桶并确定性排序
buckets = defaultdict(list)
for ex in items:
    buckets[ex["type"]].append(ex)
for t in buckets:
    buckets[t].sort(key=lambda x: x["_key"])  # 稳定顺序

# 统计每类可用数 & 位向量
avail = {t: len(buckets.get(t, [])) for t in MBTI_16}
bits  = {t: mbti_bits(t) for t in MBTI_16}

# 初始配额：先取 min(PER_CLASS, avail)（不足的全保留）
k = {t: min(PER_CLASS, avail[t]) for t in MBTI_16}

# 每类上下限：不足的类下限=其可用数（不减少）；其余允许在 [PER_CLASS-WIGGLE, PER_CLASS+WIGGLE]
min_k = {}
max_k = {}
for t in MBTI_16:
    if avail[t] < PER_CLASS:   # 稀有/不足类：全保留，不减少
        min_k[t] = avail[t]
        max_k[t] = avail[t]    # 也不增加（没有更多）
        k[t]     = avail[t]
    else:
        min_k[t] = max(0, PER_CLASS - WIGGLE)
        max_k[t] = min(avail[t], PER_CLASS + WIGGLE)
        k[t]     = max(min_k[t], min(k[t], max_k[t]))

# 目标函数：四维平方偏差（越小越均衡）
def objective(kdict):
    total = sum(kdict.values())
    if total == 0:
        return 0.0
    # side1 是 E/N/T/J 的数量； side0 = total - side1
    obj = 0.0
    for d in range(4):
        side1 = sum(kdict[t] for t in MBTI_16 if bits[t][d]==1)
        obj += (side1 - total/2.0)**2
    return obj

# 贪心交换：每次尝试把一个“过多侧”的类型 -1，和一个“过少侧”的类型 +1（均在上下限内）
def rebalance(k):
    k = k.copy()
    best = objective(k)
    improved = True
    iters = 0
    while improved:
        improved = False
        iters += 1
        total = sum(k.values())
        if total == 0: break

        # 当前四维的 side1 数量 & 失衡度
        side1 = [sum(k[t] for t in MBTI_16 if bits[t][d]==1) for d in range(4)]
        # 对每个维度，确定“过多侧”与“过少侧” (1侧与0侧)
        over_under = []
        for d in range(4):
            over_side = 1 if side1[d] > total/2.0 else 0
            under_side = 1 - over_side
            gap = abs(side1[d] - total/2.0)
            over_under.append((gap, d, over_side, under_side))
        # 按 gap 从大到小尝试修正
        over_under.sort(reverse=True)

        for gap, d, over_side, under_side in over_under:
            if gap <= 0:  # 已经均衡
                continue
            # 候选可以 -1 的类型：在过多侧、且 k[t] > min_k[t]
            cands_down = [t for t in MBTI_16 if bits[t][d]==over_side and k[t] > min_k[t]]
            # 候选可以 +1 的类型：在过少侧、且 k[t] < max_k[t]
            cands_up   = [t for t in MBTI_16 if bits[t][d]==under_side and k[t] < max_k[t]]
            if not cands_down or not cands_up:
                continue

            # 穷举所有 (down, up) 组合（16*16 最多 256 种），找最优改进
            local_best_impr = 0.0
            local_best_pair = None
            for t_down in cands_down:
                for t_up in cands_up:
                    if t_down == t_up: 
                        continue
                    k_try = k.copy()
                    k_try[t_down] -= 1
                    k_try[t_up]   += 1
                    new_obj = objective(k_try)
                    impr = best - new_obj
                    # 二级偏好：尽量贴近 PER_CLASS（让各类“30左右”）
                    # 若改进相同，则优先让 |k-30| 更小
                    if impr > local_best_impr + 1e-9:
                        local_best_impr = impr
                        local_best_pair = (t_down, t_up, new_obj)
                    elif abs(impr - local_best_impr) <= 1e-9 and local_best_pair is not None:
                        old_dev = abs(k[local_best_pair[0]]-PER_CLASS)+abs(k[local_best_pair[1]]-PER_CLASS)
                        new_dev = abs((k[t_down]-1)-PER_CLASS)+abs((k[t_up]+1)-PER_CLASS)
                        if new_dev < old_dev:
                            local_best_pair = (t_down, t_up, new_obj)

            if local_best_pair is not None and local_best_impr > 1e-9:
                t_down, t_up, new_obj = local_best_pair
                k[t_down] -= 1
                k[t_up]   += 1
                best = new_obj
                improved = True
                break  # 先应用一次改进，再重新评估四维
        if iters > 2000:  # 安全退出
            break
    return k

k_bal = rebalance(k)

# 依据最终配额，按确定性顺序取样
picked = []
for t in MBTI_16:
    bucket = buckets.get(t, [])
    picked.extend(bucket[:k_bal.get(t, 0)])

# 报告
dist = Counter(ex["type"] for ex in picked)
print("== 最终每类数量（≈30，允许±%d）==" % WIGGLE)
for t in MBTI_16:
    print(f"{t}: {dist.get(t,0)}")
total = len(picked)
print(f"Total picked: {total}")

# 四维统计
def four_stats(ex_list):
    four = [[0,0],[0,0],[0,0],[0,0]]  # EI, NS, TF, JP
    for ex in ex_list:
        b = mbti_bits(ex["type"])
        for d in range(4):
            four[d][b[d]] += 1
    return four

four = four_stats(picked)
names = ["E/I (I=0,E=1)", "N/S (S=0,N=1)", "T/F (F=0,T=1)", "J/P (P=0,J=1)"]
ideal = total / 2.0
print("\n== 四维统计（左,右） & 与理想差距(各侧理想=总数/2) ==")
for d, nm in enumerate(names):
    left, right = four[d]
    gap = abs(left - ideal) + abs(right - ideal)
    print(f"{nm}: {four[d]}    偏差和={gap:.1f}")

# 保存
for ex in picked:
    ex.pop("_key", None)
with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
    json.dump(picked, f, ensure_ascii=False, indent=2)
print(f"\n✅ 已保存：{OUTPUT_JSON}")


== 最终每类数量（≈30，允许±5）==
INTJ: 25
INTP: 25
ENTJ: 31
ENTP: 29
INFJ: 35
INFP: 25
ENFJ: 10
ENFP: 35
ISTJ: 11
ISFJ: 15
ESTJ: 5
ESFJ: 6
ISTP: 35
ISFP: 23
ESTP: 10
ESFP: 5
Total picked: 325

== 四维统计（左,右） & 与理想差距(各侧理想=总数/2) ==
E/I (I=0,E=1): [194, 131]    偏差和=63.0
N/S (S=0,N=1): [110, 215]    偏差和=105.0
T/F (F=0,T=1): [154, 171]    偏差和=17.0
J/P (P=0,J=1): [187, 138]    偏差和=49.0

✅ 已保存：picked_balanced_around30.json
