# 7.4.1 RNN 셀 구현

### 라이브러리 호출

In [1]:
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import time

### 데이터 전처리

In [2]:
start=time.time()
TEXT = torchtext.legacy.data.Field(lower=True, fix_length=200, batch_first=False) # 데이터 전처리. 소문자, 200으로 길이 고정, 
LABEL = torchtext.legacy.data.Field(sequential=False) # 레이블 전처리. 데이터에 순서가 있는지 나타낸다

### 데이터셋 준비 (IMDB - 영화 리뷰 5만 건. 긍정은 2 부정은 1)

In [3]:
from torchtext.legacy import datasets
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)

### 훈련 데이터셋 내용 확인

In [4]:
print(vars(train_data.examples[0])) 
# 데이터셋의 내용을 보고자 할 때는 examples를 사용
# 데이터는 text와 label을 갖는 사전 형식으로 구성됨

{'text': ['bromwell', 'high', 'is', 'a', 'cartoon', 'comedy.', 'it', 'ran', 'at', 'the', 'same', 'time', 'as', 'some', 'other', 'programs', 'about', 'school', 'life,', 'such', 'as', '"teachers".', 'my', '35', 'years', 'in', 'the', 'teaching', 'profession', 'lead', 'me', 'to', 'believe', 'that', 'bromwell', "high's", 'satire', 'is', 'much', 'closer', 'to', 'reality', 'than', 'is', '"teachers".', 'the', 'scramble', 'to', 'survive', 'financially,', 'the', 'insightful', 'students', 'who', 'can', 'see', 'right', 'through', 'their', 'pathetic', "teachers'", 'pomp,', 'the', 'pettiness', 'of', 'the', 'whole', 'situation,', 'all', 'remind', 'me', 'of', 'the', 'schools', 'i', 'knew', 'and', 'their', 'students.', 'when', 'i', 'saw', 'the', 'episode', 'in', 'which', 'a', 'student', 'repeatedly', 'tried', 'to', 'burn', 'down', 'the', 'school,', 'i', 'immediately', 'recalled', '.........', 'at', '..........', 'high.', 'a', 'classic', 'line:', 'inspector:', "i'm", 'here', 'to', 'sack', 'one', 'of', '

### 데이터셋 전처리 적용

In [5]:
import string

for example in train_data.examples:
    text = [x.lower() for x in vars(example)['text']] # 소문자로 변경
    text = [x.replace("<br","") for x in text] # "<br"을 ""(공백)으로 변경
    text = [''.join(c for c in s if c not in string.punctuation) for s in text] # 구두점 제거
    text = [s for s in text if s] # 공백 제거
    vars(example)['text'] = text

### 훈련과 검증 데이터셋 분리

In [6]:
import random
train_data, valid_data = train_data.split(random_state=random.seed(0), split_ratio=0.8) # 데이터가 임의로 섞인 상태에서 분할, 훈련과 검증 데이터셋을 8:2로 분리

### 데이터셋 개수 확인

In [7]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

Number of training examples: 20000
Number of validation examples: 5000
Number of testing examples: 25000


### 단어 집합 만들기

In [8]:
TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors=None) # 단어 집합 생성
LABEL.build_vocab(train_data)

print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}")

Unique tokens in TEXT vocabulary: 10002
Unique tokens in LABEL vocabulary: 3


### 테스트 데이터셋의 단어 집합 확인

In [9]:
print(LABEL.vocab.stoi)

defaultdict(<bound method Vocab._default_unk_index of <torchtext.legacy.vocab.Vocab object at 0x000002C6C2EF0588>>, {'<unk>': 0, 'pos': 1, 'neg': 2})


### 데이터셋 메모리로 가져오기 (전처리 완료 후 데이터셋을 메모리로 가져온다)

In [10]:
BATCH_SIZE = 64
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

embeding_dim = 100 # 임베딩 계층을 통과한 후 각 벡터를 100차원으로 조정
hidden_size = 300 # 은닉층의 유닛 개수

train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device)

### 워드 임베딩 및 RNN 셀 정의

