https://gist.github.com/spro/ef26915065225df65c1187562eca7ec4

In [282]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.autograd import Variable
from torch import optim
import numpy as np
import math, random
from tqdm import tqdm_notebook

from bokeh.plotting import figure, show, output_notebook

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [283]:
# Generating a noisy multi-sin wave 
class noisymultisin():
    def __init__(self):
        self.offset = 0
        
    def sine_2(self, X, signal_freq=60.):
        return (np.sin(2 * np.pi * (X) / signal_freq) + np.sin(4 * np.pi * (X) / signal_freq)) / 2.0

    def noisy(self, Y, noise_range=(-0.05, 0.05)):
        noise = np.random.uniform(noise_range[0], noise_range[1], size=Y.shape)
        return Y + noise

    def sample(self, sample_size):
        X = np.arange(sample_size)
        X += self.offset
        Y = self.noisy(self.sine_2(X + self.offset))
        self.offset += sample_size
        return X, Y

In [284]:
# generating a simple sin wave
class simplesin():
    def __init__(self):
        self.offset = 0
        
    def sin(self, X, signal_freq=60.):
        return np.sin(2 * np.pi * (X) / signal_freq)

    def sample(self, sample_size):
        X = np.arange(sample_size)
        X += self.offset
        Y = self.sin(X)
        self.offset += sample_size
        return X, Y

## The model with different mode: LSTM, LSTMCell and LSTMCustom

In [613]:
class Hidden():
    def __init__(self, *args):
        self.args = args
        self.h_0 = self.__default_state()
        self.c_0 = self.__default_state()

    def __default_state(self):
        return(torch.zeros(self.args, device=device, requires_grad=False))
   
    def reset_state(self):
        self.h_0 = self.__default_state() 
        self.c_0 = self.__default_state()
            
    def get_h_c(self):
        return (self.h_0, self.c_0)
    
    def set_h_c(self, tup):
        self.h_0 = tup[0]
        self.c_0 = tup[1]
        

In [618]:
class PPLSTM(nn.Module):

    def __init__(self, hidden_size, dropout=0.05, num_layers=2, mode="LSTM"):
        super(PPLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.dropout=dropout
        self.num_layers=num_layers
        self.batch_size = 1
        self.lstm = nn.ModuleList()
        self.hiddens = []
        self.mode = mode
        self.training = False

        assert num_layers >= 1
        
        if self.mode is "LSTM":
            self.lstm.append(nn.LSTM(input_size=1, 
                                     hidden_size=hidden_size, 
                                     num_layers=self.num_layers, 
                                     dropout=self.dropout))
            self.hiddens.append(Hidden(self.num_layers, self.batch_size, self.hidden_size))
            
        elif self.mode is "LSTMCell":
            self.lstm.append(nn.LSTMCell(input_size=1, hidden_size=hidden_size)) 

            for layer in range(self.num_layers-1):
                self.lstm.append(nn.LSTMCell(input_size=hidden_size, hidden_size=hidden_size)) 

            for layer in range(self.num_layers):
                self.hiddens.append(Hidden(self.batch_size, self.hidden_size))

        else:
            raise ValueError(f"Unsuported mode {mode}")
            
        self.out = nn.Linear(hidden_size, 1)
        

    def reset_state(self):
        for hidden in self.hiddens:
            hidden.reset_state()

    
    def printParams(self, index=None):
        if index is None:
            print("\nPrinting parameters")
            for parameter in self.parameters():
                print(parameter)
        else:
            for i, value in enumerate(self.parameters()):
                if i == index:
                    print(f"\nPrinting parameters at index {i}")
                    print(value)
                    break
    
    
    def evaluate(self, inputs):
        nbr_inputs = inputs.size()[0]
        if self.mode is "LSTM":
            o = torch.zeros(nbr_inputs, 1, 1).to(device)
            o, hidden = self.lstm[0](inputs, self.hiddens[0].get_h_c())
            self.hiddens[0].set_h_c(hidden)
            o = self.out(o)
        else:
            o = torch.zeros(nbr_inputs, 1).to(device)
            for i, input in enumerate(inputs):
                # lstm layer 0
                hidden = self.lstm[0](input.unsqueeze(1), self.hiddens[0].get_h_c())
                self.hiddens[0].set_h_c(hidden)
                
                # all subsequent lstm layers
                for layer in range(1, self.num_layers):
                    hidden = self.lstm[layer](self.hiddens[layer-1].h_0.unsqueeze(1), self.hiddens[layer].get_h_c())
                    self.hiddens[layer].set_h_c(hidden)

                # linear
                o[i] = self.out(self.hiddens[self.num_layers-1].h_0)

#                 if self.training == True:
#                     self.reset_state()
        return(o)
    
    def forward(self, inputs, training=False):
        self.training = training
        
        # Rearrange the inputs to the correct size based on the selected mode
        if len(inputs.size()) == 1:
            inputs.unsqueeze_(1)
            if mode == "LSTM":
                inputs.unsqueeze_(2)

        # Define local variables based on the selected mode
        if mode == "LSTM":
            input = torch.zeros(1,1,1).to(device)
            input[0,0,0] = inputs[0,0,0]
            outputs = torch.zeros(inputs.size()[0], 1, 1).to(device)
        else:
            input = torch.zeros(1,1).to(device)
            input[0,0] = inputs[0,0]
            outputs = torch.zeros(inputs.size()[0], 1).to(device)
                        
        # The prediction results are way better if the state is not reset before doing them but 
        # I'm unsure yet how to do it in the case of the LSTMCell so to make both models
        # equal, I reset state even before predicting
        self.reset_state()
        
        if self.training is True:
            outputs = self.evaluate(inputs)
        else: 
            # In predicting mode
            # We feed the first elements of inputs for the first loop and then feed the model output into the input.
            for i in range(inputs.size()[0]):
                output = self.evaluate(input)
                outputs[i] = output
                input = output
            self.reset_state()
        return outputs

In [619]:
n_epochs=200
hidden_size=20
dropout=0.00
num_layers=2
learning_size=1400
mode="LSTM"

model = PPLSTM(hidden_size=hidden_size, dropout=dropout, num_layers=num_layers, mode=mode)
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())

