In [5]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

with open('anna.txt', 'r') as f:
    text = f.read()

# text에 등장하는 모든 문자들에 순차적으로 정수 인덱스를 부여해서 숫자로 표현된 인코딩된 텍스트를 생성 
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}
encoded = np.array([char2int[ch] for ch in text])

print(text[:100])
print(encoded[:100])

Chapter 1


Happy families are all alike; every unhappy family is unhappy in its own
way.

Everythin
[47 73 25 53 63 38 32 23 72 26 26 26 69 25 53 53 44 23 77 25  0 48  2 48
 38 54 23 25 32 38 23 25  2  2 23 25  2 48 75 38 16 23 38 45 38 32 44 23
 35 40 73 25 53 53 44 23 77 25  0 48  2 44 23 48 54 23 35 40 73 25 53 53
 44 23 48 40 23 48 63 54 23 13 30 40 26 30 25 44 37 26 26 74 45 38 32 44
 63 73 48 40]


In [10]:
def one_hot_encode(arr, n_labels):
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    # arr이 요소들에 대해 인덱스를 만들고, 열에 해당하는 인덱스에 1을 넣는다.
    # arr.flatten()하여 1을 할당할 정수 열들을 지정한다
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot


def get_batches(arr, n_seqs, n_steps):
    """
    arr: 인코딩된 텍스트
    n_seqs: 시퀀스 갯수
    n_steps: 시퀀스 길이

    n_seqs(갯수) x n_steps(길이) 크기의 배치를 생성하여 반환
    """
    batch_size = n_seqs * n_steps
    n_batches = len(arr) // batch_size
    
    # 배치 사이즈만큼 딱 떨어지게 슬라이싱하여 가지고 온 뒤, 시퀀스 갯수 x row로 만들기
    arr = arr[:n_batches * batch_size]
    arr = arr.reshape((n_seqs, -1))
    
    for n in range(0, arr.shape[1], n_steps):
        # 인코딩된 텍스트에서 모든 배치에 대해 시퀀스 길이만큼 슬라이싱해서 온다
        # 시퀀스 길이가 3이면 [1, 2, 3], [4, 5, 6] 이런식으로 모든 배치에 대해서 가져온다
        x = arr[:, n:n + n_steps]
        # 우리는 다음에 올 문자를 예측해야하므로 y에는 x에 담긴 시퀀스를 한칸씩 옮겨서 가져온다
        y = np.zeros_like(x)

        # x를 1칸 시프트 한 값부터 끝까지 복사한 후, 마지막 값 n + n_steps값이 비므로 따로 넣어준다.
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n + n_steps]
        # 맨 마지막에서는 값이 없을 수 있으므로 예외처리한다
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

n_seqs = 10
n_steps = 50

batches = get_batches(encoded, n_seqs, n_steps)
x, y = next(batches)

print(x[0, :n_steps])
print(y[0, :n_steps])

[47 73 25 53 63 38 32 23 72 26 26 26 69 25 53 53 44 23 77 25  0 48  2 48
 38 54 23 25 32 38 23 25  2  2 23 25  2 48 75 38 16 23 38 45 38 32 44 23
 35 40]
[73 25 53 63 38 32 23 72 26 26 26 69 25 53 53 44 23 77 25  0 48  2 48 38
 54 23 25 32 38 23 25  2  2 23 25  2 48 75 38 16 23 38 45 38 32 44 23 35
 40 73]


