In [2]:
import torch
import numpy as np
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from torch import nn
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from math import sqrt
import matplotlib.pyplot as plt
from IPython.display import clear_output

# 데이터셋 로드 - 전체 데이터셋 사용
print("데이터셋 로드 중...")
train_ds = load_dataset("stanfordnlp/imdb", split="train")
test_ds = load_dataset("stanfordnlp/imdb", split="test")

# 토크나이저 로드
print("토크나이저 로드 중...")
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')

# 새로운 collate_fn 구현 - 마지막 단어 예측용
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    max_len = 512  # 증가된 최대 길이
    texts, labels = [], []
    for row in batch:
        # 마지막 단어를 label로 사용 (-3 위치의 token)
        labels.append(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[-3])
        # 마지막 단어를 제외한 나머지를 입력으로 사용
        texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-3]))
    
    texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id)
    labels = torch.LongTensor(labels)
    
    return texts, labels

# DataLoader 설정
print("DataLoader 설정 중...")
train_loader = DataLoader(
    train_ds, batch_size=32, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=32, shuffle=False, collate_fn=collate_fn
)

# Positional Encoding 함수
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]
    
    return torch.FloatTensor(pos_encoding)

# Self-Attention 구현
class SelfAttention(nn.Module):
    def __init__(self, input_dim, d_model, dropout_rate=0.1):
        super().__init__()
        
        self.input_dim = input_dim
        self.d_model = d_model
        
        self.wq = nn.Linear(input_dim, d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)
        self.dense = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout_rate)
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self, x, mask):
        q, k, v = self.wq(x), self.wk(x), self.wv(x)
        score = torch.matmul(q, k.transpose(-1, -2))  # (B, S, D) * (B, D, S) = (B, S, S)
        score = score / sqrt(self.d_model)
        
        if mask is not None:
            score = score + (mask * -1e9)
        
        score = self.softmax(score)
        score = self.dropout(score)  # Attention dropout 추가
        result = torch.matmul(score, v)
        result = self.dense(result)
        
        return result

# Transformer Layer 구현
class TransformerLayer(nn.Module):
    def __init__(self, input_dim, d_model, dff, dropout_rate=0.1):
        super().__init__()
        
        self.input_dim = input_dim
        self.d_model = d_model
        self.dff = dff
        
        self.sa = SelfAttention(input_dim, d_model, dropout_rate)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),
            nn.ReLU(),
            nn.Dropout(dropout_rate),  # FFN Dropout 추가
            nn.Linear(dff, d_model)
        )
        
        # Layer Normalization 추가
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout_rate)
    
    def forward(self, x, mask):
        # Attention 블록 (residual connection 추가)
        attn_output = self.sa(x, mask)
        attn_output = self.dropout(attn_output)
        out1 = self.norm1(x + attn_output)  # Add & Norm
        
        # Feed Forward 블록 (residual connection 추가)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout(ffn_output)
        out2 = self.norm2(out1 + ffn_output)  # Add & Norm
        
        return out2

# 텍스트 분류 모델 수정 - 마지막 단어 예측용
class LastWordPredictor(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, dff, dropout_rate=0.1):
        super().__init__()
        
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.n_layers = n_layers
        self.dff = dff
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(positional_encoding(512, d_model), requires_grad=False)  # 512로 변경
        self.dropout = nn.Dropout(dropout_rate)
        
        self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff, dropout_rate) for _ in range(n_layers)])
        
        # 출력 레이어 변경 - 단일 값 대신 전체 어휘에 대한 예측 제공
        self.classifier = nn.Linear(d_model, vocab_size)
    
    def forward(self, x):
        mask = (x == tokenizer.pad_token_id)
        mask = mask[:, None, :]
        seq_len = x.shape[1]
        
        x = self.embedding(x)
        x = x * sqrt(self.d_model)
        x = x + self.pos_encoding[:, :seq_len]
        x = self.dropout(x)  # 임베딩 후 dropout 추가
        
        for layer in self.layers:
            x = layer(x, mask)
        
        # 마지막 토큰의 표현을 사용하여 다음 단어 예측
        x = x[:, -1]  # 마지막 위치의 임베딩 사용
        x = self.classifier(x)  # 어휘 크기로 변환
        
        return x

# Perplexity 계산 함수
def calculate_perplexity(model, dataloader, device):
    model.eval()
    total_loss = 0
    total_tokens = 0
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)  # [batch_size, vocab_size]
            
            # CrossEntropyLoss는 log-softmax를 포함하므로 NLL 계산
            loss = nn.CrossEntropyLoss(reduction='sum')(outputs, labels)
            
            total_loss += loss.item()
            total_tokens += labels.size(0)
    
    # Perplexity = exp(평균 NLL)
    avg_loss = total_loss / total_tokens
    perplexity = np.exp(avg_loss)
    
    return perplexity

# Top-K 정확도 측정 함수
def top_k_accuracy(model, dataloader, k=5, device=None):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.topk(outputs, k, dim=1)
            
            # 각 sample에 대해 top-k에 정답이 있는지 확인
            for i in range(labels.size(0)):
                if labels[i] in predicted[i]:
                    correct += 1
            total += labels.size(0)
    
    return correct / total

# 기본 정확도 계산 함수
def accuracy(model, dataloader, device=None):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data in dataloader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return correct / total

# 모델 초기화 - 더 큰 모델
print("모델 초기화 중...")
d_model = 256  # 원래 64
n_layers = 4    # 원래 2
dff = 512       # 원래 128
dropout_rate = 0.1

