In [96]:
! pip list | grep "torch"

torch                   1.8.0
torchsummary            1.5.1
torchtext               0.3.1
torchvision             0.9.1


In [97]:
import torch
# from torchtext.legacy import data
# from torchtext.legacy import datasets
from torchtext import data
from torchtext import datasets

TEXT = data.Field(batch_first=True,
                  fix_length=500,
                  tokenize=str.split,
                  pad_first=True,
                  pad_token='[PAD]',
                  unk_token='[UNK]')
LABEL = data.LabelField(dtype=torch.float)
train_data, test_data = datasets.IMDB.splits(text_field=TEXT, label_field=LABEL)

# batch_first == batch size를 data shape axis의 가장 앞으로 설정
# fix_length == sentence의 길이를 미리 제한하는 옵션
# tokenize == tokenize를 설정하는 옵션, 여기서는 파이썬의 string.split (그냥 띄어쓰기로 짜름)
# pad_first == fix_lenght 대비 짧은 문장의 경우 padding을 앞에서 줄 것인지에 대한 옵션
# pad_token == padding을 할 때 사용할 token
# unk_token == token dict에 없는 token이 나올 경우 token을 표현하는 특수 token
# dtype == 가져올 데이터에 대한 type 설정 옵션

AttributeError: module 'torchtext.data' has no attribute 'Field'

In [None]:
print(f"Train Data Length : {len(train_data.examples)}")
print(f"Test Data Length : {len(test_data.examples)}")

print(train_data.fields)

print('--- Data Sample ---')
print('Input : ')
print(' '.join(vars(train_data.examples[1])['text']),'\n')
print(f'Label : {vars(train_data.examples[1])["label"]}')
print(vars(train_data.examples[1])['text'],'\n')

In [None]:
import re

def PreProcessingText(input_sentence) :
    input_sentence = input_sentence.lower()
    input_sentence = re.sub('<[^>]*>', repl= ' ', string = input_sentence) # "<br />" 처리
    input_sentence = re.sub('[!"#$%&\()*+,-./:;<=>?@[\\]^_`{|}~]', repl= ' ', string = input_sentence) # 특수문자 처리 ("'" 제외)
    input_sentence = re.sub('\s+', repl= ' ', string = input_sentence) # 연속된 띄어쓰기 처리
    if input_sentence:
        return input_sentence

for example in train_data.examples :
    vars(example)['text'] = PreProcessingText(' '.join(vars(example)['text'])).split()

for example in test_data.examples :
    vars(example)['text'] = PreProcessingText(' '.join(vars(example)['text'])).split()

In [None]:
model_config = {'emb_type' : 'glove', 'emb_dim' : 300}

# pre_trained
TEXT.build_vocab(train_data, min_freq=2, max_size=None, vectors = 'glove.6B.300d')
# min_freq == vocab에 해당하는 token에 최소한으로 등자하는 횟수에 제한을 둔다.
# max_size == vocab size 에 제한을 둔다
# vecotrs == pre-trained vector를 가져와 vocab을 세팅한다.
LABEL.build_vocab(train_data)
model_config['vocab_size'] = len(TEXT.vocab)
print(TEXT.vocab.stoi)

In [None]:
print(f"Vocab size : {len(TEXT.vocab)}")
print('Vocab Examples :')
for idx, (k, v) in enumerate(TEXT.vocab.stoi.items()):
    if idx >= 10 :
        break
    print('\t', k, v)
print('---------------------')

print(f'Label Size : {len(LABEL.vocab)}')
print('Label Examples : ')
for idx, (k, v) in enumerate(LABEL.vocab.stoi.items()):
    print('\t', k, v)

In [None]:
# check embedding vectors
print(TEXT.vocab.vectors.shape)

In [None]:
import random
train_data, valid_data = train_data.split(random_state=random.seed(0),
                                          split_ratio=0.8)
model_config['batch_size'] = 30
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    datasets=(train_data, valid_data, test_data), batch_size=model_config['batch_size'], device=device)
# Bucket iterator로 손쉽게 Batch를 만들 수 있다.

In [None]:

sample_for_check = next(iter(train_iterator))
print(sample_for_check)
print(sample_for_check.text)
print(sample_for_check.label)



In [None]:
import torch.nn as nn

