<a href="https://colab.research.google.com/github/eunzzae/Study_NLP/blob/main/231202_%EC%8B%A4%EC%8A%B5_charseq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RNN 학습

In [1]:
import torch
import torch.optim as optim
import numpy as np

In [2]:
# Random seed to make results deterministic and reproducible
torch.manual_seed(0)

<torch._C.Generator at 0x7893293613b0>

## 샘플데이터 준비

In [3]:
sample = "i want to go to America."

* 주어진 데이터의 vocab 사전을 만든다.

In [6]:
# make dictionary
char_set = sorted(list(set(sample))) # set()를 사용해서 중복제거, list()를 사용해서 리스트로 변환, sorted()를 사용해서 오름차순 정렬한다.
char_dict = {c:i for i, c in enumerate(char_set)} # 딕셔너리(dict)를 사용하여 키-값 쌍을 생성한다.
print(char_dict, len(char_dict))

{' ': 0, '.': 1, 'A': 2, 'a': 3, 'c': 4, 'e': 5, 'g': 6, 'i': 7, 'm': 8, 'n': 9, 'o': 10, 'r': 11, 't': 12, 'w': 13} 14


## 하이퍼파라미터 설정

In [7]:
# hyper parameters
dict_size=len(char_dict)
hidden_size = 20
learning_rate = 0.1

* 제작한 사전을 이용해서 알파벳들을 index로 변경해보자.

In [13]:
# data setting
## 문자열 생성
sample_idx = [char_dict[c] for c in sample] # sample 리스트에 있는 각 문자를 char_dict 딕셔너리에서 해당 문자의 값으로 매핑하는 코드이다.
print(sample_idx, len(sample_idx))

## x 데이터 생성
x_data = [sample_idx[:-1], sample_idx[:-1]] # 리스트 슬라이싱을 사용하여 sample_idx 리스트에서 일부분을 추출하는 코드이다.
print(x_data, len(x_data))

## x 데이터 원핫 인코딩
x_one_hot = [np.eye(dict_size)[x] for x in x_data] # x_data는 정수로 이루어진 리스트이며 dict_size는 원핫 인코딩을 위한 배열의 크기를 나타내는 변수임. np.yey(dict_size)[x]는 크기가 (dict_size, dict_size)인 단위행렬에서 x에 해당하는 행을 선택해서 반환한다.
print(x_one_hot)

## y 데이터 생성
y_data = [sample_idx[1:], sample_idx[1:]]
print(y_data, len(y_data))

