In [2]:
import numpy as np
import copy
from nltk.tokenize import sent_tokenize
import torch
import torch.optim as optim
from torch import nn
import sys

device = 'cuda'

In [4]:
data = open('bai_van_mau.txt','r',encoding='utf-8').read()
data = data.lower()
chars = list(set(data))

char_to_ix = {ch:i for i,ch in enumerate(chars)}
ix_to_char = {i:ch for i,ch in enumerate(chars)}
datasize, vocab_size =  len(data),  len(chars)

hidden_size = 128
sequence_len = 40

In [5]:
def build_data(text, stride = 3):
    """
    Create a training set by scanning a window of size sequence_len over the text corpus, with stride 3.
    """
    text = data
    Tx = sequence_len
    
    X = []
    Y = []

    for i in range(0, len(text) - Tx, stride):
        X.append(text[i: i + Tx])
        Y.append(text[i + Tx])
    
    return X, Y


X, Y = build_data(data)

In [6]:
def vectorization(X, Y):
    """
    Convert X and Y (lists or chars) into pytorch tensor to be given to a recurrent neural network.
    """
    m = len(X)
    

    x = np.zeros((sequence_len, vocab_size, m))
    y = np.zeros((m))
    for i, sentence in enumerate(X):
        for t, char in enumerate(sentence):
            x[t, char_to_ix[char], i] = 1
        y[i] = char_to_ix[Y[i]]
        
    x = torch.Tensor(x).to(device)
    y = torch.LongTensor(y).to(device)
    return x, y

# test vectorization
x,y = vectorization(X[:10],Y[:10])
x.shape,y.shape, 'sequence_len, vocab_size, batch_size'


(torch.Size([40, 137, 10]),
 torch.Size([10]),
 'sequence_len, vocab_size, batch_size')

In [7]:
def initialize_lstm_parameters():
    """
    Initialize parameters with small random values
    
    Returns:
    parameters -- python dictionary containing:
                        Wax -- Weight matrix multiplying the input, numpy array of shape (n_a, n_x)
                        Waa -- Weight matrix multiplying the hidden state, numpy array of shape (n_a, n_a)
                        Wya -- Weight matrix relating the hidden-state to the output, numpy array of shape (n_y, n_a)
                        b --  Bias, numpy array of shape (n_a, 1)
                        by -- Bias relating the hidden-state to the output, numpy array of shape (n_y, 1)
    """
    Wf = torch.randn(hidden_size, vocab_size + hidden_size, device=device)*0.01 
    bf = torch.zeros((hidden_size, 1), requires_grad=True, device=device)
    Wu = torch.randn(hidden_size, vocab_size + hidden_size, device=device)*0.01 
    bu = torch.zeros((hidden_size, 1), requires_grad=True, device=device)
    Wcc = torch.randn(hidden_size, vocab_size + hidden_size, device=device)*0.01 
    bcc = torch.zeros((hidden_size, 1), requires_grad=True, device=device)
    Wo = torch.randn(hidden_size, vocab_size + hidden_size, device=device)*0.01
    bo = torch.zeros((hidden_size, 1), requires_grad=True, device=device)
    Wy = torch.randn(vocab_size, hidden_size, device=device)*0.01
    by = torch.zeros((vocab_size, 1), requires_grad=True, device=device)
    
    Wf.requires_grad = True
    Wu.requires_grad = True
    Wcc.requires_grad = True
    Wo.requires_grad = True
    Wy.requires_grad = True
    
    
    parameters = [Wf, bf, Wu, bu, Wcc, bcc, Wo, bo, Wy, by]
    
    return parameters

# test init paramter
parameters = initialize_lstm_parameters()
parameters[1].device, parameters[1].is_leaf, parameters[1].requires_grad

(device(type='cuda', index=0), True, True)

In [8]:
def lstm_step_forward(x, a_prev, c_prev, parameters):
    Wf, bf, Wu, bu, Wcc, bcc, Wo, bo, Wy, by = parameters
  
    concat = torch.cat((a_prev, x),axis=0)

    f = torch.sigmoid(Wf @ concat + bf)
    u = torch.sigmoid(Wu @ concat + bu)
    cc = torch.tanh(Wcc @ concat + bcc)
    o = torch.sigmoid(Wo @ concat + bo)


    c = f*c_prev + u*cc
    a = o*torch.tanh(c)
#     y = Wy @ a + by
#     return y, a, c
    return a, c

