In [52]:
# 전체 라이브러리 모아두기
import pandas as pd
import os
import sentencepiece as spm
import json
import re
import glob

In [53]:
def train_sentencepiece(input_file, model_dir="../model/llm_model", model_name="chatbot_spm", vocab_size=16000):

    model_prefix = os.path.join(model_dir, model_name)

    spm.SentencePieceTrainer.Train(
        f"--input={input_file} --model_prefix={model_prefix} --vocab_size={vocab_size} "
        "--model_type=bpe --character_coverage=1.0 --pad_id=0 --unk_id=1 --bos_id=2 --eos_id=3"
    )

    print(f"✅ 모델 저장 완료: {model_prefix}.model")
    print(f"✅ 단어 사전 저장 완료: {model_prefix}.vocab")

In [54]:
def save_for_spm_training(pairs, out_path):
    with open(out_path, "w", encoding="utf-8") as f:
        for pair in pairs:
            f.write(pair["input"].strip() + "\n")
            f.write(pair["target"].strip() + "\n")

# 예시 사용
with open("../data/converted_dataset/train_chatbot_data.json", "r", encoding="utf-8") as f:
    data = json.load(f)

save_for_spm_training(data, "../data/text_dataset/text_for_txt/train.txt")

In [58]:
# 실행 예시
train_sentencepiece("../data/text_dataset/text_for_txt/train.txt", model_dir="../model/llm_model", model_name="chatbot_spm")

✅ 모델 저장 완료: ../model/llm_model/chatbot_spm.model
✅ 단어 사전 저장 완료: ../model/llm_model/chatbot_spm.vocab


In [59]:
class SentencePieceTokenizer:
    def __init__(self, model_path):
        self.sp = spm.SentencePieceProcessor()
        self.sp.load(model_path)
        
    def encode(self, text):
        return self.sp.encode(text, out_type=int)
    
    def decode(self, ids):
        return self.sp.decode(ids)
    
    def pad_id(self):
        return self.sp.pad_id()
    
    def bos_id(self):
        return self.sp.bos_id
    
    def eos_id(self):
        return self.sp.eos_id
    
    def vocab_size(self):
        return self.sp.get_piece_size()

In [60]:
# 테스트
tokenizer = SentencePieceTokenizer("../model/llm_model/chatbot_spm.model")

ids = tokenizer.encode("알아서 하던가")
print("tokenized:", ids)

text = tokenizer.decode(ids)
print("decode:", text)

print("PAD ID:", tokenizer.pad_id())

tokenized: [1583, 24, 1560]
decode: 알아서 하던가
PAD ID: 0


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers=2, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, 
                            batch_first=True, dropout=dropout, bidirectional=False)
        
    def forward(self, x):
        embeded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embeded)
        return outputs, (hidden, cell)

class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        if hidden.dim() == 2:
            hidden = hidden.unsqueeze(1)

        # batch_size = encoder_outputs.size(0)
        seq_len = encoder_outputs.size(1)
        hidden = hidden.repeat(1, seq_len, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        attention = self.v(energy).squeeze(2)
        attn_weight = F.softmax(attention, dim=1)
        context = torch.bmm(attn_weight.unsqueeze(1), encoder_outputs)
        return context, attn_weight

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers=2, dropout=0.1):
        super(Decoder, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.lstm = nn.LSTM(embed_size + hidden_size, hidden_size,
                            num_layers, batch_first=True, dropout=dropout)
        self.fc_out = nn.Linear(hidden_size * 2, vocab_size)
        self.attention = Attention(hidden_size)

    def forward(self, input_token, hidden, cell, encoder_outputs):
        if input_token.dim() == 1:
            input_token = input_token.unsqueeze(1)

        embedded = self.embedding(input_token)
        context, attn_weights = self.attention(hidden[-1], encoder_outputs)
        lstm_input = torch.cat((embedded, context), dim=2)

        outputs, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        concat = torch.cat((outputs, context), dim=2)
        logits = self.fc_out(concat).squeeze(1)

        return logits, hidden, cell, attn_weights

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size, trg_len = trg.shape
        vocab_size = self.decoder.embedding.num_embeddings

        outputs = torch.zeros(batch_size, trg_len, vocab_size).to(self.device)

        encoder_outputs, (hidden, cell) = self.encoder(src)
        input_token = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell, _ = self.decoder(input_token, hidden, cell, encoder_outputs)
            outputs[:, t] = output
            top1 = output.argmax(1)
            input_token = trg[:, t] if torch.rand(1).item() < teacher_forcing_ratio else top1

        return outputs

In [87]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import pandas as pd

class ChatDataset(Dataset):
    def __init__(self, csv_path, tokenizer, max_len=64):
        df = pd.read_csv(csv_path)
        self.inputs = df["input"].astype(str).tolist()
        self.responses = df["response"].astype(str).tolist()
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, idx):
        src = self.tokenizer.encode(self.inputs[idx])
        trg = self.tokenizer.encode(self.responses[idx])

        # pad
        if len(src) < self.max_len:
            src += [self.tokenizer.pad_id] * (self.max_len - len(src))
        else:
            src = src[:self.max_len]
        
        if len(trg) < self.max_len:
            trg += [self.tokenizer.pad_id] * (self.max_len - len(trg))
        else:
            trg = trg[:self.max_len]
        
        return torch.tensor(src), torch.tensor(trg)

