In [None]:
from modules.util import save_json, now
import matplotlib.pyplot as plt
from tqdm import tqdm
import pandas as pd
IMDB=pd.read_csv("./database/IMDB Dataset.csv")
IMDB

In [None]:
IMDB.info()

In [None]:
IMDB.describe()

EDA 진행 결과 IMDB 데이터 셋의 열은 review와 sentiment로 구성,
각 열에 해당되는 행의 개수는 총 50000개이며 고유값들은 각각 49582개, 2개임.
이 리뷰를 lstm모델을 활용하여 긍정 혹은 부정으로 예측하는 모델을 구축할것임.

In [None]:
# 데이터 정제
duplicates = IMDB[IMDB.duplicated()]  
print("Duplicated Rows:\n", duplicates)

In [None]:
IMDB = IMDB.drop_duplicates(subset='review') # 중복되는 행 제거
IMDB.describe()

In [None]:
IMDB.loc[:, 'sentiment']

In [None]:
IMDB.loc[:, 'sentiment'] = IMDB['sentiment'].map({'positive': 1, 'negative': 0})
IMDB.head()

In [None]:
import nltk
nltk.download('stopwords')

In [None]:
# NOTE: 텍스트 데이터 전처리
# 모델의 input을 효율적으로하고 정확한 성능을 내기 위해서 텍스트데이터를 일관된 형식으로 변환하는 과정이 필요함.
# 1. 텍스트 정규화 -> "특수문자", "대/소문자", "두 칸 이상의 공백" 제거 
# 2. 자연어 처리에서 큰 의미를 가지지 않는다고 알려져 있는 불용어(stopword) 제거 
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))

import re
def clean_text(text):
    text = text.lower() # 소문자화
    text = re.sub(r"<.*?>", "", text) # HTML태그 제거 
    text = re.sub(r"[^.a-z\s!?']", "", text) # 특수 문자 및 숫자 제거
    text = " ".join(word for word in text.split() if word not in stop_words) # 불용어 제거
    text = re.sub(r'([!?\'"])\1+', r'\1', text) # !?'이 2개 이상이면 한 개로 만들어줌.
    text = re.sub(r'\s+', ' ', text).strip() # 불필요한 공백 제거
    return text
IMDB['review_cleaned'] = IMDB['review'].apply(clean_text)
# pd.Series([len(review) for review in IMDB["review_cleaned"].to_list()]).plot.hist(bins=50)

In [None]:
# Tokenization
from nltk.tokenize import TreebankWordTokenizer
tokenizer = TreebankWordTokenizer()
IMDB.loc[:,'review_tokenized'] = IMDB.loc[:,'review_cleaned'].apply(tokenizer.tokenize)

In [None]:
# 단어집합 생성
import numpy as np
from nltk import FreqDist

all_tokens = np.hstack(IMDB["review_tokenized"])

# 단어 집합 생성 및 빈도 계산
vocab = FreqDist(all_tokens)
len(vocab)

In [None]:
FREQ_THRESHOLD = 3
vocab = {key: value for key, value in vocab.items() if value >= FREQ_THRESHOLD}

# 단어와 인덱스 할당
# 단어 인덱스를 2부터 시작하여 word2idx 생성 -> 레이블 데이터를 1과 0으로 설정 했기 때문
word2idx = {word: idx + 2 for idx, (word, _) in enumerate(vocab.items())}
word2idx["<pad>"] = 0  # 패딩을 위한 인덱스 0 예약
word2idx["<unk>"] = 1  # 알 수 없는 단어를 위한 인덱스 1 예약
VOCAB_SIZE = len(word2idx)

VOCAB_SIZE, word2idx

In [None]:
from modules.util import save_pkl
save_pkl("./word2index.pkl",word2idx)

In [None]:
IMDB['review_tokenized']

In [None]:
# 4. 맵핑(단어 집합을 데이터에 적용)
def word_to_num(word):
    try:
        return word2idx[word]  # 글자를 해당되는 정수로 변환
    except KeyError:  # 단어 집합에 없는 단어일 경우 unk로 대체된다.
        return word2idx["<unk>"]  # unk의 인덱스로 변환


IMDB["review_numbered"] = IMDB["review_tokenized"].apply(lambda _X: [word_to_num(word) for word in _X])
IMDB["token_length"] = IMDB["review_numbered"].apply(lambda _X: len(_X))

In [None]:
IMDB["token_length"].plot.hist(bins=30)
max(IMDB["token_length"])

In [None]:
MAX_TOKEN = 256
IMDB_256 = IMDB[IMDB["token_length"]<MAX_TOKEN]
IMDB_256["token_length"].plot.hist(bins=30)
len(IMDB_256["token_length"])/len(IMDB["token_length"])

In [None]:
import torch
torch.manual_seed(42)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
# Train Valid Test Split
from sklearn.model_selection import train_test_split

x_raw, y_raw = IMDB_256["review_numbered"], IMDB_256["sentiment"]

X_train, X_test, y_train, y_test = train_test_split(
    x_raw, y_raw, test_size=0.8, random_state=0, stratify=y_raw
)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train, y_train, test_size=0.2, random_state=0, stratify=y_train
)

X_train

In [None]:
# 5. 패딩 작업
from torch.nn.utils.rnn import pad_sequence


