In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from tqdm import tqdm  # 학습 진행 상황을 보여주기 위한 라이브러리
from sklearn.model_selection import train_test_split
from collections import Counter

# 하이퍼파라미터 설정
BATCH_SIZE = 64
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_CLASSES = 2
NUM_EPOCHS = 5
LEARNING_RATE = 1e-3  # 학습 안정성을 위해 학습률을 낮춤
MAX_VOCAB_SIZE = 20000  # 어휘 사전 크기 증가
MAX_SEQ_LEN = 512  # 시퀀스 길이 조정

# 디바이스 설정 (GPU 사용 가능 시 사용)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 데이터 준비
tokenizer = get_tokenizer('basic_english')

def yield_tokens(data_iter):
    for label, text in data_iter:
        yield tokenizer(text)

# 어휘 사전 구축 (훈련 데이터만 사용)
train_iter = IMDB(split='train')
vocab = build_vocab_from_iterator(yield_tokens(train_iter), max_tokens=MAX_VOCAB_SIZE, specials=["<pad>", "<unk>"])
vocab.set_default_index(vocab["<unk>"])

# 데이터 로드
train_data = list(IMDB(split='train'))
test_data = list(IMDB(split='test'))

# 레이블 분포 확인
train_labels = [label for label, _ in train_data]
test_labels = [label for label, _ in test_data]

print("훈련 데이터 레이블 분포:", Counter(train_labels))
print("테스트 데이터 레이블 분포:", Counter(test_labels))

# 훈련 데이터와 검증 데이터로 분할
train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=42, stratify=train_labels)

def text_pipeline(text):
    tokens = tokenizer(text)
    token_ids = [vocab[token] for token in tokens]
    if len(token_ids) > MAX_SEQ_LEN:
        token_ids = token_ids[:MAX_SEQ_LEN]
    else:
        token_ids += [vocab["<pad>"]] * (MAX_SEQ_LEN - len(token_ids))
    return torch.tensor(token_ids, dtype=torch.long)

def label_pipeline(label):
    return torch.tensor(1 if label == 'pos' else 0, dtype=torch.long)

class IMDBDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = []
        for label, text in data:
            try:
                text_tensor = text_pipeline(text)
                label_tensor = label_pipeline(label)
                self.data.append((text_tensor, label_tensor))
            except Exception as e:
                print(f"데이터 처리 오류: {e}")
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]

train_dataset = IMDBDataset(train_data)
val_dataset = IMDBDataset(val_data)
test_dataset = IMDBDataset(test_data)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# 모델 정의
class CNNTransformerModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_classes):
        super(CNNTransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab["<pad>"])
        self.position_embedding = nn.Embedding(MAX_SEQ_LEN, embedding_dim)

        # CNN 인코더
        self.cnn_encoder = nn.Conv1d(in_channels=embedding_dim, out_channels=embedding_dim, kernel_size=3, padding=1, stride=2)
        self.cnn_encoder_residual = nn.Conv1d(embedding_dim, embedding_dim, kernel_size=1, stride=2)

        # 트랜스포머 인코더 레이어
        encoder_layer = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=8, dropout=0.1, activation='relu')
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=2, norm=nn.LayerNorm(embedding_dim))

        # CNN 디코더
        self.cnn_decoder = nn.ConvTranspose1d(in_channels=embedding_dim, out_channels=embedding_dim, kernel_size=3, padding=1, stride=2, output_padding=1)
        self.cnn_decoder_residual = nn.ConvTranspose1d(embedding_dim, embedding_dim, kernel_size=1, stride=2, output_padding=1)

        # 출력 레이어
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(embedding_dim, num_classes)

    def forward(self, x):
        batch_size, seq_len = x.size()
        # 임베딩 및 포지셔널 인코딩 추가
        x = self.embedding(x)  # [batch_size, seq_len, embedding_dim]
        positions = torch.arange(0, seq_len).unsqueeze(0).expand(batch_size, seq_len).to(device)
        x = x + self.position_embedding(positions)
        x = x.permute(0, 2, 1)  # [batch_size, embedding_dim, seq_len]

        # CNN 인코더와 잔차 연결
        residual = self.cnn_encoder_residual(x)
        x = self.cnn_encoder(x)
        x = nn.ReLU()(x + residual)

        x = x.permute(2, 0, 1)  # [seq_len', batch_size, embedding_dim]

        # 패딩 마스크 생성
        src_key_padding_mask = (x.abs().sum(dim=2) == 0).transpose(0, 1)

        # 트랜스포머 인코더
        x = self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask)

        x = x.permute(1, 2, 0)  # [batch_size, embedding_dim, seq_len']

        # CNN 디코더와 잔차 연결
        residual = self.cnn_decoder_residual(x)
        x = self.cnn_decoder(x)
        x = nn.ReLU()(x + residual)

        # 글로벌 평균 풀링
        x = x.mean(dim=2)  # [batch_size, embedding_dim]

        x = self.dropout(x)
        logits = self.fc(x)  # [batch_size, num_classes]
        return logits