model = LastWordPredictor(len(tokenizer), d_model, n_layers, dff, dropout_rate)

# 학습 설정
lr = 0.001
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"사용 디바이스: {device}")
model = model.to(device)

# 손실 함수 변경 - CrossEntropyLoss 사용 (다중 클래스 분류)
loss_fn = nn.CrossEntropyLoss()

optimizer = Adam(model.parameters(), lr=lr)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=1, verbose=True)

# 결과 추적용 리스트
history = {
    'train_loss': [],
    'train_acc': [],
    'test_acc': [],
    'train_top5': [],
    'test_top5': [],
    'train_ppl': [],
    'test_ppl': []
}

# 그래프 그리기 함수
def plot_metrics(history):
    plt.figure(figsize=(15, 15))
    
    # 손실
    plt.subplot(3, 1, 1)
    plt.plot(history['train_loss'])
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.grid(True)
    
    # 정확도
    plt.subplot(3, 1, 2)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['test_acc'], label='Test Accuracy')
    plt.plot(history['train_top5'], label='Train Top-5 Accuracy')
    plt.plot(history['test_top5'], label='Test Top-5 Accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    
    # Perplexity
    plt.subplot(3, 1, 3)
    plt.plot(history['train_ppl'], label='Train Perplexity')
    plt.plot(history['test_ppl'], label='Test Perplexity')
    plt.title('Perplexity')
    plt.xlabel('Epoch')
    plt.ylabel('Perplexity')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()

# 예측 함수
def predict_next_word(model, text, tokenizer, device):
    # 텍스트 토큰화
    tokens = tokenizer(text, truncation=True, max_length=512).input_ids
    input_tensor = torch.LongTensor([tokens[:-3]]).to(device)
    
    # 예측
    with torch.no_grad():
        output = model(input_tensor)
        
    # 상위 5개 가능한 다음 단어
    probs, indices = torch.topk(torch.softmax(output[0], dim=0), 5)
    next_words = [tokenizer.decode([idx.item()]) for idx in indices]
    probs = probs.cpu().numpy()
    
    return list(zip(next_words, probs))

# 모델 저장 함수
def save_model(model, path):
    torch.save(model.state_dict(), path)
    print(f"모델이 {path}에 저장되었습니다.")

# 모델 불러오기 함수
def load_model(model, path):
    model.load_state_dict(torch.load(path))
    model.eval()
    print(f"모델이 {path}에서 로드되었습니다.")
    return model

# 모델 학습
print("학습 시작...")
n_epochs = 10  # 5에서 증가
best_acc = 0.0

for epoch in range(n_epochs):
    model.train()
    total_loss = 0.0
    batch_count = 0
    
    for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 순전파, 역전파, 최적화
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        batch_count += 1
        
        # 일정 간격으로 진행 상황 출력
        if batch_count % 100 == 0:
            print(f"Epoch {epoch+1}, Batch {batch_count}, Loss: {loss.item():.4f}")
    
    # 평균 손실 계산
    avg_loss = total_loss / len(train_loader)
    
    # 에폭마다 정확도 평가
    train_acc = accuracy(model, train_loader, device)
    test_acc = accuracy(model, test_loader, device)
    
    # Top-5 정확도 계산
    top5_train = top_k_accuracy(model, train_loader, 5, device)
    top5_test = top_k_accuracy(model, test_loader, 5, device)
    
    # Perplexity 계산
    train_ppl = calculate_perplexity(model, train_loader, device)
    test_ppl = calculate_perplexity(model, test_loader, device)
    
    # 결과 저장
    history['train_loss'].append(avg_loss)
    history['train_acc'].append(train_acc)
    history['test_acc'].append(test_acc)
    history['train_top5'].append(top5_train)
    history['test_top5'].append(top5_test)
    history['train_ppl'].append(train_ppl)
    history['test_ppl'].append(test_ppl)
    
    # 결과 출력
    print(f"Epoch {epoch+1} | Train Loss: {avg_loss:.4f}")
    print(f"Train Acc: {train_acc:.4f} | Test Acc: {test_acc:.4f}")
    print(f"Train Top-5: {top5_train:.4f} | Test Top-5: {top5_test:.4f}")
    print(f"Train PPL: {train_ppl:.2f} | Test PPL: {test_ppl:.2f}")
    
    # 학습률 조정
    scheduler.step(test_ppl)
    
    # 최고 성능 모델 저장
    if test_acc > best_acc:
        best_acc = test_acc
        save_model(model, '/Users/semyungpark/Documents/homework/week2/best_model.pt')
    
    # 그래프 업데이트
    clear_output(wait=True)
    plot_metrics(history)
    
# 최종 모델 저장
save_model(model, '/Users/semyungpark/Documents/homework/week2/final_model.pt')

# 모델 예시 테스트
test_sentences = [
    "I really enjoyed the movie because it was",
    "The main character was portrayed as a",
    "The plot of the story revolves around",
    "In conclusion, this film is definitely"
]

print("\n예측 테스트:")
for sentence in test_sentences:
    predictions = predict_next_word(model, sentence, tokenizer, device)
    print(f"\n입력: {sentence}")
    for word, prob in predictions:
        print(f"  - {word} ({prob:.4f})")

데이터셋 로드 중...
토크나이저 로드 중...


Using cache found in /Users/semyungpark/.cache/torch/hub/huggingface_pytorch-transformers_main


DataLoader 설정 중...
모델 초기화 중...
사용 디바이스: cpu
학습 시작...


KeyboardInterrupt: 

![image.png][def]

[def]: attachment:image.png