In [None]:
import os
import math
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import urllib.request

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer

import sentencepiece as spm
from sklearn.model_selection import train_test_split
from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction
from tqdm.notebook import tqdm

# Matplotlib에서 한글 폰트가 깨지지 않도록 설정합니다.
def setup_korean_font():
    """
    Matplotlib에서 한글을 지원하기 위한 폰트 설정을 수행합니다.
    나눔 폰트가 없으면 자동으로 다운로드하여 설정합니다.
    """
    font_name = 'NanumGothic'
    
    # 시스템에 설치된 폰트 확인
    if any(font_name in f.name for f in fm.fontManager.ttflist):
        print(f"'{font_name}' 폰트가 이미 설치되어 있습니다.")
        plt.rcParams['font.family'] = font_name
    else:
        print(f"'{font_name}' 폰트가 없어 다운로드를 시작합니다.")
        
        font_url = 'https://github.com/google/fonts/raw/main/ofl/nanumgothic/NanumGothic-Regular.ttf'
        font_path = 'NanumGothic-Regular.ttf'
        
        try:
            # 폰트 파일 다운로드
            if not os.path.exists(font_path):
                 urllib.request.urlretrieve(font_url, font_path)
            
            # Matplotlib의 폰트 매니저에 추가
            fm.fontManager.addfont(font_path)
            
            # Matplotlib의 rcParams에 폰트 설정
            plt.rcParams['font.family'] = 'NanumGothic'
            print(f"'{font_name}' 폰트 설정이 완료되었습니다.")
        except Exception as e:
            print(f"폰트 다운로드 또는 설정에 실패했습니다: {e}")
            print("기본 폰트로 그래프를 생성합니다. 한글이 깨질 수 있습니다.")
            plt.rcParams['font.family'] = 'sans-serif'

    plt.rcParams['axes.unicode_minus'] = False
    print(f"현재 Matplotlib 폰트: {plt.rcParams['font.family']}")


setup_korean_font()

# 1. 데이터 로드 및 전처리
print("1. 데이터 로드 및 전처리 시작")
try:
    df = pd.read_csv("ChatbotData.csv")
    print(f"초기 데이터 개수: {len(df)}")
    
    df = df.dropna(subset=["Q", "A"])
    df = df.drop_duplicates(subset=["Q", "A"])
    df = df.reset_index(drop=True)
    
    questions = df["Q"].values
    answers = df["A"].values
    
    print(f"전처리 후 데이터 개수: {len(df)}")
except FileNotFoundError:
    print("Error: ChatbotData.csv 파일을 찾을 수 없습니다.")
    exit()

# [수정] 데이터를 8:1:1 비율로 분할
# 1차 분할: 훈련+검증 데이터 (90%)와 테스트 데이터 (10%)
train_val_questions, test_questions, train_val_answers, test_answers = train_test_split(
    questions, answers, test_size=0.1, random_state=42)

# 2차 분할: 훈련 데이터 (80%)와 검증 데이터 (10%)
# train_val_questions의 1/9을 검증 데이터로 사용 (전체의 10%)
train_questions, val_questions, train_answers, val_answers = train_test_split(
    train_val_questions, train_val_answers, test_size=1/9, random_state=42)


print(f"\n훈련 데이터: {len(train_questions)}개, 검증 데이터: {len(val_questions)}개, 테스트 데이터: {len(test_questions)}개")
print("-" * 30)

# 2. SentencePiece 토크나이저 훈련 및 로드
print("\n2. SentencePiece 토크나이저 훈련 시작")
VOCAB_SIZE = 8000
SP_MODEL_PREFIX = 'chatbot_spm'
SP_MODEL_PATH = f'{SP_MODEL_PREFIX}.model'
SP_CORPUS_PATH = 'chatbot_corpus.txt'

# [수정] 훈련(Train) 데이터만으로 토크나이저 학습
with open(SP_CORPUS_PATH, 'w', encoding='utf-8') as f:
    for q, a in zip(train_questions, train_answers):
        f.write(f"{q}\n")
        f.write(f"{a}\n")

