In [1]:
#步骤 1：将所有 CSV 文件合并成一个文件

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#Code (Colab/py) — Modules prêts à coller

Chemins adaptés à tes fichiers；部分功能（繁→简）用 opencc，可选。

A. Préparation & normalisation

In [4]:
#A. Préparation & normalisation

from pathlib import Path
import pandas as pd
import numpy as np
import re, unicodedata
from urllib.parse import urlparse

# ====== Chemins ======
path_refs   = Path("/content/drive/MyDrive/Colab Notebooks/STAGE_CRLAO_CNRS/Corpus_data/liste de référence/all_ch.txt")
path_corpus = Path("/content/drive/MyDrive/Colab Notebooks/STAGE_CRLAO_CNRS/Corpus_data/Token/ch2_all_data_2015_2025_tok_thulac.csv")
path_neo = Path("/content/drive/MyDrive/Colab Notebooks/STAGE_CRLAO_CNRS/Corpus_data/identification/ch2_néologisme_new_only.csv")
out_dir     = Path("/content/drive/MyDrive/Colab Notebooks/STAGE_CRLAO_CNRS/Corpus_data/Evaluation")
out_dir.mkdir(parents=True, exist_ok=True)

# ====== OpenCC (繁->简，可选) ======
try:
    from opencc import OpenCC
    cc = OpenCC('t2s')
except Exception:
    cc = None

ZERO_WIDTH = re.compile(r"[\u200B-\u200D\uFEFF]")
NUM_CHI = "零一二三四五六七八九〇壹贰叁肆伍陆柒捌玖拾佰仟万亿"
PUNCT_CAT = lambda ch: unicodedata.category(ch).startswith("P")

def normalize_text(x: str) -> str:
    if not isinstance(x, str):
        x = str(x) if pd.notna(x) else ""
    x = ZERO_WIDTH.sub("", x)
    x = unicodedata.normalize("NFKC", x)  # 全->半, 形态兼容
    if cc:
        x = cc.convert(x)  # 繁->简
    x = re.sub(r"\s+", " ", x).strip()
    return x

def is_punct_token(tok: str) -> bool:
    return all(PUNCT_CAT(ch) or ch.isspace() for ch in tok)

num_pattern = re.compile(rf"^[0-9{NUM_CHI}]+$")
def is_number_token(tok: str) -> bool:
    return bool(num_pattern.match(tok))

percent_pattern = re.compile(r"^([0-9]+%|百分之[零一二三四五六七八九十百千万亿〇]+)$")
def is_percentage_token(tok: str) -> bool:
    return bool(percent_pattern.match(tok))

def valid_token(tok: str) -> bool:
    return tok and (not is_punct_token(tok)) and (not is_number_token(tok)) and (not is_percentage_token(tok))

# ====== Lecture référence ======
ref_terms = []
with path_refs.open("r", encoding="utf-8") as f:
    for line in f:
        t = normalize_text(line.strip())
        if t:
            ref_terms.append(t)
ref_set = set(pd.unique(pd.Series(ref_terms)))

# ====== Lecture corpus ======
df = pd.read_csv(path_corpus, encoding="utf-8")
if not {"content","date"}.issubset(df.columns):
    raise ValueError("Le CSV doit contenir les colonnes 'content' et 'date'.")

# Normalisations
df["content"] = df["content"].fillna("").astype(str).map(normalize_text)
df["date_parsed"] = pd.to_datetime(df["date"], errors="coerce")
df["year"] = df["date_parsed"].dt.year

# doc_id & source
if "url" in df.columns:
    def get_domain(u):
        try:
            return urlparse(u).netloc.lower()
        except Exception:
            return "unknown"
    df["source"] = df["url"].astype(str).map(get_domain)
else:
    df["source"] = "unknown"

if "doc_id" not in df.columns:
    df["doc_id"] = np.arange(len(df))


In [5]:
#B. Méthode 1 — fréquence、首现年份、文档/来源数（+ 阈值）

from collections import Counter, defaultdict

freq1 = Counter()
first_year1 = {}
docset1 = defaultdict(set)
sourceset1 = defaultdict(set)

for _, row in df.iterrows():
    y = row["year"]
    did = row["doc_id"]
    src = row["source"]
    tokens = [t for t in row["content"].split() if valid_token(t)]
    freq1.update(tokens)
    if pd.notna(y):
        y = int(y)
        for t in tokens:
            if t not in first_year1:
                first_year1[t] = y
    for t in set(tokens):
        docset1[t].add(did)
        sourceset1[t].add(src)

rows1 = []
for t, f in freq1.items():
    rows1.append({
        "term": t,
        "frequency": int(f),
        "first_year": first_year1.get(t),
        "doc_count": len(docset1[t]),
        "source_count": len(sourceset1[t]),
        "in_reference": t in ref_set,
        "status": "new" if t not in ref_set else "in_reference",
        "method1": 1
    })