# test forward step
parameters = initialize_lstm_parameters()
batch_len=100
a_prev = torch.randn((hidden_size,batch_len)).to(device)
c_prev = torch.randn((hidden_size,batch_len)).to(device)
x = torch.zeros((vocab_size, batch_len)).to(device)
a, c = lstm_step_forward(x, a_prev, c_prev, parameters)
a.shape, c.shape

(torch.Size([128, 100]), torch.Size([128, 100]))

In [9]:
def lstm_forward(batch_X, a_prev, c_prev, parameters):
    Wf, bf, Wu, bu, Wcc, bcc, Wo, bo, Wy, by = parameters
    batch_size = batch_X.shape[-1]

    a = torch.zeros((sequence_len+1, hidden_size, batch_size)).to(device)
    c = torch.zeros((sequence_len+1, hidden_size, batch_size)).to(device)
    a[0] = a_prev
    c[0] = c_prev
   
    for t in range(sequence_len):
        a[t+1], c[t+1] = lstm_step_forward(batch_X[t], a[t].clone(), c[t].clone(), parameters)
    y_hat = Wy @ a[t+1].clone() + by
    
    # loss = torch.sum(-y*torch.log(y_hat))
    return y_hat, a[-1].detach(), c[-1].detach()

# test forward operation
parameters = initialize_lstm_parameters()
m=10
x,y = vectorization(X[:m],Y[:m])
a_prev = torch.randn((hidden_size,m)).to(device)
c_prev = torch.randn((hidden_size,m)).to(device)
for i in range(10):
    y_hat, a_prev, c_prev = lstm_forward(x, a_prev, c_prev, parameters)
a_prev.shape, c_prev.shape

(torch.Size([128, 10]), torch.Size([128, 10]))

In [10]:
def pickle_(file_path, data=None, operation='load'):
    import bz2
    import pickle
    import _pickle as cPickle
    
    a = None
    if operation != 'load':
        with bz2.BZ2File(file_path, 'wb') as f:
            cPickle.dump(data, f)
    else:
        with bz2.BZ2File(file_path, 'rb') as f:
            a = cPickle.load(f)
    return a


def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64').reshape(-1)
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
   
    out = np.random.choice(range(vocab_size), p = preds.ravel())
    return out
#     return np.argmax(preds)

def softmax1d(x):
    max, _ = torch.max(x, dim=0, keepdims=True) #returns max of each row and keeps same dims
    e_x = torch.exp(x - max) #subtracts each row with its max value
    sum = torch.sum(e_x, dim=0, keepdims=True) #returns sum of each row and keeps same dims
    sf = e_x / sum 

    
    return sf

def generate_output(parameters, get_input=False):
    '''
    Generate n samples characters with random or set input
    '''
    generated = ''
    m = len(X)
    a_prev = torch.randn((hidden_size,1)).to(device)
    c_prev = torch.randn((hidden_size,1)).to(device)
    if get_input == False:
        idx = int(np.random.choice(range(m),1))
        sentence = [X[idx]]
        Y_sample = [Y[idx]]
    else:
        usr_input = input("Viết câu đầu tiên, ít hơn 40 kí tự: ") + ' '
        sentence = [('{0:0>' + str(sequence_len) + '}').format(usr_input[:-2]).lower()]
        generated += usr_input
        Y_sample = [usr_input[-1]]
#         print(sentence, len(sentence[0]))
    
    for i in range(1000):
        x , y = vectorization(sentence, Y_sample)
        preds, a_prev, c_prev = lstm_forward(x, a_prev, c_prev, parameters)
        preds = softmax1d(preds)
        next_index = sample(preds.detach().cpu(), temperature = .5)
            
        next_char = ix_to_char[next_index]
            
        generated += next_char
        sentence = [sentence[0][1:] + next_char]
        
    return generated

# pickle_('params.pickle', parameters, 'write')
parameters = pickle_('params.pickle')
generated = generate_output(parameters, True)
print(generated)