losses = np.zeros(n_epochs) # For plotting

# Setup inputs
wave = simplesin()
timesteps, _inputs = wave.sample(sample_size=learning_size)
inputs = torch.from_numpy(_inputs[:-1]).float().to(device)
targets = torch.from_numpy(_inputs[1:]).float().to(device)

for epoch in tqdm_notebook(range(n_epochs)):
 
    optimizer.zero_grad()

    outputs = model(inputs, training=True)
    outputs = torch.squeeze(outputs)
        
    loss = criterion(outputs.view(len(outputs)), targets)
    loss.backward()
    optimizer.step()
    losses[epoch] += loss.item()

HBox(children=(IntProgress(value=0, max=200), HTML(value='')))

In [620]:
output_notebook()

l = figure(plot_width=900, plot_height=400)
l.line(x=range(n_epochs), y=losses, line_width=2, line_color="blue")
l.xaxis.axis_label = "Epoch"
l.yaxis.axis_label = "Loss"
l.yaxis.major_label_orientation = "vertical"
show(l)

In [621]:
output_notebook()

p = figure(plot_width=900, plot_height=400)
p.line(x=timesteps, y=_inputs, line_width=2, line_color="blue", legend="True")
p.line(x=timesteps[1:], y=outputs.tolist(), line_width=2, line_color="orange", legend="Prediction")
show(p)


Try to do some prediction by taking more points in the sin wave


In [622]:
timesteps2, _inputs2 = wave.sample(sample_size=500)
inputs2 = torch.from_numpy(_inputs2[:-1]).float().to(device)
targets2 = torch.from_numpy(_inputs2[1:]).float()
outputs2 = model(inputs2)
outputs2 = torch.squeeze(outputs2)

In [623]:
output_notebook()

p = figure(plot_width=900, plot_height=400, title="Wave generation by model")

# add a line renderer
p.line(x=timesteps2, y=_inputs2, line_width=2, line_color="blue")
p.line(x=timesteps2[1:], y=outputs2.tolist(), line_width=2, line_color="orange")
show(p)