m1 = pd.DataFrame(rows1)

# Seuils recommandés (à ajuster)
MIN_FREQ   = 3
MIN_DOC    = 2
MIN_SOURCE = 2

m1_candidates = m1.query("status == 'new' and frequency >= @MIN_FREQ and doc_count >= @MIN_DOC and source_count >= @MIN_SOURCE").copy()
m1_candidates.head(3)


Unnamed: 0,term,frequency,first_year,doc_count,source_count,in_reference,status,method1


In [6]:
#C. Méthode 2 — n-grammes 1–5（含 PMI 简版）

from math import log2
from itertools import islice

MAX_N = 5

# 先得到 unigram 统计（供 PMI）
uni = Counter()
docset_uni = defaultdict(set)

for _, row in df.iterrows():
    did = row["doc_id"]
    toks = [t for t in row["content"].split() if valid_token(t)]
    uni.update(toks)
    for t in set(toks):
        docset_uni[t].add(did)

TOTAL_TOK = sum(uni.values())
P_uni = {t: uni[t] / TOTAL_TOK for t in uni}

# 句子边界（避免跨句拼接，简化：用句号/问号/叹号/分号等粗分）
SPLIT_RE = re.compile(r"[。！？；;?!]")

def tokens_by_sentence(text):
    # 假设 content 已经是空格分词；这里先粗切句，再分词
    # 简化：直接按标点切割原 text，再对每段 split()
    parts = SPLIT_RE.split(text)
    for p in parts:
        toks = [t for t in p.split() if valid_token(t)]
        if toks:
            yield toks

def sliding_ngrams(seq, n):
    it = iter(seq)
    win = list(islice(it, n))
    if len(win) == n:
        yield tuple(win)
    for x in it:
        win = win[1:] + [x]
        yield tuple(win)

# 统计 n-gram
ng_counts = Counter()
ng_docset = defaultdict(set)

for _, row in df.iterrows():
    did = row["doc_id"]
    for toks in tokens_by_sentence(row["content"]):
        L = len(toks)
        for n in range(1, MAX_N+1):
            if L < n:
                continue
            for ng in sliding_ngrams(toks, n):
                ng_counts[ng] += 1
                ng_docset[ng].add(did)

# PMI（对 n>=2 用相邻 PMI 的平均；n==1 时设为NaN）
def pmi_adjacent(ng):
    if len(ng) == 1:
        return np.nan
    # P(ng) 近似用频次 / 所有窗口数；简化以 TOTAL_TOK 近似分母（保守）
    p_ng = ng_counts[ng] / TOTAL_TOK
    # 相邻对的几何近似：平均 PMI
    pairs = list(zip(ng, ng[1:]))
    pmis = []
    for a,b in pairs:
        p_a, p_b = P_uni.get(a, 1e-12), P_uni.get(b, 1e-12)
        # 用 bigram 概率未知时，用联合近似：频率( (a,b) )/TOTAL_TOK；在我们的统计里，(a,b) 也是一个 n-gram
        p_ab = ng_counts.get((a,b), 0) / TOTAL_TOK
        if p_ab <= 0:
            pmis.append(-np.inf)
        else:
            pmis.append(log2(p_ab / (p_a * p_b)))
    return float(np.mean(pmis)) if pmis else np.nan

rows2 = []
for ng, f in ng_counts.items():
    n = len(ng)
    # 拼接成中文串（无空格），也保留空格版以便调试
    joined = "".join(ng)
    spaced = " ".join(ng)
    rows2.append({
        "ngram_joined": joined,
        "ngram_spaced": spaced,
        "n": n,
        "frequency": int(f),
        "doc_count": len(ng_docset[ng]),
        "pmi_adj": pmi_adjacent(ng)
    })
m2_all = pd.DataFrame(rows2)

# 只保留不在参考表中的候选（用无空格形式对比参考）
m2_candidates = m2_all[
    (~m2_all["ngram_joined"].isin(ref_set)) &
    (
        (m2_all["n"] == 1) |  # 单字/单词也许有用（可按需禁用）
        ((m2_all["n"] >= 2) &
         (m2_all["frequency"] >= MIN_FREQ) &
         (m2_all["doc_count"] >= MIN_DOC) &
         (m2_all["pmi_adj"] >= 2.0))  # PMI阈值可调：1.5~3.0
    )
].copy()

# 为 n-gram 估计 first_year（首次出现在某文档的年份）
first_year2 = {}
for _, row in df.iterrows():
    y = row["year"]
    if pd.isna(y):
        continue
    y = int(y)
    for toks in tokens_by_sentence(row["content"]):
        for n in range(1, MAX_N+1):
            for ng in sliding_ngrams(toks, n):
                key = "".join(ng)
                if key not in first_year2:
                    first_year2[key] = y

