In [1]:
# Add Lincoln to system path
import sys
sys.path.append("/Users/seth/development/lincoln/")

import numpy as np
np.set_printoptions(precision=4)

import torch

from typing import List, Callable, Dict, Tuple

from lincoln.autograd.tensor import Tensor
from lincoln.autograd.layer import Layer
from lincoln.autograd.model import Model
from lincoln.autograd.param import Parameter
from lincoln.autograd.optim import Optim, SGD
from lincoln.autograd.train import Trainer
from lincoln.autograd.activations import sigmoid, tanh, linear

from lincoln.utils import permute_data

### LSTM Autograd

Differences:

* `Layer`: the actual forward function will be different.
* `Trainer`: batch generator now will do all the transforming of text into the data.

In [4]:
class LSTMLayer(Layer):

    def __init__(self,
                 neurons: int) -> None:
        self.state_size = neurons
        self.first = True
        self.params: Dict[['str'], Tensor] = {}
        self.h_init = Tensor(np.random.randn(1, self.state_size))
        self.c_init = Tensor(np.random.randn(1, self.state_size))
            
    def _init_params(self, input_: Tensor) -> None:
        np.random.seed(self.seed)
        
        self.params['Wf'] = Parameter(self.state_size + self.vocab_size, 
                                 self.state_size)
        self.params['Wi'] = Parameter(self.state_size + self.vocab_size, 
                                 self.state_size)
        self.params['Wo'] = Parameter(self.state_size + self.vocab_size, 
                                 self.state_size)
        self.params['Wc'] = Parameter(self.state_size + self.vocab_size, 
                                 self.state_size)
        self.params['Wv'] = Parameter(self.state_size, self.vocab_size)

        self.params['Bf'] = Parameter(self.state_size)
        self.params['Bi'] = Parameter(self.state_size)
        self.params['Bo'] = Parameter(self.state_size)
        self.params['Bc'] = Parameter(self.state_size)
        self.params['Bv'] = Parameter(self.vocab_size)

        hiddens = self.h_init.repeat(input_.shape[0])
        cells = self.c_init.repeat(input_.shape[0])
        
        return hiddens, cells
            
    def forward(self, input_: Tensor) -> Tensor:
        if self.first:
            self.hiddens, self.cells = self._init_params(input_)
            self.first = False

        for i in range(input_.shape[1]): # sequence length
            if i == 0:
                outputs_single = self._lstm_node(input_.select_index_axis_1(i))
                outputs = outputs_single.expand_dims_axis_1()

            else:
                output_single = self._lstm_node(input_.select_index_axis_1(i))
                output = output_single.expand_dims_axis_1()
                outputs = outputs.append_axis_1(output)
        
        return outputs

    def _lstm_node(self,
                   inputs: Tensor):

        assert inputs.shape[0] == self.hiddens.shape[0] == self.cells.shape[0]

        Z = inputs.concat(self.hiddens)

        forget = sigmoid(Z @ self.params['Wf'] + self.params['Bf'])

        ingate = sigmoid(Z @ self.params['Wi'] + self.params['Bi'])

        outgate = sigmoid(Z @ self.params['Wo'] + self.params['Bo'])

        change = tanh(Z @ self.params['Wc'] + self.params['Bc'])

        self.cells = self.cells * forget + ingate * change

        self.hiddens = outgate * tanh(self.cells)

        outputs = self.hiddens @ self.params['Wv'] + self.params['Bv']

        return outputs

    def _params(self) -> Tensor:

        return list(self.params.values())

In [5]:
class LSTMModel(Model):
    def __init__(self, 
                 layers: List[Layer],
                 vocab_size: int,
                 sequence_length: int = 15,
                 seed: int = 1) -> None:
        super().__init__(layers, seed)
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        
        for layer in self.layers:
            setattr(layer, "seed", self.seed)
            setattr(layer, "vocab_size", self.vocab_size)
        
    def zero_grad(self):
        for parameter in self.parameters():
            parameter.zero_grad()

    def predict(self, inputs: Tensor) -> Tensor:

        output = Tensor(inputs.data, no_grad=True)

        for layer in self.layers:
            output = layer.forward(output)

        return output

    def parameters(self) -> List[Parameter]:

        params = []
        for layer in self.layers:
            for param in layer.params.values():
                params.append(param)

        return params



#### Load and preprocess data

In [6]:
# text data
data = open("../exploratory/data/input.txt", 'r').read()

