#1) 필요 install 및 import, 환경설정

In [None]:
# 1) 필요 install 및 import, 환경설정
!pip -q install "transformers>=4.40.0" "accelerate>=0.26.0" datasets sentencepiece evaluate sacrebleu torchmetrics rapidfuzz

# NeuSpell은 환경에 따라 설치가 까다로울 수 있어 '가능하면' 설치 시도
try:
    !pip -q install neuspell
    NEUSPELL_AVAILABLE = True
except Exception:
    NEUSPELL_AVAILABLE = False

import os, random, warnings, difflib
from typing import List, Dict, Tuple

import torch
import pandas as pd
from datasets import Dataset as HFDataset
from sklearn.model_selection import train_test_split

from transformers import (
    AutoTokenizer, AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq,
    Seq2SeqTrainingArguments, Seq2SeqTrainer, TrainerCallback
)

import evaluate
from torchmetrics.text import CharErrorRate
from rapidfuzz.distance import Levenshtein as RLev

# 환경 설정
warnings.filterwarnings("ignore")
os.environ["WANDB_DISABLED"] = "true"   # wandb 팝업 방지
SEED = 42
random.seed(SEED); torch.manual_seed(SEED)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)


#2) Google Drive mount

In [None]:
# 2) google drive mount
from google.colab import drive
drive.mount('/content/drive')

CSV_PATH = "경로 입력"  # 주어진 경로
assert os.path.exists(CSV_PATH), f"CSV not found at {CSV_PATH}"
print("OK:", CSV_PATH)


#3) NeuSpell 진행시 필요한 model(+tokenizer가 필요할 경우만)

In [None]:
# 3) NeuSpell 진행시 필요한 model (tokenizer 불필요)
def build_neuspell_checker():
    if not NEUSPELL_AVAILABLE:
        return None
    try:
        # 가장 일반적인 예: Semi-Character LSTM
        from neuspell import SclSTMChecker
        checker = SclSTMChecker()
        checker.from_pretrained()  # 기본 영어 체크포인트
        return checker
    except Exception:
        try:
            # 백업: BERT 기반
            from neuspell import BertChecker
            checker = BertChecker()
            checker.from_pretrained("bert-base-cased")
            return checker
        except Exception:
            return None

NEUSPELL = build_neuspell_checker()
print("NeuSpell loaded:", NEUSPELL is not None)


#4) NeuSpell 모델 진행 시 필요한 적용점 적용

In [None]:
# 4) NeuSpell 적용점 (전처리 함수)
def neuspell_preclean(texts: List[str]) -> List[str]:
    """NeuSpell로 철자/띄어쓰기 교정. 사용 불가면 원문 통과."""
    if NEUSPELL is None:
        return texts
    try:
        return NEUSPELL.correct_strings(texts)
    except Exception:
        return texts

#5) ByT5 진행 시 필요한 model과 필요한 것 load

In [None]:
# 5) ByT5 로드
MODEL_NAME = "google/byt5-base"   # 메모리 작으면 'google/byt5-small' 권장
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
model     = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME).to(DEVICE)
print("ByT5 loaded:", MODEL_NAME)

#6) ByT5 모델 진행 시 필요한 적용점 적용

In [None]:
# 6) ByT5 적용점
GEN_BEAMS = 1
MAX_SOURCE_LEN = 96
MAX_TARGET_LEN = 96

# 학습에서 메모리 절약
model.config.use_cache = False                  # 학습 시 decoder cache 비활성화
# model.gradient_checkpointing_enable()         # 필요 시 메모리 추가 절약

model.config.update({
    "num_beams": GEN_BEAMS,
    "max_length": MAX_TARGET_LEN
})

#7) 데이터 로드 및 전처리 과정 (9:1 split, NeuSpell 전처리)

In [None]:
# 7) 데이터 로드 및 전처리
df = pd.read_csv(CSV_PATH)
assert {"noise","clean"} <= set(df.columns), "CSV must have columns: noise, clean"
df["noise"] = df["noise"].fillna("").astype(str)
df["clean"] = df["clean"].fillna("").astype(str)

# NeuSpell 전처리
df["noise_preclean"] = neuspell_preclean(df["noise"].tolist())

# 9:1 split
train_df, val_df = train_test_split(df, test_size=0.05, random_state=SEED, shuffle=True)

