In [41]:
import torch
import torch.nn as nn
from torch.optim import SGD 
import numpy as np

# Упражнение, для реализации "Ванильной" RNN
* Попробуем обучить сеть восстанавливать слово hello по первой букве. т.е. построим charecter-level модель

In [42]:
a = torch.ones((3,3))*3
b = torch.ones((3,3))*5

In [43]:
a @ b

tensor([[45., 45., 45.],
        [45., 45., 45.],
        [45., 45., 45.]])

In [44]:
a * b

tensor([[15., 15., 15.],
        [15., 15., 15.],
        [15., 15., 15.]])

In [45]:
word = 'ololoasdasddqweqw123456789'
# word = 'hello'

## Датасет. 
Позволяет:
* Закодировать символ при помощи one-hot
* Делать итератор по слову, которыей возвращает текущий символ и следующий как таргет

In [46]:
class WordDataSet:
    
    def __init__(self, word):
        self.chars2idx = {}
        self.indexs  = []
        for c in word: 
            if c not in self.chars2idx:
                self.chars2idx[c] = len(self.chars2idx)
                
            self.indexs.append(self.chars2idx[c])
            
        self.vec_size = len(self.chars2idx)
        self.seq_len  = len(word)
        
    def get_one_hot(self, idx):
        x = torch.zeros(self.vec_size)
        x[idx] = 1
        return x
    
    def __iter__(self):
        return zip(self.indexs[:-1], self.indexs[1:])
    
    def __len__(self):
        return self.seq_len
    
    def get_char_by_id(self, id):
        for c, i in self.chars2idx.items():
            if id == i: return c
        return None

## Реализация базовой RNN
<br/>
Скрытый элемент
$$ h_t= tanh⁡ (W_{ℎℎ} h_{t−1}+W_{xh} x_t) $$
Выход сети

$$ y_t = W_{hy} h_t $$

In [47]:
class VanillaRNN(nn.Module):
    
    def __init__(self, in_size=5, hidden_size=3, out_size=5):
        super(VanillaRNN, self).__init__()        
        self.x2hidden    = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.hidden      = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        self.activation  = nn.Tanh()
        self.outweight   = nn.Linear(in_features=hidden_size, out_features=out_size)
    
    def forward(self, x, prev_hidden):
        hidden = self.activation(self.x2hidden(x) + self.hidden(prev_hidden))
#         Версия без активации - может происходить gradient exploding
#         hidden = self.x2hidden(x) + self.hidden(prev_hidden)
        output = self.outweight(hidden)
        return output, hidden

## Инициализация переменных 

In [48]:
ds = WordDataSet(word=word)
rnn = VanillaRNN(in_size=ds.vec_size, hidden_size=3, out_size=ds.vec_size)
criterion = nn.CrossEntropyLoss()
e_cnt     = 100
optim     = SGD(rnn.parameters(), lr = 0.1, momentum=0.9)

# Обучение

In [49]:
CLIP_GRAD = True

for epoch in range(e_cnt):
    hh = torch.zeros(rnn.hidden.in_features)
    loss = 0
    optim.zero_grad()
    for sample, next_sample in ds:
        x = ds.get_one_hot(sample).unsqueeze(0)
        target =  torch.LongTensor([next_sample])

        y, hh = rnn(x, hh)
        
        loss += criterion(y, target)
     

    loss.backward()
    
    if epoch % 10 == 0:
        print (loss.data.item())
        if CLIP_GRAD: print("Clip gradient : ", torch.nn.utils.clip_grad_norm_(rnn.parameters(), max_norm=5))
    else: 
        if CLIP_GRAD: torch.nn.utils.clip_grad_norm_(rnn.parameters(), max_norm=1)
            
#     print("Params : ")
#     num_params = 0
#     for item in rnn.parameters():
#         num_params += 1
#         print(item.grad)
#     print("NumParams :", num_params)
#     print("Optimize")
    
    optim.step()

73.41444396972656
Clip gradient :  6.128253571447335
57.18257522583008
Clip gradient :  8.708175942105852
49.658294677734375
Clip gradient :  10.375445992498053
37.342552185058594
Clip gradient :  10.933765324504696
29.617595672607422
Clip gradient :  4.913331218988951
23.910120010375977
Clip gradient :  3.67594654777344
22.12882423400879
Clip gradient :  2.7630521407530537
22.689374923706055
Clip gradient :  23.492668841825783
21.24439811706543
Clip gradient :  4.46421624044511
27.220043182373047
Clip gradient :  12.977829952388593