m2_candidates["first_year"] = m2_candidates["ngram_joined"].map(first_year2.get)
m2_candidates["method2"] = 1


In [7]:
#D. Méthode 3 — audit de la référence（谁不在语料里）


# 参考表每个词在 2015–2025 的频次/文档数
freq_ref_in_corpus = Counter()
docset_ref_in_corpus = defaultdict(set)

for _, row in df.iterrows():
    did = row["doc_id"]
    toks = set([t for t in row["content"].split() if valid_token(t)])
    inter = toks.intersection(ref_set)
    for t in inter:
        freq_ref_in_corpus[t] += 1  # 注意：这里是“包含该词的句段数”的简化；也可按出现次数统计
        docset_ref_in_corpus[t].add(did)

rows3 = []
for t in ref_set:
    rows3.append({
        "term": t,
        "doc_count": len(docset_ref_in_corpus[t]),
        "frequency_proxy": int(freq_ref_in_corpus[t])
    })
m3_audit = pd.DataFrame(rows3).sort_values(["doc_count","frequency_proxy"], ascending=True).reset_index(drop=True)
m3_audit["method3_present"] = (m3_audit["doc_count"] > 0).astype(int)


In [None]:
#E. Fusion & échantillonnage pour annotation（生成 label_me.csv）

# 统一字段并合并
m1_small = m1_candidates.rename(columns={"term":"unit"})[["unit","frequency","doc_count","source_count","first_year"]].copy()
m1_small["method1"] = 1

m2_small = m2_candidates.rename(columns={"ngram_joined":"unit"})[["unit","frequency","doc_count","first_year","pmi_adj","n"]].copy()
m2_small["source_count"] = np.nan
m2_small["method2"] = 1

# 合并
pool = pd.concat([m1_small, m2_small], ignore_index=True)
pool = (pool.groupby("unit", as_index=False)
            .agg({
                "frequency":"max",
                "doc_count":"max",
                "source_count":"max",
                "first_year":"min",
                "pmi_adj":"max",
                "n":"max",
                "method1":"max",
                "method2":"max"
            })
        )

# 加上一列示例上下文（从首个命中的文档中取左右各10个token）
def get_first_context(term, window=10):
    for _, row in df.iterrows():
        toks = row["content"].split()
        for i, tk in enumerate(toks):
            if tk == term:
                L = max(0, i-window); R = min(len(toks), i+window+1)
                return "… " + "".join(toks[L:R]) + " …"
        # 兼容 n-gram（已去空格合并的）
        joined = "".join([t for t in toks if valid_token(t)])
        pos = joined.find(term)
        if pos != -1:
            start = max(0, pos-30); end = min(len(joined), pos+len(term)+30)
            return "… " + joined[start:end] + " …"
    return ""

pool["example"] = pool["unit"].map(get_first_context)

# 分层抽样：M1_only / M2_only / both / low_freq
def stratified_sample(df_in, query, k=50, random_state=42):
    sub = df_in.query(query)
    if len(sub) <= k:
        return sub
    return sub.sample(k, random_state=random_state)

s1 = stratified_sample(pool, "method1==1 and method2!=1", k=50)
s2 = stratified_sample(pool, "method2==1 and method1!=1", k=50)
s3 = stratified_sample(pool, "method1==1 and method2==1", k=50)
s4 = stratified_sample(pool, "frequency<=2", k=50)  # 边界低频样本

label_df = pd.concat([s1,s2,s3,s4], ignore_index=True).drop_duplicates(subset=["unit"])
label_df["label"] = ""  # 你来手工填：1=真新词 / 0=非
label_path = out_dir / "label_me.csv"
label_df.to_csv(label_path, index=False, encoding="utf-8-sig")
label_path


In [None]:
#F. 计算指标（精度 / pooling 召回 / κ）

from sklearn.metrics import precision_score, recall_score, f1_score, cohen_kappa_score

# 读回你人工打标后的文件
labelled = pd.read_csv(out_dir / "label_me.csv", encoding="utf-8")
labelled = labelled.dropna(subset=["label"])
labelled["label"] = labelled["label"].astype(int)

# 单方法预测：把方法命中当作“预测为新词”
pred_m1 = (labelled["method1"]==1).astype(int)
pred_m2 = (labelled["method2"]==1).astype(int)
y_true  = labelled["label"]

def report_prec(name, y_pred, y_true):
    p = precision_score(y_true, y_pred, zero_division=0)
    r = recall_score(y_true, y_pred, zero_division=0)  # 注意：这是在样本上的召回，不是全局召回
    f = f1_score(y_true, y_pred, zero_division=0)
    print(f"{name:10s}  Precision={p:.3f}  Recall(sample)={r:.3f}  F1={f:.3f}")