# HF Datasets 변환
train_hf = HFDataset.from_pandas(train_df[["noise_preclean","clean"]], preserve_index=False)
val_hf   = HFDataset.from_pandas(val_df[["noise_preclean","clean"]],   preserve_index=False)

len(train_hf), len(val_hf)

#8) 전처리 함수 및 데이터 콜레이터

In [None]:
# 8) 전처리 함수 및 데이터 콜레이터
def preprocess_fn(batch: Dict[str, List[str]]) -> Dict[str, List[int]]:
    inputs  = batch["noise_preclean"]
    targets = batch["clean"]

    model_inputs = tokenizer(inputs, max_length=MAX_SOURCE_LEN, truncation=True, padding=False)
    with tokenizer.as_target_tokenizer():
        labels = tokenizer(targets, max_length=MAX_TARGET_LEN, truncation=True, padding=False)
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

train_tok = train_hf.map(preprocess_fn, batched=True, remove_columns=train_hf.column_names)
val_tok   = val_hf.map(preprocess_fn,   batched=True, remove_columns=val_hf.column_names)

collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model, padding="longest")

#9) 평가 지표 로드 (필요 시 추가 메트릭 가능)

In [None]:
!pip install jiwer

In [None]:
import evaluate
chrf = evaluate.load("chrf")
cer_metric = evaluate.load("cer")

from sklearn.metrics import f1_score

def _char_prf1_acc(pred_texts, label_texts):
    # 문자 단위 얼라인으로 TP/FP/FN 집계 + 정확도
    import difflib
    TP = FP = FN = TN = 0
    total_match = 0
    total_char = 0

    for p, g in zip(pred_texts, label_texts):
        sm = difflib.SequenceMatcher(None, list(g), list(p))
        for tag, i1, i2, j1, j2 in sm.get_opcodes():
            if tag == "equal":
                # 일치 문자는 TP로 처리 (교정과제의 맞춘 문자)
                length = (i2 - i1)
                TP += length
                total_match += length
                total_char += length
            elif tag == "replace":
                # 대체: g→p 길이를 기준으로 FP/FN 동시 발생으로 취급
                FP += (j2 - j1)
                FN += (i2 - i1)
                total_char += max(i2 - i1, j2 - j1)
            elif tag == "delete":
                # 정답에만 있고 예측에는 없음 → FN
                FN += (i2 - i1)
                total_char += (i2 - i1)
            elif tag == "insert":
                # 예측에만 있음 → FP
                FP += (j2 - j1)
                total_char += (j2 - j1)

    precision = TP / (TP + FP + 1e-8)
    recall    = TP / (TP + FN + 1e-8)
    f1        = 2 * precision * recall / (precision + recall + 1e-8)
    char_acc  = total_match / max(1, total_char)
    return precision, recall, f1, char_acc


def compute_metrics(eval_pred):
    preds, labels = eval_pred
    if isinstance(preds, tuple):
        preds = preds[0]

    import numpy as np
    # 3D 로짓이면 argmax
    if hasattr(preds, "ndim") and preds.ndim == 3:
        pred_ids = preds.argmax(-1)
    else:
        pred_ids = preds

    # -100 → pad id 복원
    labels_np = np.where(labels != -100, labels, tokenizer.pad_token_id)

    # [수정] 원래 labels 복원 대신 labels_np 사용
    pred_texts = tokenizer.batch_decode(np.array(pred_ids), skip_special_tokens=True)
    label_texts = tokenizer.batch_decode(labels_np, skip_special_tokens=True)
    # 기존 메트릭
    chrf_res = chrf.compute(predictions=pred_texts, references=label_texts, beta=1)
    chrf_f1  = chrf_res["score"] / 100.0
    cer_sum  = sum(cer_metric.compute(predictions=[p], references=[g]) for p, g in zip(pred_texts, label_texts))
    cer      = cer_sum / max(1, len(pred_texts))
    exact    = sum(p == g for p, g in zip(pred_texts, label_texts)) / max(1, len(pred_texts))

    # === 추가: 문자 단위 Precision/Recall/F1/Accuracy ===
    prec, rec, f1, char_acc = _char_prf1_acc(pred_texts, label_texts)

    return {
        "char_f1": f1,          # 요청하신 F1
        "char_accuracy": char_acc  # 요청하신 accuracy(문자 단위)
    }

#10) trainer 정의 (epoch 단위 로깅/평가/저장 + 콜백으로 예쁘게 출력)