[7, 0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3, 1] 24
[[7, 0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3], [7, 0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3]] 2
[array([[0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
       [0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0.,

* pytorch 학습을 위해 데이터의 타입을 torch로 변경한다.

In [20]:
# transform as torch tensor variable
X = torch.FloatTensor(x_one_hot)
X = X.transpose(0,1) #
Y = torch.LongTensor(y_data)
print(X.size(), Y.size())

torch.Size([23, 2, 14]) torch.Size([2, 23])


## 모델 구성

In [21]:
# declare RNN
rnn = torch.nn.RNN(input_size=dict_size, hidden_size=hidden_size, batch_first=False)
dnn = torch.nn.Linear(hidden_size, dict_size)

## 모델 학습
* 다음 텍스트를 예측하는 것은 다중분류이다.

In [22]:
# loss & optimizer settin
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(rnn.parameters(), learning_rate)

In [29]:
# start training
for i in range(50):
    optimizer.zero_grad() # 옵티마이저 그래디언트 초기화한다.
    outputs, _status = rnn(X) # 입력데이터 X를 RNN 모델에 전달하여 출력한다.
    outputs = dnn(outputs) # RNN의 출력을 DNN에 전달하여 최종 출력을 얻는다.
    print(outputs.size(), Y.size())
    print(outputs.view(-1, dict_size).shape, Y.view(-1).shape) # view 함수는 pytorch tensor의 형태를 변환하거나 조작하기 위해 사용되는 함수이다. 'output' Tensor를 (-1, dict_size) 모양으로 변환한다.
    loss = criterion(outputs.view(-1, dict_size), Y.view(-1)) # 출력값과 실제 정답 간의 손실을 계산한다.
    loss.backward() # 역전파를 수행하여 그래디언트를 계산한다.
    optimizer.step() # 계산된 그래디언트를 사용하여 옵티마이저가 모델의 파라미터를 업데이트 한다.

    result = outputs.data.numpy().argmax(axis=1) # axis를 1로 변경한다. 모델의 출력에서 각 샘플에 대해 가장 높은 확률을 가진 클래스를 선택한다.
    result_str = ''.join([char_set[c[0]] for c in result]) # 선택된 클래스를 문자로 변환하여 결과 문자열을 생성한다.
    print(i, "loss: ", loss.item(), "prediction: ", result[0], "true Y: ", y_data, "prediction str: ", result_str) # 현재 에포크, 손실값, 모델의 예측, 실제 레이블, 예측된 문자열을 출력한다.

torch.Size([23, 2, 14]) torch.Size([2, 23])
torch.Size([46, 14]) torch.Size([46])
0 loss:  2.4812076091766357 prediction:  [0 0 0 0 0 0 0 0 0 0 0 0 0 0] true Y:  [[0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3, 1], [0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3, 1]] prediction str:                         
torch.Size([23, 2, 14]) torch.Size([2, 23])
torch.Size([46, 14]) torch.Size([46])
1 loss:  2.3526153564453125 prediction:  [0 0 0 0 0 0 0 0 0 0 0 0 0 0] true Y:  [[0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3, 1], [0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3, 1]] prediction str:                         
torch.Size([23, 2, 14]) torch.Size([2, 23])
torch.Size([46, 14]) torch.Size([46])
2 loss:  2.263589382171631 prediction:  [0 0 0 0 0 0 0 0 0 0 0 0 0 0] true Y:  [[0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 10, 0, 12, 10, 0, 2, 8, 5, 11, 7, 4, 3, 1], [0, 13, 3, 9, 12, 0, 12, 10, 0, 6, 

In [37]:
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# 문자열 샘플을 입력으로 받아 각 문자를 정수형 인덱스로 변환하여 데이터셋을 구성하는 클래스이다.
class CharDataset(Dataset):
    def __init__(self, samples, char_to_index): # __init__ 매서드는 샘플들과 문자를 정수 인덱스로 매핑한 'char_to_index'사전을 받아들인다.
        self.samples = samples
        self.char_to_index = char_to_index

    def __len__(self): ## len 매서드는 데이터셋의 전체 길이를 반환한다. 입력된 샘플의 개수가 반환된다.
        return len(self.samples)

    def __getitem__(self, idx): # getitem 매서드는 주어진 인덱스(idx)에 해당하는 샘플을 처리한다.
        sample = self.samples[idx]
        indexed = [self.char_to_index[ch] for ch in sample] # 입력된 샘플을 하나씩 가져와서 각 문자를 'char_to_index'사전을 이용하여 해당 문자의 정수 인덱스로 변환한다.
        X = torch.tensor(indexed[:-1], dtype=torch.long) # 변환된 인덱스를 이용하여 입력'X'와 출력'Y'를 생성한다.
        Y = torch.tensor(indexed[1:], dtype=torch.long)
        return X, Y

samples = [" if you want you", " if you want you"]
chars = sorted(set("".join(samples)))
char_to_index = {ch: i for i, ch in enumerate(chars)}
index_to_char = {i: ch for i, ch in enumerate(chars)}

dataset = CharDataset(samples, char_to_index)

In [38]:
batch_size = 2
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [39]:
class CharRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1): # init 매서드는 모델의 구조를 정의한다.
        super(CharRNN, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=False)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden): # forward 매서드는 순전파 연산을 정의한다.
        out, hidden = self.rnn(x, hidden) # 입력 x와 hidden을 받아 RNN 레이어를 통과한 결과 새로운 hidden을 반환한다.
        out = self.fc(out) # 출력 out을 fc레이어를 통과시켜 최종 출력을 만든다.
        return out, hidden

    def init_hidden(self, batch_size): # init_hidden 매서드는 초기 hidden 상태를 반환하는 함수이다.
        return torch.zeros(num_layers, batch_size, hidden_size)

hidden_size = 128
num_layers = 1
input_size = 10
model = CharRNN(input_size, hidden_size, input_size)

In [41]:
model = CharRNN(len(chars), hidden_size, len(chars), num_layers=1)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.005)

num_epochs = 200
for epoch in range(num_epochs):
    total_loss = 0
    for X_batch, Y_batch in dataloader: # 데이터로더에서 미니배치(X_batch, Y_batch)를 순회한다.
        batch_size = X_batch.size(0)
        hidden = model.init_hidden(batch_size) # 새로운 미니배치에 대해 초기 hidden 상태를 초기화한다.
        X_batch_onehot = nn.functional.one_hot(X_batch, num_classes=len(chars)).float() # 입력 데이터를 one-hot인코딩으로 변환한다.

        optimizer.zero_grad()
        output, hidden = model(X_batch_onehot.view(-1, batch_size, len(chars)), hidden) # 모델에 입력을 전달하여 출력을 생성한다. input => (sequence_length, batch_size, input_size)
        # sequence_length가 -1인 경우 해당 차원의 크기를 자동으로 조정하고 나머지 차원들은 주어진 값에 따라 조정된다.
        # 모델은 시퀀스 길이가 sequence_length인 미니배치 데이터를 받아들이기 위해 -1의 형태로 입력을 받음
        print(output.view(-1, len(chars)).shape, Y_batch.view(-1).shape)
        loss = criterion(output.view(-1, len(chars)), Y_batch.view(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    if epoch % 20 == 0:
        print(f'Epoch {epoch}/{num_epochs}, Loss: {total_loss / len(dataloader)}')


torch.Size([30, 10]) torch.Size([30])
Epoch 0/200, Loss: 2.3289644718170166
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
Epoch 20/200, Loss: 0.13980229198932648
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size([30, 10]) torch.Size([30])
torch.Size