report_prec("M1", pred_m1, y_true)
report_prec("M2", pred_m2, y_true)
report_prec("M1∪M2", ((pred_m1+pred_m2)>0).astype(int), y_true)
report_prec("M1∩M2", ((pred_m1+pred_m2)==2).astype(int), y_true)

# Pooling 估计相对召回：把 M1∪M2 的“真”为近似全集
pool_true = ((pred_m1+pred_m2)>0) & (y_true==1)
denom = pool_true.sum() if pool_true.sum()>0 else 1
recall_m1_pool = ((pred_m1==1) & (y_true==1)).sum() / denom
recall_m2_pool = ((pred_m2==1) & (y_true==1)).sum() / denom
print(f"Recall_pool  M1={recall_m1_pool:.3f}  M2={recall_m2_pool:.3f}")

# 若有两份标注（A/B）可算 Cohen's κ
# labA = pd.read_csv(out_dir / "label_me_A.csv")["label"].astype(int)
# labB = pd.read_csv(out_dir / "label_me_B.csv")["label"].astype(int)
# print("Cohen κ =", cohen_kappa_score(labA, labB))


In [None]:
#G.（可选）阈值扫描（灵敏度）

def eval_with_thresholds(m1df, m2df, min_freq, min_doc, min_src, pmi_thres):
    a = m1df.query("frequency>=@min_freq and doc_count>=@min_doc and source_count>=@min_src").copy()
    a["unit"] = a["unit"].astype(str); a["method1"]=1
    b = m2df.query("(n>=2) and frequency>=@min_freq and doc_count>=@min_doc and pmi_adj>=@pmi_thres").copy()
    b["unit"] = b["unit"].astype(str); b["method2"]=1
    merged = pd.merge(labelled[["unit","label"]],
                      pd.concat([a[["unit","method1"]], b[["unit","method2"]]], ignore_index=True)
                      .groupby("unit",as_index=False).max(),
                      on="unit", how="left").fillna(0)
    y_true = merged["label"].astype(int)
    pred_m1 = (merged.get("method1",0)==1).astype(int)
    pred_m2 = (merged.get("method2",0)==1).astype(int)
    return {
        "min_freq":min_freq,"min_doc":min_doc,"min_src":min_src,"pmi":pmi_thres,
        "prec_m1": precision_score(y_true, pred_m1, zero_division=0),
        "prec_m2": precision_score(y_true, pred_m2, zero_division=0),
        "prec_union": precision_score(y_true, ((pred_m1+pred_m2)>0).astype(int), zero_division=0)
    }

grid = []
for mf in [1,2,3,5]:
    for md in [1,2,3]:
        for ms in [1,2]:
            for pmi in [1.5,2.0,2.5,3.0]:
                grid.append(eval_with_thresholds(m1_small, m2_small, mf, md, ms, pmi))
pd.DataFrame(grid).sort_values("prec_union", ascending=False).head(10)


Comment interpréter & décider ?

Précision cible（按任务需求）：

如果要提供**高质量“新词清单”**供词典录入，倾向提高 Precision（提高阈值：min_doc/min_source/PMI）。

做探索/召回时，放宽阈值，接受更多候选，后续人工筛。

看分层表现：

M1∩M2 往往精度最高；M2_only 中可能含大量“新复合词/专名”，要靠 PMI+doc_count 控噪。

低频（freq≤2） 区域里误报多，建议单独复核。

对齐社会时间线：抽查高频“真新词”的 first_year 是否与社会事件年份相符（增强可信度）。

#2） 人工抽样 échantillonnage manuel

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path

# ========= 参数区 =========
SEED = 20250821
N_TARGET = 400  # 目标样本数
PATH_CAND = "/content/drive/MyDrive/Colab Notebooks/STAGE_CRLAO_CNRS/Corpus_data/identification/ch2_néologisme_new_only.csv"
# 可选：若有出现记录（为计算频次、首现年、抽上下文）
PATH_OCC = "/content/drive/MyDrive/Colab Notebooks/STAGE_CRLAO_CNRS/Corpus_data/Token/ch2_all_data_2015_2025_tok_thulac.csv"

np.random.seed(SEED)

# ========= 读取 =========
cand = pd.read_csv(PATH_CAND)
# 统一列名
cand = cand.rename(columns={ '词': 'term', 'token': 'term' })
assert 'term' in cand.columns, "候选文件需要包含一列 term"

# 若有出现记录，用它计算 freq/first_year/context
occ = None
try:
    occ = pd.read_csv(PATH_OCC)
    # 根据你的实际列名改这里：
    occ = occ.rename(columns={'token':'term', 'year':'year', 'docid':'doc_id', 'doc_id':'doc_id', 'context':'context'})
    assert set(['term','year']).issubset(occ.columns)
