In [1]:
import torch.nn as nn  
import torch
from Korpora import korpus_nsmc


In [2]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
korpus_nsmc.fetch_nsmc('./', force_download=False)

[Korpora] Corpus `nsmc` is already installed at /media/data/KMS/KDT5_NL_Practice/DAY04/nsmc/ratings_train.txt
[Korpora] Corpus `nsmc` is already installed at /media/data/KMS/KDT5_NL_Practice/DAY04/nsmc/ratings_test.txt


In [4]:
import pandas as pd

traindf =  pd.read_csv('./nsmc/ratings_train.txt', sep='\t')
testdf = pd.read_csv('./nsmc/ratings_test.txt', sep='\t')

print("Training Data Size :", len(traindf))
print("Test Data Size :", len(testdf))

Training Data Size : 150000
Test Data Size : 50000


In [5]:
traindf.dropna(inplace=True), testdf.dropna(inplace=True)

(None, None)

In [6]:
traindf.columns

Index(['id', 'document', 'label'], dtype='object')

In [7]:
traindf.shape, testdf.shape

((149995, 3), (49997, 3))

- [2] 단어 사전 생성
<hr>

* 토큰화 진행 ==> 형태소 분석기 선택
* 단어 사전

In [8]:
from konlpy.tag import Mecab
from collections import Counter


def build_vocab(corpus, n_vocab, special_tokens):
    counter = Counter()
    for tokens in corpus:
        counter.update(tokens)
    vocab = special_tokens + [token for token, _ in counter.most_common(n_vocab)]

    return vocab

tokenizer = Mecab()
train_tokens = [tokenizer.morphs(sentence) for sentence in traindf['document']]
test_tokens = [tokenizer.morphs(sentence) for sentence in testdf['document']]

vocab = build_vocab(train_tokens, n_vocab=10000, special_tokens=['<pad>', '<unk>'])
token_to_id = {token: idx for idx, token in enumerate(vocab)}
id_to_token = {idx: token for idx, token in enumerate(vocab)}

print(len(vocab))
print(vocab[:10])

10002
['<pad>', '<unk>', '.', '이', '는', '영화', '다', '고', '하', '도']


### [3] 데이터 가공
<hr>

- 토큰 데이터 정수 인코딩
- 데이터 길이 표준화 => 다른 길이의 데이터를 길이 맞추기 -> 1개 문장 구성하는 단어 수 맞추기 

#### [3-1] 토큰 정수화

#### [3-2] 데이터 구성 단어 수 맞추기 즉, 패딩(padding)
- 단어 수 선정 필요
- 선전된 단어 수 맞게 데이터 조절 => 길면 잘라내기, 짧으면 채우기


In [9]:
import numpy as np

# 토큰화할 데이터
# 최대문장길이
# 패딩처리시 값
# 오른쪽 왼쭉 여부

def pad_sequences(sequences, max_length, pad_val, trucateEnd = True):
    import math
    result = list()

    for sequence in sequences:
        pad_length = max_length - len(sequence)
        sequence = sequence[:max_length]
        paded_sequence = sequence + [pad_val] * pad_length
        result.append(paded_sequence)
    return np.asarray(result)

unk_id = token_to_id['<unk>']

# 언어 단어에 없는 키워드의 경우에는 해당 값으로 변경될 수 있도로 dict.get(원단어, 원단어실패시기본<unk>)
train_id = [
        [token_to_id.get(token, unk_id) for token in tokens] for tokens in train_tokens
]

test_id = [
    [token_to_id.get(token, unk_id) for token in tokens] for tokens in test_tokens
]

# 학습용, 테스트용 데이터 패딩 처리
max_length = 32
pad_id = token_to_id['<pad>']
train_id = pad_sequences(train_id, max_length, pad_id)
test_id = pad_sequences(test_id, max_length, pad_id)

print(train_id[0])


[ 41  86 958   2   2  48 245  28  42 765   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0]


### [4] 데이터 학습 준비
<hr>

- 데이터로더 준비
- 학습용/테스트용 함수
- 모델 클래스
- 학습 관련 변수 => DEVICE, Optimizer, Model, Loss Function, EPOCHS, BATCH SIZE