# 모델 초기화
model = CNNTransformerModel(len(vocab), EMBEDDING_DIM, NUM_CLASSES).to(device)

# 손실 함수와 옵티마이저 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# 학습 루프
for epoch in range(NUM_EPOCHS):
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f"에포크 {epoch+1}/{NUM_EPOCHS}")
    for texts, labels in progress_bar:
        texts, labels = texts.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())
    avg_loss = total_loss / len(train_loader)
    print(f"에포크 [{epoch+1}/{NUM_EPOCHS}], 평균 손실: {avg_loss:.4f}")

    # 검증 데이터로 평가
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for texts, labels in val_loader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_accuracy = correct / total
    print(f"에포크 {epoch+1} 후 검증 정확도: {val_accuracy * 100:.2f}%\n")

# 테스트 데이터로 최종 평가
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
test_accuracy = correct / total
print(f"테스트 정확도: {test_accuracy * 100:.2f}%")


# 모델 저장
torch.save(model.state_dict(), 'cnn_transformer_model.pth')


훈련 데이터 레이블 분포: Counter({1: 12500, 2: 12500})
테스트 데이터 레이블 분포: Counter({1: 12500, 2: 12500})


에포크 1/5: 100%|██████████| 313/313 [07:07<00:00,  1.37s/it, loss=9.78e-6]


에포크 [1/5], 평균 손실: 0.0038
에포크 1 후 검증 정확도: 100.00%



에포크 2/5: 100%|██████████| 313/313 [07:12<00:00,  1.38s/it, loss=1.06e-5]


에포크 [2/5], 평균 손실: 0.0000
에포크 2 후 검증 정확도: 100.00%



에포크 3/5: 100%|██████████| 313/313 [07:04<00:00,  1.36s/it, loss=3.8e-7] 


에포크 [3/5], 평균 손실: 0.0000
에포크 3 후 검증 정확도: 100.00%



에포크 4/5: 100%|██████████| 313/313 [07:15<00:00,  1.39s/it, loss=2.15e-6]


에포크 [4/5], 평균 손실: 0.0000
에포크 4 후 검증 정확도: 100.00%



에포크 5/5: 100%|██████████| 313/313 [07:11<00:00,  1.38s/it, loss=2.05e-7]


에포크 [5/5], 평균 손실: 0.0000
에포크 5 후 검증 정확도: 100.00%

테스트 정확도: 100.00%


In [22]:
# 모델 초기화 및 가중치 로드
model = CNNTransformerModel(len(vocab), EMBEDDING_DIM, NUM_CLASSES).to(device)
model.load_state_dict(torch.load('cnn_transformer_model.pth', map_location=device))
model.eval()

# 예측 함수
def predict(text):
    with torch.no_grad():
        text_tensor = text_pipeline(text).unsqueeze(0).to(device)
        outputs = model(text_tensor)
        probabilities = nn.functional.softmax(outputs, dim=1)
        predicted_class = torch.argmax(probabilities, dim=1).item()
        class_names = ['Negative', 'Positive']
        return class_names[predicted_class], probabilities.squeeze().cpu().numpy()


In [23]:
user_input = "the movie was good"

In [24]:
label, probs = predict(user_input)
print(f"예측 결과: {label}")
print(f"확률 분포: Negative {probs[0]*100:.2f}%, Positive {probs[1]*100:.2f}%\n")

예측 결과: Negative
확률 분포: Negative 100.00%, Positive 0.00%

