다대다 RNN은 모든 시점의 입력에 대해 모든 시점에 대한 출력을 하는 RNN입니다. 대표적으로 품사 태깅, 개체명 인식 등에 사용됩니다.

## **1. 문자 단위 RNN(Char RNN)**

RNN의 입출력의 단위가 단어 레벨이 아니라 문자 레벨로 바꿔 RNN을 구현한다면, 이를 문자 단위 RNN이라고 합니다. RNN 구조 자체가 달라진 것은 아니고, 입출력 단위가 문자로 바뀌었을 뿐입니다. 문자 단위 RNN을 다대다 구조로 구현해보겠습니다.

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

### **1) 훈련 데이터 전처리하기**

여기서는 문자 시퀀스 apple을 입력받으면 pple!를 출력하는 RNN을 구현할겁니다. 이는 어떤 의미를 가지진 않지만 RNN의 동작을 이해할 수 있습니다.

입력 데이터와 레이블 데이터에 대해서 vocabulary를 만듭니다. 여기서는 문자 집합은 중복을 제거한 문자들의 집합입니다.

In [2]:
input_str = "apple"
label_str = "pple!"
char_vocab = sorted(list(set(input_str + label_str)))
vocab_size = len(char_vocab)
print("문자 집합의 크기: {}".format(vocab_size))

문자 집합의 크기: 5


현재 문자 집합에는 !, a, e, l, p 총 5개의 문자가 있습니다. 이제 tanh를 정의하겠습니다. 이때 입력은 원-핫 벡터를 사용하며 입력의 크기는 문자 집합의 크기여야만 합니다.

In [3]:
input_size = vocab_size # 입력의 크기는 문자 집합의 크기
hidden_size = 5
output_size = 5
learning_rate = 0.1

이제 문자 집합에 고유한 정수를 부여합니다.

In [4]:
char_to_index = dict((c, i) for i, c in enumerate(char_vocab))
char_to_index

{'!': 0, 'a': 1, 'e': 2, 'l': 3, 'p': 4}

나중에 예측 결과를 문자로 보기 위해 정수에서 문자를 얻을 수 있는 사전도 만듭니다.

In [5]:
index_to_char = {}
for key, value in char_to_index.items():
    index_to_char[value] = key
print(index_to_char)

{0: '!', 1: 'a', 2: 'e', 3: 'l', 4: 'p'}


이제 입력 데이터와 레이블 데이터의 각 문자들을 정수로 맵핑합니다. 그런데 파이토치의 nn.RNN()은 기본적으로 3차원 텐서를 입력받습니다. 그렇기 때문에 배치 차원을 추가해줍니다. 방법은 둘 중 하나를 사용하면 됩니다.

In [6]:
x_data = [char_to_index[c] for c in input_str]
y_data = [char_to_index[c] for c in label_str]
print(x_data)

# 배치 차원 추가
x_data = [x_data]
y_data = [y_data]
print(x_data)

[1, 4, 4, 3, 2]
[[1, 4, 4, 3, 2]]


In [7]:
x_data = [char_to_index[c] for c in input_str]
y_data = [char_to_index[c] for c in label_str]
print(x_data)

# 텐서 연산인 unsqueeze(0)을 통해 해결할 수도 있음
x_data = torch.IntTensor(x_data).unsqueeze(0)
y_data = torch.Tensor(y_data).unsqueeze(0)
y_data = torch.as_tensor(y_data, dtype=torch.int64) # int64로 변환 / LongTensor를 위해

print(x_data)
print(y_data)

[1, 4, 4, 3, 2]
tensor([[1, 4, 4, 3, 2]], dtype=torch.int32)
tensor([[4, 4, 3, 2, 0]])


입력 시퀀스의 각 문자들을 원-핫 벡터로 바꿔줍니다.

In [8]:
x_one_hot = [np.eye(vocab_size)[x] for x in x_data]
x_one_hot

[array([[0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 1.],
        [0., 0., 0., 0., 1.],
        [0., 0., 0., 1., 0.],
        [0., 0., 1., 0., 0.]])]

입력 데이터와 레이블 데이터를 텐서로 바꿔줍니다.

