In [None]:
import torch
import torch.nn as nn
import pennylane as qml
import torch.nn.functional as F
import numpy as np

In [None]:
class QLSTM(nn.Module):
    def __init__(self, 
                input_size, 
                hidden_size, 
                n_qubits,
                n_qlayers=1,
                batch_first=True,
                return_sequences=False, 
                return_state=False,
                backend="default.qubit.torch"):
        super(QLSTM, self).__init__()
        self.n_inputs = input_size #features
        self.hidden_size = hidden_size
        self.concat_size = self.n_inputs + self.hidden_size
        self.n_qubits = n_qubits
        self.n_qlayers = n_qlayers
        self.backend = backend  # "default.qubit", "qiskit.basicaer", "qiskit.ibm"

        self.batch_first = batch_first
        self.return_sequences = return_sequences
        self.return_state = return_state

        #self.dev = qml.device("default.qubit", wires=self.n_qubits)
        #self.dev = qml.device('qiskit.basicaer', wires=self.n_qubits)
        #self.dev = qml.device('qiskit.ibm', wires=self.n_qubits)
        # use 'qiskit.ibmq' instead to run on hardware

        self.wires_forget = [f"wire_forget_{i}" for i in range(self.n_qubits)]
        self.wires_input = [f"wire_input_{i}" for i in range(self.n_qubits)]
        self.wires_update = [f"wire_update_{i}" for i in range(self.n_qubits)]
        self.wires_output = [f"wire_output_{i}" for i in range(self.n_qubits)]

        self.dev_forget = qml.device(self.backend, wires=self.wires_forget,torch_device='cuda')
        self.dev_input = qml.device(self.backend, wires=self.wires_input,torch_device='cuda')
        self.dev_update = qml.device(self.backend, wires=self.wires_update,torch_device='cuda')
        self.dev_output = qml.device(self.backend, wires=self.wires_output,torch_device='cuda')

        def _circuit_forget(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_forget)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_forget)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_forget]
        self.qlayer_forget = qml.QNode(_circuit_forget, self.dev_forget, interface="torch",diff_method='backprop')

        def _circuit_input(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_input)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_input)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_input]
        self.qlayer_input = qml.QNode(_circuit_input, self.dev_input, interface="torch",diff_method='backprop')

        def _circuit_update(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_update)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_update)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_update]
        self.qlayer_update = qml.QNode(_circuit_update, self.dev_update, interface="torch",diff_method='backprop')

        def _circuit_output(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_output)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_output)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_output]
        self.qlayer_output = qml.QNode(_circuit_output, self.dev_output, interface="torch",diff_method='backprop')

        weight_shapesf = {"weights": (n_qlayers, n_qubits)}
        weight_shapesi = {"weights": (n_qlayers, n_qubits)}
        weight_shapesu = {"weights": (n_qlayers, n_qubits)}
        weight_shapeso = {"weights": (n_qlayers, n_qubits)}
        device = torch.device('cuda')
        
        print(f"weight_shapes = (n_qlayers, n_qubits) = ({n_qlayers}, {n_qubits})")

        self.clayer_in = torch.nn.Linear(self.concat_size, n_qubits)
        self.VQC = {
            'forget': qml.qnn.TorchLayer(self.qlayer_forget, weight_shapesf),
            'input': qml.qnn.TorchLayer(self.qlayer_input, weight_shapesi),
            'update': qml.qnn.TorchLayer(self.qlayer_update, weight_shapesu),
            'output': qml.qnn.TorchLayer(self.qlayer_output, weight_shapeso)
        }
        self.clayer_out = torch.nn.Linear(self.n_qubits, self.hidden_size)
        #self.clayer_out = [torch.nn.Linear(n_qubits, self.hidden_size) for _ in range(4)]

    def forward(self, x, init_states=None):
        '''
        x.shape is (batch_size, seq_length, feature_size)
        recurrent_activation -> sigmoid
        activation -> tanh
        '''
        device = torch.device('cuda')
        if self.batch_first is True:
            batch_size, seq_length, features_size = x.size()
        else:
            seq_length, batch_size, features_size = x.size()

        hidden_seq = []
        if init_states is None:
            h_t = torch.zeros(batch_size, self.hidden_size).to(device)  # hidden state (output)
            c_t = torch.zeros(batch_size, self.hidden_size).to(device)  # cell state
        else:
            # for now we ignore the fact that in PyTorch you can stack multiple RNNs
            # so we take only the first elements of the init_states tuple init_states[0][0], init_states[1][0]
            h_t, c_t = init_states
            h_t = h_t[0]
            c_t = c_t[0]

        for t in range(seq_length):
            # get features from the t-th element in seq, for all entries in the batch
            x_t = x[:, t, :] #x has shape (batch,seq_len,features)
            
            # Concatenate input and hidden state
            v_t = torch.cat((h_t, x_t), dim=1)

            # match qubit dimension
            y_t = self.clayer_in(v_t)

            f_t = torch.sigmoid(self.clayer_out(self.VQC['forget'](y_t)))  # forget block
            i_t = torch.sigmoid(self.clayer_out(self.VQC['input'](y_t)))  # input block
            g_t = torch.tanh(self.clayer_out(self.VQC['update'](y_t)))  # update block
            o_t = torch.sigmoid(self.clayer_out(self.VQC['output'](y_t))) # output block

            c_t = (f_t * c_t) + (i_t * g_t)
            h_t = o_t * torch.tanh(c_t) #it has size (batch_size, hidden)
            hidden_seq.append(h_t.unsqueeze(0)) #we will end with a number of sequences of the size of the window of time 
        hidden_seq = torch.cat(hidden_seq, dim=0) #(window, batch_size,hidden)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous() #(batch_size,window,hidden)
        return hidden_seq, (h_t, c_t)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

