<a href="https://colab.research.google.com/github/andrewdk1123/KoSentiment/blob/main/Simple_RNN_for_Sentiment_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

이 Notebook에서는 PyTorch를 이용하여 주어진 문장의 감정 (label 0: neg, 1: pos)을 분류하는 간단한 RNN 모델을 만들어 보겠습니다.

RNN 모델은 입력 시퀀스에 대해 순차적으로 Hidden State를 계산하는 모델입니다. 즉, 현재 Timestep의 Hidden State를 계산할 때, 이전 Timestep에서 계산된 Hidden State를 함께 계산함으로써, 입력 시퀀스의 순서를 고려하여 학습할 수 있습니다.

모델은 다음과 같이 구성됩니다.

 * Embedding Layer: 입력 문장을 단어 임베딩으로 변환합니다.
 * RNN Layer: 입력 시퀀스에 대해 순차적으로 Hidden State를 계산합니다.
 * Linear Layer: 마지막 Hidden State를 입력으로 받아 감정을 분류합니다.

# Data Preparation

In [1]:
# Load packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io

## Upload Train and Test Set

In [2]:
from google.colab import files

uploaded = files.upload()

Saving processed_training.csv to processed_training.csv


In [3]:
print(uploaded.keys())

dict_keys(['processed_training.csv'])


In [4]:
train_data = pd.read_csv(io.BytesIO(uploaded['processed_training.csv']), sep = ',')

cols_to_keep = ['label', 'sentence', 'tokenized_sentence', 'cleaned_tokens']
train_data = train_data.loc[:, cols_to_keep]
train_data.head()

Unnamed: 0,label,sentence,tokenized_sentence,cleaned_tokens
0,0,이제 슬슬 자리를 잡았다 싶었는데 아내가 세상을 떠났어.,"['이제', '슬슬', '자리를', '잡았다', '싶었는데', '아내가', '세상을...","['이제', '슬슬', '자리를', '잡았다', '싶었는데', '아내가', '세상을..."
1,0,내가 돈을 펑펑 쓰고 다닌다면서 남들이 뭐라고 하더라.,"['내가', '돈을', '펑', '##펑', '쓰고', '다닌', '##다면', '...","['내가', '돈을', '펑', '쓰고', '다닌', '남들이', '뭐라고', '하']"
2,0,아무 일도 아닌 것에 스트레스를 받는 나 자신이 너무 혐오스러워.,"['아무', '일도', '아닌', '것에', '스트레스를', '받는', '나', '...","['아무', '일도', '아닌', '것에', '스트레스를', '받는', '나', '..."
3,0,취업하기가 너무 힘들어.,"['취업', '##하기가', '너무', '힘들어', '.']","['취업', '너무', '힘들어']"
4,0,정년퇴직을 하고 나니 노후준비를 안 한 것이 후회가 돼.,"['정', '##년', '##퇴', '##직을', '하고', '나니', '노후', ...","['정', '하고', '나니', '노후', '안', '한', '것이', '후회', ..."


In [5]:
train_data.shape

(24504, 4)

In [6]:
uploaded = files.upload()

Saving processed_test.csv to processed_test.csv


In [7]:
print(uploaded.keys())

dict_keys(['processed_test.csv'])


In [8]:
test_data = pd.read_csv(io.BytesIO(uploaded['processed_test.csv']), sep = ',')

test_data = test_data.loc[:, cols_to_keep]
test_data.head()