def seq_padding(sequence: pd.Series):
    # 정수 인코딩된 시퀀스를 PyTorch 텐서로 변환
    encoded_tensors = [torch.tensor(seq) for seq in sequence.to_list()]
    # 패딩 적용 (최대 길이에 맞춰 0으로 패딩)
    return pad_sequence(encoded_tensors, batch_first=True, padding_value=0, padding_side="left")


X_train, X_valid, X_test = seq_padding(X_train), seq_padding(X_valid), seq_padding(X_test)
y_train, y_valid, y_test = y_train.apply(int), y_valid.apply(int), y_test.apply(int)

In [None]:
# 6. 패딩이 적용된 시퀀스와 레이블 합치기
from torch.utils.data import TensorDataset, DataLoader

BATCH_SIZE = 256
def dataloader_gen(x, y, batch_size=BATCH_SIZE):
    x = torch.tensor(x, dtype=torch.int32)
    y = torch.tensor(y, dtype=torch.long)
    return DataLoader(TensorDataset(x, y), batch_size=BATCH_SIZE, shuffle=True)


dataloader = dataloader_gen(X_train, y_train.to_numpy())
valid_dataloader = dataloader_gen(X_valid, y_valid.to_numpy())
test_dataloader = dataloader_gen(X_test, y_test.to_numpy())


# 배치를 확인하며 첫 번째 배치 출력
for batch in dataloader:
    inputs, targets = batch
    print("Padded Sequences:\n", inputs)
    print("Labels:\n", targets)
    break  # 첫 번째 배치만 출력

---

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Param
LEARNING_RATE = 0.00001
N_EPOCHS = 100


EMBED_SIZE = 256
HIDDEN_SIZE = 1024
OUTPUT_DIM = 2

In [None]:
class SentimentAnalysisRNN(nn.Module):
    def __init__(
        self,
        vocab_dim=VOCAB_SIZE,
        embedding_dim=EMBED_SIZE,
        hidden_dim=HIDDEN_SIZE,
        output_dim=OUTPUT_DIM,
        device=device,
    ) -> None:
        super().__init__()

        self.embed = nn.Embedding(vocab_dim, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc1 = nn.Linear(hidden_dim, 128)
        self.fc2 = nn.Linear(128, output_dim)
        self.softmax = nn.Softmax(dim=1)

        self.hidden_dim = hidden_dim
        self.device = device

    def forward(self, x):
        embed = self.embed(x)
        y_t_list, h_t_list = self.rnn(embed)
        h_t = h_t_list.squeeze(0)

        feature = self.fc1(h_t)
        feature = F.relu(feature)
        output = self.fc2(feature)
        return self.softmax(output)


# Training setup
model = SentimentAnalysisRNN(vocab_dim=VOCAB_SIZE, device=device).to(device)
loss_fn = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [None]:
def calculate_accuracy(logits, labels):
    # _, predicted = torch.max(logits, 1)
    predicted = torch.argmax(logits, dim=1)
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    accuracy = correct / total
    return accuracy

def evaluate(model, valid_dataloader, criterion, device):
    val_loss = 0
    val_correct = 0
    val_total = 0

    model.eval()
    with torch.no_grad():
        # 데이터로더로부터 배치 크기만큼의 데이터를 연속으로 로드
        for batch_X, batch_y in valid_dataloader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            # 모델의 예측값
            logits = model(batch_X)

            # 손실을 계산
            loss = criterion(logits, batch_y)

            # 정확도와 손실을 계산함
            val_loss += loss.item()
            val_correct += calculate_accuracy(logits, batch_y) * batch_y.size(0)
            val_total += batch_y.size(0)

    val_accuracy = val_correct / val_total
    val_loss /= len(valid_dataloader)

    return val_loss, val_accuracy

In [None]:
trainin_at = now()
best_val_loss = float('inf')  # 검증 손실의 최저 값을 추적하기 위한 변수로, 초기값은 매우 큰 값으로 설정합니다.
for epoch in range(N_EPOCHS):
    train_loss = 0
    train_correct = 0
    train_total = 0
    model.train()
    
    for batch_idx, samples in enumerate(dataloader):
        x_batch, y_batch = samples
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)

        predicted = model(x_batch)
        loss = loss_fn(predicted, y_batch)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        train_correct += calculate_accuracy(predicted, y_batch) * y_batch.size(0)
        train_total += y_batch.size(0)
        
    train_accuracy = train_correct / train_total
    train_loss /= len(dataloader)
    
      # Validation
    val_loss, val_accuracy = evaluate(model, valid_dataloader, loss_fn, device)

    print(f'Epoch {epoch+1}/{N_EPOCHS}:')
    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

    # 검증 손실이 최소일 때 체크포인트 저장
    if val_loss < best_val_loss:
        print(f'Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}. 체크포인트를 저장합니다.')
        best_val_loss = val_loss
        torch.save(model.state_dict(), f'best_model_checkpoint-{trainin_at}.pth')

In [None]:

save_json(
    f"./training_param_{trainin_at}.json",
    {
        "voca_frequency_thresold": FREQ_THRESHOLD,
        "token_truncation": MAX_TOKEN,
        "lr": LEARNING_RATE,
        "voca_size":VOCAB_SIZE,
        "batch_size": BATCH_SIZE,
        "embed_dim": EMBED_SIZE,
        "rnn_hidden_dim": HIDDEN_SIZE,
        "output_dim": OUTPUT_DIM,
        "loss":best_val_loss
    },
)

In [None]:
evaluate(model,test_dataloader,loss_fn,device)