In [None]:
!pip install -U "transformers>=4.40.0" "accelerate>=0.26.0"

In [None]:
import transformers, inspect
from transformers import Seq2SeqTrainingArguments, TrainingArguments

# 10) trainer 정의 (변경분만)
BATCH_SIZE  = 2
NUM_EPOCHS  = 2
LR          = 5e-6
OUTPUT_DIR = "출력 경로 입력"

# 수우정
if not os.path.exists(OUTPUT_DIR):
  os.makedirs(OUTPUT_DIR)
# 수정 끄읏

PRED_LOG = []              # [{'epoch': 1, 'input': ..., 'preclean': ..., 'edit_src_word': ..., 'edit_new_word': ..., 'output': ...}, ...]
MAX_VAL_SAVE = -1          # -1 이면 검증셋 전체 저장, 양이 많으면 200 등으로 제한
# 구버전 호환: evaluation_strategy 대신 do_train/do_eval 사용
args = Seq2SeqTrainingArguments(
    output_dir=OUTPUT_DIR,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    learning_rate=LR,
    num_train_epochs=NUM_EPOCHS,

    # evaluation_strategy="steps",
    # logging_strategy="steps",
    do_train=True,
    do_eval=True,          # <-- 구버전에서 평가 활성화
    logging_steps=500,
    save_steps=1000,
    eval_steps=1000,

    save_total_limit=2,

    predict_with_generate=True,   # 평가시 생성
    generation_max_length=MAX_TARGET_LEN,
    generation_num_beams=4,
    fp16=False,
    report_to="none",
    seed=SEED,
)


