In [1]:
# 모듈 불러오기
import dill
import time
import random
import numpy as np
from sklearn.metrics import roc_curve, auc

import nltk

nltk.download("punkt")
from nltk.tokenize import word_tokenize

import torch
import torch.nn as nn

from torchtext.legacy.data import Field
from torchtext.legacy.data import TabularDataset
from torchtext.legacy.data import BucketIterator
from torchtext.legacy.data import Iterator

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\amole\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [2]:
# 환경 세팅
RANDOM_SEED = 2021
torch.manual_seed(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

DATA_PATH = "data/processed/"

In [3]:
#데이터 불러오기

# torchtext.Field를 이용해 필드를 정의
TEXT = Field(
    sequential=True,
    use_vocab=True,
    tokenize=word_tokenize,
    lower=True,
    batch_first=True,
)
LABEL = Field(
    sequential=False,
    use_vocab=False,
    batch_first=True,
)

In [4]:
# 전처리를 끝낸 수능 데이터(tsv) 불러오기
sat_train_data, sat_valid_data, sat_test_data = TabularDataset.splits(
    path=DATA_PATH,
    train="sat_train.tsv",
    validation="sat_valid.tsv",
    test="sat_test.tsv",
    format="tsv",
    fields=[("text", TEXT), ("label", LABEL)],
    skip_header=1,
)

sat_train_iterator, sat_valid_iterator, sat_test_iterator = BucketIterator.splits(
    (sat_train_data, sat_valid_data, sat_test_data),
    batch_size=8,
    device=None,
    sort=False,
)

TEXT.build_vocab(sat_train_data, min_freq=2)

In [25]:
# 모델 클래스 정의
class LSTMClassifier(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, hidden_size, num_layers, pad_idx):
        super().__init__()
        self.embed_layer = nn.Embedding(
            # 생성할 Embedding Layer의 크기
            num_embeddings = num_embeddings,
            embedding_dim = embedding_dim,
            #배치별로 문장의 크기를 맞추기 위해 짧은 문장에 Padding을 붙여서 길이를 맞춘다.
            padding_idx=pad_idx
        )
        self.lstm_layer = nn.LSTM(
            input_size = embedding_dim,
            hidden_size = hidden_size,
            num_layers = num_layers,
            bidirectional = True,
            dropout = 0.5
        )
        self.last_layer = nn.Sequential(
            nn.Linear(hidden_size * 2, hidden_size),
            nn.Dropout(0.5),
            nn.LeakyReLU(),                 
            nn.Linear(hidden_size, 1), # (1)
            nn.Sigmoid(),
        )
        
        """
        (1) output의 크기를 1로 두어 각각의 문장이 문법적으로 맞을 점수를 계산
        만일 output의 크기가 2라면 각각의 문장이 문법적으로 맞는지 틀렸는지에 대한
        점수 2개를 구할 수 있다.
        우리는 각각의 문장이 맞았는지 점수를 확인하여 가장 점수가 낮은 문장을 정답으로 고르기 위해
        output의 값을 1로 둔다.
         
        (2) 우리가 구한 점수를 확률로 변환하기 위해 sigmoid를 마지막 층으로 둔다. 
        """  
            
    #모델의 파이프라인 정의    
    def forward(self, x):
        # 숫자로 이루어진 토큰을 input으로 받아 이 값들을 Embedding값으로 변환
        embed_x = self.embed_layer(x)
        ''' 
        LSTM은 output, (Hidden State, Celss State)을 반환한다.
        이 중 state 값들은 사용하지 않으므로 반환하지 않는다. 
        '''
        output, (_, _) = self.lstm_layer(embed_x)
        '''
        LSTM의 output은 batch size, 문장 길이, Output size라는
        size를 가지고 있다. 우리는 가장 마지막 단어의 결과값을 사용한다.
        '''
        last_output = output[:, -1, :]
        ''' 
        문장의 마지막 단어의 output을 Fully Connected Layer에 통과시켜 확률값을 계산한다.
        '''
        last_output = self.last_layer(last_output)
        return last_output

# 데이터셋 불러오기
* 이번 프로젝트를 진행하면서 데이터를 처리하는 부분에서는 모두 torchtext를 사용한다.
* torchtext를 사용하는 순서를 개략적으로 표현하면 다음과 같다
  1. TEXT = Field(...)
  2. LABEL = Field(...)
  3. dataset = TabularDataset(...) 
  4. TEXT.build_vocab(dataset)
  5. data_loader = BucketIterator(dataset, ...)

In [10]:
# train, evaluate, test 정의
def train(model: nn.Module, iterator: Iterator, optimizer: torch.optim.Optimizer, criterion: nn.Module, device: str):
    model.train()
    epoch_loss = 0

    for _, batch in enumerate(iterator):
        optimizer.zero_grad()

        text = batch.text
        if text.shape[0] > 1:
            label = batch.label.type(torch.FloatTensor)
            text = text.to(device)
            label = label.to(device)
            output = model(text).flatten()
            loss = criterion(output, label)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)


