## Import

In [1]:
import pandas as pd
import re
import numpy as np
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import GPT2Tokenizer

## 데이터 불러오기

In [2]:
# STEP 1: 데이터 로드
file_path = "/Users/jian_lee/Desktop/aiffel/data/Main_project/ChatbotData.csv"
df = pd.read_csv(file_path)

In [3]:
# 데이터 확인
print("데이터 샘플 확인:")
print(df.head())

데이터 샘플 확인:
                 Q            A  label
0           12시 땡!   하루가 또 가네요.      0
1      1지망 학교 떨어졌어    위로해 드립니다.      0
2     3박4일 놀러가고 싶다  여행은 언제나 좋죠.      0
3  3박4일 정도 놀러가고 싶다  여행은 언제나 좋죠.      0
4          PPL 심하네   눈살이 찌푸려지죠.      0


## 데이터 전처리

In [4]:
# CSV 파일의 실제 컬럼 확인
actual_columns = list(df.columns)
expected_columns = ['Q', 'A']  # 우리가 기대하는 컬럼명

# 'label' 컬럼 자동 제거 및 'Q', 'A' 컬럼 유지 (KeyError 방지)
if set(expected_columns).issubset(actual_columns):  # 'Q'와 'A'가 포함된 경우
    print(f"⚠️ 경고: 불필요한 'label' 컬럼 제거. 실제 컬럼명: {actual_columns} → ['Q', 'A']만 사용")
    df = df[expected_columns]  # 'Q'와 'A' 컬럼만 유지
else:
    raise ValueError(f"❌ 오류: 데이터셋에 필요한 컬럼이 없습니다! {actual_columns}")

# STEP 2: 데이터 전처리 함수 정의
def clean_text(text):
    text = str(text).lower()  # 소문자 변환
    text = re.sub(r"[^a-zA-Z0-9가-힣?.!,]+", " ", text)  # 특수문자 제거
    text = re.sub(r'\s+', ' ', text).strip()  # 공백 정리
    return text

# 텍스트 클리닝 적용
df['Q'] = df['Q'].apply(clean_text)
df['A'] = df['A'].apply(clean_text)

# STEP 3: GPT 모델 입력 형식 변환
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

def format_for_gpt(row):
    return f"<s> {row['Q']} </s> {row['A']} <e>"

df['text'] = df.apply(format_for_gpt, axis=1)  # 'formatted' 대신 'text' 컬럼을 직접 추가
df = df[['text']]  # 'text' 컬럼만 유지하여 저장하도록 변경

# STEP 4: 훈련 데이터와 검증 데이터 분리 (80:20 비율)
train_data, val_data = train_test_split(df, test_size=0.2, random_state=42)

# STEP 5: 데이터 저장 (라벨 저장 오류 방지)
train_data.to_csv("./processed_train.csv", index=False, encoding='utf-8', header=True)
val_data.to_csv("./processed_val.csv", index=False, encoding='utf-8', header=True)

print("✅ 데이터 전처리 완료! 'label' 컬럼 제거 후 파일 저장됨: processed_train.csv, processed_val.csv")

⚠️ 경고: 불필요한 'label' 컬럼 제거. 실제 컬럼명: ['Q', 'A', 'label'] → ['Q', 'A']만 사용
✅ 데이터 전처리 완료! 'label' 컬럼 제거 후 파일 저장됨: processed_train.csv, processed_val.csv


## 모델 정의 및 학습

In [5]:
from transformers import GPT2Tokenizer, GPT2LMHeadModel

# 디바이스 설정 (CUDA > MPS > CPU)
device = torch.device("cuda" if torch.cuda.is_available() 
                      else ("mps" if torch.backends.mps.is_available() else "cpu"))
print(f"✅ Using device: {device}")

# Pretrained GPT-2 모델 및 토크나이저 로드
model_name = "gpt2"
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
model = GPT2LMHeadModel.from_pretrained(model_name).to(device)

# 패딩 토큰 설정 (GPT-2는 기본적으로 padding token이 없음)
tokenizer.pad_token = tokenizer.eos_token

# ChatbotDataset 정의 (기존과 동일)
class ChatbotDataset(Dataset):
    def __init__(self, data, tokenizer, max_len=512):
        if 'text' not in data.columns:
            raise ValueError(f"❌ 오류: 데이터셋에 'text' 컬럼이 없습니다! 실제 컬럼명: {data.columns}")

        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]['text']
        encoding = self.tokenizer(
            text, truncation=True, padding="max_length", max_length=self.max_len, return_tensors="pt"
        )

        input_ids = encoding["input_ids"].squeeze()
        labels = input_ids.clone()
        return {"input_ids": input_ids, "labels": labels}

# 데이터 로드
train_df = pd.read_csv("processed_train.csv")  # CSV 파일 로드
train_dataset = ChatbotDataset(train_df, tokenizer)

# DataLoader 최적화 (Mac 환경에서는 num_workers=0 설정)
num_workers = 4 if device.type == "cuda" else 0
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=num_workers, pin_memory=True)

# 옵티마이저 및 손실 함수 설정
optimizer = optim.AdamW(model.parameters(), lr=5e-5)
criterion = nn.CrossEntropyLoss()

# Fine-tuning (사전학습 모델 활용)
EPOCHS = 3
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0

    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        
        # Hugging Face의 GPT-2는 labels=labels을 직접 입력 가능
        outputs = model(input_ids, labels=labels)
        loss = outputs.loss  # loss를 직접 가져옴
        
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    
    print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader)}")

print("✅ GPT-2 모델 Fine-tuning 완료! 🚀")

✅ Using device: mps


model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

## 샘플링 알고리즘 구현

In [None]:
import torch
import torch.nn.functional as F