trainer = Seq2SeqTrainer(
    model=model,
    args=args,
    train_dataset=train_tok,
    eval_dataset=val_tok,
    data_collator=collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# word-level 첫 변경 단어쌍 추정 (기존 그대로)
def first_changed_word_pair(src: str, tgt: str) -> Tuple[str, str]:
    import difflib
    src_words = src.split()
    tgt_words = tgt.split()
    sm = difflib.SequenceMatcher(None, src_words, tgt_words)
    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        if tag == "replace":
            if i1 < len(src_words) and j1 < len(tgt_words):
                return (src_words[i1], tgt_words[j1])
        elif tag == "delete":
            if i1 < len(src_words):
                return (src_words[i1], "")
        elif tag == "insert":
            if j1 < len(tgt_words):
                return ("", tgt_words[j1])
    return ("-", "-")

# ▼ 변경: 콘솔 출력 + DataFrame 로그 누적
class EpochEndReporter(TrainerCallback):
    def on_epoch_end(self, args, state, control, **kwargs):
        # train/eval loss 요약
        train_subset = train_tok.select(range(min(1024, len(train_tok))))
        train_metrics = trainer.evaluate(eval_dataset=train_subset, metric_key_prefix="train")
        eval_metrics  = trainer.evaluate(metric_key_prefix="eval")
        print(f"\n[Epoch {int(state.epoch)}/{args.num_train_epochs}] "
              f"train_loss={train_metrics.get('train_loss'):.4f} | "
              f"eval_loss={eval_metrics.get('eval_loss'):.4f}")

        # ▼ DataFrame 저장용 로그 누적 (검증셋 전체 또는 샘플)
        if MAX_VAL_SAVE == -1:
            iter_df = val_df
        else:
            iter_df = val_df.sample(n=min(MAX_VAL_SAVE, len(val_df)), random_state=SEED)

        for _, row in iter_df.iterrows():
            src = str(row["noise"])
            pre = neuspell_preclean([src])[0]

            # 생성 (predict_with_generate=False 이므로 수동 generate)
            inputs = tokenizer(pre, return_tensors="pt", truncation=True, max_length=MAX_SOURCE_LEN).to(model.device)
            with torch.no_grad():
                gen_ids = model.generate(**inputs, num_beams=GEN_BEAMS, max_length=MAX_TARGET_LEN)
            out = tokenizer.decode(gen_ids[0], skip_special_tokens=True)

            ori_w, new_w = first_changed_word_pair(src, out)

            PRED_LOG.append({
                "epoch": int(state.epoch),
                "input": src,
                "preclean": pre,
                "edit_src_word": ori_w,
                "edit_new_word": new_w,
                "output": out,
            })

STEP_LOG = []  # [{'step': int, 'loss': float}, {'step': int, 'eval_loss': float}, ...]

class StepLogger(TrainerCallback):
    def on_log(self, args, state, control, logs=None, **kwargs):
        if not logs:
            return
        row = {"step": state.global_step}
        if "loss" in logs:
            row["loss"] = float(logs["loss"])
        if "eval_loss" in logs:
            row["eval_loss"] = float(logs["eval_loss"])
        STEP_LOG.append(row)

    def on_train_end(self, args, state, control, **kwargs):
      # CSV 저장
      import pandas as pd, os
      df = pd.DataFrame(STEP_LOG)
      csv_path = os.path.join(OUTPUT_DIR, "step_logs.csv")
      df.to_csv(csv_path, index=False, encoding="utf-8")
      print(f"[Saved] {csv_path}")

      # 그래프 저장
      import matplotlib.pyplot as plt
      plt.figure()

        # [수정 시작] train_loss와 eval_loss를 그리는 방식을 분리합니다.

      # 1. Train Loss: loss 값이 있는 모든 행 사용
      if "loss" in df.columns:
        train_df = df.dropna(subset=['loss'])
        plt.plot(train_df["step"], train_df["loss"], label="train_loss")

      # 2. Eval Loss: eval_loss 값이 있는 행만 필터링하여 사용
      if "eval_loss" in df.columns:
        # NaN이 아닌 eval_loss 행만 필터링합니다.
          eval_df = df.dropna(subset=['eval_loss'])
          # 이제 eval_df["step"]과 eval_df["eval_loss"]의 길이는 동일합니다.
          plt.plot(eval_df["step"], eval_df["eval_loss"], label="eval_loss", marker='o')

        # [수정 끝]

      plt.xlabel("step"); plt.ylabel("loss"); plt.legend(); plt.title("Step-wise Loss")
      png_path = os.path.join(OUTPUT_DIR, "step_loss.png")
      plt.savefig(png_path, bbox_inches="tight")
      print(f"[Saved] {png_path}")

# ==== 스텝 단위 수동 평가 콜백 (구버전 호환) ====
EVAL_EVERY = 10  # 원하는 주기(스텝)로 조절

class ManualStepEvaluator(TrainerCallback):
    def __init__(self, eval_every=EVAL_EVERY):
        self.eval_every = eval_every

    def on_step_end(self, args, state, control, **kwargs):
        # global_step이 eval 주기일 때 수동 평가
        if state.global_step > 0 and state.global_step % self.eval_every == 0:
            # 구버전 호환: 전역 trainer 참조로 직접 평가 실행
            metrics = trainer.evaluate(metric_key_prefix="stepeval")
            # 스텝 정보와 함께 수동 로그
            metrics = {f"stepeval_{k}": v for k, v in metrics.items()}
            metrics["stepeval_step"] = state.global_step
            trainer.log(metrics)

# 등록
#trainer.add_callback(ManualStepEvaluator(EVAL_EVERY))
trainer.add_callback(EpochEndReporter())
trainer.add_callback(StepLogger())

#11) 모델 학습 및 검증 (epoch=3, epoch마다 train/eval loss 출력)

In [None]:
# 11) 모델 학습 및 검증 + CSV 저장
train_result = trainer.train()
trainer.save_model(os.path.join(OUTPUT_DIR, "stage1_silver"))
tokenizer.save_pretrained(os.path.join(OUTPUT_DIR, "stage1_silver"))

print("\n[Training Finished]")
print(train_result)

# === (NEW) Save .pt (state_dict) ===
import os, torch

pt_dir = os.path.join(OUTPUT_DIR, "stage1_silver")
os.makedirs(pt_dir, exist_ok=True)

# 메모리·호환성을 위해 CPU 텐서로 변환해 저장
state_dict_cpu = {k: v.to("cpu") for k, v in model.state_dict().items()}
pt_path = os.path.join(pt_dir, "NeuSpell_ByT5_Weight.pt")
torch.save(state_dict_cpu, pt_path)

print(f"[Saved .pt] {pt_path}")


# ▼ 추가: 에폭별 검증 예측 로그를 CSV로 저장
import pandas as pd
pred_df = pd.DataFrame(PRED_LOG, columns=["epoch","input","preclean","edit_src_word","edit_new_word","output"])
save_path = os.path.join(OUTPUT_DIR, "val_predictions_by_epoch2.csv")
pred_df.to_csv(save_path, index=False, encoding="utf-8")
print(f"[Saved] {save_path}  (rows={len(pred_df)})")

In [None]:
# 12) 추론 함수 정의 및 Gradio 실행 (조건 3, 4)
import torch, os
# Gradio 사용을 위한 설치 및 임포트
!pip -q install gradio
import gradio as gr
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

# 이전 셀에서 정의된 변수들을 재참조
MODEL_NAME = "google/byt5-base"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MAX_SOURCE_LEN = 96
MAX_TARGET_LEN = 96
GEN_BEAMS = 4
OUTPUT_DIR = "위와 동일한 경로" # 조건 1의 경로

# 1. 학습된 모델 및 토크나이저 로드 (11번 셀에서 저장된 경로 사용)
CHECKPOINT_DIR = os.path.join(OUTPUT_DIR, "stage1_silver")
PT_PATH = os.path.join(CHECKPOINT_DIR, "NeuSpell_ByT5_Weight.pt")

try:
    # 토크나이저 로드
    tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT_DIR)
    # 기본 모델 구조 로드
    model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME)
    # .pt 파일(state_dict) 로드 및 적용
    state_dict = torch.load(PT_PATH, map_location=DEVICE)
    model.load_state_dict(state_dict)
    model.to(DEVICE).eval()
    print(f"[Model Loaded] Model state loaded from {PT_PATH} and moved to {DEVICE}.")