In [11]:
class RNNCell_Encoder(nn.Module):
    def __init__(self, input_dim, hidden_size):
        super(RNNCell_Encoder, self).__init__()
        self.rnn = nn.RNNCell(input_dim, hidden_size)
        # input_dim : 훈련 데이터셋의 특성 개수. (batch, input_size) 형태
        # hidden_size : 은닉층의 유닛 개수. (batch, hidden_size) 형태

    def forward(self, inputs):
        # inputs : 입력 시퀀스. (시퀀스 길이, 배치, 임베딩) 형태
        bz = inputs.shape[1]  # 배치 크기를 가져옵니다.
        ht = torch.zeros((bz, hidden_size)).to(device)  # 배치와 은닉층 뉴런 크기를 0으로 초기화합니다.
        
        for word in inputs:
            ht = self.rnn(word, ht)  # 각 단어를 RNNCell에 입력하여 인코딩합니다.
        return ht

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.em = nn.Embedding(len(TEXT.vocab.stoi), embeding_dim)  # 임베딩 처리. (임베딩을 할 단어 수, 임베딩할 벡터의 차원)
        self.rnn = RNNCell_Encoder(embeding_dim, hidden_size)  # 입력 차원과 은닉 상태의 크기를 가진 RNNCell_Encoder를 정의합니다.
        self.fc1 = nn.Linear(hidden_size, 256)  # 은닉 상태에서 256차원의 특징으로 매핑하는 fully connected 레이어를 정의합니다.
        self.fc2 = nn.Linear(256, 3)  # 256차원의 특징을 3가지 클래스로 매핑하는 fully connected 레이어를 정의합니다.

    def forward(self, x):
        x = self.em(x)  # 입력을 임베딩 레이어에 통과시킵니다.
        x = self.rnn(x)  # 입력 시퀀스를 RNNCell_Encoder에 통과시켜 시퀀스를 인코딩합니다.
        x = F.relu(self.fc1(x))  # ReLU 활성화 함수를 적용합니다.
        x = self.fc2(x)  # 최종 출력을 계산합니다.
        return x

### 옵티마이저와 손실 함수 정의

In [12]:
model = Net() # model이라는 이름으로 모델을 객체화
model.to(device)

loss_fn = nn.CrossEntropyLoss() # 다중 분류에 사용
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

### 모델 학습을 위한 함수 정의

In [13]:
def training(epoch, model, trainloader, validloader):
    correct = 0
    total = 0
    running_loss = 0

    model.train()
    for b in trainloader:
        x, y = b.text, b.label # trainloader에서 text와 label을 꺼내 온다
        x, y = x.to(device), y.to(device) # 꺼내 온 데이터가 CPU를 사용할 수 있도록 장치 지정
                                          # 반드시 모델과 같은 장치를 사용하도록 지정해야 한다
        y_pred = model(x)
        loss = loss_fn(y_pred, y) # CrossEntropyLoss 손실 함수를 이용하여 오차 계산
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        with torch.no_grad():
            y_pred = torch.argmax(y_pred, dim=1)
            correct += (y_pred == y).sum().item()
            total += y.size(0)
            running_loss += loss.item()
    epoch_loss = running_loss / len(trainloader.dataset) # 누적된 오차를 전체 데이터셋으로 나누어서 epoch마다 오차를 구한다
    epoch_acc = correct / total

    valid_correct = 0
    valid_total = 0
    valid_running_loss = 0

    model.eval()
    with torch.no_grad():
        for b in validloader:
            x, y = b.text, b.label
            x, y = x.to(device), y.to(device)
            y_pred = model(x)
            loss = loss_fn(y_pred, y)
            y_pred = torch.argmax(y_pred, dim=1)
            valid_correct += (y_pred == y).sum().item()
            valid_total += y.size(0)
            valid_running_loss += loss.item()

    epoch_valid_loss = valid_running_loss / len(validloader.dataset)
    epoch_valid_acc = valid_correct / valid_total

    print('epoch: ', epoch,
          'loss： ', round(epoch_loss, 3),
          'accuracy:', round(epoch_acc, 3),
          'valid_loss： ', round(epoch_valid_loss, 3),
          'valid_accuracy:', round(epoch_valid_acc, 3)
          ) # 훈련이 진행될 때 epoch마다 정확도와 오차(loss) 출력
    return epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc

### 모델 학습

In [14]:
epochs = 5
train_loss = []
train_acc = []
valid_loss = []
valid_acc = []