# Тестирование

In [50]:
rnn.eval()
hh = torch.zeros(rnn.hidden.in_features)
id = 0
softmax  = nn.Softmax(dim=1)
predword = ds.get_char_by_id(id)
for c in enumerate(word[:-1]):
    x = ds.get_one_hot(id).unsqueeze(0)
    y, hh = rnn(x, hh)
    y = softmax(y)
    m, id = torch.max(y, 1)
    id = id.data[0]
    predword += ds.get_char_by_id(id)
print ('Prediction:\t' , predword)
print("Original:\t", word)
assert(predword == word)

Prediction:	 oasdasdasdasdasdasdasdasda
Original:	 ololoasdasddqweqw123456789


AssertionError: 

# ДЗ
Реализовать LSTM и GRU модули, обучить их предсказывать тестовое слово

In [10]:
#тестовое слово
word = 'ololoasdasddqweqw123456789'

## Реализовать LSTM

In [11]:
#Написать реализацию LSTM и обучить предсказывать слово

class myLSTM(nn.Module):
    
    def __init__(self, in_size=5, hidden_size=3, out_size=5):
        super(myLSTM, self).__init__()    
        self.tanh = nn.Tanh()
        self.sig = nn.Sigmoid()
        
        self.cand_x2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.cand_hid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.ix2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.ihid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.fx2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.fhid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.ox2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.ohid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.cand_x2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.cand_hid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.outweight = nn.Linear(in_features=hidden_size, out_features=out_size, bias=False)
    
    def forward(self, x, prev_hidden, prev_state):
        candidate_cell_state = self.tanh(self.cand_x2hid(x) + self.cand_hid2hid(prev_hidden))
        
        input_gate = self.sig(self.ix2hid(x) + self.ihid2hid(prev_hidden))
        forget_gate = self.sig(self.fx2hid(x) + self.fhid2hid(prev_hidden))
        output_gate = self.sig(self.ox2hid(x) + self.ohid2hid(prev_hidden))
        
        state = forget_gate * prev_state + input_gate * candidate_cell_state
        hidden = output_gate * self.tanh(state)
        
        output = self.outweight(hidden)
        
        return output, hidden, state

In [34]:
ds = WordDataSet(word=word)
hidden_size=20
rnn = myLSTM(in_size=ds.vec_size, hidden_size=hidden_size, out_size=ds.vec_size)
criterion = nn.CrossEntropyLoss()
e_cnt     = 100
optim     = SGD(rnn.parameters(), lr = 0.2, momentum=0.9)

In [35]:
CLIP_GRAD = True

for epoch in range(e_cnt):
    hh = torch.zeros(hidden_size)
    state = torch.zeros(hidden_size)
    loss = 0
    optim.zero_grad()
    for sample, next_sample in ds:
        x = ds.get_one_hot(sample).unsqueeze(0)
        target =  torch.LongTensor([next_sample])

        y, hh, state = rnn(x, hh, state)
        
        loss += criterion(y, target)
     

    loss.backward()
    
    if epoch % 10 == 0:
        print (loss.data.item())
        if CLIP_GRAD: print("Clip gradient : ", torch.nn.utils.clip_grad_norm_(rnn.parameters(), max_norm=5))
    else: 
        if CLIP_GRAD: torch.nn.utils.clip_grad_norm_(rnn.parameters(), max_norm=1)
    
    optim.step()

71.00183868408203
Clip gradient :  1.8934135753720187
50.348941802978516
Clip gradient :  8.280644278677135
24.195974349975586
Clip gradient :  18.488357973906677
35.21298599243164
Clip gradient :  37.739387362514904
19.048065185546875
Clip gradient :  9.959193190069682
7.041546821594238
Clip gradient :  6.237555687994236
2.2635979652404785
Clip gradient :  2.1892817734471004
0.25054073333740234
Clip gradient :  0.27804219792901924
0.05487251281738281
Clip gradient :  0.056126192059905725
0.024082183837890625
Clip gradient :  0.02374052989068462