num_workers = 0
torch.backends.cudnn.benchmark = True

# 데이터 로더
train_dataset = ChatDataset("../data/text_dataset/save_path/train_pairs.csv", tokenizer)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=num_workers, pin_memory=False)
valid_dataset = ChatDataset("../data/text_dataset/save_path/valid_pairs.csv", tokenizer)
valid_loader = DataLoader(valid_dataset, batch_size=64)

### LSTM + Attention 챗봇 학습
1. 입력
 + model : Seq2Seq 모델(Encoder + Attention + Decoder)
 + dataloader : 학습 데이터 로더
 + tokenizer : 패딩 ID 확인용
 + num_epochs : 학습 epoch 수
 + lr : 학습률
2. 동작
 + 모델 foward
 + output, target -> reshape
 + CrossEntropyLoss 계산
 + 역전파 + optimizer 업데이트
 + tqdm 진행 표시 및 평균 loss 출력

In [88]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

In [None]:
def train_model(model, dataloader, tokenizer, num_epochs=5, lr=1e-3, device=None, checkpoint_path="../model/checkpoint_epoch6.pt"):
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    # gpu 기반
    # device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    state_dict = torch.load(checkpoint_path, map_location=device)

    # PAD 토큰 무시
    pad_id = tokenizer.pad_id
    criterion = nn.CrossEntropyLoss(ignore_index=pad_id)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    start_epoch = 0
    if checkpoint_path is not None:
        model.load_state_dict(state_dict)
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        start_epoch = 6
        print(f"Resuming from epoch {start_epoch}")

    model.train()

    for epoch in range(start_epoch, num_epochs):
        epoch_loss = 0
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")

        for src, trg in progress_bar:
            src, trg = src.to(device), trg.to(device)
            optimizer.zero_grad()

            output = model(src, trg)
            output_dim = output.shape[-1]

            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())

        avg_loss = epoch_loss / len(dataloader)
        print(f"\n[Epoch {epoch+1}] 평균 Loss: {avg_loss:.4f}")

        # checkpoint 저장
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
        }, f"checkpoint_epoch{epoch}.pt")

    return model

### 모델 저장 및 평가

In [None]:
def save_model(model, path='../model/llm_model/chatbot_model_v2.pt'):
    torch.save(model.state_dict(), path)
    print(f"✅ 모델 저장 완료: {path}")

def load_model(model, path='../model/llm_model/chatbot_model_v2.pt'):
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    model.load_state_dict(torch.load(path, map_location=device))
    model.to(device)
    model.eval()
    return model

In [91]:
from torch.nn.functional import softmax