Unnamed: 0,label,sentence,tokenized_sentence,cleaned_tokens
0,0,이번 프로젝트에서 발표를 하는데 내가 실수하는 바람에 우리 팀이 감점을 받았어. 너...,"['이번', '프로젝트', '##에서', '발표를', '하는데', '내가', '실수...","['이번', '프로젝트', '발표를', '하는데', '내가', '실수', '바람에'..."
1,0,회사에서 중요한 프로젝트를 혼자 하게 됐는데 솔직히 두렵고 무서워.,"['회사에서', '중요한', '프로젝트를', '혼자', '하게', '됐는데', '솔...","['회사에서', '중요한', '프로젝트를', '혼자', '하게', '됐는데', '솔..."
2,0,상사가 너무 무섭게 생겨서 친해지는 게 너무 두려워.,"['상', '##사가', '너무', '무섭게', '생겨서', '친', '##해지는'...","['상', '너무', '무섭게', '생겨서', '친', '게', '너무', '두려워']"
3,0,이번에 힘들게 들어간 첫 직장이거든. 첫 직장이라서 그런지 너무 긴장된다.,"['이번에', '힘들게', '들어간', '첫', '직장', '##이거', '##든'...","['이번에', '힘들게', '들어간', '첫', '직장', '첫', '직장', '그..."
4,0,직장에서 동료들이랑 관계가 안 좋아질까 봐 걱정돼.,"['직장', '##에서', '동료', '##들이랑', '관계가', '안', '좋아질...","['직장', '동료', '관계가', '안', '좋아질', '봐', '걱정']"


In [9]:
test_data.shape

(6640, 4)

## Data Preparation

In [10]:
import ast
import torch
from torch.nn.utils.rnn import pad_sequence
from collections import Counter

CSV는 `list`를 Cell Value로 가질 수 없습니다. 앞서 불러온 `train_data`와 `test_data`의 `cleaned_tokens` Column은 `str`으로 저장되어 있습니다. 따라서 `ast.literal_eval`을 통해 해당 `str`값들을 실제 `list`로 변경하기로 하겠습니다.

In [11]:
# Convert the string representation of the list to an actual list
train_data['cleaned_tokens'] = train_data['cleaned_tokens'].apply(ast.literal_eval)
test_data['cleaned_tokens'] = test_data['cleaned_tokens'].apply(ast.literal_eval)

이어서 `cleaned_tokens`의 각 `list`를 평탄화 하고, `Counter`를 사용하여 Vocabulary를 구축하였습니다. 또한 Word-to-Index 및 Index-to-Word Mapping을 통해 각 토큰을 인덱싱하였습니다. 이 딕셔너리는 각 토큰을 인코딩하고 디코딩하는 데 사용됩니다.

이후, 특수 토큰 <PAD>와 <UNK>를 이용하여 각 리스트의 시퀀스를 모두 동일한 길이 (50)으로 패딩하고, 외부 어휘 단어를 추가하였습니다.

In [14]:
# Flatten the list of tokens
all_tokens = train_data['cleaned_tokens'].explode()

# Build a vocabulary using Counter
vocab = Counter(all_tokens)

# Create word-to-index and index-to-word dictionaries
word_to_index = {word: index + 1 for index, (word, _) in enumerate(vocab.most_common())}
index_to_word = {index: word for word, index in word_to_index.items()}

# Add a special token for padding with index 0
word_to_index['<PAD>'] = 0
index_to_word[0] = '<PAD>'

# Add a special token for out-of-vocabulary words with index len(vocab) + 1
word_to_index['<UNK>'] = len(vocab) + 1
index_to_word[len(vocab) + 1] = '<UNK>'

# Encode tokens using the word-to-index dictionary, replace out-of-vocabulary words with '<UNK>'
train_data['encoded_tokens'] = train_data['cleaned_tokens'].apply(lambda tokens: [word_to_index.get(token, word_to_index['<UNK>']) for token in tokens])

# Pad sequences to a specific length (adjust maxlen as needed)
maxlen = train_data['encoded_tokens'].apply(len).max()
padded_tokens = pad_sequence([torch.tensor(tokens) for tokens in train_data['encoded_tokens']], batch_first=True, padding_value=word_to_index['<PAD>']).tolist()
train_data['padded_tokens'] = [list(tokens) for tokens in padded_tokens]

In [16]:
train_data.iloc[:,-2:].head()

Unnamed: 0,encoded_tokens,padded_tokens
0,"[17, 2627, 1306, 1695, 1033, 108, 842, 1456]","[17, 2627, 1306, 1695, 1033, 108, 842, 1456, 0..."
1,"[1, 92, 3587, 1396, 1740, 3218, 444, 249]","[1, 92, 3587, 1396, 1740, 3218, 444, 249, 0, 0..."
2,"[316, 690, 565, 929, 1019, 622, 8, 227, 3, 1962]","[316, 690, 565, 929, 1019, 622, 8, 227, 3, 196..."
3,"[79, 3, 82]","[79, 3, 82, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0..."
4,"[433, 28, 735, 145, 12, 25, 121, 236, 102]","[433, 28, 735, 145, 12, 25, 121, 236, 102, 0, ..."


