# TAPEX 기반 ToTTo Table-aware Fact Verification 평가 노트북

이 노트북은 다음을 **end-to-end**로 수행합니다.

1. **원본 ToTTo 데이터(unlabeled test/dev/train)**에서 `example_id -> table`을 로드
2. ToTTo table을 **row_span/column_span을 고려해 2D grid로 복원** 후 `pandas.DataFrame`으로 변환
3. **예측 파일(totto_test_predictions.jsonl)**에서 `example_id -> generated_text` 로드
4. generated_text를 **문장 단위 claim**으로 분해
5. `microsoft/tapex-base-finetuned-tabfact`를 verifier로 사용해 각 claim을 **entailed/refuted** 판정
6. 예측별로 **TSS(Table Support Score), HR(Hallucination Rate), TSS_num(숫자 가중)**를 계산
7. 결과를 CSV로 저장

> 주의: TabFact fine-tuned TAPEX는 *단문 claim*에 최적화되어 있습니다. 본 노트북은 기본적으로 문장 단위로 분해하며, 필요시 숫자/비교/조건 claim 분해를 추가할 수 있습니다.


In [25]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [26]:
!pip -q install -U transformers accelerate sentencepiece safetensors pandas tqdm

## 1. 경로 설정

- **원본 테이블 데이터**: test를 평가하려면 보통 `unlabeled_totto_test_data.jsonl` (또는 팀에서 만든 test 원본)
- **예측 데이터**: `/content/drive/MyDrive/nlp_project_02/data/totto_test_predictions.jsonl`

원본 테이블 파일이 test에 없고 dev/train만 있으면, **predictions의 example_id가 포함된 split과 동일한 원본을 지정**해야 합니다.

In [27]:
# ====== 수정해서 사용 ======
DATA_DIR = '/content/drive/MyDrive/nlp_project_02/data'


ORIGINAL_TABLE_JSONL = f'{DATA_DIR}/totto_dev_data.jsonl'
# ORIGINAL_TABLE_JSONL = f'{DATA_DIR}/totto_preprocessed_dev.json'  # <- JSON이면 아래 로더가 자동 처리

# (B) 모델 예측 파일
PREDICTIONS_JSONL = f'{DATA_DIR}/totto_test_predictions.jsonl'

# 출력 경로
OUT_PER_CLAIM_CSV = f'{DATA_DIR}/tapex_eval_per_claim.csv'
OUT_PER_EXAMPLE_CSV = f'{DATA_DIR}/tapex_eval_per_example.csv'


## 2. 로더 + 테이블 변환(row_span/col_span 포함)

In [28]:
import json, re, os
from typing import Dict, Any, List, Iterator, Optional, Tuple, Union
import pandas as pd
from tqdm import tqdm

# ---------- JSON / JSONL 로더 ----------

def iter_jsonl(path: str) -> Iterator[Dict[str, Any]]:
    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            s = line.strip()
            if not s:
                continue
            yield json.loads(s)

def iter_json_auto(path: str) -> Iterator[Dict[str, Any]]:
    """
    .jsonl: line-delimited dict
    .json : dict or list[dict]
    """
    if path.endswith('.jsonl'):
        yield from iter_jsonl(path)
        return

    with open(path, 'r', encoding='utf-8') as f:
        obj = json.load(f)

    if isinstance(obj, list):
        for ex in obj:
            if not isinstance(ex, dict):
                raise ValueError(f'JSON list item is not dict: {type(ex)}')
            yield ex
    elif isinstance(obj, dict):
        yield obj
    else:
        raise ValueError(f'Unsupported JSON root type: {type(obj)}')


# ---------- ToTTo table -> DataFrame (span-aware) ----------

def _make_unique_columns(cols: List[str]) -> List[str]:
    seen = {}
    out = []
    for c in cols:
        c = str(c).strip() if c is not None else ''
        if c == '':
            c = 'col'
        if c not in seen:
            seen[c] = 0
            out.append(c)
        else:
            seen[c] += 1
            out.append(f'{c}_{seen[c]}')
    return out