except Exception as e:
    print("未加载出现记录文件，仅基于候选 term 抽样；建议提供出现记录以改进分层与上下文。", e)

# ========= 统计特征 =========
if occ is not None:
    # 频次
    freq = occ.groupby('term').size().rename('freq_total')
    # 首现年
    first_year = occ.groupby('term')['year'].min().rename('first_year')
    # 年度频次宽表
    by_year = occ.pivot_table(index='term', columns='year', values='doc_id', aggfunc='count', fill_value=0)
    by_year.columns = [f'freq_{int(c)}' for c in by_year.columns]
    # 合并
    stat = pd.concat([freq, first_year, by_year], axis=1).reset_index()
else:
    # 无出现记录时，简单占位
    stat = cand[['term']].drop_duplicates().copy()
    stat['freq_total'] = 1
    stat['first_year'] = 2015

base = cand[['term']].drop_duplicates().merge(stat, on='term', how='left')

# ========= 形态/启发式特征（仅用于分层覆盖）=========
def is_abbrev_like(t):
    # 两字中文或包含英文字母/数字的，视为缩略/混写候选（可按需完善）
    if pd.isna(t): return False
    t = str(t)
    has_latin = any('a' <= ch.lower() <= 'z' for ch in t)
    has_digit = any(ch.isdigit() for ch in t)
    return (len(t) == 2) or has_latin or has_digit

def has_name_suffix(t):
    suffixes = ['市','省','县','区','镇','乡','校','大学','学院','医院','公司','集团','科技','控股','新区','局','厅','办','委','银行','证券','保险','药业','中心','研究院']
    return any(str(t).endswith(suf) for suf in suffixes)

base['is_abbrev'] = base['term'].apply(is_abbrev_like)
base['has_name_suffix'] = base['term'].apply(has_name_suffix)
base['latin_or_digit'] = base['term'].apply(lambda x: any(c.isdigit() or ('a'<=c.lower()<='z') for c in str(x)))

# ========= 频次层 =========
q95 = base['freq_total'].quantile(0.95) if base['freq_total'].notna().any() else 1
low_mask = base['freq_total'] <= 3
high_mask = base['freq_total'] >= q95
mid_mask = ~(low_mask | high_mask)

base['freq_layer'] = np.select(
    [high_mask, mid_mask, low_mask],
    ['high','mid','low'],
    default='mid'
)

# ========= 年份层（首现年）=========
def year_bucket(y):
    try:
        y = int(y)
    except:
        y = 2015
    if 2015 <= y <= 2017: return '2015-2017'
    if 2018 <= y <= 2019: return '2018-2019'
    if y == 2020:         return '2020'
    if 2021 <= y <= 2023: return '2021-2023'
    if 2024 <= y <= 2025: return '2024-2025'
    return 'unknown'

base['year_layer'] = base['first_year'].apply(year_bucket)

# ========= 分层配额（示例：先按频次层，再按年份层）=========
N_total = min(N_TARGET, len(base))
alloc = []

for f_layer, df_f in base.groupby('freq_layer'):
    # 频次层按占比配额
    n_f = int(round(N_total * len(df_f) / len(base)))
    if n_f == 0: n_f = min(1, len(df_f))

    # 该层内按年份层再均衡分配
    year_groups = list(df_f.groupby('year_layer'))
    # 平均分配；可按各年层大小加权
    weights = np.array([len(g) for _, g in year_groups], dtype=float)
    weights = weights / weights.sum()
    for (y_layer, g), w in zip(year_groups, weights):
        n_y = int(round(n_f * w))
        if n_y == 0 and len(g) > 0:
            n_y = 1
        alloc.append((f_layer, y_layer, n_y))

# ========= 在每个（频次×年份）子层内抽样，并尽量覆盖形态特征 =========
samples = []
for (f_layer, y_layer, n_y) in alloc:
    sub = base[(base['freq_layer']==f_layer) & (base['year_layer']==y_layer)]
    if len(sub) == 0 or n_y == 0:
        continue

    # 优先保障 is_abbrev / has_name_suffix / latin_or_digit 覆盖（各抽1条，如果有）
    picks = []
    for col in ['is_abbrev','has_name_suffix','latin_or_digit']:
        cand_sub = sub[sub[col] == True]
        if len(cand_sub) > 0 and len(picks) < n_y:
            picks.append(cand_sub.sample(n=1, random_state=SEED))

    picked = pd.concat(picks) if picks else pd.DataFrame(columns=sub.columns)
    # 剩余名额随机补齐
    remain = n_y - len(picked)
    if remain > 0:
        pool = sub[~sub['term'].isin(picked['term'])] if len(picked)>0 else sub
        if len(pool) > 0:
            picked2 = pool.sample(n=min(remain, len(pool)), random_state=SEED)
            picked = pd.concat([picked, picked2])
    samples.append(picked)