In [17]:
print("Length of the Vocabulary is: ", len(vocab))

Length of the Vocabulary is:  11372


In [18]:
train_data['padded_tokens'].apply(len).unique()

array([38])

In [19]:
# Example: Reconstruct tokens for the first row
example_row_index = 0
reconstructed_tokens = [index_to_word[index] for index in train_data.loc[example_row_index, 'padded_tokens'] if index != 0]

# Display the reconstructed tokens
print(reconstructed_tokens)

['이제', '슬슬', '자리를', '잡았다', '싶었는데', '아내가', '세상을', '떠났']


# Build and Train RNN Sentiment Classifier

In [20]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

아래의 `BucketIterator` 클래스는 자연어 처리 작업에서 시퀀스의 길이가 다른 데이터를 효율적으로 배치하기 위한 클래스입니다. `BucketIterator`는 다음과 같은 Attribute를 갖습니다:

 * `data`: 처리할 데이터 세트
 * `batch_size`: 훈련에 사용할 배치 크기
 * `shuffle`: 데이터를 버킷으로 생성하기 전에 데이터를 섞을지 여부
 * `seed`: 셔플링에 사용할 난수 seed
 * `buckets`: 시퀀스 길이에 따라 시퀀스의 목록을 매핑하는 사전

클래스에 포함된 Method는 아래와 같습니다:

 * `create_buckets()`: 시퀀스 길이에 따라 시퀀스의 목록을 매핑하는 딕셔너리를 생성하여 유사한 길이의 시퀀스를 효율적으로 배치할 수 있도록 합니다.
 * `shuffle_buckets()`: 각 버킷 내의 시퀀스를 섞고, 과적합을 방지합니다.
 * `__iter__()`: 메서드는 BucketIterator 클래스의 `iterator`를 정의합니다. 이를 통해 각 버킷에 대한 시퀀스 Batch를 생성을 반복합니다. 생성된 Batch는 텐서로 변환되고 생성됩니다.

In [21]:
class BucketIterator:
    def __init__(self, data, batch_size, shuffle=True, seed=1123):
        self.data = data
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.seed = seed
        self.buckets = self.create_buckets()

    def create_buckets(self):
        buckets = {}
        for X, y in self.data:
            length = len(X)
            if length not in buckets:
                buckets[length] = []
            buckets[length].append((X, y))
        return buckets

    def shuffle_buckets(self):
        random.seed(self.seed)
        for key in self.buckets:
            random.shuffle(self.buckets[key])

    def __iter__(self):
        if self.shuffle:
            self.shuffle_buckets()

        for key in self.buckets:
            bucket = self.buckets[key]
            if not bucket:  # Check if the bucket is empty
                continue
            for i in range(0, len(bucket), self.batch_size):
                batch = bucket[i:i + self.batch_size]
                X_batch, y_batch = zip(*batch)

                X_batch = [torch.LongTensor(seq) for seq in X_batch]
                y_batch = torch.Tensor(y_batch).view(-1, 1)  # Ensure y_batch has shape (batch_size, 1)

                yield torch.stack(X_batch, dim=0), y_batch

다음으로 모델 학습을 위한 Train 및 Validation 데이터를 준비합니다. 분할된 데이터셋들은 `torch.LongTensor(seq)`를 이용하여 `int`값으로 구성된 `list`를 PyTorch 텐서의 목록으로 변환합니다. 이 때, `torch.stack()`을 통해 각 줄의 PyTorch 텐서를 단일 텐서로 스택합니다.

In [22]:
train_X, val_X, train_y, val_y = train_test_split(train_data['padded_tokens'], train_data['label'], test_size=0.2, random_state=1123)

# Convert data and labels to numpy arrays and then PyTorch tensors
train_X = [torch.LongTensor(seq) for seq in train_X.values.tolist()]
train_y = torch.tensor(train_y.values.tolist())
val_X = [torch.LongTensor(seq) for seq in val_X.values.tolist()]
val_y = torch.tensor(val_y.values.tolist())