In [36]:
rnn.eval()
hh = torch.zeros(hidden_size)
state = torch.zeros(hidden_size)
id = 0
softmax  = nn.Softmax(dim=1)
predword = ds.get_char_by_id(id)
for c in enumerate(word[:-1]):
    x = ds.get_one_hot(id).unsqueeze(0)
    y, hh, state = rnn(x, hh, state)
    y = softmax(y)
    m, id = torch.max(y, 1)
    id = id.data[0]
    predword += ds.get_char_by_id(id)
print ('Prediction:\t' , predword)
print("Original:\t", word)
assert(predword == word)

Prediction:	 ololoasdasddqweqw123456789
Original:	 ololoasdasddqweqw123456789


## Реализовать GRU

In [37]:
#Написать реализацию GRU и обучить предсказывать слово

class myGRU(nn.Module):
    
    def __init__(self, in_size=5, hidden_size=3, out_size=5):
        super(myGRU, self).__init__()    
        self.tanh = nn.Tanh()
        self.sig = nn.Sigmoid()
        
        self.ux2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.uhid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.rx2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.rhid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.hid_cand_x2hid = nn.Linear(in_features=in_size, out_features=hidden_size)
        self.hid_cand_hid2hid = nn.Linear(in_features=hidden_size, out_features=hidden_size)
        
        self.outweight = nn.Linear(in_features=hidden_size, out_features=out_size, bias=False)
    
    def forward(self, x, prev_hidden):
        update_gate = self.sig(self.ux2hid(x) + self.uhid2hid(prev_hidden))
        reset_gate = self.sig(self.rx2hid(x) + self.rhid2hid(prev_hidden))
        hidden_candidate = self.tanh(self.hid_cand_x2hid(x) + self.hid_cand_hid2hid(reset_gate * prev_hidden))
        
        hidden = (1 - update_gate) * hidden_candidate + update_gate * prev_hidden
        
        output = self.outweight(hidden)
        
        return output, hidden

In [38]:
ds = WordDataSet(word=word)
hidden_size=20
rnn = myGRU(in_size=ds.vec_size, hidden_size=hidden_size, out_size=ds.vec_size)
criterion = nn.CrossEntropyLoss()
e_cnt     = 100
optim     = SGD(rnn.parameters(), lr = 0.2, momentum=0.9)

In [39]:
CLIP_GRAD = True

for epoch in range(e_cnt):
    hh = torch.zeros(hidden_size)
    loss = 0
    optim.zero_grad()
    for sample, next_sample in ds:
        x = ds.get_one_hot(sample).unsqueeze(0)
        target =  torch.LongTensor([next_sample])

        y, hh = rnn(x, hh)
        
        loss += criterion(y, target)
     

    loss.backward()
    
    if epoch % 10 == 0:
        print (loss.data.item())
        if CLIP_GRAD: print("Clip gradient : ", torch.nn.utils.clip_grad_norm_(rnn.parameters(), max_norm=5))
    else: 
        if CLIP_GRAD: torch.nn.utils.clip_grad_norm_(rnn.parameters(), max_norm=1)
    
    optim.step()

70.51126098632812
Clip gradient :  4.077464295536491
36.13667297363281
Clip gradient :  11.29450096116869
13.686416625976562
Clip gradient :  9.713353079763419
4.613590240478516
Clip gradient :  7.528328063262426
3.789278984069824
Clip gradient :  7.043000781782226
2.5595197677612305
Clip gradient :  5.9222902054146935
0.18115806579589844
Clip gradient :  0.38413490844312564
0.04615211486816406
Clip gradient :  0.07454732159372253
0.018060684204101562
Clip gradient :  0.02947075543827954
0.011266708374023438
Clip gradient :  0.016750137330578


In [40]:
rnn.eval()
hh = torch.zeros(hidden_size)
id = 0
softmax  = nn.Softmax(dim=1)
predword = ds.get_char_by_id(id)
for c in enumerate(word[:-1]):
    x = ds.get_one_hot(id).unsqueeze(0)
    y, hh = rnn(x, hh)
    y = softmax(y)
    m, id = torch.max(y, 1)
    id = id.data[0]
    predword += ds.get_char_by_id(id)
print ('Prediction:\t' , predword)
print("Original:\t", word)
assert(predword == word)

Prediction:	 ololoasdasddqweqw123456789
Original:	 ololoasdasddqweqw123456789