# SentencePiece 모델 훈련
spm_command = (f'--input={SP_CORPUS_PATH} --model_prefix={SP_MODEL_PREFIX} '
               f'--vocab_size={VOCAB_SIZE} --model_type=bpe '
               f'--user_defined_symbols=<pad>,<sos>,<eos>')
spm.SentencePieceTrainer.train(spm_command)
print("SentencePiece 모델 훈련 완료.")

# 훈련된 토크나이저 로드
tokenizer = spm.SentencePieceProcessor()
tokenizer.load(SP_MODEL_PATH)

PAD_IDX = tokenizer.piece_to_id('<pad>')
SOS_IDX = tokenizer.piece_to_id('<sos>')
EOS_IDX = tokenizer.piece_to_id('<eos>')
UNK_IDX = tokenizer.unk_id()

print(f"단어장 크기: {tokenizer.get_piece_size()}")
print(f"PAD: {PAD_IDX}, SOS: {SOS_IDX}, EOS: {EOS_IDX}, UNK: {UNK_IDX}")
print("-" * 30)

# 3. 트랜스포머 모델 정의 (어텐션 가중치 반환을 위해 수정)
print("\n3. 트랜스포머 모델 정의")

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

# 어텐션 가중치를 반환하도록 Decoder Layer 수정
class CustomTransformerDecoderLayer(nn.TransformerDecoderLayer):
    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None,
                tgt_is_causal=None, memory_is_causal=None):
        # Self-Attention
        tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask,
                              key_padding_mask=tgt_key_padding_mask)[0]
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)
        
        # Cross-Attention (Encoder-Decoder Attention)
        # 여기서 헤드별 attention weight를 추출하기 위해 average_attn_weights=False 설정
        tgt2, attn_weights = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask,
                                                 key_padding_mask=memory_key_padding_mask,
                                                 average_attn_weights=False)
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)
        
        # Feed Forward
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)
        return tgt, attn_weights

class TransformerChatbot(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super(TransformerChatbot, self).__init__()
        self.d_model = d_model
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)

        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, batch_first=True)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_encoder_layers)
        
        # 커스텀 디코더 레이어 사용
        decoder_layer = CustomTransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, batch_first=True)
        self.transformer_decoder = TransformerDecoder(decoder_layer, num_decoder_layers)
        
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt, src_padding_mask, tgt_padding_mask, memory_key_padding_mask, tgt_mask):
        src_emb = self.pos_encoder(self.embedding(src) * math.sqrt(self.d_model))
        tgt_emb = self.pos_encoder(self.embedding(tgt) * math.sqrt(self.d_model))

        memory = self.transformer_encoder(src_emb, src_key_padding_mask=src_padding_mask)
        
        # self.transformer_decoder를 직접 호출하는 대신, 내부 레이어를 수동으로 반복합니다.
        # 훈련 시에는 어텐션 가중치가 필요 없으므로, 출력 텐서만 사용합니다.
        output = tgt_emb
        for mod in self.transformer_decoder.layers:
            output, _ = mod(output, memory, 
                            tgt_mask=tgt_mask,
                            tgt_key_padding_mask=tgt_padding_mask,
                            memory_key_padding_mask=memory_key_padding_mask)
        
        return self.fc_out(output)

    def generate_square_subsequent_mask(self, sz, device):
        mask = (torch.triu(torch.ones(sz, sz, device=device)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

print("모델 클래스 정의 완료.")
print("-" * 30)

# 4. 데이터셋 및 데이터로더
print("\n4. 데이터셋 및 데이터로더 생성")
MAX_LEN = 50

def text_transform(text, tokenizer):
    return torch.tensor([SOS_IDX] + tokenizer.encode_as_ids(text) + [EOS_IDX])

class ChatbotDataset(Dataset):
    def __init__(self, questions, answers, tokenizer):
        self.questions = questions
        self.answers = answers
        self.tokenizer = tokenizer
    def __len__(self): return len(self.questions)
    def __getitem__(self, idx):
        q_tensor = text_transform(self.questions[idx], self.tokenizer)
        a_tensor = text_transform(self.answers[idx], self.tokenizer)
        return q_tensor, a_tensor

def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(src_sample[:MAX_LEN])
        tgt_batch.append(tgt_sample[:MAX_LEN])
    src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=PAD_IDX, batch_first=True)
    tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, padding_value=PAD_IDX, batch_first=True)
    return src_batch, tgt_batch