In [7]:
# setup params
sequence_length = 15
vocab_size = len(set(data))

In [8]:
class LSTMTrainer(object):
    '''
    Just a list of layers that runs forwards and backwards
    '''
    def __init__(self,
                 net: LSTMModel,
                 optim: Optim,
                 data: str,
                 sequence_length: int = 15) -> None:
        self.net = net
        self.optim = optim
        self.data = data
        self.train_data, self.test_data = self._train_test_split_text()
        
        self.max_len = self.net.sequence_length
        
        self.chars = list(set(self.data))
        self.char_to_idx = {ch: i for i, ch in enumerate(self.chars)}
        self.idx_to_char = {i: ch for i, ch in enumerate(self.chars)}
        
    def update_params(self) -> None:
        self.optim.step(self.net)

    def fit(self,
            epochs: int=100,
            eval_every: int=10,
            batch_size: int=32,
            seed: int = 1,
            single_output: bool = False,
            restart: bool = True)-> None:

        if restart:
            self.optim.first = True
            
        for e in range(epochs):

            batch_generator = self._generate_batches(batch_size)

            for ii, (X_batch, y_batch) in enumerate(batch_generator):

                self.net.zero_grad()
                
                prediction = self.net.predict(X_batch)
                loss = self._loss_prediction(prediction, y_batch)
                print(loss)
                loss.backward()

                self.update_params()

            if (e+1) % eval_every == 0:
                predicted = self.net.predict(X_test)
                loss = self._loss_prediction(predicted, y_test)
                print(f"Validation loss after {e+1} epochs is {loss}")

    def _loss_prediction(self,
                         prediction: Tensor,
                         actual: Tensor,
                         kind: str = "mse") -> None:
        if kind == "mse":
            errors = prediction - actual
            loss = (errors * errors).sum()
            return loss

    def _generate_batches(self,
                          batch_size: int) -> Tuple[Tensor]:
        N = len(self.train_data)

        for ii in range(0, N, batch_size):

            features_tensors = []
            target_tensors = []

            for char in range(batch_size):

                features_str, target_str =\
                 self.train_data[ii+char:ii+char+self.max_len],\
                 self.train_data[ii+char+1:ii+char+self.max_len+1]

                features_array, target_array =\
                    self._string_to_one_hot_array(features_str),\
                    self._string_to_one_hot_array(target_str)

                features_tensors.append(features_array)
                target_tensors.append(target_array)

#             import pdb; pdb.set_trace()
            yield Tensor(np.stack(features_tensors), no_grad=True),\
            Tensor(np.stack(target_tensors), no_grad=True)

    def _string_to_one_hot_array(self, input_string: str) -> Tensor:

        ind = [self.char_to_idx[ch] for ch in input_string]

        array = self._one_hot_text_data(ind)

        return array

    def _one_hot_text_data(self,
                           sequence: List) -> Tensor:

        sequence_length = len(sequence)
        batch = np.zeros((sequence_length, self.net.vocab_size))
        for i in range(sequence_length):
            batch[i][sequence[i]] = 1.0

        return batch

    def _train_test_split_text(self, pct=0.8) -> Tuple[str]:

        n = len(self.data)
        return self.data[:int(n * pct)], self.data[int(n * pct):]
    
    def generate_test_data(self) -> Tuple[Tensor]:

        features_str, target_str = self.test_data[:-1], self.test_data[1:]

        X_tensors = []
        y_tensors = []

        N = len(self.test_data)

        for start in range(0, N, self.max_len):

            features_str, target_str =\
             self.test_data[start:start+self.max_len],\
             self.test_data[start+1:start+self.max_len+1]

            features_array, target_array =\
                self._string_to_one_hot_array(features_str),\
                self._string_to_one_hot_array(target_str)

            X_tensors.append(features_array)
            y_tensors.append(target_array)

        return Tensor(np.stack(X_tensors), no_grad=True),\
    Tensor(np.stack(y_tensors), no_grad=True)


In [9]:
optimizer = SGD(lr=0.001)
batch_size = 100
lstm_model = LSTMModel([LSTMLayer(128)],
                       vocab_size,
                       sequence_length=5,
                       seed=112818)

In [10]:
trainer = LSTMTrainer(lstm_model, optimizer, data)

In [None]:
trainer.fit(epochs = 10,
       eval_every = 1,
       batch_size=1,
       seed=102618,
       single_output=True);

Tensor(5674.0475)
Tensor(6516.8157)
Tensor(6907.8247)
