In [None]:
!pip install -q transformers peft accelerate datasets

from google.colab import drive
drive.mount('/content/drive')

import os, json
from typing import List, Dict, Any

DATA_DIR = "/content/drive/MyDrive/FinQA/dataset"

TRAIN_PATH = os.path.join(DATA_DIR, "train.json")
DEV_PATH = os.path.join(DATA_DIR, "dev.json")
TEST_PATH = os.path.join(DATA_DIR, "test.json")
PRIVATE_TEST_PATH = os.path.join(DATA_DIR, "private_test.json")

print("Check files:")
for p in [TRAIN_PATH, DEV_PATH, TEST_PATH, PRIVATE_TEST_PATH]:
    print(p, os.path.exists(p))

In [None]:
def load_json(path: str):
    with open(path, "r") as f:
        data = json.load(f)
    print(f"Loaded {len(data)} samples from {path}")
    return data

train_raw = load_json(TRAIN_PATH)
dev_raw = load_json(DEV_PATH)
test_raw = load_json(TEST_PATH)

In [None]:
from datasets import Dataset

def build_sft_record(sample: Dict[str, Any]) -> Dict[str, str]:
    q = sample["qa"]["question"]
    mi = sample["qa"]["model_input"]  # list of [id, text]

    evidences = []
    for i, (_id, text) in enumerate(mi):
        t = " ".join(text.split())
        evidences.append(f"[{i}] {t}")
    ev_block = "\n".join(evidences)

    gold = sample["qa"]["exe_ans"]  # float

    system = "You are a financial numerical reasoning expert."
    user = f"""Question:
{q}

Evidence:
{ev_block}

Please compute the final numeric answer.
Always end your reply with:
Final answer: <number>"""

    assistant = f"Final answer: {gold}"

    return {
        "system": system,
        "user": user,
        "assistant": assistant,
    }

train_records = [build_sft_record(s) for s in train_raw]
len(train_records), train_records[0]

In [None]:
from transformers import AutoTokenizer

MODEL_NAME_MATH = "Qwen/Qwen2.5-7B-Instruct"

tokenizer_math = AutoTokenizer.from_pretrained(
    MODEL_NAME_MATH,
    trust_remote_code=True,
)

train_ds = Dataset.from_list(train_records)

def add_text(example):
    messages = [
        {"role": "system", "content": example["system"]},
        {"role": "user", "content": example["user"]},
        {"role": "assistant", "content": example["assistant"]},
    ]
    text = tokenizer_math.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=False,
    )
    return {"text": text}

train_ds = train_ds.map(add_text)
print(train_ds[0]["text"][:600])

In [None]:
import torch
from transformers import AutoModelForCausalLM
from peft import LoraConfig, get_peft_model

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

model_math_base = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME_MATH,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True,
)

model_math_base.gradient_checkpointing_enable()

lora_config = LoraConfig(
    r=32,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM",
)

model_math = get_peft_model(model_math_base, lora_config)
model_math.print_trainable_parameters()

In [None]:
from transformers import TrainingArguments, Trainer

MAX_LENGTH = 768

def data_collator(batch):
    texts = [b["text"] for b in batch]
    enc = tokenizer_math(
        texts,
        padding=True,
        truncation=True,
        max_length=MAX_LENGTH,
        return_tensors="pt",
    )
    enc["labels"] = enc["input_ids"].clone()
    return enc

train_ds_full = train_ds
print("Train size:", len(train_ds_full))

OUTPUT_DIR = "/content/qwen2p5_7b_finqa_lora_v2"

training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=8,
    num_train_epochs=2,
    learning_rate=2e-4,
    fp16=True,
    logging_steps=20,
    save_steps=1000,
    save_total_limit=1,
    report_to="none",
    remove_unused_columns=False,
)

trainer = Trainer(
    model=model_math,
    args=training_args,
    train_dataset=train_ds_full,
    data_collator=data_collator,
)

trainer.train()

In [None]:
LORA_SAVE_DIR = "/content/qwen2p5_7b_finqa_lora_v2"
model_math.save_pretrained(LORA_SAVE_DIR)
tokenizer_math.save_pretrained(LORA_SAVE_DIR)
print("LoRA adapter saved to:", LORA_SAVE_DIR)

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

RETRIEVER_NAME = "Qwen/Qwen2.5-7B-Instruct"
retriever_tokenizer = AutoTokenizer.from_pretrained(
    RETRIEVER_NAME,
    trust_remote_code=True,
)
retriever_model = AutoModelForCausalLM.from_pretrained(
    RETRIEVER_NAME,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True,
)
retriever_model.eval()