sample_df = pd.concat(samples).drop_duplicates(subset=['term'])

# 若总数偏离目标，微调（全局补齐/裁剪）
if len(sample_df) < N_total:
    pool = base[~base['term'].isin(sample_df['term'])]
    add = pool.sample(n=min(N_total-len(sample_df), len(pool)), random_state=SEED)
    sample_df = pd.concat([sample_df, add]).drop_duplicates(subset=['term'])
elif len(sample_df) > N_total:
    sample_df = sample_df.sample(n=N_total, random_state=SEED)

# ========= 填充上下文例句（最多3条）=========
def collect_context(term, occ_df, k=3):
    if occ_df is None or 'context' not in occ_df.columns:
        return ""
    ex = occ_df[occ_df['term']==term].dropna(subset=['context'])
    if len(ex)==0:
        return ""
    # 不同文档优先
    ex = ex.drop_duplicates(subset=['doc_id','context'])
    return "||".join(ex['context'].astype(str).head(k).tolist())

if occ is not None:
    ctx_map = {t: collect_context(t, occ, 3) for t in sample_df['term']}
    sample_df['context_examples'] = sample_df['term'].map(ctx_map)
    # doc_id 样本
    doc_map = occ.groupby('term')['doc_id'].apply(lambda s: ",".join(map(str, s.dropna().astype(str).head(5)))).to_dict()
    sample_df['doc_ids_samples'] = sample_df['term'].map(doc_map)
else:
    sample_df['context_examples'] = ""
    sample_df['doc_ids_samples'] = ""

# ========= 生成盲审标注表（A / B）=========
common_cols = ['term','first_year','freq_total'] + \
              [c for c in sample_df.columns if c.startswith('freq_')] + \
              ['doc_ids_samples','context_examples','freq_layer','year_layer','is_abbrev','has_name_suffix','latin_or_digit']

# 若没有年度频次列，也不影响
common_cols = [c for c in common_cols if c in sample_df.columns]

annot_cols = common_cols + [
    'detector_source', 'candidate_rank',
    'label_primary', 'label_secondary', 'rationale_short', 'annotator_id', 'annotation_date'
]

for col in ['detector_source','candidate_rank','label_primary','label_secondary','rationale_short','annotator_id','annotation_date']:
    if col not in sample_df.columns:
        sample_df[col] = ""

annot_A = sample_df[annot_cols].copy()
annot_B = sample_df[annot_cols].copy()

outdir = Path("/content")
annot_A.to_csv(outdir/"gold_sample_annot_A.csv", index=False)
annot_B.to_csv(outdir/"gold_sample_annot_B.csv", index=False)

print("完成！已输出：")
print(outdir/"gold_sample_annot_A.csv")
print(outdir/"gold_sample_annot_B.csv")


#3）黄金标准Gold Standard  
#annotation du jeu de référence (gold standard)
#constitution d’un gold standard par annotation manuelle。

In [None]:
# === Step 2. 统计候选 ===
import pandas as pd
from pathlib import Path

PATH_CAND = "/content/drive/.../identification/ch2_néologisme_new_only.csv"
PATH_OCC  = "/content/drive/.../Token/ch2_all_data_2015_2025_tok_thulac.csv"  # 包含 term/year/doc_id/context
OUT_DIR   = "/content/drive/.../gold_v1"  # 你想存放金标文件的文件夹

Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

cand = pd.read_csv(PATH_CAND)
cand = cand.rename(columns={'token':'term','词':'term'}).drop_duplicates(subset=['term'])
assert 'term' in cand.columns

occ = pd.read_csv(PATH_OCC)
occ = occ.rename(columns={'token':'term','docid':'doc_id'})
need_cols = {'term','year','doc_id','context'}
assert need_cols.issubset(occ.columns), f"缺列：{need_cols - set(occ.columns)}"

# 频次与首现年
freq = occ.groupby('term').size().rename('freq_total')
first_year = occ.groupby('term')['year'].min().rename('first_year')

# 年度频次
by_year = occ.pivot_table(index='term', columns='year', values='doc_id', aggfunc='count', fill_value=0)
by_year.columns = [f'freq_{int(c)}' for c in by_year.columns]

# 合并
stat = pd.concat([freq, first_year, by_year], axis=1).reset_index()
base = cand[['term']].merge(stat, on='term', how='left').fillna(0)

# 上下文与 doc 样本
def contexts_for(t, k=3):
    sub = occ[occ['term']==t].dropna(subset=['context'])
    sub = sub.drop_duplicates(subset=['doc_id','context'])
    return "||".join(sub['context'].astype(str).head(k).tolist())