Viết câu đầu tiên, ít hơn 40 kí tự: cháu lên ba
cháu lên ba ́ễåậ́ặỷ1́ặ1ẃwèấ;ậ́aấ;wè́ẽ'́ặỷỡwỷ́ễớ́ặ5ậ́wỷậưủ́ồớ́7ỷạ́wèỷð́7ỷủ.7́ặỷóõắ́ộớậ́ồớ́791wè́wèớṹ?ắậ́wỷấ́wèấ;ậ́ặỷ/ậ́7.̃́7ỷậð7”́791wè́795ậ́ồớ́aåậ́wèỷậð?́èậ̀̃́7ắ́ỷắậ́79ấ;wè́ặ)ắýẽớậ́7ỷ/́ồậð7́wắ?́79ẵ́ồ ́7ỷắ́7ỷậ●7́aấ*ặ́èậớủ́ặỷ8wè́hỷđwè́ặắ7ỷḿẽậmẃộậ ủ́ồớ́7ỷ●́wỷấwè́wèấ;ậ́aớẃẽớ́ồụẃhỷđwè́ặắễọwè́ễớ́ặỷậ●ặ́7ỷủũ ẃwè1ớậ́õắã́ặỷĩ́ồ.7́ẽĩ́ặỷủũðẃẽĩ́?ủ4ẃẽắṹặắ1́7ỷạ́ặỷỹwè́ặỷủũðẃ?i7́wèấ;ậ́?ợ́7ỷầṹwèấ;ậ́aớẃẽớ́ễåậ́ẽầ7́ỷåwỷ́hỷậ́ồ ́a●ẃ7ỷầṹaấ*ặ́wèấ;ậ́7ắ́aấ*ặ́ỷ1ớẃặẩwỷ́ồớ́ặ1ẃwèấ;ậ́7ỷấ;wè́ặỷủũðẃồớ́79ớwè́aḿhỷđwè́ặỷ"́ặắ7ỷḿ7åậ́7ỷẩ1́ễớ́wèấ;ậ́aắwè́ặỷ1́ặỷ̀wè́p?́hỷđwè́75?́ỷậðẃ7ỷáặ́ếá́79ấ;wè́7ậwỷ́79ắwè́a“́èậ̀̃́7ắ́wỷầ7́79ẵ́7ỷớwỷ́ỷắậ́?ợ́ặ1ẃộâủ́ặ)ắ́7ạwỷ́ũưủ́aấ*ặ́hỷđwè́ặỷ"́ặắwỷốwè́wèấ;ậ́aọặ́ẽậmẃhỷđwè́ặắ?i7́wèấ;ậ́aớẃẽớ́ỷớwè́wèớṹwỷâẃồ.7́79ấ;wè́79ưẃ791wè́7ỷ/́wè.̃́wỷậ ủ́wỷốwè́èậắ́aạwỷ́p?́ặỷủũðẃhḿồ ́wỷốwè́wèấ;ậ́7ỷắ́7ỷậ●7́ồqậ́?i7́ặđwè́ồậðặ́ặỷ1́ặỷ̀wè́7ắ́ặ̣wè́7ỷầṹ?i7́7ỷ●́7ỷạ́ễớ?́ẽåwỷ́7ỷâẃễâủ́ễớ́?i7́ặỷủũðẃẽú7́7ỷđậ́wèấ;ậ́ặỷ8wèã́?i7́wèớṹ7ỷĩ7́wè)́?ậưủ́7ẩ

# Training the network

In [13]:
m = len(X)
batch_size = 1000
num_batch = np.floor(m/batch_size)
num_iterations = 20

parameters = initialize_lstm_parameters()
a_prev = torch.randn((hidden_size,batch_size)).to(device)
c_prev = torch.randn((hidden_size,batch_size)).to(device)

optimizer = optim.Adam(parameters, lr=0.01)
criterion = nn.CrossEntropyLoss().to(device)

In [None]:
for i in range(num_iterations):
    loss_total = 0
    permutation = np.random.permutation(m)
    count = 0
    for j in range(0,m,batch_size):
        indices = permutation[j:j+batch_size]
        if len(indices) == batch_size: # skip last batch if m%batch_size!=0
            batch_X, batch_Y = [X[i] for i in indices], [Y[i] for i in indices] # mini batch
            batch_X, batch_Y = vectorization(batch_X, batch_Y)

            y_hat, a_prev, c_prev =  lstm_forward(batch_X, a_prev, c_prev, parameters)
            loss = criterion(y_hat.T, batch_Y)

            loss.backward()
            torch.nn.utils.clip_grad_value_(parameters, 1)
            optimizer.step()    # Does the update
            optimizer.zero_grad()
            count+=1
            loss_total += loss.detach()

    print('\nIteration: %d, Loss: %f' % (i, loss_total/num_batch) + '\n')
    print(generate_output(parameters))


In [None]:
pickle_('params.pickle', parameters, 'write')