# Import Libraries

In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset
import pennylane as qml

# Why LSTM?

Some studies have shown that the LSTM's efficiency and trainability can be improved by replacing some of the layers in the LSTM with variational quantum layers, thus making it a quantum-classical hybrid model of LSTM which we will call QLSTM for Quantum LSTM. In the study done by Samuel Yen-Chi Chen, Shinjae Yoo, and Yao-Lung L. Fang, they show that QLSTM offers better trainability compared to its classical counterpart as it proved to learn significantly more information after the first training epoch than its classical counterpart, learnt the local features better, all while having a comparable number of parameters. Inspired by these recent results, we wanted to test this variational quantum-classical hybrid neural network technique on stock price predictions. 

This study into QLSTM was motivated by a combination of a few separate studies:

Stock price prediction use BERT and GAN: https://arxiv.org/pdf/2107.09055.pdf, Priyank Sonkiya, Vikas Bajpai, Anukriti Bansal <br>
Quantum Long Short-Term Memory: https://arxiv.org/pdf/2009.01783.pdf, Samuel Yen-Chi Chen, Shinjae Yoo, and Yao-Lung L. Fang <br>

With code and ideas reused and repurposed from the following sources:

Example of a QLSTM: https://github.com/rdisipio/qlstm, Riccardo Di Sipio <br>
How to use PyTorch LSTMs for time series regression: https://www.crosstab.io/articles/time-series-pytorch-lstm, Brian Kent <br>
Using GANs to predict stock price movement: https://towardsdatascience.com/aifortrading-2edd6fac689d, Boris Banushev<br>

# How does a LSTM cell work?

It has 2 memory states: 
- Long Term Memory (cell state) 
- Short Term Memory (hidden state) 

And it has 3 gates to work through: 
- Input gate 
- Forget gate 
- Output gate

For the Forget gate:
- Intuition: Decides what information to forget from the cell state (i.e the long term memory) 
- Consists of a sigmoid activation layer (0, 1) 

$$ f_{t} = \sigma(Linear(x_t + h_t))  $$

For the Input gate:
- Intuition: Update Cell State, which adds new information to the long term memory
- Consists of an element-wise product of a sigmoid activation layer (0, 1) and a tanh activation layer (-1, 1). 
- The sigmoid layer judges the importance of the new information is, while the tanh layer processes the new information 

$$ i_{t} = \sigma(Linear(x_t + h_t))  $$
$$ g_{t} = tanh(Linear(x_t + h_t))  $$

For the Output gate:
- Intuition: Determines what information from the cell state would be used for the output
- Consists of a sigmoid activation layer (0, 1) 

$$ o_{t} = \sigma(Linear(x_t + h_t))  $$

To add them all up:

We first update the cell state (i.e the long term memory) using the forget gate (which acts on the previous cell state) and the input gate (which decides uses two layers to process the new information):

$$ c_{t} = f_{t} \dot c_{t-1} + i_{t} \dot g_{t} $$

Then, we use the output gate to determine what information from the long term memory we want to use to determine the information used in the short term memory (our output):

$$ h_{t} = o_{t} \dot tanh(c_{t})  $$

With this information, please build the LSTM cell:

In [2]:
class LSTM(nn.Module):
    def __init__(self, 
                input_size, 
                hidden_size, 
                batch_first=True,
                return_sequences=False, 
                return_state=False):
        super(LSTM, self).__init__()
        self.n_inputs = input_size
        self.hidden_size = hidden_size
        self.concat_size = self.n_inputs + self.hidden_size

        self.batch_first = batch_first
        self.return_sequences = return_sequences
        self.return_state = return_state
        
        ########### Fill in the blanks ###################
        self.clayer_forget = torch.nn.Linear(self.concat_size, self.hidden_size)
        self.clayer_input = # blank
        self.clayer_update = # blank
        self.clayer_output = # blank
        
    def forward(self, x, init_states=None):
        '''
        x.shape is (batch_size, seq_length, feature_size)
        recurrent_activation -> sigmoid
        activation -> tanh
        '''
        if self.batch_first is True:
            batch_size, seq_length, features_size = x.size()
        else:
            seq_length, batch_size, features_size = x.size()

        hidden_seq = []
        if init_states is None:
            h_t = torch.zeros(batch_size, self.hidden_size)  # hidden state (output)
            c_t = torch.zeros(batch_size, self.hidden_size)  # cell state
        else:
            # for now we ignore the fact that in PyTorch you can stack multiple RNNs
            # so we take only the first elements of the init_states tuple init_states[0][0], init_states[1][0]
            h_t, c_t = init_states
            h_t = h_t[0]
            c_t = c_t[0]

        for t in range(seq_length):
            # get features from the t-th element in seq, for all entries in the batch
            x_t = x[:, t, :]
            
            # Concatenate input and hidden state
            v_t = torch.cat((h_t, x_t), dim=1)

            ########## Fill in the blocks here ###########
            f_t = torch.sigmoid(self.clayer_forget(v_t))  # forget block
            i_t = # input block
            g_t = # update block
            o_t = # output block

            c_t = # update cell state/long term memory
            h_t = # update hidden state/short term memory

            hidden_seq.append(h_t.unsqueeze(0))
        hidden_seq = torch.cat(hidden_seq, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        return hidden_seq, (h_t, c_t)

We can use it to create a simple LSTM model here:

In [3]:
class ShallowRegressionLSTM(nn.Module):
    def __init__(self, num_sensors, hidden_units):
        super().__init__()
        self.num_sensors = num_sensors  # this is the number of features
        self.hidden_units = hidden_units
        self.num_layers = 1

        self.lstm = LSTM(
            input_size=num_sensors,
            hidden_size=hidden_units,
            batch_first=True,
            num_layers=self.num_layers
        )

        self.linear = nn.Linear(in_features=self.hidden_units, out_features=1)

    def forward(self, x):
        batch_size = x.shape[0]
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_()
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_()
        
        _, (hn, _) = self.lstm(x, (h0, c0))
        out = self.linear(hn[0]).flatten()  # First dim of Hn is num_layers, which is set to 1 above.

        return out

# How do we add Quantum to it?

We replace the Linear Layers with Variational Quantum Layers! Therefore, the equations now look like:

$$ f_{t} = \sigma(VQC(x_t + h_t))  $$
$$ i_{t} = \sigma(VQC(x_t + h_t))  $$
$$ g_{t} = tanh(VQC(x_t + h_t))  $$
$$ o_{t} = \sigma(VQC(x_t + h_t))  $$
$$ c_{t} = f_{t} \dot c_{t-1} + i_{t} \dot g_{t} $$
$$ h_{t} = o_{t} \dot tanh(c_{t})  $$

To do that, we use Pennylane to create a dressed quantum circuit for each VQC layer:

Linear_out(Variational Quantum Circuit(Linear_in(Input)))

Here is how you do it:

In [4]:
class QLSTM(nn.Module):
    def __init__(self, 
                input_size, 
                hidden_size, 
                n_qubits=4,
                n_qlayers=1,
                n_vrotations=3,
                batch_first=True,
                return_sequences=False, 
                return_state=False,
                backend="lightning.qubit"):
        super(QLSTM, self).__init__()
        self.n_inputs = input_size
        self.hidden_size = hidden_size
        self.concat_size = self.n_inputs + self.hidden_size
        self.n_qubits = n_qubits
        self.n_qlayers = n_qlayers
        self.n_vrotations = n_vrotations
        self.backend = backend  # "default.qubit", "qiskit.basicaer", "qiskit.ibm"

        self.batch_first = batch_first
        self.return_sequences = return_sequences
        self.return_state = return_state
        
        self.wires_forget = [f"wire_forget_{i}" for i in range(self.n_qubits)]
        self.wires_input = [f"wire_input_{i}" for i in range(self.n_qubits)]
        self.wires_update = [f"wire_update_{i}" for i in range(self.n_qubits)]
        self.wires_output = [f"wire_output_{i}" for i in range(self.n_qubits)]

        self.dev_forget = qml.device(self.backend, wires=self.wires_forget)
        self.dev_input = qml.device(self.backend, wires=self.wires_input)
        self.dev_update = qml.device(self.backend, wires=self.wires_update)
        self.dev_output = qml.device(self.backend, wires=self.wires_output)

        
        def ansatz(params, wires_type):
            # Entangling layer.
            ############## Insert Code ##############
            
            ############## Insert Code ##############
            # Question: Should you being doing your work here?
                
        def VQC(features, weights, wires_type):
            # Preproccess input data to encode the initial state.
            ############## Insert Code ##############
            
            ############## Insert Code ##############

        def _circuit_forget(inputs, weights):
            VQC(inputs, weights, self.wires_forget)
            return [qml.expval(qml.PauliZ(wires=i)) for i in self.wires_forget]
        self.qlayer_forget = qml.QNode(_circuit_forget, self.dev_forget, interface="torch")

        def _circuit_input(inputs, weights):
            VQC(inputs, weights, self.wires_input)
            return [qml.expval(qml.PauliZ(wires=i)) for i in self.wires_input]
        self.qlayer_input = qml.QNode(_circuit_input, self.dev_input, interface="torch")

        def _circuit_update(inputs, weights):
            VQC(inputs, weights, self.wires_update)
            return [qml.expval(qml.PauliZ(wires=i)) for i in self.wires_update]
        self.qlayer_update = qml.QNode(_circuit_update, self.dev_update, interface="torch")

        def _circuit_output(inputs, weights):
            VQC(inputs, weights, self.wires_output)
            return [qml.expval(qml.PauliZ(wires=i)) for i in self.wires_output]
        self.qlayer_output = qml.QNode(_circuit_output, self.dev_output, interface="torch")

        weight_shapes = {"weights": (self.n_qlayers, self.n_vrotations, self.n_qubits)}
        print(f"weight_shapes = (n_qlayers, n_vrotations, n_qubits) = ({self.n_qlayers}, {self.n_vrotations}, {self.n_qubits})")

        self.clayer_in = torch.nn.Linear(self.concat_size, self.n_qubits)
        self.VQC = {
            'forget': qml.qnn.TorchLayer(self.qlayer_forget, weight_shapes),
            'input': qml.qnn.TorchLayer(self.qlayer_input, weight_shapes),
            'update': qml.qnn.TorchLayer(self.qlayer_update, weight_shapes),
            'output': qml.qnn.TorchLayer(self.qlayer_output, weight_shapes)
        }
        self.clayer_out = torch.nn.Linear(self.n_qubits, self.hidden_size)

    def forward(self, x, init_states=None):
        '''
        x.shape is (batch_size, seq_length, feature_size)
        recurrent_activation -> sigmoid
        activation -> tanh
        '''
        if self.batch_first is True:
            batch_size, seq_length, features_size = x.size()
        else:
            seq_length, batch_size, features_size = x.size()

        hidden_seq = []
        if init_states is None:
            h_t = torch.zeros(batch_size, self.hidden_size)  # hidden state (output)
            c_t = torch.zeros(batch_size, self.hidden_size)  # cell state
        else:
            # for now we ignore the fact that in PyTorch you can stack multiple RNNs
            # so we take only the first elements of the init_states tuple init_states[0][0], init_states[1][0]
            h_t, c_t = init_states
            h_t = h_t[0]
            c_t = c_t[0]

        for t in range(seq_length):
            # get features from the t-th element in seq, for all entries in the batch
            x_t = x[:, t, :]
            
            # Concatenate input and hidden state
            v_t = torch.cat((h_t, x_t), dim=1)

            # match qubit dimension
            y_t = self.clayer_in(v_t)
    
            ############## Fill in the blocks here ##############
            f_t = torch.sigmoid(self.clayer_out(self.VQC['forget'](y_t)))  # forget block
            i_t = # input block
            g_t = # update block
            o_t = # output block

            c_t = # update cell state/long term memory
            h_t = # update hidden state/short term memory

            hidden_seq.append(h_t.unsqueeze(0))
        hidden_seq = torch.cat(hidden_seq, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        return hidden_seq, (h_t, c_t)

And with this, we can use it to create a simple QLSTM model here:

In [5]:
class QShallowRegressionLSTM(nn.Module):
    def __init__(self, num_sensors, hidden_units, n_qubits=0, n_qlayers=1):
        super().__init__()
        self.num_sensors = num_sensors  # this is the number of features
        self.hidden_units = hidden_units
        self.num_layers = 1

        self.lstm = QLSTM(
            input_size=num_sensors,
            hidden_size=hidden_units,
            batch_first=True,
            n_qubits = n_qubits,
            n_qlayers= n_qlayers
        )

        self.linear = nn.Linear(in_features=self.hidden_units, out_features=1)

    def forward(self, x):
        batch_size = x.shape[0]
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_()
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_()
        
        _, (hn, _) = self.lstm(x, (h0, c0))
        out = self.linear(hn).flatten()  # First dim of Hn is num_layers, which is set to 1 above.

        return out