def docids_for(t, k=5):
    sub = occ[occ['term']==t]
    return ",".join(map(str, sub['doc_id'].dropna().astype(str).head(k).tolist()))

base['context_examples'] = base['term'].apply(lambda t: contexts_for(t, 3))
base['doc_ids_samples']  = base['term'].apply(lambda t: docids_for(t, 5))

base.to_csv(f"{OUT_DIR}/candidates_with_stats.csv", index=False)
print("OK:", f"{OUT_DIR}/candidates_with_stats.csv")


In [None]:
# === Step 3. 分层抽样 ===
import numpy as np

SEED = 20250821
N_TARGET = 400

df = pd.read_csv(f"{OUT_DIR}/candidates_with_stats.csv")

# 频次层
q95 = df['freq_total'].quantile(0.95)
low  = df['freq_total'] <= 3
high = df['freq_total'] >= q95
mid  = ~(low | high)
df['freq_layer'] = np.select([high, mid, low], ['high','mid','low'], default='mid')

# 年份层
def y_bucket(y):
    y = int(y) if pd.notna(y) else 2015
    if 2015 <= y <= 2017: return '2015-2017'
    if 2018 <= y <= 2019: return '2018-2019'
    if y == 2020:         return '2020'
    if 2021 <= y <= 2023: return '2021-2023'
    if 2024 <= y <= 2025: return '2024-2025'
    return 'unknown'

df['year_layer'] = df['first_year'].apply(y_bucket)

# 形态覆盖用（仅用于抽样多样性，不是最终标签）
def abbrev_like(t):
    t = str(t)
    has_lat = any('a' <= c.lower() <= 'z' for c in t)
    has_dig = any(c.isdigit() for c in t)
    return (len(t)==2) or has_lat or has_dig

def has_name_suffix(t):
    suf = ['市','省','县','区','镇','乡','校','大学','学院','医院','公司','集团','科技','控股','新区','局','厅','办','委','银行','证券','保险','药业','中心','研究院']
    return any(str(t).endswith(s) for s in suf)

df['is_abbrev'] = df['term'].apply(abbrev_like)
df['has_name_suffix'] = df['term'].apply(has_name_suffix)
df['latin_or_digit'] = df['term'].apply(lambda t: any(c.isdigit() or ('a'<=c.lower()<='z') for c in str(t)))

# 分层配额
np.random.seed(SEED)
N_total = min(N_TARGET, len(df))
alloc = []
for f, g1 in df.groupby('freq_layer'):
    n_f = int(round(N_total * len(g1)/len(df)))
    if n_f == 0: n_f = min(1, len(g1))
    for y, g2 in g1.groupby('year_layer'):
        w = len(g2) / len(g1)
        n_y = int(round(n_f * w)) or (1 if len(g2)>0 else 0)
        alloc.append((f,y,n_y))

# 逐层抽样并尽量覆盖形态
picked = []
for f,y,n in alloc:
    sub = df[(df['freq_layer']==f)&(df['year_layer']==y)]
    if n==0 or len(sub)==0:
        continue
    sel = []
    for col in ['is_abbrev','has_name_suffix','latin_or_digit']:
        cand = sub[sub[col]==True]
        if len(cand)>0 and len(sel)<n:
            sel.append(cand.sample(1, random_state=SEED))
    chosen = pd.concat(sel) if sel else pd.DataFrame(columns=sub.columns)
    remain = n - len(chosen)
    if remain>0:
        pool = sub[~sub['term'].isin(chosen['term'])] if len(chosen)>0 else sub
        if len(pool)>0:
            chosen = pd.concat([chosen, pool.sample(min(remain, len(pool)), random_state=SEED)])
    picked.append(chosen)

sample_df = pd.concat(picked).drop_duplicates(subset=['term'])

# 全局补齐/裁剪
if len(sample_df) < N_total:
    pool = df[~df['term'].isin(sample_df['term'])]
    add = pool.sample(min(N_total-len(sample_df), len(pool)), random_state=SEED)
    sample_df = pd.concat([sample_df, add]).drop_duplicates(subset=['term'])
elif len(sample_df) > N_total:
    sample_df = sample_df.sample(N_total, random_state=SEED)

sample_df.to_csv(f"{OUT_DIR}/gold_sample_pool.csv", index=False)
print("OK:", f"{OUT_DIR}/gold_sample_pool.csv", len(sample_df))


In [None]:
# === Step 4. 生成 A/B 标注表 ===
common_cols = ['term','first_year','freq_total'] + \
              [c for c in sample_df.columns if c.startswith('freq_')] + \
              ['doc_ids_samples','context_examples','freq_layer','year_layer','is_abbrev','has_name_suffix','latin_or_digit']

