In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
from sklearn.preprocessing import MinMaxScaler

In [None]:
class LSTMNP(object):

    def __init__(
            self,
            units:int,
            return_sequences:bool = False,
            return_states:bool = False,
            time_major:bool = False
    ):
        self.units = units
        self.return_sequences = return_sequences
        self.return_states = return_states
        self.time_major = time_major

        self.kernel = k_vals.numpy()
        self.rec_kernel = rec_vals.numpy()
        self.bias = b_vals.numpy()


    def __call__(self, inputs, initial_state=None):

        if not self.time_major:
            inputs = np.moveaxis(inputs, [0, 1], [1, 0])

        lookback_steps, bs, ins = inputs.shape

        if initial_state is None:
            h_state = np.zeros((bs, self.units))
            c_state = np.zeros((bs, self.units))
        else:
            assert len(initial_state) == 2
            h_state, c_state = initial_state

        h_states = []
        c_states = []

        for step in range(lookback_steps):

            h_state, c_state = self.cell(inputs[step], h_state, c_state)

            h_states.append(h_state)
            c_states.append(c_state)

        h_states = np.stack(h_states)
        c_states = np.stack(c_states)

        if not self.time_major:
            h_states = np.moveaxis(h_states, [0, 1], [1, 0])
            c_states = np.moveaxis(c_states, [0, 1], [1, 0])

        o = h_states[:, -1]
        if self.return_sequences:
            o = h_states

        if self.return_states:
            return o, c_states
        return o

    def cell(self, xt, ht, ct):
        """implements logic of LSTM"""

        # input gate
        k_i = self.kernel[:, :self.units]
        rk_i = self.rec_kernel[:, :self.units]
        b_i = self.bias[:self.units]
        i_t = self.sigmoid(np.dot(xt, k_i) + np.dot(ht, rk_i) + b_i)

        # forget gate
        k_f = self.kernel[:, self.units:self.units * 2]
        rk_f = self.rec_kernel[:, self.units:self.units * 2]
        b_f = self.bias[self.units:self.units * 2]
        ft = self.sigmoid(np.dot(xt, k_f) + np.dot(ht, rk_f) + b_f)

        # candidate cell state
        k_c = self.kernel[:, self.units * 2:self.units * 3]
        rk_c = self.rec_kernel[:, self.units * 2:self.units * 3]
        b_c = self.bias[self.units * 2:self.units * 3]
        c_t = self.tanh(np.dot(xt, k_c) + np.dot(ht, rk_c) + b_c)

        # cell state
        ct = ft * ct + i_t * c_t

        # output gate
        k_o = self.kernel[:, self.units * 3:]
        rk_o = self.rec_kernel[:, self.units * 3:]
        b_o = self.bias[self.units * 3:]
        ot = self.sigmoid(np.dot(xt, k_o) + np.dot(ht, rk_o) + b_o)
        ht = ot * self.tanh(ct)

        return ht, ct

    @staticmethod
    def tanh(x):
        return np.tanh(x)

    @staticmethod
    def sigmoid(x):
        return 1. / (1 + np.exp(-x))

In [None]:
def sliding_windows(data, seq_length):
    x = []
    y = []

    for i in range(len(data)-seq_length-1):
        _x = data[i:(i+seq_length)]
        _y = data[i+seq_length]
        x.append(_x)
        y.append(_y)

    return np.array(x),np.array(y)

In [None]:
class LSTM(nn.Module):

    def __init__(self, num_classes, input_size, hidden_size, num_layers):
        super(LSTM, self).__init__()
        
        self.num_classes = num_classes
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.seq_length = seq_length
        
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True)
        
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h_0 = Variable(torch.zeros(
            self.num_layers, x.size(0), self.hidden_size))
        
        c_0 = Variable(torch.zeros(
            self.num_layers, x.size(0), self.hidden_size))
        
        # Propagate input through LSTM
        ula, (h_out, _) = self.lstm(x, (h_0, c_0))
        
        h_out = h_out.view(-1, self.hidden_size)
        
        out = self.fc(h_out)
        
        return out