gen_base_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME_MATH,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True,
)

gen_model = PeftModel.from_pretrained(gen_base_model, LORA_SAVE_DIR)
gen_model.eval()

gen_tokenizer = AutoTokenizer.from_pretrained(
    LORA_SAVE_DIR,
    trust_remote_code=True,
)

In [None]:
import re
import textwrap

@torch.no_grad()
def retriever_generate(prompt: str, max_new_tokens: int = 64) -> str:
    inputs = retriever_tokenizer(prompt, return_tensors="pt").to(retriever_model.device)
    out_ids = retriever_model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False,
        pad_token_id=retriever_tokenizer.eos_token_id,
    )
    out = retriever_tokenizer.decode(out_ids[0], skip_special_tokens=True)
    if out.startswith(prompt):
        out = out[len(prompt):]
    return out.strip()

def build_retriever_prompt(question: str, candidates: List[str]) -> str:
    cand_lines = []
    for i, text in enumerate(candidates):
        t = " ".join(text.split())
        cand_lines.append(f"[{i}] {t}")
    cand_block = "\n".join(cand_lines)
    prompt = textwrap.dedent(
        "You help solve financial numerical reasoning questions.\n\n"
        "Given a question and a list of candidate evidence texts, select ALL evidence\n"
        "sentences that are directly needed to compute the numeric answer.\n\n"
        "Only output a comma-separated list of indices.\n"
        "Do NOT output anything else.\n\n"
        "Question:\n"
        + question
        + "\n\nCandidate evidence:\n"
        + cand_block
        + "\n\nAnswer (indices only):"
    ).strip()
    return prompt

def parse_indices(s: str, max_idx: int) -> List[int]:
    nums = re.findall(r"\d+", s)
    ids = []
    for t in nums:
        i = int(t)
        if 0 <= i < max_idx:
            ids.append(i)
    return sorted(set(ids))

def select_evidence(sample: Dict[str, Any]) -> List[str]:
    q = sample["qa"]["question"]
    mi = sample["qa"]["model_input"]
    candidates = [t for (_id, t) in mi]
    prompt = build_retriever_prompt(q, candidates)
    raw = retriever_generate(prompt)
    idxs = parse_indices(raw, len(candidates))
    if not idxs:
        idxs = list(range(min(5, len(candidates))))
    selected = [candidates[i] for i in idxs]
    return selected

In [None]:
@torch.no_grad()
def qwen7b_answer_generate(
    question: str,
    evidence_texts: List[str],
    tokenizer,
    model,
    max_new_tokens: int = 128,
) -> str:
    ev_block = "\n".join("Evidence: " + " ".join(e.split()) for e in evidence_texts)
    system_prompt = (
        "You are a financial numerical reasoning expert. "
        "Given a question and evidence sentences, you must:\n"
        "1) Identify exactly which numbers are needed.\n"
        "2) Write out the calculation step by step.\n"
        "3) Pay very careful attention to UNITS and SCALE.\n"
        "- If the question asks for a percentage, decide clearly whether the answer\n"
        "  should be a raw ratio (e.g., 0.12) or a percent (e.g., 12.0), and DO NOT\n"
        "  arbitrarily multiply or divide by 100.\n"
        "- Do NOT just copy a number from the evidence. Always perform the required\n"
        "  addition/subtraction/multiplication/division.\n"
        "You may show reasoning steps, but MUST end your response with exactly:\n"
        "Final answer: <number>\n"
        "Only output one final numeric value in the 'Final answer' line."
    )
    user_content = (
        "Question:\n"
        + question
        + "\n\nEvidence:\n"
        + ev_block
        + "\n\nPlease compute the final numeric answer.\n"
        + "Always end with:\n"
        + "Final answer: <number>\n"
    )
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_content},
    ]
    chat_inputs = tokenizer.apply_chat_template(
        messages,
        tokenize=True,
        add_generation_prompt=True,
        return_tensors="pt",
    ).to(model.device)
    input_ids = chat_inputs
    attention_mask = torch.ones_like(input_ids)
    input_len = input_ids.shape[1]
    out_ids = model.generate(
        input_ids,
        attention_mask=attention_mask,
        max_new_tokens=max_new_tokens,
        do_sample=False,
        pad_token_id=tokenizer.eos_token_id,
    )
    gen_ids = out_ids[0, input_len:]
    out = tokenizer.decode(gen_ids, skip_special_tokens=True)
    return out.strip()