train_dataset = ChatbotDataset(train_questions, train_answers, tokenizer)
val_dataset = ChatbotDataset(val_questions, val_answers, tokenizer)
test_dataset = ChatbotDataset(test_questions, test_answers, tokenizer)

BATCH_SIZE = 128
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
print("데이터로더 생성 완료.")
print("-" * 30)

# 5. 훈련 설정
print("\n5. 훈련 설정")
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 디바이스: {DEVICE}")

D_MODEL = 256
N_HEAD = 8
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3
D_FF = 512
DROPOUT = 0.2
LEARNING_RATE = 0.0001
EPOCHS = 50

model = TransformerChatbot(
    VOCAB_SIZE, D_MODEL, N_HEAD, NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, D_FF, DROPOUT
).to(DEVICE)

def count_parameters(model): return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'모델 파라미터 수: {count_parameters(model):,}')

criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

def train_epoch(model, dataloader, optimizer, criterion, device, epoch):
    model.train()
    total_loss = 0
    loop = tqdm(dataloader, desc=f'Epoch {epoch:02} Train')
    for src, tgt in loop:
        src, tgt = src.to(device), tgt.to(device)
        tgt_input = tgt[:, :-1]
        tgt_out = tgt[:, 1:]
        src_padding_mask = (src == PAD_IDX)
        tgt_padding_mask = (tgt_input == PAD_IDX)
        tgt_mask = model.generate_square_subsequent_mask(tgt_input.size(1), device)
        optimizer.zero_grad()
        output = model(src, tgt_input, src_padding_mask, tgt_padding_mask, src_padding_mask, tgt_mask)
        loss = criterion(output.reshape(-1, output.shape[-1]), tgt_out.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        loop.set_postfix(loss=loss.item())
    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device, epoch):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        loop = tqdm(dataloader, desc=f'Epoch {epoch:02} Val  ')
        for src, tgt in loop:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            tgt_out = tgt[:, 1:]
            src_padding_mask = (src == PAD_IDX)
            tgt_padding_mask = (tgt_input == PAD_IDX)
            tgt_mask = model.generate_square_subsequent_mask(tgt_input.size(1), device)
            output = model(src, tgt_input, src_padding_mask, tgt_padding_mask, src_padding_mask, tgt_mask)
            loss = criterion(output.reshape(-1, output.shape[-1]), tgt_out.reshape(-1))
            total_loss += loss.item()
            loop.set_postfix(loss=loss.item())
    return total_loss / len(dataloader)

print("훈련/평가 함수 정의 완료.")
print("-" * 30)


# 6. 훈련 루프
print("\n6. 훈련 시작")
best_val_loss = float('inf')
early_stop_counter = 0
patience = 2
model_save_path = 'best_transformer_chatbot_sp.pt'
train_losses, val_losses = [], []

for epoch in range(1, EPOCHS + 1):
    start_time = time.time()
    train_loss = train_epoch(model, train_dataloader, optimizer, criterion, DEVICE, epoch)
    val_loss = evaluate(model, val_dataloader, criterion, DEVICE, epoch)
    end_time = time.time()
    epoch_mins, epoch_secs = divmod(end_time - start_time, 60)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    
    print(f'Epoch: {epoch:02} | Time: {int(epoch_mins)}m {int(epoch_secs)}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Val. Loss: {val_loss:.3f}')
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), model_save_path)
        early_stop_counter = 0
        print("\t-> 검증 손실 감소, 모델 저장 완료.")
    else:
        early_stop_counter += 1
        print(f"\t-> 검증 손실 증가. ({early_stop_counter}/{patience})")
    if early_stop_counter >= patience:
        print("Early stopping. 훈련을 중단합니다.")
        break