train_dataset = TensorDataset(torch.stack(train_X), train_y)
val_dataset = TensorDataset(torch.stack(val_X), val_y)

아래의 `SentimentRNN` 클래스는 감정 분석을 위한 간단한 순환 신경망 모델입니다. 모델은 다음과 같이 구성됩니다.

 1. 임베딩 레이어: 각 단어를 embedding_dim 차원의 벡터로 변환합니다.
 2. RNN 레이어: 입력 시퀀스를 처리하고 최종 숨겨진 상태를 생성합니다.
 3. FC 레이어: 최종 숨겨진 상태를 단일 출력 값으로 변환합니다.

`SentimentRNN`의 Model 객체는 입력 시퀀스를 임베딩 레이어에 통과하여 각 단어에 대한 임베딩을 생성하고, 임베딩된 시퀀스를 RNN 레이어에 통과하여 Hidden State를 생성합니다. 마지막 Hidden State를 FC레이어에 통과하여 0과 1 사이의 확률값을 생성합니다. 이 확률값은 입력 시퀀스가 긍정적인 감정을 나타내는지 부정적인 감정을 나타내는지 나타냅니다.

In [23]:
class SentimentRNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        super(SentimentRNN, self).__init__()

        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text):
        embedded = self.embedding(text)
        output, hidden = self.rnn(embedded)

        # output shape: (batch_size, sequence_length, hidden_dim)
        # hidden shape: (num_layers * num_directions, batch_size, hidden_dim)
        # We are using the hidden state from the last time step as the representation of the sequence.

        assert torch.equal(output[:, -1, :], hidden.squeeze(0))

        # Take the last time step's hidden state
        last_hidden = hidden[-1, :, :]  # (batch_size, hidden_dim)
        output = self.fc(last_hidden)
        output = self.sigmoid(output).squeeze(1)

        return output

하이퍼파라메터 세팅을 통해 생성한 Model 객체는 아래와 같습니다.

In [24]:
INPUT_DIM = len(vocab)
EMBEDDING_DIM = 50
HIDDEN_DIM = 64
OUTPUT_DIM = 1

model = SentimentRNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM)

In [25]:
print(model)

SentimentRNN(
  (embedding): Embedding(11372, 50)
  (rnn): RNN(50, 64, batch_first=True)
  (fc): Linear(in_features=64, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)


In [26]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 576,089 trainable parameters


다음으로 모델 학습을 수행하겠습니다.

In [27]:
# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [36]:
def binary_accuracy(preds, y, threshold=0.5):
    # Apply threshold to the rounded predictions
    rounded_preds = (torch.sigmoid(preds) >= threshold).float()

    # Compare the rounded predictions to the ground truth
    correct = (rounded_preds == y).float()

    # Calculate accuracy
    acc = correct.sum() / len(correct)
    return acc

In [28]:
# Define a BucketIterator for training and validation
batch_size = 64
train_iterator = BucketIterator(train_dataset, batch_size=batch_size, shuffle=True)
val_iterator = BucketIterator(val_dataset, batch_size=batch_size, shuffle=False)

In [37]:
epochs = 10
for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    train_acc = 0.0

    for batch in train_iterator:
        text, labels = batch
        optimizer.zero_grad()

        # Ensure indices are within the vocabulary size
        text = text.clamp(max=INPUT_DIM - 1)

        predictions = model(text).squeeze()  # Remove the dimension

        labels = labels.squeeze(1)

        loss = criterion(predictions, labels.float())
        acc = binary_accuracy(torch.round(torch.sigmoid(predictions)), labels, 0.8)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        train_acc += acc.item()

    # Calculate average training loss and accuracy per epoch
    num_train_batches = sum(1 for _ in train_iterator)
    avg_train_loss = train_loss / num_train_batches
    avg_train_acc = train_acc / num_train_batches

    # Validation loop
    model.eval()
    val_loss = 0.0
    val_acc = 0.0

    with torch.no_grad():
        for batch in val_iterator:
            text, labels = batch

            # Ensure indices are within the vocabulary size
            text = text.clamp(max=INPUT_DIM - 1)

            predictions = model(text).squeeze()  # Remove the dimension

            labels = labels.squeeze(1)

            loss = criterion(predictions, labels.float())
            acc = binary_accuracy(torch.round(torch.sigmoid(predictions)), labels, 0.8)

            val_loss += loss.item()
            val_acc += acc.item()

    # Calculate average validation loss and accuracy per epoch
    num_val_batches = sum(1 for _ in val_iterator)
    avg_val_loss = val_loss / num_val_batches
    avg_val_acc = val_acc / num_val_batches

    # Print results for each epoch
    print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_acc:.4f}, Val Loss: {avg_val_loss:.4f}, Val Acc: {avg_val_acc:.4f}')