def extract_final_number(output: str):
    m = re.search(r"Final answer\s*:\s*([-+]?\d*\.?\d+)", output, flags=re.IGNORECASE)
    if m:
        return float(m.group(1))
    nums = re.findall(r"[-+]?\d*\.\d+|[-+]?\d+", output)
    if nums:
        return float(nums[-1])
    return None

def generate_answer(sample: Dict[str, Any], selected_texts: List[str]):
    q = sample["qa"]["question"]
    raw = qwen7b_answer_generate(
        question=q,
        evidence_texts=selected_texts,
        tokenizer=gen_tokenizer,
        model=gen_model,
        max_new_tokens=128,
    )
    pred = extract_final_number(raw)
    return pred, raw

In [None]:
def evaluate_subset(data: List[Dict[str, Any]], n_samples: int = 20):
    n_samples = min(n_samples, len(data))
    total, correct = 0, 0
    info = []
    for idx in range(n_samples):
        sample = data[idx]
        gold = sample["qa"]["exe_ans"]
        selected = select_evidence(sample)
        pred, raw_output = generate_answer(sample, selected)
        is_ok = False
        if pred is not None:
            try:
                if abs(pred - gold) <= 1e-2:
                    is_ok = True
            except TypeError:
                is_ok = False
        total += 1
        if is_ok:
            correct += 1
        print(f"[{idx}] pred={pred}, gold={gold}, correct={is_ok}")
        print("Selected evidence:")
        for s in selected:
            print("  -", " ".join(s.split()))
        print("RAW LLM:", raw_output[:400], "\n")
        info.append({
            "idx": idx,
            "pred": pred,
            "gold": gold,
            "selected": selected,
            "raw": raw_output,
            "correct": is_ok,
        })
    acc = correct / total if total > 0 else 0.0
    print(f"\nExecution Accuracy: {correct}/{total} = {acc:.4f}")
    return acc, info

acc_lora_20, info_lora_20 = evaluate_subset(dev_raw, n_samples=20)

In [None]:
zs_tokenizer = AutoTokenizer.from_pretrained(
    MODEL_NAME_MATH,
    trust_remote_code=True,
)

zs_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME_MATH,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True,
)
zs_model.eval()

@torch.no_grad()
def generate_answer_zeroshot(sample: Dict[str, Any], selected_texts: List[str]):
    q = sample["qa"]["question"]
    raw = qwen7b_answer_generate(
        question=q,
        evidence_texts=selected_texts,
        tokenizer=zs_tokenizer,
        model=zs_model,
        max_new_tokens=128,
    )
    pred = extract_final_number(raw)
    return pred, raw

def evaluate_subset_zeroshot(data: List[Dict[str, Any]], n_samples: int = 20):
    n_samples = min(n_samples, len(data))
    total, correct = 0, 0
    info = []
    for idx in range(n_samples):
        sample = data[idx]
        gold = sample["qa"]["exe_ans"]
        selected = select_evidence(sample)
        pred, raw_output = generate_answer_zeroshot(sample, selected)
        is_ok = False
        if pred is not None:
            try:
                if abs(pred - gold) <= 1e-2:
                    is_ok = True
            except TypeError:
                is_ok = False
        total += 1
        if is_ok:
            correct += 1
        print(f"[ZS {idx}] pred={pred}, gold={gold}, correct={is_ok}")
        print("Selected evidence:")
        for s in selected:
            print("  -", " ".join(s.split()))
        print("RAW LLM (zero-shot):", raw_output[:400], "\n")
        info.append({
            "idx": idx,
            "pred": pred,
            "gold": gold,
            "selected": selected,
            "raw": raw_output,
            "correct": is_ok,
        })
    acc = correct / total if total > 0 else 0.0
    print(f"\n[Zero-shot] Execution Accuracy: {correct}/{total} = {acc:.4f}")
    return acc, info

acc_zs_20, info_zs_20 = evaluate_subset_zeroshot(dev_raw, n_samples=20)
print("Zero-shot acc_20 =", acc_zs_20, "Finetuned acc_20 =", acc_lora_20)

In [None]:
acc_lora_50, info_lora_50 = evaluate_subset(dev_raw, n_samples=50)
print("Finetuned LoRA on 50 dev examples, accuracy =", acc_lora_50)