<a href="https://colab.research.google.com/github/XTMay/AI_OCR/blob/main/layoutlmv3_ner/Lec_10_LayoutLMv3_Invoice_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# LayoutLMv3 发票信息抽取

本 Notebook 演示：

- 合成最小发票数据集（含图像、OCR token、bbox、BIO 标签）；
- 使用 **LayoutLMv3**（HuggingFace Transformers）进行 **微调**；
- 评估（实体级 F1、报告）；
- 可视化误差（预测 vs 真实，bbox 叠加）；
- 简单 **主动学习**（基于不确定性采样的增量标注与再训练）。



## 0. 准备环境

In [None]:

%%bash
pip -q install -U pip
pip -q install "transformers>=4.43.0" "datasets>=2.20.0" "accelerate>=0.31.0"                "seqeval>=1.2.2" "evaluate>=0.4.2" pillow matplotlib                "huggingface_hub>=0.23.0"
python - << 'PY'
import torch, platform
print("Torch:", torch.__version__, "| CUDA available:", torch.cuda.is_available())
print("Python:", platform.python_version())
PY


## 1. 导入与全局配置

In [None]:

import os, json, random, math, shutil
from pathlib import Path
from typing import List, Dict, Any

import numpy as np
from PIL import Image, ImageDraw, ImageFont

import matplotlib.pyplot as plt

import datasets
from datasets import load_dataset, Dataset, DatasetDict

from transformers import (
    LayoutLMv3Processor,
    LayoutLMv3ForTokenClassification,
    TrainingArguments,
    Trainer,
    set_seed
)

from seqeval.metrics import f1_score, precision_score, recall_score, classification_report

set_seed(42)