In [41]:
class Model(nn.Module):
    
    def __init__(self, tokens, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc = nn.Linear(n_hidden, len(self.chars))
        
        self.init_weights()
      
    
    def forward(self, x, hc):
        x, (h, c) = self.lstm(x, hc)
        x = self.dropout(x)
        x = x.contiguous().view(-1, self.n_hidden)
        x = self.fc(x)
        
        return x, (h, c)
    
    
    def predict(self, char, hc=None, device=None, top_k=None):
        ''' 
        char가 주어지면 다음에 올 char를 추론한다
        '''
        if torch.cuda.is_available():
            self.cuda()
        elif torch.backends.mps.is_available():
            self.to("mps")
        else:
            self.cpu()
        
        if hc is None:
            hc = self.init_hidden(1)
        
        x = np.array([[self.char2int[char]]])
        x = one_hot_encode(x, len(self.chars))
        inputs = torch.from_numpy(x)
        if torch.cuda.is_available():
            inputs = inputs.cuda()
        elif torch.backends.mps.is_available():
            inputs = inputs.to("mps")
        
        hc = tuple([each.data for each in hc])
        out, hc = self.forward(inputs, hc)

        # 확률 분포로 바꾼다
        p = F.softmax(out, dim=1).data
        if device is not None:
            p = p.cpu()
        
        # top_k 또는 top_ch에서 가장 확률이 높은 문자를 찾는다
        if top_k is None:
            top_ch = np.arange(len(self.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.cpu().numpy().squeeze()
        
        p = p.cpu().numpy().squeeze()

        # 확률 값이 들어있는 p값을 정규화화 한 뒤 top_ch에서 p에 지정한 확률로 샘플링을 하여 하나를 고른다.
        char = np.random.choice(top_ch, p=p / p.sum())
            
        return self.int2char[char], hc
    
    def init_weights(self):
        """
        RNN/LSTM은 기울기 소실/폭발 문제가 자주 일어나므로 초기화를 해준다.
        """
        self.fc.bias.data.fill_(0)
        # 가장 무난하게 쓰이는 균등분포로 초기화를 했으나 특별한 이유가 있진 않다.
        self.fc.weight.data.uniform_(-1, 1)
        
    def init_hidden(self, n_seqs):
        """
        predict의 경우 LSTM의 초기 hidden state 값이 없으므로
        n_layers x n_seqs x n_hidden 크기의 0으로 초기화한다.
        """
        lstm_parmas = self.lstm.parameters()
        h = next(lstm_parmas).data.new(self.n_layers, n_seqs, self.n_hidden).zero_()
        c = next(lstm_parmas).data.new(self.n_layers, n_seqs, self.n_hidden).zero_()
        return (h, c)
    
    

In [27]:
def train(net, data, epochs=10, n_seqs=10, n_steps=50, lr=0.001, clip=5, val_frac=0.1, device="cpu", print_every=10):
    net.train()
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # 데이터를 val_frac 비율로 나눈다.
    val_idx = int(len(data) * (1 - val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if device is not None:
        net.to(device)
    
    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        h = net.init_hidden(n_seqs)
        for x, y in get_batches(data, n_seqs, n_steps):
            counter += 1
            
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if device is not None:
                inputs, targets = inputs.to(device), targets.to(device)

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            net.zero_grad()
            
            output, h = net(inputs, h)
            loss = criterion(output, targets.view(n_seqs * n_steps))

            loss.backward()
            
            # 과적합 방치를 위해 그래디언트 클리핑을 한다.
            # L2 정규화 (벡터 길이 구하는 것)
            # 보통 5 - 10 사이의 값이 좋다는 경험적 데이터에 근거한다.
            nn.utils.clip_grad_norm_(net.parameters(), clip)

            opt.step()
            
            if counter % print_every == 0:
                
                # Get validation loss
                val_h = net.init_hidden(n_seqs)
                val_losses = []
                for x, y in get_batches(val_data, n_seqs, n_steps):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if device is not None:
                        inputs, targets = inputs.to(device), targets.to(device)

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(n_seqs*n_steps))
                
                    val_losses.append(val_loss.item())
                
                print(f"Epoch: {e + 1}/{epochs} ",
                      f"Step: {counter} ",
                      f"Loss: {loss.item():.4f} ",
                      f"Val Loss: {np.mean(val_losses):.4f}")


net = Model(chars, n_hidden=512, n_layers=2)
print(net)

n_seqs, n_steps = 128, 100
train(net, encoded, epochs=25, n_seqs=n_seqs, n_steps=n_steps, lr=0.001, device="mps", print_every=100)

Model(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)
Epoch: 1/25  Step: 100  Loss: 2.4442  Val Loss: 2.4632
Epoch: 2/25  Step: 200  Loss: 2.1756  Val Loss: 2.2828
Epoch: 3/25  Step: 300  Loss: 2.0262  Val Loss: 2.0947
Epoch: 3/25  Step: 400  Loss: 1.8900  Val Loss: 2.0064
Epoch: 4/25  Step: 500  Loss: 1.8512  Val Loss: 1.9264
Epoch: 5/25  Step: 600  Loss: 1.7573  Val Loss: 1.8773
Epoch: 6/25  Step: 700  Loss: 1.7211  Val Loss: 1.8337
Epoch: 6/25  Step: 800  Loss: 1.6780  Val Loss: 1.8087
Epoch: 7/25  Step: 900  Loss: 1.6326  Val Loss: 1.7716
Epoch: 8/25  Step: 1000  Loss: 1.5865  Val Loss: 1.7397
Epoch: 8/25  Step: 1100  Loss: 1.5615  Val Loss: 1.7304
Epoch: 9/25  Step: 1200  Loss: 1.5335  Val Loss: 1.7057
Epoch: 10/25  Step: 1300  Loss: 1.5219  Val Loss: 1.7008
Epoch: 11/25  Step: 1400  Loss: 1.5514  Val Loss: 1.6819
Epoch: 11/25  Step: 1500  Loss: 1.4686  Va

In [21]:
# change the name, for saving multiple files
model_name = 'rnn_1_epoch.net'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

In [42]:
def sample(net, size, prime='The', top_k=None):
    if torch.cuda.is_available():
        net.cuda()
    elif torch.backends.mps.is_available():
        net.to("mps")
    else:
        net.cpu()

    net.eval()
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = net.predict(ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for _ in range(size):
        char, h = net.predict(chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)


state_dict = torch.load(model_name)
net = Model(state_dict['tokens'], n_hidden=state_dict['n_hidden'], n_layers=state_dict['n_layers'])
net.load_state_dict(state_dict['state_dict'])
print(sample(net, 2000, prime='Anna', top_k=5))


  state_dict = torch.load(model_name)


Anna was streaming. Seryozha
wanted on seeing his friends of the thing where the days there she
could not have for a change for a minute, and the pord would
so true, they went out into his chasinges were cringed to him them. "You won't all shall at the moment, but
however they showed and the still? His should be anyway,"
said Stepan Arkadyevitch, with a back of the charmed the princess of his wife.

"I can't here he have no sertimes to be a single way as the stop to her at hastes," he
said, stepping and the doctor was as
a considerable she filled his steps, and his shoots was he had been thought, how
so if her heart and strong water, he said the country with him as he had to tone to herself to struck the simple hand of side of
a seese she had bures, and his still said with those for mild of
that her face, when she fluen to the partion and when she had
always answered to the promistable are they had thinking of the door the table, the sheet and the people
with his face what all all as s

# 단순 LSTM을 활용한 텍스트 생성 모델의 한계
그럴싸하게 지어는 냈지만 말이 굉장히 어색한 것을 볼 수가 있는데 이는 작은 파라미터로 긴 문맥을 이해하지 않고 학습하기 때문이다. 단순히 다음에 올 확률이 높은 문자를 이어 붙이기만 하고
LSTM의 파라미터(메모리)가 부족한 경우 장기적인 기억을 하는데 한계가 있기 떄문이다. 또한 글자 하나가 이어붙여질 때마다 오차는 누적되어 계속 초반 문장이 어색한데 어색한 것을 이어붙이는 식으로 
만들어지기 때문에 매끄럽지 않을 수 있다. Top-K 샘플링으로 자연스러움을 조금 더 개선해줄 수 있으나 사실 이는 모델 자체의 한계가 크다.

이 때문에 중요한 부분에 가중치를 부여하는 attention 기법을 쓰는 Transformer 모델이 등장하게 되지 않았나 싶다. Attention 자체가 사람이 정보를 찾는 방식을 흉내내는 것이라서 
텍스트 생성과 비전 문제에서도 뛰어난 성능을 보여준 것 같다.