for epoch in range(epochs):
    epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc = training(epoch,
                                                                 model,                                                                 train_iterator,                                                                 valid_iterator)
    train_loss.append(epoch_loss) # 훈련 데이터셋을 모델에 적용했을 때의 오차
    train_acc.append(epoch_acc) # 훈련 데이터셋을 모델에 적용했을 때의 정확도
    valid_loss.append(epoch_valid_loss) # 검증 데이터셋을 모델에 적용했을 때의 오차
    valid_acc.append(epoch_valid_acc) # 검증 데이터셋을 모델에 적용했을 때의 정확도

end = time.time()
print(end-start)

epoch:  0 loss：  0.011 accuracy: 0.493 valid_loss：  0.011 valid_accuracy: 0.505
epoch:  1 loss：  0.011 accuracy: 0.499 valid_loss：  0.011 valid_accuracy: 0.497
epoch:  2 loss：  0.011 accuracy: 0.515 valid_loss：  0.011 valid_accuracy: 0.496
epoch:  3 loss：  0.011 accuracy: 0.518 valid_loss：  0.011 valid_accuracy: 0.496
epoch:  4 loss：  0.011 accuracy: 0.527 valid_loss：  0.011 valid_accuracy: 0.513
6013.790748596191


### 모델 예측 함수 정의

In [15]:
def evaluate(epoch, model, testloader):    
    test_correct = 0
    test_total = 0
    test_running_loss = 0
    
    model.eval()
    with torch.no_grad():
        for b in testloader:
            x, y = b.text, b.label
            x, y = x.to(device), y.to(device)
            y_pred = model(x)
            loss = loss_fn(y_pred, y)
            y_pred = torch.argmax(y_pred, dim=1)
            test_correct += (y_pred == y).sum().item()
            test_total += y.size(0)
            test_running_loss += loss.item()

    epoch_test_loss = test_running_loss / len(testloader.dataset)
    epoch_test_acc = test_correct / test_total

    print('epoch: ', epoch,
          'test_loss： ', round(epoch_test_loss, 3),
          'test_accuracy:', round(epoch_test_acc, 3)
          )
    return epoch_test_loss, epoch_test_acc

### 모델 예측 결과 확인

In [16]:
epochs = 5
test_loss = []
test_acc = []

for epoch in range(epochs):
    epoch_test_loss, epoch_test_acc = evaluate(epoch,
                                               model,
                                               test_iterator)
    test_loss.append(epoch_test_loss)
    test_acc.append(epoch_test_acc)

end = time.time()
print(end-start)

epoch:  0 test_loss：  0.011 test_accuracy: 0.504
epoch:  1 test_loss：  0.011 test_accuracy: 0.504
epoch:  2 test_loss：  0.011 test_accuracy: 0.504
epoch:  3 test_loss：  0.011 test_accuracy: 0.504
epoch:  4 test_loss：  0.011 test_accuracy: 0.504
6150.496003866196


# 7.4.2 RNN 계층 구현

### 라이브러리 호출

In [17]:
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import time

### 데이터셋 내려받기 및 전처리

In [18]:
start=time.time()  # 시간 측정 시작

# torchtext를 사용하여 데이터 전처리
TEXT = torchtext.legacy.data.Field(sequential=True, batch_first=True, lower=True)  # 텍스트 필드 정의
LABEL = torchtext.legacy.data.Field(sequential=False, batch_first=True)  # 레이블 필드 정의

# IMDB 데이터셋을 텍스트와 레이블 필드로 나누어 로드
from torchtext.legacy import datasets
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)  # 훈련 데이터와 테스트 데이터를 로드합니다.
train_data, valid_data = train_data.split(split_ratio=0.8)  # 훈련 데이터를 훈련 세트와 검증 세트로 나눕니다.

# TEXT 필드의 단어 집합 생성
TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors=None)  # 최대 크기 10000의 단어 집합을 만들고, 최소 빈도수가 10인 단어들을 포함합니다.
LABEL.build_vocab(train_data)  # LABEL 필드의 단어 집합 생성

BATCH_SIZE = 100  # 배치 크기 설정
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')  # GPU 사용 가능 여부에 따라 디바이스 설정

### 데이터셋 분리

In [19]:
train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device)

### 변수 값 지정

In [20]:
vocab_size = len(TEXT.vocab) # 영화 리뷰에 대한 텍스트 길이
n_classes = 2 # 레이블 값(긍정, 부정) 지정