# 项目目录
ROOT = Path.cwd() / "invoice_demo"
IMG_DIR = ROOT / "images"
DATA_DIR = ROOT / "data"
OUT_DIR  = ROOT / "outputs"
for d in [IMG_DIR, DATA_DIR, OUT_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# 标签集合（BIO）
LABELS = ["O",
          "B-INV_NO","I-INV_NO",
          "B-AMT_TOTAL","I-AMT_TOTAL",
          "B-AMT_NET","I-AMT_NET",
          "B-CURRENCY","I-CURRENCY"]
id2label = dict(enumerate(LABELS))
label2id = {v:k for k,v in id2label.items()}

# Processor/Model 名称
MODEL_NAME = "microsoft/layoutlmv3-base"



## 2. 生成最小合成发票数据集（含 OCR tokens + bbox + BIO 标签）

说明：
- 使用 `PIL` 合成简单发票图像（中英混合）；
- 同时产出 token 序列、位置框（归一化到 [0,1000]）、标签；
- 拆分为 `train/validation/test/unlabeled`。


In [None]:

def draw_text_with_box(draw, xy, text, font, fill=(0,0,0)):
    draw.text(xy, text, fill=fill, font=font)
    bbox = draw.textbbox(xy, text, font=font)  # (left, top, right, bottom)
    return bbox

def normalize_bbox(bbox, width, height, scale=1000):
    x0,y0,x1,y1 = bbox
    x0 = max(0, min(scale, int(scale * x0 / width)))
    y0 = max(0, min(scale, int(scale * y0 / height)))
    x1 = max(0, min(scale, int(scale * x1 / width)))
    y1 = max(0, min(scale, int(scale * y1 / height)))
    return [x0,y0,x1,y1]

# 尝试加载系统字体（Colab可能存在这些），没有就用默认
def get_font(size=20):
    try:
        return ImageFont.truetype("DejaVuSans.ttf", size)
    except:
        return ImageFont.load_default()

random.seed(7)

def synth_invoice(sample_id:int, lang="en") -> Dict[str,Any]:
    W,H = 1000, 1400
    img = Image.new("RGB", (W,H), (255,255,255))
    draw = ImageDraw.Draw(img)
    font_h1 = get_font(28)
    font = get_font(22)
    y = 50

    tokens, bboxes, labels = [], [], []

    # 抬头
    title = "INVOICE" if lang=="en" else "发票"
    bbox = draw_text_with_box(draw, (50,y), title, font_h1)
    y += 60

    # 发票号
    if lang=="en":
        pieces = [("Invoice","B-INV_NO"), ("No.","I-INV_NO"), (str(10000+sample_id),"I-INV_NO")]
    else:
        pieces = [("发票号码","B-INV_NO"), (str(10000+sample_id),"I-INV_NO")]
    x=50
    for t,l in pieces:
        bbox = draw_text_with_box(draw, (x,y), t, font)
        tokens.append(t)
        bboxes.append(normalize_bbox(bbox, W,H))
        labels.append(l)
        x += (len(t)*12 + 25)
    y += 50

    # 表头（金额/币种）
    if lang=="en":
        headers = [("Amount (USD)","B-AMT_TOTAL"), ("Net Amount","B-AMT_NET")]
        cur_token = ("USD","B-CURRENCY")
    else:
        headers = [("金额(人民币)","B-AMT_TOTAL"), ("不含税金额","B-AMT_NET")]
        cur_token = ("人民币","B-CURRENCY")

    x=50
    for text,l in headers:
        bbox = draw_text_with_box(draw, (x,y), text, font)
        tokens.append(text)
        bboxes.append(normalize_bbox(bbox, W,H))
        labels.append(l)
        x += 350
    y += 40

    # 金额行
    amt_total = round(random.uniform(100, 2000), 2)
    amt_net   = round(amt_total * random.uniform(0.7, 0.95), 2)

    # 币种 token（单独给一个位置）
    bbox = draw_text_with_box(draw, (50,y), cur_token[0], font)
    tokens.append(cur_token[0])
    bboxes.append(normalize_bbox(bbox, W,H))
    labels.append(cur_token[1])

    # 金额 token
    bbox = draw_text_with_box(draw, (250,y), f"{amt_total:,.2f}", font)
    tokens.append(f"{amt_total:,.2f}")
    bboxes.append(normalize_bbox(bbox, W,H))
    labels.append("I-AMT_TOTAL")

    bbox = draw_text_with_box(draw, (600,y), f"{amt_net:,.2f}", font)
    tokens.append(f"{amt_net:,.2f}")
    bboxes.append(normalize_bbox(bbox, W,H))
    labels.append("I-AMT_NET")

    # 随机添加一些干扰字段
    y += 60
    noise = ["Vendor", "Address", "Date", "备注", "编号", "税率", "Thank you!", "合计"]
    x=50
    for _ in range(8):
        t = random.choice(noise)
        bbox = draw_text_with_box(draw, (x,y), t, font)
        tokens.append(t)
        bboxes.append(normalize_bbox(bbox, W,H))
        labels.append("O")
        x += random.randint(120, 220)
        if x > 800:
            x = 50
            y += 35

    return img, tokens, bboxes, labels

def build_dataset(n_train=40, n_val=10, n_test=10, n_unlabeled=20):
    # 随机语言分布
    langs = ["en","zh"]
    entries = {"train":[], "validation":[], "test":[], "unlabeled":[]}
    counters = {"train":0, "validation":0, "test":0, "unlabeled":0}

    def add_split(split, idx):
        lang = random.choice(langs)
        img, tokens, bboxes, labels = synth_invoice(idx, lang)
        img_path = IMG_DIR / f"{split}_{idx:04d}.png"
        img.save(img_path)
        entries[split].append({
            "id": f"{split}_{idx:04d}",
            "words": tokens,
            "bboxes": bboxes,
            "labels": labels,
            "image_path": str(img_path),
            "lang": lang
        })
        counters[split]+=1

    for i in range(n_train): add_split("train", i)
    for i in range(n_val): add_split("validation", 1000+i)
    for i in range(n_test): add_split("test", 2000+i)
    for i in range(n_unlabeled): add_split("unlabeled", 3000+i)

    # 写 JSONL
    for split in entries:
        with open(DATA_DIR / f"{split}.jsonl", "w", encoding="utf-8") as f:
            for ex in entries[split]:
                f.write(json.dumps(ex, ensure_ascii=False)+"
")

    print("Data stats:", counters)

build_dataset()


## 3. 加载数据集并预处理（Processor：图像+token+bbox → 模型输入）

In [None]:

processor = LayoutLMv3Processor.from_pretrained(MODEL_NAME)

def load_jsonl(path:Path):
    data = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            data.append(json.loads(line))
    return data

raw_datasets = DatasetDict({
    "train": Dataset.from_list(load_jsonl(DATA_DIR/"train.jsonl")),
    "validation": Dataset.from_list(load_jsonl(DATA_DIR/"validation.jsonl")),
    "test": Dataset.from_list(load_jsonl(DATA_DIR/"test.jsonl")),
    "unlabeled": Dataset.from_list(load_jsonl(DATA_DIR/"unlabeled.jsonl")),
})

from PIL import Image

def preprocess(batch):
    images = [Image.open(p).convert("RGB") for p in batch["image_path"]]
    word_labels = []
    for labs in batch["labels"]:
        word_labels.append([label2id[l] for l in labs])

    enc = processor(
        images,
        batch["words"],
        boxes=batch["bboxes"],
        word_labels=word_labels,
        truncation=True,
        max_length=512,
        padding="max_length"
    )
    return enc

encoded = raw_datasets.map(preprocess, batched=True, remove_columns=raw_datasets["train"].column_names)
encoded.set_format(type="torch")
encoded


## 4. 加载模型并设置 Trainer

In [None]:

model = LayoutLMv3ForTokenClassification.from_pretrained(
    MODEL_NAME, num_labels=len(LABELS), id2label=id2label, label2id=label2id
)

def seqeval_metrics(eval_pred):
    preds, labels = eval_pred
    preds = preds.argmax(-1)

    # 恢复到标签字符串（忽略 padding：-100）
    true_labels = []
    true_preds = []
    for p, l in zip(preds, labels):
        cur_true_l, cur_pred_l = [], []
        for pi, li in zip(p, l):
            if li == -100:
                continue
            cur_true_l.append(id2label[int(li)])
            cur_pred_l.append(id2label[int(pi)])
        true_labels.append(cur_true_l)
        true_preds.append(cur_pred_l)

    return {
        "precision": precision_score(true_labels, true_preds),
        "recall": recall_score(true_labels, true_preds),
        "f1": f1_score(true_labels, true_preds)
    }

args = TrainingArguments(
    output_dir=str(OUT_DIR/"run_base"),
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    learning_rate=5e-5,
    num_train_epochs=6,
    weight_decay=0.01,
    warmup_ratio=0.1,
    logging_steps=20,
    evaluation_strategy="steps",
    eval_steps=100,
    save_steps=100,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    fp16=True
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=encoded["train"],
    eval_dataset=encoded["validation"],
    tokenizer=processor.tokenizer,
    compute_metrics=seqeval_metrics
)

print("Ready to train.")


## 5. 训练

In [None]:

trainer.train()
trainer.evaluate(encoded["test"])


## 6. 误差可视化（预测 vs 真实）

In [None]:

import torch

def visualize_example(idx=0, split="test", k=50):
    # 取原始样本
    ex = raw_datasets[split][idx]
    img = Image.open(ex["image_path"]).convert("RGB")
    draw = ImageDraw.Draw(img)
    W,H = img.size

    # 预处理
    enc = processor(
        img, ex["words"], boxes=ex["bboxes"], word_labels=[[label2id[l] for l in ex["labels"]]],
        truncation=True, padding="max_length", max_length=512, return_tensors="pt"
    )
    enc = {k:v.to(model.device) for k,v in enc.items()}

    with torch.no_grad():
        logits = model(**enc).logits
    pred_ids = logits.argmax(-1).cpu().numpy()[0]

    # 还原非 padding 的部分
    pred_labels = []
    true_labels = []
    words = []
    bboxes = []
    for w, b, tl in zip(ex["words"], ex["bboxes"], ex["labels"]):
        words.append(w)
        bboxes.append(b)
        true_labels.append(tl)
    # pred 需要从 input_ids 对齐，这里直接按非 -100 的标签长度截取
    pred_labels = [id2label[int(pid)] for pid in pred_ids[:len(true_labels)]]

    # 可视化前 k 个 token 的 bbox
    for i,(w,b,pl,tl) in enumerate(zip(words, bboxes, pred_labels, true_labels)):
        if i>=k: break
        x0,y0,x1,y1 = [int(v/1000*W) if i%2==0 else int(v/1000*H) for i,v in enumerate(b)]
        color = (0,200,0) if pl==tl else (255,0,0)
        draw.rectangle([x0,y0,x1,y1], outline=color, width=2)
        draw.text((x0, max(0,y0-14)), f"{w} | pred:{pl} | true:{tl}", fill=color)

    plt.figure(figsize=(8,11))
    plt.imshow(img)
    plt.axis("off")
    plt.show()

visualize_example(idx=0, split="test", k=60)


## 7. 主动学习示例（不确定性采样 → 回标 → 再训练）

In [None]:

import torch

def entropy(p: np.ndarray, axis=-1, eps=1e-12):
    p = np.clip(p, eps, 1.0)
    return -np.sum(p*np.log(p), axis=axis)

def select_uncertain_samples(dataset, top_k=5):
    scores = []
    for i in range(len(dataset)):
        ex = dataset[i]
        img = Image.open(ex["image_path"]).convert("RGB")
        enc = processor(img, ex["words"], boxes=ex["bboxes"], truncation=True,
                        padding="max_length", max_length=512, return_tensors="pt")
        enc = {k:v.to(model.device) for k,v in enc.items()}
        with torch.no_grad():
            logits = model(**enc).logits    # [1, seq, C]
            probs = torch.softmax(logits, dim=-1).cpu().numpy()[0]
        # 选取前 len(words) 的 token 概率计算熵
        seq_len = len(ex["words"])
        ent = entropy(probs[:seq_len], axis=-1).mean()
        scores.append((ent, i))
    scores.sort(reverse=True, key=lambda x: x[0])
    return [idx for _,idx in scores[:top_k]]

# 选择不确定样本
top_indices = select_uncertain_samples(raw_datasets["unlabeled"], top_k=5)
print("Selected indices from unlabeled:", top_indices)

# 模拟“回标”：本例合成数据已有 labels，直接加入训练集
selected = [raw_datasets["unlabeled"][i] for i in top_indices]
new_train_list = list(raw_datasets["train"]) + selected

# 保存新训练集并重新编码
with open(DATA_DIR/"train_al.jsonl","w",encoding="utf-8") as f:
    for ex in new_train_list:
        f.write(json.dumps(ex, ensure_ascii=False)+"\n")

train_al = Dataset.from_list(new_train_list)
encoded_al = train_al.map(preprocess, batched=True, remove_columns=train_al.column_names)
encoded_al.set_format(type="torch")

# 继续训练（小步数演示）
args_al = TrainingArguments(
    output_dir=str(OUT_DIR/"run_active_learning"),
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    learning_rate=3e-5,
    num_train_epochs=3,
    weight_decay=0.01,
    warmup_ratio=0.06,
    logging_steps=20,
    evaluation_strategy="steps",
    eval_steps=100,
    save_steps=100,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    fp16=True
)

trainer_al = Trainer(
    model=model,
    args=args_al,
    train_dataset=encoded_al,
    eval_dataset=encoded["validation"],
    tokenizer=processor.tokenizer,
    compute_metrics=seqeval_metrics
)

trainer_al.train()
res = trainer_al.evaluate(encoded["test"])
print("After Active Learning - Test metrics:", res)


### 1. 不确定性采样（Uncertainty Sampling）

top_indices = select_uncertain_samples(raw_datasets["unlabeled"], top_k=5)

	•	目的：从未标注的数据中挑选模型最“不确定”的样本，让人工标注后最大化提升模型性能。
	•	实现方式：
	•	先用当前模型 model 对每个未标注样本做推理。
	•	计算每个 token 的预测概率 probs。
	•	通过 熵 (entropy) 衡量不确定性：

ent = entropy(probs[:seq_len], axis=-1).mean()

	•	熵越大 → 模型越不确定。

	•	挑选熵最高的 top-k 样本加入待标注列表。


### 2. 回标（Labeling / Pseudo-Labeling）

selected = [raw_datasets["unlabeled"][i] for i in top_indices]
new_train_list = list(raw_datasets["train"]) + selected

	•	目的：给挑选出的样本加上标签（这里示例是合成数据，已有 label，所以直接加入训练集）。
	•	实际应用：
	•	在真实场景中，这一步通常需要人工标注。
	•	也可以用模型自身的预测做 伪标注 (pseudo-label)，尤其在半监督场景。

### 3. 再训练（Model Retraining / Fine-tuning）

trainer_al.train()
res = trainer_al.evaluate(encoded["test"])

	•	目的：用新增标注数据 微调模型，提高模型在关键样本上的性能。
	•	流程：
	1.	将新的训练集编码 (encoded_al)。
	2.	使用 Trainer 再训练模型若干 epoch。
	3.	评估更新后的模型在测试集的效果。


### 4. 总结：主动学习流程

原始训练集 → 训练模型 → 用模型预测未标注数据 → 选出最不确定的样本 → 人工标注/伪标注 → 加入训练集 → 再训练模型 → 重复

	•	优点：
	•	避免随机标注大量数据，节省标注成本。
	•	重点标注模型不确定的样本，可以更快提升模型性能。
	•	关键概念：
	•	不确定性采样：基于模型预测的不确定性挑选样本（本例用熵）。
	•	回标 / 标注：给挑选样本加上正确 label。
	•	再训练：微调模型，将新标注数据的知识吸收进模型。


## 8. 超参数小网格实验（演示版）

In [None]:

def small_sweep(lrs=[5e-5,3e-5], epochs=[4,6]):
    results = []
    for lr in lrs:
        for ep in epochs:
            run_dir = OUT_DIR / f"sweep_lr{lr}_ep{ep}"
            args_s = TrainingArguments(
                output_dir=str(run_dir),
                per_device_train_batch_size=2,
                per_device_eval_batch_size=2,
                learning_rate=lr,
                num_train_epochs=ep,
                weight_decay=0.01,
                warmup_ratio=0.1,
                logging_steps=20,
                evaluation_strategy="epoch",
                save_strategy="epoch",
                load_best_model_at_end=True,
                metric_for_best_model="f1",
                fp16=True
            )
            trainer_s = Trainer(
                model=LayoutLMv3ForTokenClassification.from_pretrained(
                    MODEL_NAME, num_labels=len(LABELS), id2label=id2label, label2id=label2id
                ),
                args=args_s,
                train_dataset=encoded["train"],
                eval_dataset=encoded["validation"],
                tokenizer=processor.tokenizer,
                compute_metrics=seqeval_metrics
            )
            trainer_s.train()
            eval_res = trainer_s.evaluate(encoded["validation"])
            results.append({"lr":lr,"epochs":ep, **eval_res})
            print("Config:", lr, ep, "=>", eval_res)
    return results

# 仅演示，实际可按需运行（注释掉默认不跑）
# sweep_results = small_sweep()
# sweep_results


## 9. 推理与字段组装（示例）

In [None]:

def normalize_amount(s: str):
    s = s.replace(",", "").replace(" ", "")
    s = s.replace("（","(").replace("）",")")
    neg = s.startswith("(") and s.endswith(")")
    s = s.strip("()")
    try:
        v = float(s)
        return -v if neg else v
    except:
        return None

def infer_fields(words:List[str], labels:List[str]):
    inv_no = []
    amt_total_tokens, amt_net_tokens, currency_tokens = [], [], []

    for w,l in zip(words, labels):
        if l.endswith("INV_NO") and l[0] in ("B","I"): inv_no.append(w)
        if l.endswith("AMT_TOTAL") and l[0] in ("B","I"): amt_total_tokens.append(w)
        if l.endswith("AMT_NET") and l[0] in ("B","I"): amt_net_tokens.append(w)
        if l.endswith("CURRENCY") and l[0] in ("B","I"): currency_tokens.append(w)

    inv_no = " ".join(inv_no).strip()
    amt_total = normalize_amount("".join(amt_total_tokens)) if amt_total_tokens else None
    amt_net = normalize_amount("".join(amt_net_tokens)) if amt_net_tokens else None
    currency = " ".join(currency_tokens).upper().replace("人民币","CNY").strip() if currency_tokens else ""

    # 简单上下文兜底
    if not currency:
        ctx = " ".join(words).upper()
        if "USD" in ctx or "$" in ctx: currency = "USD"
        if "人民币" in ctx or "CNY" in ctx or "RMB" in ctx or "¥" in ctx: currency = "CNY"

    return {
        "invoice_number": inv_no,
        "amount_with_tax": amt_total,
        "amount_without_tax": amt_net,
        "currency": currency if currency else "UNKNOWN"
    }

def predict_example(idx=0, split="test"):
    ex = raw_datasets[split][idx]
    img = Image.open(ex["image_path"]).convert("RGB")
    enc = processor(img, ex["words"], boxes=ex["bboxes"], truncation=True,
                    padding="max_length", max_length=512, return_tensors="pt")
    with torch.no_grad():
        logits = model(**{k:v.to(model.device) for k,v in enc.items()}).logits
    pred_ids = logits.argmax(-1).cpu().numpy()[0]
    pred_labels = [id2label[int(pid)] for pid in pred_ids[:len(ex["words"])]]

    fields = infer_fields(ex["words"], pred_labels)
    print("Pred fields:", fields)
    return fields

_ = predict_example(idx=0, split="test")
