## **4-1. RNN 셀 구현**

In [None]:
# !pip install torch==1.10.0

In [None]:
# !pip install torchtext==0.11.0

### **라이브러리 호출**

In [3]:
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import time

### **데이터 전처리**

In [4]:
start = time.time()

TEXT = torchtext.legacy.data.Field(lower = True, fix_length = 200, batch_first = False)
LABEL = torchtext.legacy.data.Field(sequential = False)

In [5]:
from torchtext.legacy import datasets

train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)

downloading aclImdb_v1.tar.gz


100%|██████████████████████████████████████████████████████████████████████████████| 84.1M/84.1M [06:56<00:00, 202kB/s]


In [6]:
# 데이터셋 내용 확인

print(vars(train_data.examples[0]))

{'text': ['bromwell', 'high', 'is', 'a', 'cartoon', 'comedy.', 'it', 'ran', 'at', 'the', 'same', 'time', 'as', 'some', 'other', 'programs', 'about', 'school', 'life,', 'such', 'as', '"teachers".', 'my', '35', 'years', 'in', 'the', 'teaching', 'profession', 'lead', 'me', 'to', 'believe', 'that', 'bromwell', "high's", 'satire', 'is', 'much', 'closer', 'to', 'reality', 'than', 'is', '"teachers".', 'the', 'scramble', 'to', 'survive', 'financially,', 'the', 'insightful', 'students', 'who', 'can', 'see', 'right', 'through', 'their', 'pathetic', "teachers'", 'pomp,', 'the', 'pettiness', 'of', 'the', 'whole', 'situation,', 'all', 'remind', 'me', 'of', 'the', 'schools', 'i', 'knew', 'and', 'their', 'students.', 'when', 'i', 'saw', 'the', 'episode', 'in', 'which', 'a', 'student', 'repeatedly', 'tried', 'to', 'burn', 'down', 'the', 'school,', 'i', 'immediately', 'recalled', '.........', 'at', '..........', 'high.', 'a', 'classic', 'line:', 'inspector:', "i'm", 'here', 'to', 'sack', 'one', 'of', '

In [7]:
### 데이터 전처리

import string

for example in train_data.examples:
    text = [x.lower() for x in vars(example)['text']] # 소문자 변경
    text = [x.replace("<br","") for x in text] # <br/> 태그 제거
    text = [''.join(c for c in s if c not in string.punctuation) for s in text] # 구두점 제거
    text = [s for s in text if s] # 공백 제거
    vars(example)['text'] = text
    
for example in test_data.examples:
    text = [x.lower() for x in vars(example)['text']]
    text = [x.replace("<br","") for x in text]
    text = [''.join(c for c in s if c not in string.punctuation) for s in text]
    text = [s for s in text if s]
    vars(example)['text'] = text

In [8]:
### 데이터셋 분리

import random
train_data, valid_data = train_data.split(random_state = random.seed(0), split_ratio=0.8)

In [9]:
# 데이터셋 개수 확인

print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

Number of training examples: 20000
Number of validation examples: 5000
Number of testing examples: 25000


### **단어 집합 만들기**
- IMDB 데이터셋에 포함된 단어들을 이용하여 하나의 딕셔너리와 같은 집합을 만드는 것
- 단어들의 중복은 제거된 상태에서 진행

In [10]:
TEXT.build_vocab(train_data, max_size = 10000, 
                 min_freq = 10, vectors = None)
LABEL.build_vocab(train_data)

print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}")

Unique tokens in TEXT vocabulary: 10002
Unique tokens in LABEL vocabulary: 3


In [11]:
# 테스트 데이터셋의 단어 집합 확인하기

print(LABEL.vocab.stoi)

defaultdict(<bound method Vocab._default_unk_index of <torchtext.legacy.vocab.Vocab object at 0x00000217D3F26408>>, {'<unk>': 0, 'pos': 1, 'neg': 2})


- 확인 결과 pos(positive, 긍정), neg(negative, 부정) 외에 "unk"가 있음
- 일반적으로 "unk"는 사전에 없는 단어를 의미
    - 예제에서는 pos와 neg만 활용

### **데이터 load**
- 메모리로 데이터를 가져오는 작업

In [12]:
BATCH_SIZE = 64
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

embeding_dim = 100 # 각 단어를 100차원으로 조정
hidden_size = 300 # 은닉층의 유닛 개수 지정

train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device)