In [9]:
X = torch.FloatTensor(x_one_hot)
Y = torch.LongTensor(y_data)

  X = torch.FloatTensor(x_one_hot)


In [10]:
print("훈련 데이터의 크기: {}".format(X.shape))
print("레이블의 크기: {}".format(Y.shape))

훈련 데이터의 크기: torch.Size([1, 5, 5])
레이블의 크기: torch.Size([1, 5])


### **2) 모델 구현하기**

이제 RNN 모델을 구현해보겠습니다. 아래에서 fc는 완전 연결층(fully-connected layer)을 의미하며 출력층으로 사용됩니다.

In [11]:
class Net(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Net, self).__init__()
        self.rnn = torch.nn.RNN(input_size, hidden_size, batch_first=True) # RNN셀 구현
        self.fc = torch.nn.Linear(hidden_size, output_size, bias=True) # 출력층 구현
    
    def forward(self, x): # 구현한 RNN셀과 출력층을 연결
        x, _status = self.rnn(x)
        x = self.fc(x)
        return x

In [12]:
net = Net(input_size, hidden_size, output_size)

이제 모델에 입력을 넣어 출력의 크기를 확인해보겠습니다.

In [13]:
outputs = net(X)
print(outputs.shape) # 3차원 텐서

torch.Size([1, 5, 5])


(1, 5, 5)의 크기를 가지는데 각각 배치 차원, 시점, 출력의 크기입니다. 나중에 정확도를 측정할 때는 이를 모두 펼쳐 계산하게 되는데 이때는 view를 사용하여 배치 차원과 시점 차원을 하나로 만듭니다.

In [14]:
print(outputs.view(-1, input_size).shape) # 2차원 텐서로 변환

torch.Size([5, 5])


(5, 5)로 줄어든 것을 확인할 수 있습니다. 이제 레이블 데이터의 크기를 다시 확인해보겠습니다.

In [15]:
print(Y.shape)
print(Y.view(-1).shape)

torch.Size([1, 5])
torch.Size([5])


레이블 데이터는 (1, 5)의 크기를 가지는데, 마찬가지로 나중에 정확도를 측정할 때는 이걸 펼쳐서 계산할 예정입니다. 이 경우 (5)의 크기를 가지게 됩니다. 

이제 옵티마이저와 손실 함수를 정의합니다.

In [16]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), learning_rate)

In [17]:
for i in range(100):
    optimizer.zero_grad()
    outputs = net(X)
    loss = criterion(outputs.view(-1, input_size), Y.view(-1)) # view를 통해 배치 차원 제거
    loss.backward() # 기울기 계산
    optimizer.step() # optimizer에 넣었던 파라미터 업데이트
    
    # 모델이 실제로 어떻게 예측했는지 확인
    result = outputs.data.numpy().argmax(axis=2) # 최종 예측값인 각 시점 별 5차원 벡터에 대해 가장 큰 인덱스를 선택
    result_str = ''.join([index_to_char[c] for c in np.squeeze(result)])
    print(i, "loss: ", loss.item(), "prediction: ", result, "true Y: ", y_data, "prediction str: ", result_str)

0 loss:  1.5310280323028564 prediction:  [[4 4 4 4 2]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  ppppe
1 loss:  1.2918808460235596 prediction:  [[4 4 4 4 4]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  ppppp
2 loss:  1.0967464447021484 prediction:  [[4 4 2 2 2]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  ppeee
3 loss:  0.8909416198730469 prediction:  [[4 4 2 2 2]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  ppeee
4 loss:  0.6655368804931641 prediction:  [[4 4 3 2 0]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  pple!
5 loss:  0.4822908937931061 prediction:  [[4 4 3 2 0]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  pple!
6 loss:  0.33621734380722046 prediction:  [[4 4 3 2 0]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  pple!
7 loss:  0.22394892573356628 prediction:  [[4 4 3 2 0]] true Y:  tensor([[4, 4, 3, 2, 0]]) prediction str:  pple!
8 loss:  0.14825893938541412 prediction:  [[4 4 3 2 0]] true Y:  tensor([[4, 4, 3, 2, 0]]) pre