except Exception as e:
    print(f"[ERROR] 모델 로드 실패: {e}")
    # 모델 로드 실패 시에는 Gradio 실행에 문제가 생길 수 있습니다.


# [조건 3] Inference 함수 정의
def predict_correction(text_input: str) -> str:
    """
    주어진 문장에 대해 철자 교정 예측 결과를 반환합니다.
    """
    if not text_input:
        return "입력 문장이 비어 있습니다."

    # 1. NeuSpell 전처리 (4번 셀의 함수 neuspell_preclean 사용)
    # 이 셀은 새로운 환경(커널)에서도 실행될 수 있으므로,
    # NEUSPELL 변수 및 neuspell_preclean 함수가 정의되어 있다는 가정이 필요합니다.
    try:
        precleaned_text = neuspell_preclean([text_input])[0]
    except NameError:
        precleaned_text = text_input # NeuSpell이 정의되지 않은 경우 원문 사용

    # 2. 토크나이징 및 모델 입력
    inputs = tokenizer(
        precleaned_text,
        return_tensors="pt",
        truncation=True,
        max_length=MAX_SOURCE_LEN
    ).to(DEVICE)

    # 3. 모델 생성 (Inference)
    with torch.no_grad():
        gen_ids = model.generate(
            **inputs,
            num_beams=GEN_BEAMS,
            max_length=MAX_TARGET_LEN
        )

    # 4. 결과 디코딩
    output_text = tokenizer.decode(gen_ids[0], skip_special_tokens=True)

    return output_text


# [조건 4] Gradio UI 통합 및 실행
try:
    iface = gr.Interface(
        fn=predict_correction, # 위에 정의한 추론 함수
        inputs=gr.Textbox(
            label="오류가 있는 문장 입력 (Noise Input)",
            placeholder="예: This is a exammple senntence."
        ),
        outputs=gr.Textbox(
            label="교정된 결과 (Corrected Output)"
        ),
        title="NeuT5 기반 철자 교정 모델 (ByT5)",
        description="학습된 NeuT5 모델을 사용하여 입력 문장의 철자 및 문법을 교정합니다.",
    )

    # Gradio 실행 (Colab 환경에서 public_share=True 권장)
    iface.launch(share=True)

except Exception as e:
    print(f"Gradio 인터페이스 실행에 실패했습니다: {e}")

#빠른 확인용

In [None]:
# ========= Quick Demo =========
if __name__ == "__main__":
    demo_inputs = [
        "He go to school every day.",
        "She has two child.",
        "She is teacher.",
        "He arrived to the airport on time.",
        "I every day go to school.",
        "He told that he was tired.",
        "This is very importent information.",
    ]
    tok_inf, mdl_inf = load_trained_model_for_inference()
    results = correct_sentences(demo_inputs, tok_inf, mdl_inf)
    for r in results:
        print(f"IN : {r['input']}")
        print(f"PRE: {r['preclean']}")
        print(f"OUT: {r['output']}")
        print("-"*40)