def totto_expand_to_grid(table: List[List[Dict[str, Any]]]) -> List[List[str]]:
    """
    ToTTo table(List[List[Cell]])을 row_span/column_span을 고려해
    직사각형 2D grid(List[List[str]])로 복원.
    """
    grid: List[List[Optional[str]]] = []

    def ensure_size(r: int, c: int):
        while len(grid) <= r:
            grid.append([])
        while len(grid[r]) <= c:
            grid[r].append(None)

    for r, row in enumerate(table):
        c = 0
        for cell in row:
            val = str(cell.get('value', '')).strip()
            rs = int(cell.get('row_span', 1) or 1)
            cs = int(cell.get('column_span', 1) or 1)

            # 이미 span으로 채워진 칸 skip
            while True:
                ensure_size(r, c)
                if grid[r][c] is None:
                    break
                c += 1

            # span 영역 채움
            for dr in range(rs):
                rr = r + dr
                for dc in range(cs):
                    cc = c + dc
                    ensure_size(rr, cc)
                    if grid[rr][cc] is None:
                        grid[rr][cc] = val

            c += cs

    if not grid:
        return []

    max_len = max(len(r) for r in grid)
    out = []
    for r in grid:
        if len(r) < max_len:
            r = r + [None] * (max_len - len(r))
        out.append([('' if x is None else str(x)) for x in r])
    return out

def totto_table_to_df(table: List[List[Dict[str, Any]]]) -> pd.DataFrame:
    grid = totto_expand_to_grid(table)
    if not grid:
        return pd.DataFrame()

    # header row 판단: 원본 첫 row가 전부 is_header=True면 header로 사용
    is_header_row = False
    if table and table[0]:
        flags = [bool(cell.get('is_header', False)) for cell in table[0]]
        is_header_row = all(flags)

    if is_header_row:
        cols = _make_unique_columns(grid[0])
        data = grid[1:] if len(grid) > 1 else []
        df = pd.DataFrame(data, columns=cols)
    else:
        cols = _make_unique_columns([f'col_{i}' for i in range(len(grid[0]))])
        df = pd.DataFrame(grid, columns=cols)

    return df.astype(str)

# ---------- example_id -> table df 인덱스 ----------

def build_table_index(original_path: str) -> Dict[str, pd.DataFrame]:
    idx: Dict[str, pd.DataFrame] = {}
    for ex in tqdm(iter_json_auto(original_path), desc='Loading tables'):
        eid = str(ex.get('example_id', ex.get('id', ''))).strip()
        if not eid:
            continue
        if 'table' not in ex:
            # 전처리 포맷이 다를 수 있음
            raise KeyError(f"No 'table' field in original example (keys={list(ex.keys())[:20]})")
        idx[eid] = totto_table_to_df(ex['table'])
    return idx


## 3. predictions 로드 (example_id + 텍스트 키 자동 탐지)

In [29]:
TEXT_KEYS_CANDIDATES = [
    'prediction', 'pred', 'output', 'generated', 'generation',
    'decoded', 'text', 'hypothesis', 'model_output'
]

def pick_example_id(ex: Dict[str, Any]) -> str:
    for k in ['example_id', 'id', 'guid', 'eid']:
        if k in ex and ex[k] is not None:
            return str(ex[k])
    raise KeyError(f'example_id field not found (keys={list(ex.keys())})')

def pick_text_field(ex: Dict[str, Any]) -> str:
    # 1) 후보 키 우선
    for k in TEXT_KEYS_CANDIDATES:
        if k in ex and isinstance(ex[k], str) and ex[k].strip():
            return ex[k].strip()
    # 2) fallback: 가장 긴 문자열
    best = ''
    for _, v in ex.items():
        if isinstance(v, str) and len(v) > len(best):
            best = v
    return best.strip()

def load_predictions(pred_path: str) -> Dict[str, str]:
    preds: Dict[str, str] = {}
    for ex in tqdm(iter_json_auto(pred_path), desc="Loading predictions"):  # ✅ auto로 변경
        eid = pick_example_id(ex)
        txt = pick_text_field(ex)
        if eid is None:
            continue
        preds[str(eid)] = txt
    return preds



## 4. Claim 분해 (generated text → 문장 단위 claims)

In [30]:
def get_claims_from_generated_text(text: str):
    text = (text or "").strip()
    if not text:
        return []
    return [text]  # ✅ 무조건 통째로 1개 claim

def is_numeric_claim(claim: str) -> bool:
    # 숫자 포함 여부(간단 룰). 필요하면 %, $, 콤마 등 확장 가능
    return bool(re.search(r'\d', claim))