### RNN 계층 네트워크

In [22]:
class BasicRNN(nn.Module):
    def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, dropout_p = 0.2):
        super(BasicRNN, self).__init__()
        self.n_layers = n_layers # RNN 계층 개수
        self.embed = nn.Embedding(n_vocab, embed_dim) # 워드 임베딩 적용
        self.hidden_dim = hidden_dim
        self.dropout = nn.Dropout(dropout_p) # 드롭아웃 적용
        self.rnn = nn.RNN(embed_dim, self.hidden_dim, num_layers = self.n_layers, batch_first = True) # RNN 계층
        self.out = nn.Linear(self.hidden_dim, n_classes) 

    def forward(self, x):
        x = self.embed(x) # 문자를 숫자/벡터로 변환
        h_0 = self._init_state(batch_size = x.size(0)) # 최초 은닉 상태의 값을 0으로 초기화
        x, _ = self.rnn(x, h_0) # RNN 계층을 의미. 파라미터로 입력과 이전 은닉 상태의 값을 받는다
        h_t = x[:, -1, :] # 모든 네트워크를 거쳐서 가장 마지막에 나온 단어의 임베딩 값(마지막 은닉 상태의 값)
        self.dropout(h_t)
        logit = torch.sigmoid(self.out(h_t))
        return logit

    # RNN의 첫 번째 은닉 상태를 초기화
    def _init_state(self, batch_size = 1):
        weight = next(self.parameters()).data # 모델의 파라미터 값을 가져와서 weight 변수에 저장
        return weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()
        # 크기가 (계층의 개수, 배치 크기, 은닉층의 유닛 개수)인 텐서를 생성하여 0으로 초기화한 후 반환

### 손실 함수와 옵티마이저 설정

In [23]:
model = BasicRNN(n_layers = 1, hidden_dim = 256, n_vocab = vocab_size, embed_dim = 128, n_classes = n_classes, dropout_p = 0.5)
model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

### 모델 학습 함수

In [25]:
def train(model, optimizer, train_iter):
    model.train()
    for b, batch in enumerate(train_iter):
        x, y = batch.text.to(device), batch.label.to(device)
        y.data.sub_(1) # 레이블을 0과 1로 변경
        optimizer.zero_grad()

        logit = model(x)
        loss = F.cross_entropy(logit, y)
        loss.backward()
        optimizer.step()

        if b % 50 == 0: # 훈련 데이터셋의 개수를 50으로 나누어서 나머지가 0이면 출력
            print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(e,
                                                                           b * len(x),
                                                                           len(train_iter.dataset),
                                                                           100. * b / len(train_iter),
                                                                           loss.item()))

### 모델 평가 함수

In [26]:
def evaluate(model, val_iter):
    model.eval()
    corrects, total, total_loss = 0, 0, 0

    for batch in val_iter:
        x, y = batch.text.to(device), batch.label.to(device)
        y.data.sub_(1) 
        logit = model(x)
        loss = F.cross_entropy(logit, y, reduction = "sum")
        total += y.size(0)
        total_loss += loss.item()
        corrects += (logit.max(1)[1].view(y.size()).data == y.data).sum() # 모델의 정확도
        
    avg_loss = total_loss / len(val_iter.dataset)
    avg_accuracy = corrects / total
    return avg_loss, avg_accuracy

### 모델 학습 및 평가

In [27]:
BATCH_SIZE = 100
LR = 0.001
EPOCHS = 5
for e in range(1, EPOCHS + 1):
    train(model, optimizer, train_iterator)
    val_loss, val_accuracy = evaluate(model, valid_iterator)
    print("[EPOCH: %d], Validation Loss: %5.2f | Validation Accuracy: %5.2f" % (e, val_loss, val_accuracy))



RuntimeError: CUDA out of memory. Tried to allocate 182.00 MiB (GPU 0; 2.00 GiB total capacity; 1.01 GiB already allocated; 0 bytes free; 1.06 GiB reserved in total by PyTorch)

### 테스트 데이터셋을 이용한 모델 예측

In [None]:
test_loss, test_acc = evaluate(model,test_iterator)
print("Test Loss: %5.2f | Test Accuracy: %5.2f" % (test_loss, test_acc))