class SentenceClassification(nn.Module) :
    def __init__(self, **model_config):
        super(SentenceClassification, self).__init__()

        if model_config['emb_type'] == 'glove' or 'fasttext' :
            self.emb = nn.Embedding(model_config['vocab_size'],
                                    model_config['emb_dim'],
                                    _weight=TEXT.vocab.vectors)
            # 순서대로 num_embeddings, embedding_dim, _wieght
            # num_embeddings == vocab_size
            # embedding_dim == 원하는 embedding dimension (pre-trained vector 사용시 차원 일치시켜야  한다.)
            # _weight == pre-trained 된 vector를 initial value로 설정
                # 이렇게 하면 해당 임베딩 벡터도 같이 학습된다. freeze 시키고 싶으면 nn.Embedding.from_pretrained(TEXT.vocab.vectors) 사용
        else :
            self.emb = nn.Embedding(model_config['vocab_size'],
                                    model_config['emb_dim'])

        self.bidirectional = model_config['bidirectional']
        self.num_direction = 2 if model_config['bidirectional'] else 1
        self.model_type = model_config['model_type']

        self.RNN = nn.RNN(input_size=model_config['emb_dim'],
                          hidden_size=model_config['hidden_dim'],
                          dropout=model_config['dropout'],
                          bidirectional=model_config['bidirectional'],
                          batch_first=model_config['batch_first'])
        self.LSTM = nn.LSTM(input_size=model_config['emb_dim'],
                          hidden_size=model_config['hidden_dim'],
                          dropout=model_config['dropout'],
                          bidirectional=model_config['bidirectional'],
                          batch_first=model_config['batch_first'])
        self.GRU = nn.GRU(input_size=model_config['emb_dim'],
                          hidden_size=model_config['hidden_dim'],
                          dropout=model_config['dropout'],
                          bidirectional=model_config['bidirectional'],
                          batch_first=model_config['batch_first'])
        self.fc = nn.Linear(model_config['hidden_dim']*self.num_direction,
                            model_config['output_dim'])
        self.drop = nn.Dropout(model_config['dropout'])

    def forward(self,x):
        # x : (Batch_size, max_seq_length)
        # emb : (Batch_size, max_seq_length, emb_dim)
        # output : (Batch_size, max_seq_length, hidden_dim * num_direction)
        # hidden : (num_direction, batch_size, hidden_dim)
        # last_output : (batch_size, hidden_dim * num_direction)

        emb = self.emb(x)
        if self.model_type == 'RNN':
            output, hidden = self.RNN(emb)
        elif self.model_type == 'LSTM':
            output, (hidden, cell) = self.LSTM(emb)
        elif self.model_type == 'GRU':
            output, hidden = self.GRU(emb)
        else :
            raise NameError('Select model_type in [RNN, LSTM, GRU]')
        last_output = output[:,-1,:] # token의 위치를 설명하는 두 번째 차원에서 마지막 값을 가져와 사용

        return self.fc(self.drop(last_output))

In [None]:
model_config.update(dict(batch_first = True,
                         model_type = 'RNN',
                         bidirectional = True,
                         hidden_dim = 128,
                         output_dim = 1,
                         dropout = 0))
model = SentenceClassification(**model_config).to(device)
loss_fn = nn.BCEWithLogitsLoss().to(device)
# 내부에 sigmoid layer를 통과시키기 때문에 model 구성시 sigmoid를 따로 사용하지 않는다.


In [None]:
def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds==y).float()
    acc =correct.sum()/len(correct)
    return acc

In [None]:
predictions = model.forward(sample_for_check.text).squeeze()
loss = loss_fn(predictions, sample_for_check.label)
acc = binary_accuracy(predictions, sample_for_check.label)

print(predictions)
print(loss, acc)

In [None]:

def train(model, iterator, optimizer, loss_fn, idx_epoch, **model_params):

    epoch_loss = 0
    epoch_acc = 0

    model.train()
    batch_size = model_params['batch_size']

    for idx, batch in enumerate(iterator):

        # Initializing
        optimizer.zero_grad()

        # Forward
        predictions = model(batch.text).squeeze()
        loss = loss_fn(predictions, batch.label)
        acc = binary_accuracy(predictions, batch.label)

        sys.stdout.write(
                    "\r" + f"[Train] Epoch : {idx_epoch:^3}"\
                    f"[{(idx + 1) * batch_size} / {len(iterator) * batch_size} ({100. * (idx + 1) / len(iterator) :.4}%)]"\
                    f"  Loss: {loss.item():.4}"\
                    f"  Acc : {acc.item():.4}"\
                    )

        # Backward
        loss.backward()
        optimizer.step()

        # Update Epoch Performance
        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss/len(iterator) , epoch_acc/len(iterator)

In [None]:

def evaluate(model, iterator, loss_fn):

    epoch_loss = 0
    epoch_acc = 0

    # evaluation mode
    model.eval()
    with torch.no_grad():
        for batch in iterator:
            predictions = model(batch.text).squeeze(1)
            loss = loss_fn(predictions, batch.label)
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
model_config['model_type'] = 'RNN'
model = SentenceClassification(**model_config).to(device)
optimizer = torch.optim.Adam(model.parameters())
loss_fn = nn.BCEWithLogitsLoss().to(device)

In [None]:

N_EPOCH = 5

best_valid_loss = float('inf')
model_name = f"{'bi-' if model_config['bidirectional'] else ''}{model_config['model_type']}_{model_config['emb_type']}"

print('---------------------------------')
print(f'Model name : {model_name}')
print('---------------------------------')

for epoch in range(N_EPOCH):
    train_loss, train_acc = train(model, train_iterator, optimizer, loss_fn, epoch, **model_config)
    valid_loss, valid_acc = evaluate(model, valid_iterator, loss_fn)
    print('')
    if valid_loss < best_valid_loss: # early stopping 기능임
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), f'./{model_name}.pt')
        print(f'\t Saved at {epoch}-epoch')

    print(f'\t Epoch : {epoch} | Train Loss : {train_loss:.4} | Train Acc : {train_acc:.4}')
    print(f'\t Epoch : {epoch} | Valid Loss : {valid_loss:.4} | Valid Acc : {valid_acc:.4}')

In [None]:

# Test set
model.load_state_dict(torch.load(f'./{model_name}.pt'))
test_loss, test_acc = evaluate(model, test_iterator, loss_fn)
print(f'Test Loss : {test_loss:.4} | Test Acc : {test_acc:.4}')