## 5. TAPEX(TabFact) verifier 로드

In [31]:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import TapexTokenizer, BartForConditionalGeneration

MODEL_NAME = "microsoft/tapex-base-finetuned-tabfact"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device).eval()

print("device:", device)
print("num_labels:", model.config.num_labels)
print("id2label:", model.config.id2label)
print("model class:", model.__class__)


You passed `num_labels=3` which is incompatible to the `id2label` map of length `2`.


device: cuda
num_labels: 2
id2label: {0: 'Refused', 1: 'Entailed'}
model class: <class 'transformers.models.bart.modeling_bart.BartForSequenceClassification'>


## 6. Verifier 실행 + 지표 계산

### 산출
- per-claim 결과: claim별 entail/refute + 확률
- per-example 결과: TSS, HR, TSS_num


In [32]:
from math import isnan
import torch
from typing import Dict, Optional, Tuple
import pandas as pd
from tqdm import tqdm

# 모델/토크나이저 로드 셀에서 이것도 꼭 해둬야 함:
# model.resize_token_embeddings(len(tokenizer))

@torch.no_grad()
def verify_claim(table_df: pd.DataFrame, claim: str) -> Tuple[int, float, str]:
    """
    return (entailed(1)/not(0), entail_prob, pred_label)
    id2label: 0=Refused, 1=Entailed
    """
    if table_df is None or table_df.empty or table_df.shape[1] == 0:
        return 0, 0.0, "EmptyTable"

    table_df = table_df.fillna("").astype(str)

    enc = tokenizer(
        table=table_df,
        query=str(claim),
        return_tensors="pt",
        truncation=True,
        max_length=1024
    )
    enc = {k: v.to(device) for k, v in enc.items()}

    out = model(**enc)                  # logits: [1,2]
    probs = torch.softmax(out.logits[0], dim=-1)

    entail_id = 1                       # ✅ 고정
    entail_prob = float(probs[entail_id].item())

    pred_id = int(torch.argmax(probs).item())
    entailed = 1 if pred_id == entail_id else 0

    pred_label = model.config.id2label.get(pred_id, str(pred_id))
    return entailed, entail_prob, pred_label



def evaluate(preds: Dict[str, str], tables: Dict[str, pd.DataFrame],
             max_examples: Optional[int] = None,
             max_claims_per_example: Optional[int] = None) -> Tuple[pd.DataFrame, pd.DataFrame]:

    per_claim_rows = []
    per_example_rows = []

    eids = list(preds.keys())
    if max_examples is not None:
        eids = eids[:max_examples]

    missing = 0

    for eid in tqdm(eids, desc='Evaluating'):
        if eid not in tables:
            missing += 1
            continue

        table_df = tables[eid]
        gen = preds[eid]

        # 너희 데이터가 한 문장이라면 get_claims_from_generated_text는 [gen] 반환 권장
        claims = get_claims_from_generated_text(gen)

        if not claims:
            per_example_rows.append({
                'example_id': eid,
                'num_claims': 0,
                'tss': None,
                'hr': None,
                'tss_num': None,
                'pred_label': None,          # ✅ pred_label 변수 없으니 None
                'entail_prob_mean': None,    # ✅ (선택) 평균 확률도 None
                'generated_text': gen
            })
            continue

        if max_claims_per_example is not None:
            claims = claims[:max_claims_per_example]

        v = []
        w = []
        entail_probs = []
        pred_labels = []   # per-claim pred_label 모음 (example 요약에 쓰려고)

        for ci, c in enumerate(claims):
            entailed, entail_prob, pred_label = verify_claim(table_df, c)  # ✅ 3개 받기
            numeric = is_numeric_claim(c)
            weight = 2.0 if numeric else 1.0

            v.append(int(entailed))
            w.append(float(weight))
            entail_probs.append(float(entail_prob))
            pred_labels.append(pred_label)

            per_claim_rows.append({
                'example_id': eid,
                'claim_idx': ci,
                'claim': c,
                'pred_label': pred_label,        # ✅ 추가
                'entailed': int(entailed),
                'entail_prob': float(entail_prob),
                'is_numeric_claim': int(numeric),
                'weight': float(weight)
            })

        n = len(v)
        tss = sum(v) / n
        hr = 1.0 - tss
        tss_num = (sum(vi * wi for vi, wi in zip(v, w)) / sum(w)) if sum(w) > 0 else None

        # per-example pred_label은 claim이 1개일 땐 그거 그대로,
        # 여러 개면 "Entailed/Refused" 카운트 요약 문자열로 저장
        if len(pred_labels) == 1:
            pred_label_summary = pred_labels[0]
        else:
            # 간단 요약: 가장 많이 나온 라벨
            from collections import Counter
            pred_label_summary = Counter(pred_labels).most_common(1)[0][0]

        per_example_rows.append({
            'example_id': eid,
            'num_claims': n,
            'tss': float(tss),
            'hr': float(hr),
            'tss_num': None if tss_num is None else float(tss_num),
            'pred_label': pred_label_summary,                 # ✅ 추가
            'entail_prob_mean': float(sum(entail_probs)/len(entail_probs)),  # ✅ 추가(선택)
            'generated_text': gen
        })

    if missing:
        print(f'[WARN] {missing} predictions had no matching table in original data. (example_id mismatch or wrong split)')

    per_claim_df = pd.DataFrame(per_claim_rows)
    per_example_df = pd.DataFrame(per_example_rows)
    return per_claim_df, per_example_df


