In [None]:
import openai
import json
import pandas as pd
from tqdm import tqdm
import re
import time
from datetime import datetime
from sklearn.metrics import cohen_kappa_score
import os
from dotenv import load_dotenv

In [None]:
load_dotenv()

openai.api_key = os.environ["OPENAI_API_KEY"]
openai.api_base = os.environ["OPENAI_API_BASE"]
openai.api_type = "openai"
openai.api_version = None

In [None]:
MODEL_CONFIG = {
    "qwen2.5-72b": {
        "model_id" : "qwen2.5-72b-instruct"
    },
    "qwen2.5-32b": {
        "model_id": "qwen2.5-32b-instruct"
    },
    "qwen2.5-14b": {
        "model_id": "qwen2.5-14b-instruct"
    },
    "qwen2.5-1.5b": {
        "model_id": "qwen2.5-1.5b-instruct"
    },
    "deepseekV3": {
        "model_id" : "deepseek-v3"
    }
}

def get_model_config(model_name: str):
    if model_name not in MODEL_CONFIG:
        raise ValueError(f"Unknown Model: {model_name}")
    return MODEL_CONFIG[model_name]

model_list = ["qwen2.5-72b", "qwen2.5-32b", "qwen2.5-14b", "qwen2.5-1.5b", "deepseekV3"]

model_name = model_list[3]
model = get_model_config(model_name)["model_id"]

prompt_type = "zeroshot"

# well-formed
# atomic
# minimal
criteria = "well-formed"

In [None]:
def load_prompt_from_file(prompt_criteria: str) -> str:
    try:
        # 读取模板文件
        with open(f"template/{prompt_criteria}.txt", "r", encoding="utf-8") as file:
            return file.read().strip()
    except FileNotFoundError:
        print(f"❌ 错误：文件 template/{prompt_criteria}.txt 未找到")
        return ""
    except Exception as e:
        print(f"❌ 错误读取模板文件：{e}")
        return ""
    
def clean_json_text(text: str) -> str:
    # 移除 markdown 代码块标记
    return re.sub(r"^```(json)?|```$", "", text.strip(), flags=re.IGNORECASE)

def evaluate_user_story(system_prompt, user_story: str) -> dict:
    try:
        response = openai.ChatCompletion.create(
            model=model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_story}
            ],
            temperature=0
        )

        content = response.choices[0].message.content.strip()
        #print("🧾 模型原始输出：", content)
        cleaned = clean_json_text(content)

        # 尝试解析为 JSON
        parsed = json.loads(cleaned)
        return parsed

    except json.JSONDecodeError as e:
        print("❌ 模型返回内容不是有效的 JSON：")
        print(content)
        print("错误信息：", e)
        return {"error": "Invalid JSON", "raw": content}
    except Exception as e:
        print(f"❌ 评估出错：{e}")
        return {"error": str(e)}

In [None]:
prompt_file = f"{criteria}_{prompt_type}"
prompt = load_prompt_from_file(prompt_file)
if not prompt:
    print("❌ Prompt 加载失败，退出程序")
    exit()

In [None]:
# === 读取全量用户故事数据 ===
criteria_col = criteria.capitalize()
us_df = pd.read_excel("data/us/stories.xlsx", usecols=["Issue key", "story", "ac", "bg", criteria_col])

# === 步骤 2：读取需要评估的 issue_key 列表 ===
atomic_df = pd.read_excel(f"data/us1/{criteria}.xlsx", usecols=["Issue key"])

# 将需要评估的 Issue key 与全量数据匹配
df_eval = atomic_df.merge(us_df, on="Issue key", how="left")

# 检查是否有丢失匹配项
missing = df_eval[df_eval["story"].isnull()]
if not missing.empty:
    print("⚠️ 以下 Issue key 未在 stories.xlsx 中找到：")
    print(missing["Issue key"].tolist())

In [None]:
results = []

for _, row in tqdm(df_eval.iterrows(), total=len(df_eval), desc="Evaluating user stories"):
    issue_key = row["Issue key"]
    description = row["story"]
    expert = row[criteria_col]

    if pd.isna(description):
        result = {"error": "No description found"}
    else:
        result = evaluate_user_story(prompt, description)
        time.sleep(1)  # 每次调用后暂停 1 秒
        agent = -1 # （默认为 -1 表示解析失败）
        if isinstance(result, dict):
            v = result.get(f"violation")
            if v is not None:
                agent = 0 if v else 1

    results.append({
        "Issue key": issue_key,
        "Expert": expert,
        "Agent": agent,
        "Result": json.dumps(result, indent=2, ensure_ascii=False)
    })

In [None]:
# 生成当前时间字符串（到分钟）
current_time = datetime.now().strftime("%Y%m%d%H%M")
output_file = f"output/{criteria}-{model_name}-{prompt_type}-{current_time}.xlsx"
pd.DataFrame(results).to_excel(output_file, index=False)

print(f"\n✅ 结果已保存至：{output_file}")

In [None]:
# === 指标计算 ===
def compute_metrics(df):
    df = df[(df["Expert"].isin([0, 1])) & (df["Agent"].isin([0, 1]))]

    TP = ((df["Expert"] == 1) & (df["Agent"] == 1)).sum()
    TN = ((df["Expert"] == 0) & (df["Agent"] == 0)).sum()
    FP = ((df["Expert"] == 0) & (df["Agent"] == 1)).sum()
    FN = ((df["Expert"] == 1) & (df["Agent"] == 0)).sum()

    total = TP + TN + FP + FN
    accuracy = (TP + TN) / total if total else 0
    precision = TP / (TP + FP) if (TP + FP) else 0
    recall = TP / (TP + FN) if (TP + FN) else 0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) else 0

    print("\n📊 评估指标：")
    print(f"TP (True Positive): {TP}")
    print(f"FP (False Positive): {FP}")
    print(f"TN (True Negative): {TN}")
    print(f"FN (False Negative): {FN}")
    print(f"Accuracy 准确率: {accuracy:.4f}")
    print(f"Precision 精确率: {precision:.4f}")
    print(f"Recall 召回率: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

# === 计算Kappa系数
def compute_cohen_kappa(results_df):
    """根据 DataFrame 中的 Expert 与 Agent 列，计算 Cohen's Kappa 系数"""
    if "Expert" not in results_df.columns or "Agent" not in results_df.columns:
        print("❌ 缺少 Expert 或 Agent 列，无法计算 Kappa")
        return
    
    # 移除无效（-1）的 agent 值
    filtered = results_df[results_df["Agent"] != -1]

    if filtered.empty:
        print("⚠️ 无有效模型评估结果，跳过 Kappa 计算")
        return

    y_true = filtered["Expert"].astype(int)
    y_pred = filtered["Agent"].astype(int)

    kappa = cohen_kappa_score(y_true, y_pred)
    print(f"📊 Cohen's Kappa 系数：{kappa:.4f}")

In [None]:
df_result = pd.DataFrame(results)
compute_cohen_kappa(df_result)

In [None]:
compute_metrics(df_result)