Encoder Decoder

In [None]:
class Encoder(nn.Module):

  def __init__(self, input_size, embedding_dim, n_qubits,  backend='default.qubit.torch'):
    super(Encoder, self).__init__()

    self.input_size=input_size #number of features
    self.embedding_dim, self.hidden_dim = embedding_dim, 2 * embedding_dim
    self.n_qubits=n_qubits
    self.rnn1 = QLSTM(self.input_size, self.hidden_dim, self.n_qubits, backend=backend)
    
    self.rnn2 = QLSTM(self.hidden_dim,embedding_dim,self.n_qubits, backend=backend)

  def forward(self, x):
    #x = x.reshape((1, self.seq_len, self.n_features))
    #print(x.shape)
    x, (_, _) = self.rnn1(x)
    
    
    x, (hidden_n, _) = self.rnn2(x)
    

    #return hidden_n.reshape((self.n_features, self.embedding_dim))
    return x
  
class Decoder(nn.Module):

  def __init__(self, seq_len,n_features,n_qubits, input_dim=64,backend='default.qubit.torch'):
    super(Decoder, self).__init__()

    self.seq_len, self.input_dim = seq_len, input_dim
    self.hidden_dim, self.n_features = 2 * input_dim, n_features
    self.n_qubits=n_qubits

    self.rnn1 = QLSTM(
      self.input_dim,
      self.input_dim,
      self.n_qubits,
      backend=backend
    )

    self.rnn2 = QLSTM(
      self.input_dim,
      self.hidden_dim,
      self.n_qubits,
      backend=backend
    )

    self.output_layer = nn.Linear(self.hidden_dim, n_features)

  def forward(self, x):
    
    #x = x.repeat(self.seq_len, self.n_features)
    #x = x.reshape((self.n_features, self.seq_len, self.input_dim))

    x, (hidden_n, cell_n) = self.rnn1(x)
    x, (hidden_n, cell_n) = self.rnn2(x)
    x = x.reshape((self.seq_len, self.hidden_dim))

    return self.output_layer(x)
  

class RecurrentAutoencoder(nn.Module):
    def __init__(self, seq_len, n_features, n_qubits,embedding_dim=64):
        super(RecurrentAutoencoder, self).__init__()

        self.encoder = Encoder(seq_len, n_features, embedding_dim).to(device)
        self.decoder = Decoder(seq_len,  n_features,n_qubits).to(device)

    def forward(self, x):
        
        x = self.encoder(x)
        x = self.decoder(x)

        return x