def generate_text(model, tokenizer, prompt="<s>", max_length=50, method="greedy", top_k=50, top_p=0.9):
    """
    GPT 모델을 사용하여 텍스트를 생성하는 함수 (Greedy, Top-k, Top-p 지원)
    
    Args:
    model (GPT): 학습된 GPT 모델
    tokenizer (GPT2Tokenizer): GPT 토크나이저
    prompt (str): 초기 입력 문장
    max_length (int): 최대 생성 길이
    method (str): 샘플링 방법 ("greedy", "top_k", "top_p")
    top_k (int): 상위 k개 단어만 선택 (top-k 샘플링)
    top_p (float): 누적 확률이 p 이상이 되는 단어만 선택 (top-p 샘플링)
    
    Returns:
    generated_text (str): 생성된 텍스트
    """
    model.eval()  # 평가 모드
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)  # 입력 인코딩
    
    with torch.no_grad():  # 그래디언트 계산 비활성화
        for _ in range(max_length):
            outputs = model(input_ids)  # 모델 예측
            logits = outputs[:, -1, :]  # 마지막 단어의 예측 확률
            
            if method == "greedy":
                next_token = torch.argmax(logits, dim=-1).unsqueeze(-1)  # 확률이 가장 높은 단어 선택
            elif method == "top_k":
                filtered_logits = top_k_top_p_filtering(logits, top_k=top_k)
                probs = F.softmax(filtered_logits, dim=-1)
                next_token = torch.multinomial(probs, num_samples=1)  # 확률적으로 샘플링
            elif method == "top_p":
                filtered_logits = top_k_top_p_filtering(logits, top_p=top_p)
                probs = F.softmax(filtered_logits, dim=-1)
                next_token = torch.multinomial(probs, num_samples=1)
            else:
                raise ValueError("지원되지 않는 샘플링 방식입니다. ('greedy', 'top_k', 'top_p') 중 선택하세요.")

            input_ids = torch.cat([input_ids, next_token], dim=-1)  # 생성된 단어 추가

            if next_token == tokenizer.eos_token_id:  # 종료 토큰 나오면 중단
                break

    generated_text = tokenizer.decode(input_ids.squeeze(), skip_special_tokens=True)
    return generated_text


def top_k_top_p_filtering(logits, top_k=0, top_p=0.0):
    """
    Top-k 및 Top-p 샘플링을 적용하여 확률 분포를 필터링하는 함수
    
    Args:
    logits (Tensor): 모델이 예측한 확률 분포 (logits)
    top_k (int): 상위 k개 단어만 유지
    top_p (float): 확률 누적 합이 p 이상이 되는 단어만 유지
    
    Returns:
    filtered_logits (Tensor): 필터링된 logits
    """
    if top_k > 0:
        top_k = min(top_k, logits.size(-1))  # k가 단어 수보다 크지 않도록 제한
        indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
        logits[indices_to_remove] = float('-inf')

    if top_p > 0.0:
        sorted_logits, sorted_indices = torch.sort(logits, descending=True)
        cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)

        sorted_indices_to_remove = cumulative_probs > top_p
        sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
        sorted_indices_to_remove[..., 0] = False

        indices_to_remove = sorted_indices[sorted_indices_to_remove]
        logits[indices_to_remove] = float('-inf')

    return logits

## 모델 학습 후 저장 및 불러오기 기능 추가

In [None]:
# 모델 저장 함수
def save_model(model, tokenizer, save_path="gpt_model.pth"):
    """
    모델과 토크나이저를 저장하는 함수
    
    Args:
    model (GPT): 학습된 GPT 모델
    tokenizer (GPT2Tokenizer): GPT2 토크나이저
    save_path (str): 저장 경로 (기본값: "gpt_model.pth")
    """
    torch.save({
        'model_state_dict': model.state_dict(),
        'tokenizer': tokenizer
    }, save_path)
    print(f"✅ 모델이 저장되었습니다: {save_path}")

# 모델 불러오기 함수
def load_model(model, tokenizer, load_path="gpt_model.pth"):
    """
    저장된 모델과 토크나이저를 불러오는 함수
    
    Args:
    model (GPT): GPT 모델 객체 (구조가 동일해야 함)
    tokenizer (GPT2Tokenizer): GPT 토크나이저
    load_path (str): 불러올 모델 경로 (기본값: "gpt_model.pth")
    """
    checkpoint = torch.load(load_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    tokenizer = checkpoint['tokenizer']
    model.to(device)
    print("✅ 저장된 모델을 성공적으로 불러왔습니다.")
    return model, tokenizer

# 모델 저장
save_model(model, tokenizer, "gpt_model.pth")

# 모델 불러오기
model, tokenizer = load_model(model, tokenizer, "gpt_model.pth")

## 생성된 텍스트 평가 (BLEU Score)

In [None]:
from nltk.translate.bleu_score import sentence_bleu

def calculate_bleu(reference, candidate):
    """
    BLEU 점수를 계산하는 함수
    
    Args:
    reference (list of str): 정답 문장 리스트
    candidate (str): 모델이 생성한 문장
    
    Returns:
    BLEU 점수 (float)
    """
    reference = [ref.split() for ref in reference]  # 단어 단위 분리
    candidate = candidate.split()
    return sentence_bleu(reference, candidate)

# BLEU Score 테스트
reference_sentences = ["안녕하세요, 저는 AI입니다.", "반갑습니다. GPT 모델을 학습하고 있어요."]
generated_text = generate_text(model, tokenizer, prompt="안녕하세요,", max_length=20, method="greedy")

bleu_score = calculate_bleu(reference_sentences, generated_text)
print(f" BLEU Score: {bleu_score:.4f}")