In [None]:
# =========================================
# 1. 라이브러리 불러오기
# =========================================
import pandas as pd
import re
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from tqdm import tqdm
from transformers import BertTokenizer, BertForSequenceClassification

# =========================================
# 2. 텍스트 전처리 함수 정의
# =========================================
def clean_korean_text(text):
    """
    한국어 텍스트 전처리 함수
    - 한글, 숫자, 공백, 문장부호(.,?! 제외)는 모두 제거
    - 반복되는 'ㅋ', 'ㅎ'은 각각 'ㅋㅋ', 'ㅎㅎ'로 통일
    - 같은 글자가 3번 이상 반복되면 2번으로 축소
    - 다중 공백 제거
    """
    if pd.isnull(text):  # NaN 값 방지
        return ""
    text = re.sub(r"[^가-힣0-9\s.,?!]", " ", text)   # 허용되지 않는 문자 제거
    text = re.sub(r"(ㅋ)\1+", "ㅋㅋ", text)          # ㅋㅋㅋ -> ㅋㅋ
    text = re.sub(r"(ㅎ)\1+", "ㅎㅎ", text)          # ㅎㅎㅎ -> ㅎㅎ
    text = re.sub(r"(.)\1{2,}", r"\1\1", text)      # aaa -> aa
    text = re.sub(r"\s+", " ", text).strip()        # 다중 공백 제거
    return text


# =========================================
# 3. 데이터 불러오기 및 전처리
# =========================================
# Train 데이터 로드
train = pd.read_csv("merged_train.csv")
train["conversation"] = train["conversation"].astype(str).apply(clean_korean_text)

# Test 데이터 로드
test = pd.read_csv("test.csv")
test["text"] = test["text"].astype(str).apply(clean_korean_text)


# =========================================
# 4. 학습/검증 데이터 분리 (Stratified Split)
# =========================================
train_df, valid_df = train_test_split(
    train,
    test_size=0.2,             # 80:20 비율
    random_state=42,           # 재현성 고정
    stratify=train["class"]    # 클래스 비율 유지
)


# =========================================
# 5. 토크나이저 정의
# =========================================
tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased")


# =========================================
# 6. 데이터셋 인코딩 함수
# =========================================
def encode_dataset(texts, labels, tokenizer, max_len=256):
    """
    텍스트와 라벨을 토크나이저로 변환
    - padding: max_length까지 패딩
    - truncation: 길이 초과 시 자르기
    - return_tensors: PyTorch 텐서 반환
    """
    encodings = tokenizer(
        texts.tolist(),
        padding="max_length",
        truncation=True,
        max_length=max_len,
        return_tensors="pt"
    )
    encodings["labels"] = torch.tensor(labels.tolist())
    return encodings


# Train/Valid 데이터 인코딩
train_encodings = encode_dataset(train_df["conversation"], train_df["class"], tokenizer)
valid_encodings = encode_dataset(valid_df["conversation"], valid_df["class"], tokenizer)


# =========================================
# 7. Dataset 클래스 정의
# =========================================
class ConversationDataset(Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __len__(self):
        return len(self.encodings["input_ids"])

    def __getitem__(self, idx):
        return {key: val[idx] for key, val in self.encodings.items()}


# =========================================
# 8. DataLoader 정의
# =========================================
train_dataset = ConversationDataset(train_encodings)
valid_dataset = ConversationDataset(valid_encodings)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)


# =========================================
# 9. 모델 정의 (BERT 분류기)
# =========================================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BertForSequenceClassification.from_pretrained(
    "bert-base-multilingual-cased",
    num_labels=5   # 클래스 개수
).to(device)


# =========================================
# 10. Optimizer & Loss 정의
# =========================================
optimizer = AdamW(
    model.parameters(),
    lr=2e-5,              # 학습률
    betas=(0.9, 0.999),   # 1차, 2차 모멘트
    eps=1e-8,             # 수치 안정성
    weight_decay=0.01     # 가중치 감쇠 (정규화 효과)
)

criterion = torch.nn.CrossEntropyLoss()


# =========================================
# 11. 학습 루프
# =========================================
epochs = 3

for epoch in range(epochs):
    # ----------- Train Loop -----------
    model.train()
    total_loss, total_correct = 0, 0
    all_preds, all_labels = [], []   # F1 점수 계산용

    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1} Training"):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits

        loss.backward()
        optimizer.step()

        # 통계 계산
        total_loss += loss.item()
        preds = logits.argmax(dim=1)
        total_correct += (preds == labels).sum().item()

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(train_loader)
    avg_acc = total_correct / len(train_dataset)
    avg_f1 = f1_score(all_labels, all_preds, average="macro")

    print(f"Epoch {epoch+1} | Train Loss: {avg_loss:.4f}, Train Acc: {avg_acc:.4f}, Train F1: {avg_f1:.4f}")

    # ----------- Validation Loop -----------
    model.eval()
    val_loss, val_correct = 0, 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch in tqdm(valid_loader, desc=f"Epoch {epoch+1} Validation"):
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            val_loss += outputs.loss.item()

            preds = outputs.logits.argmax(dim=1)
            val_correct += (preds == labels).sum().item()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_val_loss = val_loss / len(valid_loader)
    avg_val_acc = val_correct / len(valid_dataset)
    val_f1 = f1_score(all_labels, all_preds, average="macro")

    print(f"Epoch {epoch+1} | Val Loss: {avg_val_loss:.4f}, Val Acc: {avg_val_acc:.4f}, Val F1: {val_f1:.4f}")


# =========================================
# 12. 학습 완료 후 모델 저장
# =========================================
save_path = "./saved_mbert_model"

model.save_pretrained(save_path)
tokenizer.save_pretrained(save_path)


# =========================================
# 13. 저장된 모델 불러오기
# =========================================
save_path = "./saved_mbert_model"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BertForSequenceClassification.from_pretrained(save_path).to(device)
tokenizer = BertTokenizer.from_pretrained(save_path)
model.eval()


# =========================================
# 14. 예측 함수 정의
# =========================================
def predict(texts, model, tokenizer, max_len=256):
    """
    입력 텍스트 리스트에 대해 예측 수행
    - texts: pandas Series or list
    - 반환값: 예측된 클래스 번호 배열
    """
    encodings = tokenizer(
        texts.tolist(),
        padding="max_length",
        truncation=True,
        max_length=max_len,
        return_tensors="pt"
    ).to(device)

    with torch.no_grad():
        outputs = model(**encodings)
        preds = outputs.logits.argmax(dim=1).cpu().numpy()
    return preds


# =========================================
# 15. Test 데이터 예측 후 제출 파일 생성
# =========================================
preds = predict(test["text"], model, tokenizer)

# 샘플 제출 파일 불러오기
submission = pd.read_csv("submission.csv")

# 예측 라벨 덮어쓰기
submission["class"] = preds

# 저장
submission.to_csv("mbert.csv", index=False, encoding="utf-8-sig")
print("✅ mbert.csv 저장 완료")