@torch.no_grad()
def evalute_model(model, dataloader, tokenizer,
                    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")):
    # device='cuda' if torch.cuda.is_available() else 'cpu'
    model.eval()
    model.to(device)

    for i, (src, trg) in enumerate(dataloader):
        if i >= 5: break

        src, trg = src.to(device), trg.to(device)

        # infernce 모드
        output = model(src, trg, teacher_forcing_ratio=0.0)
        pred = output.argmax(dim=-1)

        print("🟢 Input :", tokenizer.decode(src[0].tolist()))
        print("✅ Target :", tokenizer.decode(trg[0].tolist()))
        print("🤖 Output :", tokenizer.decode(pred[0].tolist()))
        print("-"*60)

In [116]:
def generate_reply(model, tokenizer, input_text, max_len=64, 
                device=torch.device("mps" if torch.backends.mps.is_available() else "cpu")):
    model.eval()
    model.to(device)

    # 입력 인코딩
    input_ids = tokenizer.encode(input_text)
    input_ids = input_ids + [tokenizer.pad_id] * (max_len - len(input_ids))
    src = torch.tensor(input_ids).unsqueeze(0).to(device)  # [1, seq_len]

    input_token = torch.tensor([tokenizer.bos_id], device=device)  # 시작 토큰

    generated_ids = []

    with torch.no_grad():
        # 인코더 출력
        encoder_outputs, (hidden, cell) = model.encoder(src)

        for t in range(max_len):
            # 🔥 encoder_outputs 전달
            output, hidden, cell, _ = model.decoder(input_token.view(1, 1), hidden, cell, encoder_outputs)

            if t < 5:
                output[0][tokenizer.eos_id] = -float('inf')

            next_token = output.argmax(1)
            
            if next_token.item() == tokenizer.eos_id:
                break

            generated_ids.append(next_token.item())
            input_token = next_token

    decoded = tokenizer.decode(generated_ids)
    return decoded

### 테스트

In [117]:
def interactive_chat(model, tokenizer, device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")):
    # device='cuda' if torch.cuda.is_available() else 'cpu'
    print("🤖 챗봇 테스트 시작. 종료하려면 exit 입력.")

    while(True):
        query = input("👤 사용자: ")
        print("👤 사용자: ", query)
        if query.strip().lower() in ['exit', 'quit', '종료']:
            print("🔴 테스트 종료.")
            break
        response = generate_reply(model, tokenizer, query, device=device)
        print("🤖 챗봇: ", response)

### 전체 파이프라인

In [None]:
def full_pipeline(train_loader, valid_loader, tokenizer, model, num_epochs=5):
    trained_model = train_model(model, train_loader, tokenizer, num_epochs=num_epochs)

    save_model(trained_model, "model/llm_model/chatbot_model_v2.pt")

    evalute_model(trained_model, valid_loader, tokenizer)

    interactive_chat(trained_model, tokenizer)

In [119]:
def test_pipeline(valid_loader, tokenizer, model):
    
    test_model = load_model(model)

    evalute_model(test_model, valid_loader, tokenizer)

    interactive_chat(test_model, tokenizer)

### 학습 시작 및 모델 저장

In [120]:
vocab_size = tokenizer.vacab_size()
embed_size = 128
hidden_size = 256

encoder = Encoder(vocab_size, embed_size, hidden_size)
decoder = Decoder(vocab_size, embed_size, hidden_size)
model = Seq2Seq(encoder, decoder, device = torch.device("mps" if torch.backends.mps.is_available() else "cpu"))
# gpu 기반
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 전체 파이프라인
# full_pipeline(train_loader, valid_loader, tokenizer, model, num_epochs=5)

# 테스트만
test_pipeline(valid_loader, tokenizer, model)

🟢 Input : 보고싶더랴.. 오라고는 못하겠고
✅ Target : 아이고..담주에 가야겠네
🤖 Output : 나도
------------------------------------------------------------
🟢 Input : 꼭사봐야지
✅ Target : ᄏᄏᄏᄏᄏᄏ 나두 시댁가는데도 ᄏᄏ좋다ᄏᄏᄀᄀ
🤖 Output : 응
------------------------------------------------------------
🟢 Input : 옹..왜 롤이 안돼?
✅ Target : 클라이언트오류인듯
🤖 Output : 넴..
------------------------------------------------------------
🟢 Input : 분위기 계속 살펴봐
✅ Target : 글고 호칭도 이란말야
🤖 Output : ᄏᄏᄏᄏᄏᄏᄏᄏ
------------------------------------------------------------
🟢 Input : 그르게ᅮᅮ 한시간동안 압박면접이라니ᅮᅮ
✅ Target : ᄏᄏᄏᄏᄏᄏᄏᄏᄏ꼬기 먹어야겟업
🤖 Output : ᅲᅲ
------------------------------------------------------------
🤖 챗봇 테스트 시작. 종료하려면 exit 입력.
👤 사용자:  안녕?
🤖 챗봇:  응#ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬
👤 사용자:  너는 누구야?
🤖 챗봇:  나#는 나?
👤 사용자:  우응 너 말야
🤖 챗봇:  나?ᄏᄏᄏ 나 지금
👤 사용자:  ㅇ
🤖 챗봇:  근데#이ᅵᄋ
👤 사용자:  맞아
🤖 챗봇:  근데#도 안챙겼
👤 사용자:  뭐?
🤖 챗봇:  나#ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ⫬ᄏ
👤 사용자:  exit
🔴 테스트 종료.