#### [4-1] 데이터 로더 준비


In [10]:
import torch
from torch.utils.data import TensorDataset, DataLoader


train_id = torch.tensor(train_id)
test_id = torch.tensor(test_id)

train_labels = torch.tensor(traindf['label'].values, dtype=torch.float32)
test_labels = torch.tensor(testdf['label'].values, dtype=torch.float32)

train_dataset = TensorDataset(train_id, train_labels)
test_dataset = TensorDataset(test_id, test_labels)

train_loader = DataLoader(train_dataset, batch_size=12812, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=12812, shuffle=False)



#### [4-2] 모델 클래스 정의

- 입력층 : Embedding Layer
- 은닉층 : LSTM Layer
- 은닉층 : Dropout Layer
- 출력층 : Linear Layer

In [11]:
class SentenceClassifier(nn.Module):
    def __init__(
            self,
            n_vocab,
            hidden_dim,
            embedding_dim,
            n_layers,
            dropout = 0,
            bidirectional = True,
            model_type = 'lstm',
    ):
        super().__init__()
        
        self.embedding = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=embedding_dim,
            padding_idx=0,
        )

        if model_type == 'lstm':
            self.model = nn.LSTM(
                input_size=embedding_dim,
                hidden_size=hidden_dim,
                num_layers=n_layers,
                bidirectional=bidirectional,
                dropout=dropout,
                batch_first=True,
            )

        if bidirectional:
            self.classifier = nn.Linear(hidden_dim * 2, 1)
        else:
            self.classifier = nn.Linear(hidden_dim, 1)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        embedded = self.embedding(x)
        output, _ = self.model(embedded)
        last_output = output[:, -1, :]
        last_output = self.dropout(last_output)
        return self.classifier(last_output)

In [12]:
from torch import optim

n_vocab = len(token_to_id)
hidden_dim = 64
embedding_dim = 128
n_layers = 2


classifier = SentenceClassifier(
    n_vocab,
    hidden_dim,
    embedding_dim,
    n_layers,
).to(DEVICE)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(classifier.parameters(), lr=1e-3)


In [13]:
def train(model, datasets, criterion, optimizer, device, interval):
    model.train()
    losses = list()

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).unsqueeze(1)

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # if step % interval == 0:
        #     print(f'Train loss {step} : {np.mean(losses)}')

def test(model, datasets, criterion, device):
    model.eval()
    losses = []
    corrects = []

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).unsqueeze(1)

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())
        yhat = torch.sigmoid(logits) >.5
        corrects.extend(
            torch.eq(yhat, labels).tolist()
        )

    print(f'Test loss : {np.mean(losses)}, Test Accuracy : {np.mean(corrects)}')


epoches = 20
interval = 100

for epoch in range(1, epoches + 1):
    train(classifier, train_loader, criterion, optimizer, DEVICE, interval)
    test(classifier, test_loader, criterion, DEVICE)

Test loss : 0.6920441836118698, Test Accuracy : 0.5202912174730484
Test loss : 0.6793728321790695, Test Accuracy : 0.6080964857891473
Test loss : 0.595001757144928, Test Accuracy : 0.7051023061383683
Test loss : 0.535707414150238, Test Accuracy : 0.7433245994759685
Test loss : 0.48560822755098343, Test Accuracy : 0.7708462507750465
Test loss : 0.4475458338856697, Test Accuracy : 0.7942276536592195
Test loss : 0.41683295369148254, Test Accuracy : 0.8096285777146629
Test loss : 0.3950471132993698, Test Accuracy : 0.8207492449546973
Test loss : 0.3831527754664421, Test Accuracy : 0.8277496649798988
Test loss : 0.3730240985751152, Test Accuracy : 0.833109986599196
Test loss : 0.3689783066511154, Test Accuracy : 0.8373702422145328
Test loss : 0.36283884197473526, Test Accuracy : 0.837750265015901
Test loss : 0.3596683219075203, Test Accuracy : 0.8428905734344061
Test loss : 0.35472220182418823, Test Accuracy : 0.8449306958417505
Test loss : 0.35504648834466934, Test Accuracy : 0.84609076544