### **모델링**

**워드 임베딩 및 RNN 셀 정의**

In [13]:
class RNNCell_Encoder(nn.Module):
    def __init__(self, input_dim, hidden_size):
        super(RNNCell_Encoder, self).__init__()
        self.rnn = nn.RNNCell(input_dim, hidden_size) # RNN 셀 구현

    def forward(self, inputs):
        bz = inputs.shape[1]
        ht = torch.zeros((bz, hidden_size)).to(device)

        for word in inputs:
            ht = self.rnn(word, ht) # 재귀적으로 발생하는 상태 값 처리
        return ht

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.em = nn.Embedding(len(TEXT.vocab.stoi), embeding_dim) # 임베딩 처리
        self.rnn = RNNCell_Encoder(embeding_dim, hidden_size)
        self.fc1 = nn.Linear(hidden_size, 256)
        self.fc2 = nn.Linear(256, 3)

    def forward(self, x):
        x = self.em(x)
        x = self.rnn(x)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

**옵티마이져, 손실함수 정의**

In [14]:
model = Net() # 모델 객체화(이름: model)
model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001)

**모델 학습을 위한 함수 생성**

In [15]:
def training(epoch, model, trainloader, validloader):
    ### 훈련(Training)
    correct = 0
    total = 0
    running_loss = 0

    model.train()
    
    for b in trainloader:
        x, y = b.text, b.label
        x, y = x.to(device), y.to(device)
        y_pred = model(x)
        loss = loss_fn(y_pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        with torch.no_grad():
            y_pred = torch.argmax(y_pred, dim = 1)
            correct += (y_pred == y).sum().item()
            total += y.size(0)
            running_loss += loss.item()
    epoch_loss = running_loss / len(trainloader.dataset)
    epoch_acc = correct / total
    
    ### 검증(Validation)
    valid_correct = 0
    valid_total = 0
    valid_running_loss = 0

    model.eval()
    
    with torch.no_grad():
        for b in validloader:
            x, y = b.text, b.label
            x, y = x.to(device), y.to(device)
            y_pred = model(x)
            loss = loss_fn(y_pred, y)
            y_pred = torch.argmax(y_pred, dim=1)
            valid_correct += (y_pred == y).sum().item()
            valid_total += y.size(0)
            valid_running_loss += loss.item()

    epoch_valid_loss = valid_running_loss / len(validloader.dataset)
    epoch_valid_acc = valid_correct / valid_total
    
    ### 결과 출력
    print('epoch: ', epoch,
          'loss： ', round(epoch_loss, 3),
          'accuracy:', round(epoch_acc, 3),
          'valid_loss： ', round(epoch_valid_loss, 3),
          'valid_accuracy:', round(epoch_valid_acc, 3)
          )
    return epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc

**예측 결과 확인**

In [16]:
epochs = 5

train_loss = []
train_acc = []
valid_loss = []
valid_acc = []

for epoch in range(epochs):
    epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc = training(epoch, model, train_iterator, valid_iterator)
    train_loss.append(epoch_loss)
    train_acc.append(epoch_acc)
    valid_loss.append(epoch_valid_loss)
    valid_acc.append(epoch_valid_acc)

end = time.time()

print(end-start)

epoch:  0 loss：  0.011 accuracy: 0.495 valid_loss：  0.011 valid_accuracy: 0.508
epoch:  1 loss：  0.011 accuracy: 0.5 valid_loss：  0.011 valid_accuracy: 0.495
epoch:  2 loss：  0.011 accuracy: 0.511 valid_loss：  0.011 valid_accuracy: 0.494
epoch:  3 loss：  0.011 accuracy: 0.514 valid_loss：  0.011 valid_accuracy: 0.494
epoch:  4 loss：  0.011 accuracy: 0.523 valid_loss：  0.011 valid_accuracy: 0.508
5166.980179548264