## 7. 실행

- 먼저 table index를 만들고
- predictions를 로드한 다음
- 평가를 수행합니다.

> 처음에는 `max_examples=20` 정도로 테스트한 뒤, 전체로 늘리는 걸 권장합니다.

In [33]:
# 1) 원본 테이블 로드
assert os.path.exists(ORIGINAL_TABLE_JSONL), f'Not found: {ORIGINAL_TABLE_JSONL}'
assert os.path.exists(PREDICTIONS_JSONL), f'Not found: {PREDICTIONS_JSONL}'

tables = build_table_index(ORIGINAL_TABLE_JSONL)
print('num tables:', len(tables))

# 2) predictions 로드
preds = load_predictions(PREDICTIONS_JSONL)
print('num predictions:', len(preds))

# 3) 평가 (처음엔 소량 테스트 추천)
per_claim_df, per_example_df = evaluate(preds, tables, max_examples=20, max_claims_per_example=None)

per_claim_df.head(5), per_example_df.head(5)


Loading tables: 7700it [00:06, 1106.88it/s]


num tables: 7700


Loading predictions: 22293it [00:00, 31803.67it/s]


num predictions: 7700


Evaluating: 100%|██████████| 20/20 [00:01<00:00, 12.97it/s]


(             example_id  claim_idx  \
 0   7391450717765563190          0   
 1   9012083751335522596          0   
 2  -8764917516249435941          0   
 3  -6915287003153277224          0   
 4  -3004901021745997743          0   
 
                                                claim pred_label  entailed  \
 0  Daniel Henry Chamberlain was the 76th Governor...   Entailed         1   
 1  In 2016, Alma Jodorowsky played Evelyn in Kids...   Entailed         1   
 2                        A. J. Hawk had 119 tackles.   Entailed         1   
 3  Peter II the Simple (Pêr II) and Arthur III th...    Refused         0   
 4  Ralph J. Parker was the Speaker of the Minneso...   Entailed         1   
 
    entail_prob  is_numeric_claim  weight  
 0     0.999975                 1     2.0  
 1     0.999974                 1     2.0  
 2     0.999974                 1     2.0  
 3     0.001682                 1     2.0  
 4     0.999462                 0     1.0  ,
              example_id  num

In [34]:
def summarize_model_scores(per_claim_df, per_example_df):

    macro_tss = per_example_df["tss"].mean()
    macro_hr  = 1.0 - macro_tss

    micro_tss = per_claim_df["entailed"].mean()
    micro_hr  = 1.0 - micro_tss

    tss_num = (per_claim_df["entailed"] * per_claim_df["weight"]).sum() / per_claim_df["weight"].sum()
    hr_num  = 1.0 - tss_num

    entail_prob_mean = per_claim_df["entail_prob"].mean()

    label_dist = per_claim_df["pred_label"].value_counts(normalize=True).to_dict()

    summary = {
        "N_examples": int(len(per_example_df)),        # 평가에 사용된 전체 ToTTo example(테이블+생성문) 수
        "N_claims": int(len(per_claim_df)),            # 전체 검증 claim 수 (현재는 example당 1문장 → N_examples와 거의 동일)

        "Model_TSS_macro": float(macro_tss),            # (Macro) example 단위 평균 Table Support Score
                                                        # = 샘플 하나당 평균적으로 표에 의해 지지되는 비율

        "Model_HR_macro": float(macro_hr),              # (Macro) example 단위 평균 Hallucination Rate
                                                        # = 1 - Model_TSS_macro

        "Model_TSS_micro": float(micro_tss),            # (Micro) claim 단위 전체 평균 Table Support Score
                                                        # = 전체 claim 중 표에 의해 지지되는 비율

        "Model_HR_micro": float(micro_hr),              # (Micro) claim 단위 전체 Hallucination Rate
                                                        # = 1 - Model_TSS_micro

        "Model_TSS_num": float(tss_num),                # 숫자 포함 claim에 가중치(2)를 둔 weighted Table Support Score
                                                        # = 숫자 환각에 더 민감한 사실성 지표

        "Model_HR_num": float(hr_num),                  # 숫자 가중 hallucination rate
                                                        # = 1 - Model_TSS_num

        "EntailProb_mean": float(entail_prob_mean),     # verifier가 Entailed 클래스로 판단한 평균 확률
                                                        # = 모델 사실 판단의 평균 확신도(confidence)

        "LabelDist": label_dist,                        # verifier 예측 라벨 분포 (Entailed / Refused 비율)
                                                        # = 전체 생성문 중 표 기반/비표 기반 비율 요약
    }
    return summary

summary = summarize_model_scores(per_claim_df, per_example_df)
summary

{'N_examples': 20,
 'N_claims': 20,
 'Model_TSS_macro': 0.75,
 'Model_HR_macro': 0.25,
 'Model_TSS_micro': 0.75,
 'Model_HR_micro': 0.25,
 'Model_TSS_num': 0.7631578947368421,
 'Model_HR_num': 0.23684210526315785,
 'EntailProb_mean': 0.7577358731463392,
 'LabelDist': {'Entailed': 0.75, 'Refused': 0.25}}

## 8. 전체 평가 + CSV 저장

GPU로 돌리면 훨씬 빠릅니다.

In [35]:
# 전체 평가
per_claim_df, per_example_df = evaluate(preds, tables, max_examples=None, max_claims_per_example=None)

# 저장
per_claim_df.to_csv(OUT_PER_CLAIM_CSV, index=False)
per_example_df.to_csv(OUT_PER_EXAMPLE_CSV, index=False)

print('saved:', OUT_PER_CLAIM_CSV)
print('saved:', OUT_PER_EXAMPLE_CSV)

# 요약
summary = {
    'num_examples_scored': int(per_example_df['tss'].notna().sum()),
    'avg_tss': float(per_example_df['tss'].dropna().mean()) if per_example_df['tss'].notna().any() else None,
    'avg_hr': float(per_example_df['hr'].dropna().mean()) if per_example_df['hr'].notna().any() else None,
    'avg_tss_num': float(per_example_df['tss_num'].dropna().mean()) if per_example_df['tss_num'].notna().any() else None,
}
summary


Evaluating: 100%|██████████| 7700/7700 [04:30<00:00, 28.50it/s]


saved: /content/drive/MyDrive/nlp_project_02/data/tapex_eval_per_claim.csv
saved: /content/drive/MyDrive/nlp_project_02/data/tapex_eval_per_example.csv


{'num_examples_scored': 7700,
 'avg_tss': 0.8020779220779221,
 'avg_hr': 0.19792207792207792,
 'avg_tss_num': 0.8020779220779221}

## 9. (선택) 평가 신뢰도 체크/디버깅

- mismatch가 많으면 **원본 테이블 split을 잘못 잡은 것**일 확률이 큽니다.
- predictions의 example_id 일부를 출력해 원본 테이블에 존재하는지 확인하세요.

In [36]:
# mismatch 점검
some_ids = list(preds.keys())[:10]
for eid in some_ids:
    print(eid, 'IN_TABLES' if eid in tables else 'MISSING')

7391450717765563190 IN_TABLES
9012083751335522596 IN_TABLES
-8764917516249435941 IN_TABLES
-6915287003153277224 IN_TABLES
-3004901021745997743 IN_TABLES
9095314032876340546 IN_TABLES
6803794595179672650 IN_TABLES
-2706081572458524575 IN_TABLES
3877051823034175640 IN_TABLES
6364030237891034315 IN_TABLES