print("\n훈련 종료.")
print("-" * 30)

# 7. 손실 시각화
print("\n7. 훈련/검증 손실 시각화")
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Epoch별 훈련 및 검증 손실')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(); plt.grid(True)
plt.savefig('loss_curve_sp.png')
print("손실 그래프를 'loss_curve_sp.png' 파일로 저장했습니다.")
plt.show()
print("-" * 30)


# 8. 모델 로드 및 추론 함수
print("\n8. 최적 모델 로드 및 테스트 시작")
model.load_state_dict(torch.load(model_save_path, map_location=DEVICE))

def translate_and_get_attention(model, sentence, tokenizer, device):
    model.eval()
    src_tensor = text_transform(sentence, tokenizer).unsqueeze(0).to(device)
    src_padding_mask = (src_tensor == PAD_IDX)

    with torch.no_grad():
        memory = model.transformer_encoder(model.pos_encoder(model.embedding(src_tensor) * math.sqrt(D_MODEL)), 
                                           src_key_padding_mask=src_padding_mask)
    
    ys = torch.ones(1, 1).fill_(SOS_IDX).type(torch.long).to(device)
    attentions = []

    for i in range(MAX_LEN - 1):
        tgt_padding_mask = (ys == PAD_IDX)
        tgt_mask = model.generate_square_subsequent_mask(ys.size(1), device)
        
        tgt_emb = model.pos_encoder(model.embedding(ys) * math.sqrt(D_MODEL))
        
        output = tgt_emb
        temp_attention = None
        for layer in model.transformer_decoder.layers:
            output, temp_attention = layer(output, memory, tgt_mask=tgt_mask, 
                                           tgt_key_padding_mask=tgt_padding_mask,
                                           memory_key_padding_mask=src_padding_mask)
        
        # 마지막 레이어, 마지막 토큰의 어텐션 가중치 저장
        # temp_attention shape: (batch=1, heads, target_len, source_len)
        # 마지막 토큰에 대한 어텐션만 슬라이싱: (heads, source_len)
        attentions.append(temp_attention[0, :, -1, :])
        
        prob = model.fc_out(output[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys, torch.ones(1, 1).type_as(src_tensor.data).fill_(next_word)], dim=1)
        if next_word == EOS_IDX:
            break
            
    generated_tokens = tokenizer.decode(ys.squeeze(0).tolist())
    
    # 수집된 어텐션들을 target_len 차원을 따라 합침
    # attentions: 리스트[ (heads, source_len), (heads, source_len), ... ]
    attentions = torch.stack(attentions, dim=1) # -> (heads, target_len, source_len)
    
    return generated_tokens, attentions


# [수정] 예측 결과에서 특수 토큰 제거 후 출력
print("\n테스트 데이터 예측 결과 샘플:")
for i in range(5):
    q = test_questions[i]
    true_a = test_answers[i]
    pred_a, _ = translate_and_get_attention(model, q, tokenizer, DEVICE)
    
    # 특수 토큰 제거
    clean_pred_a = pred_a.replace('<sos>', '').replace('<eos>', '').strip()
    
    print(f"Q: {q}")
    print(f"실제 A: {true_a}")
    print(f"예측 A: {clean_pred_a}\n")

print("-" * 30)

