In [2]:

import os
import re
import pickle

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchmetrics.classification import F1Score, MulticlassF1Score
import torchmetrics

from konlpy.tag import Okt


from collections import Counter

from sklearn.model_selection import train_test_split

In [4]:
with open('train_dataset.pkl', 'rb') as f:
    train_dataset = pickle.load(f)

with open('test_dataset.pkl', 'rb') as f:
    test_dataset = pickle.load(f)

with open('train_loader.pkl', 'rb') as f:
    train_loader = pickle.load(f)

with open('test_loader.pkl', 'rb') as f:
    test_loader = pickle.load(f)

with open('token_to_id.pkl', 'rb') as f:
    token_to_id = pickle.load(f)


In [190]:
label = dict(zip(train_labels.value_counts().index, range(12)))
label

{'나머지': 0,
 '성인물(에로)': 1,
 '공포(호러)': 2,
 '스릴러': 3,
 'SF': 4,
 '범죄': 5,
 '미스터리': 6,
 '사극': 7,
 '어드벤처': 8,
 '기타': 9,
 '전쟁': 10,
 '서부극(웨스턴)': 11}

In [23]:
# word2vec의 vector_size와 embedding_dim의 크기가 같아야 되는거 같다.

n_vocab = len(token_to_id)
hidden_dim = 64
embedding_dim = 1024
n_layers = 2

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [29]:
# 문장 분류 모델 

class SentenceClassifier(nn.Module):
    def __init__(self,
                 n_vocab = n_vocab,
                 hidden_dim = hidden_dim,
                 embedding_dim = embedding_dim,
                 n_layers = n_layers,
                 dropout = 0.7,
                 # 양방향은 빈칸에 들어갈 단어를 고르는 등, 앞 문장과 뒷 문장이 둘 다 중요할 때 사용
                 bidirectional = True,
                 model_type = 'lstm',
                 pretrained_embedding = None,
                 n_classes = 12
                 ):
        super().__init__()

        if pretrained_embedding is not None:
            self.embedding = nn.Embedding.from_pretrained(
                torch.tensor(pretrained_embedding, dtype = torch.float32)
            )
        else:
            self.embedding = nn.Embedding(
                num_embeddings = n_vocab,
                embedding_dim = embedding_dim,
                padding_idx = 0
        )
        
        if model_type == 'rnn':
            self.model = nn.RNN(
                input_size = embedding_dim,
                hidden_size = hidden_dim,
                num_layers = n_layers,
                bidirectional = bidirectional,
                dropout = dropout,
                batch_first = True
            )
        elif model_type == 'lstm':
            self.model = nn.LSTM(
                input_size = embedding_dim,
                hidden_size = hidden_dim,
                num_layers = n_layers,
                bidirectional = bidirectional,
                dropout = dropout,
                batch_first = True
            )
        
        if bidirectional:
            self.classifier = nn.Linear(hidden_dim * 2, n_classes)

        else:
            self.classifier = nn.Linear(hidden_dim, n_classes)

        self.dropout = nn.Dropout(dropout)

    def forward(self, inputs):
        embeddings = self.embedding(inputs)
        output, _ = self.model(embeddings)
        last_output = output[:, -1, :]
        last_output = self.dropout(last_output)
        logits = self.classifier(last_output)
        return logits

In [30]:
classifier = SentenceClassifier().to(DEVICE)

In [31]:
EPOCH = 1000
LR = 0.001

device = 'cuda' if torch.cuda.is_available() else 'cpu'

optimizer = optim.Adam(classifier.parameters(), lr = LR)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)
criterion = nn.CrossEntropyLoss().to(device)

In [32]:
# 저장 경로 설정
SAVE_PATH = './model/movie/'
SAVE_FILE = os.path.join(SAVE_PATH, 'model_movie_train_wbs.pth')
if not os.path.exists(SAVE_PATH):
    os.makedirs(SAVE_PATH)

In [34]:
def train(model, datasets, criterion, optimizer, device, interval):
    model.train()
    losses = list()
    accuracy_metric = torchmetrics.Accuracy(task="multiclass", num_classes = 12).to(device)  

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).long()

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        yhat = torch.argmax(logits, dim = 1)
        accuracy_metric.update(yhat, labels) 

        if step % interval == 0:
            print(f'Train Loss {step} : {np.mean(losses)}')

    train_accuracy = accuracy_metric.compute()
    return np.mean(losses), train_accuracy 


def test(model, datasets, criterion, device):
    model.eval()
    model.dropout = nn.Dropout(0)
    losses = list()
    accuracy_metric = torchmetrics.Accuracy(task="multiclass", num_classes = 12).to(device)

    all_predictions = [] # 예측값 저장
    all_labels = []  # 실제값 저장장

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).long()

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())

        yhat = torch.argmax(logits, dim = 1)
        accuracy_metric.update(yhat, labels)

        all_predictions.extend(yhat.cpu().detach().numpy())  
        all_labels.extend(labels.cpu().detach().numpy())  

    accuracy = accuracy_metric.compute()

    print(f'Val Loss: {np.mean(losses)}, Val Accuracy: {accuracy:.4f}')
    print(f'Predictions: {all_predictions}') 
    print(f'Actuals: {all_labels}')  
    return np.mean(losses), accuracy 


# 학습 및 평가 루프
epochs = 50
interval = 500
best_accuracy = 0.0  
SAVE_FILE = 'best_model.pth'

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    
    train_loss, train_accuracy = train(classifier, train_loader, criterion, optimizer, device, interval)
    val_loss, current_accuracy = test(classifier, test_loader, criterion, device)

    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)
    val_losses.append(val_loss)
    val_accuracies.append(current_accuracy.item())

    scheduler.step()

    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')  

    if current_accuracy > best_accuracy:
        best_accuracy = current_accuracy
        torch.save({
            'epoch': epoch,
            'model_state_dict': classifier.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'best_accuracy': best_accuracy,  
        }, SAVE_FILE)
        print(f"best model accuracy : {best_accuracy:.4f} epoch : {epoch}")
    else:
        print(f"향상 없음: {best_accuracy:.4f}")

print(f'학습 완료 - best model accuracy : {best_accuracy:.4f}')






Epoch 1/50
Train Loss 0 : 1.0409696102142334
Train Loss 500 : 0.6925834674916106
Val Loss: 0.8591523754714739, Val Accuracy: 0.7787
Predictions: [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 2, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0,