### Bayesian LSTM

import library

In [1]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from blitz.modules import BayesianLSTM
from blitz.utils import variational_estimator

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import matplotlib.pyplot as plt

from collections import deque

In [3]:
import yfinance as yf

start_date = '1980-01-01'
end_date = '2024-03-23'
ticker = 'AAPL'
data = yf.download(
    ticker, start = start_date, end = end_date
)

[*********************100%%**********************]  1 of 1 completed


In [7]:
data['Close']

Date
1980-12-12      0.128348
1980-12-15      0.121652
1980-12-16      0.112723
1980-12-17      0.115513
1980-12-18      0.118862
                 ...    
2024-03-18    173.720001
2024-03-19    176.080002
2024-03-20    178.669998
2024-03-21    171.369995
2024-03-22    172.509995
Name: Close, Length: 10910, dtype: float64

In [10]:
ret = data['Close'].pct_change().dropna()

In [6]:
def create_timestamps_ds(series, timestep_size):
    time_stamps = []
    labels = []
    aux_deque = deque(maxlen = timestep_size)
    
    #starting the timestep deque
    for i in range(timestep_size):
        aux_deque.append(0)
    
    #feed the timestamps list
    for i in range(len(series)-1):
        aux_deque.append(series[i])
        time_stamps.append(list(aux_deque))
    
    #feed the labels lsit
    for i in range(len(series)-1):
        labels.append(series[i + 1])
    
    assert len(time_stamps) == len(labels), "Something went wrong"
    
    #torch-tensoring it
    features = torch.tensor(time_stamps[timestep_size:]).float()
    labels = torch.tensor(labels[timestep_size:]).float()
    
    return features, labels

In [8]:
@variational_estimator
class NN(nn.Module):
    def __init__(self):
        super(NN, self).__init__()
        self.lstm_1 = BayesianLSTM(1, 10)
        self.linear = nn.Linear(10, 1)
            
    def forward(self, x):
        x_, _ = self.lstm_1(x)
        
        #gathering only the latent end-of-sequence for the linear layer
        x_ = x_[:, -1, :]
        x_ = self.linear(x_)
        return x_

In [56]:
Xs, ys = create_timestamps_ds(ret, timestep_size = 21)
X_train, X_test, y_train, y_test = train_test_split(Xs, ys, test_size = .05, shuffle = False)

ds = torch.utils.data.TensorDataset(X_train, y_train)
dataloader_train = torch.utils.data.DataLoader(ds, batch_size = 1, shuffle = False)

net = NN()

criterion = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr = 0.001)

In [57]:
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

iteration = 0
for epoch in tqdm(range(10)):
    for i, (datapoints, labels) in enumerate(dataloader_train):
        optimizer.zero_grad()
        datapoints = datapoints.reshape(1, 21, 1)

        loss = net.sample_elbo(
            inputs = datapoints, 
            labels = labels, 
            criterion = criterion, 
            sample_nbr = 3
        )
        loss.backward()
        optimizer.step()
        
        iteration += 1

100%|██████████| 10/10 [47:19<00:00, 283.99s/it]


In [ ]:
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow_probability.python.distributions import Normal

def compute_KL_univariate_prior(univariateprior, theta, sample):
    """
    :param prior:  assuming univariate prior of Normal(m,s);
    :param posterior: (theta: mean,std) to create posterior q(w/theta) i.e. Normal(mean,std)
    :param sample: Number of sample
    """
    sample=tf.reshape(sample, [-1])  #flatten vector
    (mean,std)=theta
    mean =tf.reshape(mean, [-1])
    std=tf.reshape(std, [-1])
    posterior = Normal(mean, std)
    (mean2,std2) = univariateprior
    prior=Normal(mean2, std2)

    q_theta=tf.reduce_sum(posterior.log_prob(sample))
    p_d=tf.reduce_sum(prior.log_prob(sample))

    KL=tf.subtract(q_theta,p_d)
    return KL

def variationalPosterior(shape, name, prior, istraining):
    """
    this function create a variational posterior q(w/theta) over a given "weight:w" of the network
    theta is parameterized by mean+standard*noise we apply the reparameterization trick from kingma et al, 2014
    with correct loss function (free energy) we learn mean and standard to estimate of theta, thus can estimate
    posterior p(w/D) by computing KL loss for each variational posterior q(w/theta) with prior(w)

    :param name: is the name of the tensor/variable to create variational posterior  q(w/Q) for true posterior (p(w/D))
    :param shape: is the shape of the weight variable
    :param training: whether in training or inference mode
    :return: samples (i.e. weights), mean of weights, std in-case of the training there is noise add to the weights
    """
    # theta=mu+sigma i.e. theta = mu+sigma i.e. mu+log(1+exp(rho)), log(1+exp(rho))
    # is the computed by using tf.math.softplus(rho)
    mu=tf.get_variable("{}_mean".format(name), shape=shape, dtype=tf.float32);
    rho=tf.get_variable("{}_rho".format(name), shape=shape, dtype=tf.float32);
    sigma = tf.math.softplus(rho)

    #if training we add noise to variation parameters theta
    if (istraining):
        epsilon= Normal(0,1.0).sample(shape)
        sample=mu+sigma*epsilon
    else:
        sample=mu+sigma;

    theta=(mu,sigma)

    kl_loss = compute_KL_univariate_prior(prior, theta, sample)

    tf.summary.histogram(name + '_rho_hist', rho)
    tf.summary.histogram(name + '_mu_hist', mu)
    tf.summary.histogram(name + '_sigma_hist', sigma)

    # we shall used this in the training to get kl loss
    tf.add_to_collection("KL_layers", kl_loss)

    return sample, mu, sigma

In [ ]:
import tensorflow as tf
class BayesianLSTMCell(tf.keras.layers.Layer):
    def __init__(self, num_units, prior_fn, is_training, **kwargs):
        super(BayesianLSTMCell, self).__init__(**kwargs)
        self.num_units = num_units
        self.prior_fn = prior_fn
        self.is_training = is_training
        self.state_size = self.num_units
        self.output_size = self.num_units

    def build(self, input_shape):
        input_dim = input_shape[-1]
        self.kernel = self.add_weight(shape=(input_dim + self.num_units, 4 * self.num_units),
                                      initializer='glorot_uniform',
                                      name='kernel')
        self.recurrent_kernel = self.add_weight(shape=(self.num_units, 4 * self.num_units),
                                                initializer='orthogonal',
                                                name='recurrent_kernel')
        self.bias = self.add_weight(shape=(4 * self.num_units,),
                                    initializer='zeros',
                                    name='bias')
        # Variational posterior weights/biases
        # Implement your variationalPosterior function based on your needs
        # self.w, self.w_mean, self.w_sd = variationalPosterior(...)
        # self.b, self.b_mean, self.b_sd = variationalPosterior(...)
        self.built = True

    def call(self, inputs, states):
        prev_output = states[0]
        concat_inputs = tf.concat([inputs, prev_output], axis=-1)

        gate_inputs = tf.matmul(concat_inputs, self.kernel)
        gate_inputs = tf.nn.bias_add(gate_inputs, self.bias)

        i, j, f, o = tf.split(gate_inputs, num_or_size_splits=4, axis=1)

        new_cell = tf.sigmoid(f + self.recurrent_activation_bias) * states[1] + tf.sigmoid(i) * tf.tanh(j)
        new_hidden = tf.sigmoid(o) * tf.tanh(new_cell)

        return new_hidden, [new_hidden, new_cell]