Epoch 1/10, Train Loss: 0.6931, Train Acc: 0.5000, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 2/10, Train Loss: 0.6931, Train Acc: 0.4996, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 3/10, Train Loss: 0.6931, Train Acc: 0.4997, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 4/10, Train Loss: 0.6931, Train Acc: 0.4998, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 5/10, Train Loss: 0.6933, Train Acc: 0.4998, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 6/10, Train Loss: 0.6931, Train Acc: 0.4997, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 7/10, Train Loss: 0.6931, Train Acc: 0.4997, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 8/10, Train Loss: 0.6931, Train Acc: 0.4998, Val Loss: 0.6931, Val Acc: 0.5005
Epoch 9/10, Train Loss: 0.6929, Train Acc: 0.4991, Val Loss: 0.6949, Val Acc: 0.5005
Epoch 10/10, Train Loss: 0.6933, Train Acc: 0.4995, Val Loss: 0.6931, Val Acc: 0.5005


# Model Evaluation

In [38]:
test_data['encoded_tokens'] = test_data['cleaned_tokens'].apply(lambda tokens: [word_to_index.get(token, word_to_index['<UNK>']) for token in tokens])

# Pad sequences to a specific length (adjust maxlen as needed)
maxlen = 50
padded_tokens = pad_sequence([torch.tensor(tokens) for tokens in test_data['encoded_tokens']], batch_first=True, padding_value=word_to_index['<PAD>']).tolist()
test_data['padded_tokens'] = [list(tokens) for tokens in padded_tokens]

In [39]:
test_iterator = BucketIterator(train_dataset, batch_size=batch_size, shuffle=True)
test_X = test_data['padded_tokens']
test_y = test_data['label']

# Convert data and labels to numpy arrays and then PyTorch tensors
test_X = [torch.LongTensor(seq) for seq in test_X.values.tolist()]
test_y = torch.tensor(test_y.values.tolist())

test_dataset = TensorDataset(torch.stack(test_X), test_y)

In [40]:
from sklearn.metrics import precision_score, recall_score, f1_score

model.eval()
test_loss = 0.0
test_acc = 0.0
all_predictions = []
all_labels = []

with torch.no_grad():
    for batch in test_iterator:
        text, labels = batch

        # Preprocess the input (similar to what you did for the validation set)
        text = text.clamp(max=INPUT_DIM - 1)
        text[text >= len(vocab) + 1] = word_to_index['<UNK>']

        predictions = model(text).squeeze()
        labels = labels.squeeze(1)

        loss = criterion(predictions, labels.float())
        acc = binary_accuracy(predictions, labels)

        # Accumulate predictions and labels for calculating metrics
        all_predictions.extend(torch.round(torch.sigmoid(predictions)).cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

        test_loss += loss.item()
        test_acc += acc.item()

# Calculate average test loss and accuracy
num_test_batches = sum(1 for _ in test_iterator)
avg_test_loss = test_loss / num_test_batches
avg_test_acc = test_acc / num_test_batches

# Calculate precision, recall, and F1 score
precision = precision_score(all_labels, all_predictions)
recall = recall_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions)

print(f'Test Loss: {avg_test_loss:.4f}, Test Acc: {avg_test_acc:.4f}')
print(f'Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}')


Test Loss: 0.6931, Test Acc: 0.5000
Precision: 0.5003, Recall: 1.0000, F1 Score: 0.6670