def evaluate(model: nn.Module, iterator: Iterator, criterion: nn.Module, device: str):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for _, batch in enumerate(iterator):
            text = batch.text
            label = batch.label.type(torch.FloatTensor)
            text = text.to(device)
            label = label.to(device)
            output = model(text).flatten()
            loss = criterion(output, label)
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)


def test(model: nn.Module, iterator: Iterator, device: str):
    model.eval()
    with torch.no_grad():
        y_real = []
        y_pred = []
        for batch in iterator:
            text = batch.text
            label = batch.label.type(torch.FloatTensor)
            text = text.to(device)
            output = model(text).flatten().cpu()
            y_real += [label]
            y_pred += [output]
        y_real = torch.cat(y_real)
        y_pred = torch.cat(y_pred)

    fpr, tpr, _ = roc_curve(y_real, y_pred)
    auroc = auc(fpr, tpr)

    return auroc


def epoch_time(start_time: int, end_time: int):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [26]:
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
N_EPOCHS = 20

lstm_classifier = LSTMClassifier(
    num_embeddings=len(TEXT.vocab),
    embedding_dim=100,
    hidden_size=200,
    num_layers=4,
    pad_idx=PAD_IDX,
)
if torch.cuda.is_available():
    device = "cuda:0"
else:
    device = "cpu"
_ = lstm_classifier.to(device)

optimizer = torch.optim.Adam(lstm_classifier.parameters())
bce_loss_fn = nn.BCELoss()

for epoch in range(N_EPOCHS):

    start_time = time.time()

    train_loss = train(lstm_classifier, sat_train_iterator, optimizer, bce_loss_fn, device)
    valid_loss = evaluate(lstm_classifier, sat_valid_iterator, bce_loss_fn, device)

    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f"Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s")
    print(f"\tTrain Loss: {train_loss:.5f}")
    print(f"\t Val. Loss: {valid_loss:.5f}")

Epoch: 01 | Time: 0m 1s
	Train Loss: 0.53307
	 Val. Loss: 0.54121
Epoch: 02 | Time: 0m 1s
	Train Loss: 0.46817
	 Val. Loss: 0.51483
Epoch: 03 | Time: 0m 1s
	Train Loss: 0.44441
	 Val. Loss: 0.52126
Epoch: 04 | Time: 0m 1s
	Train Loss: 0.44559
	 Val. Loss: 0.51875
Epoch: 05 | Time: 0m 1s
	Train Loss: 0.43964
	 Val. Loss: 0.51787
Epoch: 06 | Time: 0m 1s
	Train Loss: 0.44878
	 Val. Loss: 0.51335
Epoch: 07 | Time: 0m 1s
	Train Loss: 0.42592
	 Val. Loss: 0.51371
Epoch: 08 | Time: 0m 1s
	Train Loss: 0.42068
	 Val. Loss: 0.51575
Epoch: 09 | Time: 0m 1s
	Train Loss: 0.42974
	 Val. Loss: 0.52408
Epoch: 10 | Time: 0m 1s
	Train Loss: 0.42826
	 Val. Loss: 0.51546
Epoch: 11 | Time: 0m 1s
	Train Loss: 0.41950
	 Val. Loss: 0.51844
Epoch: 12 | Time: 0m 1s
	Train Loss: 0.41668
	 Val. Loss: 0.52236
Epoch: 13 | Time: 0m 1s
	Train Loss: 0.45130
	 Val. Loss: 0.51730
Epoch: 14 | Time: 0m 1s
	Train Loss: 0.41939
	 Val. Loss: 0.51189
Epoch: 15 | Time: 0m 1s
	Train Loss: 0.43199
	 Val. Loss: 0.51776
Epoch: 16 

In [27]:
_ = lstm_classifier.cpu()
test_auroc = test(lstm_classifier, sat_test_iterator, "cpu")

print(f"SAT Dataset Test AUROC: {test_auroc:.5f}")

SAT Dataset Test AUROC: 0.73077


In [28]:
# 다른 곳에 쓰기위해 dill 적용
with open("baseline_model.dill", "wb") as f:
    model = {
        "TEXT": TEXT,
        "LABEL": LABEL,
        "classifier": lstm_classifier
    }
    dill.dump(model, f)