# 去不存在的列
common_cols = [c for c in common_cols if c in sample_df.columns]

annot_cols = common_cols + [
    'detector_source','candidate_rank',
    'label_primary','label_secondary','rationale_short','annotator_id','annotation_date'
]

for col in ['detector_source','candidate_rank','label_primary','label_secondary','rationale_short','annotator_id','annotation_date']:
    if col not in sample_df.columns:
        sample_df[col] = ""

annot_A = sample_df[annot_cols].copy()
annot_B = sample_df[annot_cols].copy()

annot_A.to_csv(f"{OUT_DIR}/gold_sample_annot_A.csv", index=False)
annot_B.to_csv(f"{OUT_DIR}/gold_sample_annot_B.csv", index=False)
print("OK: 生成 A/B 标注文件")


In [None]:
# === Step 5. 一致性与冲突清单 ===
import pandas as pd
from sklearn.metrics import cohen_kappa_score

A_path = f"{OUT_DIR}/gold_sample_annot_A_filled.csv"  # A 回传后的文件
B_path = f"{OUT_DIR}/gold_sample_annot_B_filled.csv"  # B 回传后的文件

A = pd.read_csv(A_path)
B = pd.read_csv(B_path)

# 只保留 term 和标注列
A_ = A[['term','label_primary','label_secondary']].rename(columns={'label_primary':'label_A','label_secondary':'sec_A'})
B_ = B[['term','label_primary','label_secondary']].rename(columns={'label_primary':'label_B','label_secondary':'sec_B'})

merged = A_.merge(B_, on='term', how='inner')

# 计算 kappa（主标签）
kappa = cohen_kappa_score(merged['label_A'], merged['label_B'])
print("Cohen's kappa (primary):", round(kappa, 3))

# 可选：二值化（NEO vs 非NEO），看“是否新词”的一致性
binA = merged['label_A'].apply(lambda x: 'NEO' if x=='NEO' else 'NON')
binB = merged['label_B'].apply(lambda x: 'NEO' if x=='NEO' else 'NON')
kappa_bin = cohen_kappa_score(binA, binB)
print("Cohen's kappa (NEO vs NON):", round(kappa_bin, 3))

# 冲突清单 → 供仲裁
conflicts = merged[merged['label_A'] != merged['label_B']].copy()
conflicts = conflicts.merge(A[['term','rationale_short']], on='term', how='left').rename(columns={'rationale_short':'rationale_A'})
conflicts = conflicts.merge(B[['term','rationale_short']], on='term', how='left').rename(columns={'rationale_short':'rationale_B'})
conflicts.to_csv(f"{OUT_DIR}/conflicts_for_adjudication.csv", index=False)
print("OK: 冲突清单", f"{OUT_DIR}/conflicts_for_adjudication.csv")


In [None]:
# === Step 6. 生成最终金标 ===
conf = pd.read_csv(f"{OUT_DIR}/conflicts_for_adjudication_filled.csv")  # 仲裁者填好 adjudicated_label 后的文件

# 把无冲突的项（A==B）先收集
agree = merged[merged['label_A'] == merged['label_B']].copy()
agree['adjudicated_label'] = agree['label_A']  # 直接采用一致标签

# 合并仲裁结果
gold = pd.concat([
    agree[['term','adjudicated_label']],
    conf[['term','adjudicated_label']]
], axis=0, ignore_index=True)

# 连接统计信息与上下文，形成完整金标表
gold_full = gold.merge(df, on='term', how='left')
gold_full.to_csv(f"{OUT_DIR}/gold_standard_v1.csv", index=False)
print("OK: 金标 v1", f"{OUT_DIR}/gold_standard_v1.csv", len(gold_full))


In [None]:
# === Step 7. 评测 ===
from sklearn.metrics import classification_report

# 假设你的系统输出：term + pred_label（NEO/NOT_NEO/...）
SYS = pd.read_csv("/content/drive/.../my_system_preds.csv")  # 你自己的系统输出
GOLD = pd.read_csv(f"{OUT_DIR}/gold_standard_v1.csv").rename(columns={'adjudicated_label':'gold'})

eval_df = GOLD.merge(SYS[['term','pred_label']], on='term', how='left').dropna(subset=['pred_label'])

print(classification_report(eval_df['gold'], eval_df['pred_label'], digits=3))

# 可选：二值化只看“是否新词”
g_bin = eval_df['gold'].apply(lambda x: 'NEO' if x=='NEO' else 'NON')
p_bin = eval_df['pred_label'].apply(lambda x: 'NEO' if x=='NEO' else 'NON')
print("=== NEO vs NON ===")
print(classification_report(g_bin, p_bin, digits=3))