# 9. 실제 어텐션 맵 시각화
print("\n9. 실제 어텐션 맵 시각화")
def display_attention(sentence, translation, attention_heads, tokenizer):
    sentence_tokens = ['<sos>'] + tokenizer.encode_as_pieces(sentence) + ['<eos>']
    
    # 예측 결과(translation)는 이미 클리닝되었으므로 그대로 토큰화
    translation_pieces = tokenizer.encode_as_pieces(translation)
    translation_tokens = ['<sos>'] + translation_pieces + ['<eos>']


    # attention: (heads, target_len, source_len)
    attention = attention_heads[:, :len(translation_tokens), :len(sentence_tokens)].cpu().detach().numpy()
    
    fig = plt.figure(figsize=(16, 8))
    for i in range(N_HEAD):
        ax = fig.add_subplot(2, 4, i + 1)
        cax = ax.matshow(attention[i], cmap='viridis')
        ax.set_xticks(range(len(sentence_tokens)))
        ax.set_yticks(range(len(translation_tokens)))
        ax.set_xticklabels(sentence_tokens, rotation=90, fontsize=8)
        ax.set_yticklabels(translation_tokens, fontsize=8)
        ax.set_xlabel('Source (Question)'); ax.set_ylabel('Target (Answer)')
        ax.set_title(f'Head {i+1}')
        
    plt.tight_layout(pad=3.0)
    plt.savefig('attention_map_sp.png')
    print("실제 어텐션 맵을 'attention_map_sp.png' 파일로 저장했습니다.")
    plt.show()

q_example = test_questions[0]
pred_a_example, attention = translate_and_get_attention(model, q_example, tokenizer, DEVICE)

# [수정] 시각화할 답변도 특수 토큰 제거
clean_pred_a_example = pred_a_example.replace('<sos>', '').replace('<eos>', '').strip()

print(f"시각화할 질문: {q_example}")
print(f"생성된 답변: {clean_pred_a_example}")
display_attention(q_example, clean_pred_a_example, attention, tokenizer)
print("-" * 30)


# 10. BLEU 스코어 측정
print("\n10. BLEU 스코어 측정")
def calculate_bleu_score(model, dataset, tokenizer, device):
    targets = []
    predictions = []
    smoothing_function = SmoothingFunction().method1

    loop = tqdm(zip(dataset.questions, dataset.answers), total=len(dataset.questions), desc="Calculating BLEU Score")
    for question, answer in loop:
        true_answer_tokens = tokenizer.encode_as_pieces(answer)
        predicted_answer, _ = translate_and_get_attention(model, question, tokenizer, device)
        
        # BLEU 스코어 계산 시에는 특수 토큰을 포함한 원래 토큰 리스트를 사용하는 것이 더 정확할 수 있음
        # 여기서는 일관성을 위해 클리닝된 텍스트를 다시 토큰화
        clean_predicted_answer = predicted_answer.replace('<sos>', '').replace('<eos>', '').strip()
        predicted_answer_tokens = tokenizer.encode_as_pieces(clean_predicted_answer)

        targets.append([true_answer_tokens])
        predictions.append(predicted_answer_tokens)
    
    # [수정] sentence_bleu를 corpus_bleu로 변경
    corpus_bleu_4 = corpus_bleu(targets, predictions, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothing_function)
    corpus_bleu_1 = corpus_bleu(targets, predictions, weights=(1, 0, 0, 0), smoothing_function=smoothing_function)
    
    return corpus_bleu_1 * 100, corpus_bleu_4 * 100

# 작은 샘플로 BLEU 스코어 계산 (전체는 시간이 오래 걸릴 수 있음)
sample_test_dataset = ChatbotDataset(test_questions[:100], test_answers[:100], tokenizer)
# [수정] 함수 호출 시 빠진 model 인자 추가
bleu1, bleu4 = calculate_bleu_score(model, sample_test_dataset, tokenizer, DEVICE)

print(f"테스트 데이터 샘플 100개에 대한 BLEU 스코어:")
print(f"BLEU-1: {bleu1:.2f}")
print(f"BLEU-4: {bleu4:.2f}")
print("-" * 30)
print("\n모든 작업이 완료되었습니다.")