# References

This code is based on the work by DiSipio, Huang, Chen described in the paper in reference 1 below.

1. Di Sipio R, Huang J-H, Chen SY-C, et al (2022) The Dawn of Quantum Natural Language Processing. In: ICASSP 2022 - 2022 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP). IEEE, Singapore, Singapore, pp 8612–8616
2. Chen SY-C, Yoo S, Fang Y-LL (2022) Quantum Long Short-Term Memory. In: ICASSP 2022 - 2022 IEEE International Conference on Acoustics, Speech and Signal Processing (ICASSP). IEEE, Singapore, Singapore, pp 8622–8626


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import pyplot

import pennylane as qml
from pennylane import numpy as np
from pennylane.templates.embeddings import AngleEmbedding, AmplitudeEmbedding
from pennylane.optimize import AdamOptimizer
from pennylane.optimize import NesterovMomentumOptimizer

import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import load_model
from tensorflow.keras.metrics import AUC

from sklearn.preprocessing import normalize
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score


import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


In [1]:
from google.colab import drive

drive.mount('/MyDrive')
SystemCallData="./MyData/MySystemCallList.csv"
SysData = pd.read_csv(SystemCallData)
SysData = SysData[["SystemCallName"]] 
SysDataList=SysData['SystemCallName'].tolist()


In [15]:
word_to_ix = {}

In [16]:
for systemcall in SysData['SystemCallName']:
  if systemcall not in word_to_ix:  # word has not been assigned an index yet
      word_to_ix[systemcall] = len(word_to_ix)  # Assign each word with a unique index

In [18]:
def split_sequence(sequence, n_steps):
	X, y = list(), list()
	for i in range(len(sequence)):
		# find the end of this pattern
		end_ix = i + n_steps
		# check if we are beyond the sequence
		if end_ix > len(sequence)-1:
			break
		# gather input and output parts of the pattern
		seq_x, seq_y = sequence[i:end_ix], sequence[i+1:end_ix+1]
		X.append(seq_x)
		y.append(seq_y)
	return array(X), array(y)

In [19]:
# choose a number of time steps
from numpy import array
n_steps = 15 # 3, 5, 10, 15, 20

n_steps_list = [3,5,10,15,20]
# split into samples
h=SysDataList
#h=[1,2,3,4,5,6,7,8,9,10]
X, y = split_sequence(h, n_steps)

#trainingdata_list = [split_sequence(h, i) for i in n_steps_list]

In [20]:
trainingdata_list = (split_sequence(h, i) for i in n_steps_list)

In [23]:
trainingdata=[X,y]

In [30]:
def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    return torch.tensor(idxs, dtype=torch.long)

In [None]:
class QLSTM(nn.Module):
    def __init__(self, 
                input_size, 
                hidden_size, 
                n_qubits=4,
                n_qlayers=1,
                batch_first=True,
                return_sequences=False, 
                return_state=False,
                backend="default.qubit"):
        super(QLSTM, self).__init__()
        self.n_inputs = input_size
        self.hidden_size = hidden_size
        self.concat_size = self.n_inputs + self.hidden_size
        self.n_qubits = n_qubits
        self.n_qlayers = n_qlayers
        self.backend = backend  # "default.qubit", "qiskit.basicaer", "qiskit.ibm"

        self.batch_first = batch_first
        self.return_sequences = return_sequences
        self.return_state = return_state

        self.wires_forget = [f"wire_forget_{i}" for i in range(self.n_qubits)]
        self.wires_input = [f"wire_input_{i}" for i in range(self.n_qubits)]
        self.wires_update = [f"wire_update_{i}" for i in range(self.n_qubits)]
        self.wires_output = [f"wire_output_{i}" for i in range(self.n_qubits)]

        self.dev_forget = qml.device(self.backend, wires=self.wires_forget)
        self.dev_input = qml.device(self.backend, wires=self.wires_input)
        self.dev_update = qml.device(self.backend, wires=self.wires_update)
        self.dev_output = qml.device(self.backend, wires=self.wires_output)

        def _circuit_forget(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_forget)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_forget)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_forget]
        self.qlayer_forget = qml.QNode(_circuit_forget, self.dev_forget, interface="torch")

        def _circuit_input(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_input)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_input)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_input]
        self.qlayer_input = qml.QNode(_circuit_input, self.dev_input, interface="torch")

        def _circuit_update(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_update)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_update)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_update]
        self.qlayer_update = qml.QNode(_circuit_update, self.dev_update, interface="torch")

        def _circuit_output(inputs, weights):
            qml.templates.AngleEmbedding(inputs, wires=self.wires_output)
            qml.templates.BasicEntanglerLayers(weights, wires=self.wires_output)
            return [qml.expval(qml.PauliZ(wires=w)) for w in self.wires_output]
        self.qlayer_output = qml.QNode(_circuit_output, self.dev_output, interface="torch")

        weight_shapes = {"weights": (n_qlayers, n_qubits)}
        print(f"weight_shapes = (n_qlayers, n_qubits) = ({n_qlayers}, {n_qubits})")

        self.clayer_in = torch.nn.Linear(self.concat_size, n_qubits)
        self.VQC = {
            'forget': qml.qnn.TorchLayer(self.qlayer_forget, weight_shapes),
            'input': qml.qnn.TorchLayer(self.qlayer_input, weight_shapes),
            'update': qml.qnn.TorchLayer(self.qlayer_update, weight_shapes),
            'output': qml.qnn.TorchLayer(self.qlayer_output, weight_shapes)
        }
        self.clayer_out = torch.nn.Linear(self.n_qubits, self.hidden_size)

    def draw_mpl(self, style, inp, weights):
        qml.drawer.use_style(style)
        fig, ax = qml.draw_mpl(self.qlayer_forget)(inp, weights)
        fig, ax = qml.draw_mpl(self.qlayer_input)(inp, weights)
        fig, ax = qml.draw_mpl(self.qlayer_update)(inp, weights)
        fig, ax = qml.draw_mpl(self.qlayer_output)(inp, weights)
        plt.show()

    def snapshots(self, inp, weights):
        print("forget: ", qml.snapshots(self.qlayer_forget)(inp, weights))
        print("input: ", qml.snapshots(self.qlayer_input)(inp, weights))
        print("update: ", qml.snapshots(self.qlayer_update)(inp, weights))
        print("output: ", qml.snapshots(self.qlayer_output)(inp, weights))

    def print(self, inp, weights):
        print(qml.draw(self.qlayer_forget, expansion_strategy="device")(inp, weights))
        print(qml.draw(self.qlayer_input, expansion_strategy="device")(inp, weights))
        print(qml.draw(self.qlayer_update, expansion_strategy="device")(inp, weights))
        print(qml.draw(self.qlayer_output, expansion_strategy="device")(inp, weights))

    def forward(self, x, init_states=None):
        '''
        x.shape is (batch_size, seq_length, feature_size)
        recurrent_activation -> sigmoid
        activation -> tanh
        '''
        if self.batch_first is True:
            batch_size, seq_length, features_size = x.size()
        else:
            seq_length, batch_size, features_size = x.size()

        hidden_seq = []
        if init_states is None:
            h_t = torch.zeros(batch_size, self.hidden_size)  # hidden state (output)
            c_t = torch.zeros(batch_size, self.hidden_size)  # cell state
        else:
            h_t, c_t = init_states
            h_t = h_t[0]
            c_t = c_t[0]

        for t in range(seq_length):
            # get features from the t-th element in seq, for all entries in the batch
            x_t = x[:, t, :]
            
            # Concatenate input and hidden state
            v_t = torch.cat((h_t, x_t), dim=1)

            # match qubit dimension
            y_t = self.clayer_in(v_t)

            f_t = torch.sigmoid(self.clayer_out(self.VQC['forget'](y_t)))  # forget block
            i_t = torch.sigmoid(self.clayer_out(self.VQC['input'](y_t)))  # input block
            g_t = torch.tanh(self.clayer_out(self.VQC['update'](y_t)))  # update block
            o_t = torch.sigmoid(self.clayer_out(self.VQC['output'](y_t))) # output block

            c_t = (f_t * c_t) + (i_t * g_t)
            h_t = o_t * torch.tanh(c_t)

            hidden_seq.append(h_t.unsqueeze(0))
        hidden_seq = torch.cat(hidden_seq, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        return hidden_seq, (h_t, c_t)

In [31]:
class LSTMTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size, n_qubits=0):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_qubits = n_qubits

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        if n_qubits > 0:
            print("Quantum LSTM")
            self.lstm = QLSTM(embedding_dim, hidden_dim, n_qubits=n_qubits)
        else:
            print("Classical LSTM")
            self.lstm = nn.LSTM(embedding_dim, hidden_dim)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_logits = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_logits, dim=1)
        return tag_scores

In [32]:
embedding_dim = 8
hidden_dim = 6    # 2,4,6,8,10,12,14
hidden_dim_list = list(range(2,16,2))
n_epochs = 300

In [33]:
models_classical = []

for i in hidden_dim_list:

  models_classical.append(LSTMTagger(embedding_dim, 
                          i, 
                          vocab_size=len(word_to_ix), 
                          tagset_size=len(word_to_ix), 
                          n_qubits=0))


In [36]:
def train(model, n_epochs, trainingdata=trainingdata, name="", file=""):
    loss_function = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)
    #first = True
    history = {
        'loss': [],
        'acc': []
    }
    for epoch in range(n_epochs):
        losses = []
        preds = []
        targets = []
        for i in range(len(trainingdata)):
            # Step 1. Remember that Pytorch accumulates gradients.
            # We need to clear them out before each instance
            model.zero_grad()

            # Step 2. Get our inputs ready for the network, that is, turn them into
            # Tensors of word indices.
            sentence_in = prepare_sequence(trainingdata[0][i], word_to_ix)
            labels = prepare_sequence(trainingdata[1][i], word_to_ix)

            # Step 3. Run our forward pass.
            tag_scores = model(sentence_in)

            # Step 4. Compute the loss, gradients, and update the parameters by
            #  calling optimizer.step()
            loss = loss_function(tag_scores, labels)
            loss.backward()
            optimizer.step()
            losses.append(float(loss))
            
            probs = torch.softmax(tag_scores, dim=-1)
            preds.append(probs.argmax(dim=-1))
            targets.append(labels)

        avg_loss = np.mean(losses)
        history['loss'].append(avg_loss)
        
        preds = torch.cat(preds)
        targets = torch.cat(targets)
        corrects = (preds == targets)
        accuracy = corrects.sum().float() / float(targets.size(0) )
        history['acc'].append(accuracy)

        string = f"Epoch {epoch+1} / {n_epochs}: Loss = {avg_loss:.3f} Acc = {accuracy:.2f}"

        string = (name + " " + string) if name else string
        print(string)

        if file:
          
          with open(file, 'a', newline='') as f:
            # if first:
            #   temp = max(accuracy)
            #   f.write(f'Max accuracy = {temp:.2f} at Epoch = {accuracy.index(temp)},\n')
            #   first = False
            f.write(string+',\n')
            # writer = csv.writer(f)
            # writer.writerow(string)
    # if file:
    #       with open(file, 'a', newline='') as f:

    return history

In [37]:
#train(models_classical[0], n_epochs, data,name=f"Classical hidden_dim={models_classical[0].hidden_dim}_n_steps={n_steps_list[0]}", file="/QUANTUM/save.csv")

In [38]:
#history_classical = train(model_classical, n_epochs)

# his_model = train(models_classical[0], n_epochs, list(next(trainingdata_list)), 
#                                      name=f"Classical hidden_dim={hidden_dim_list[0]}:n_steps={n_steps_list[0]}",
#                                      file="/content/epochs.csv")

# histories_classical = []

# for k,i in enumerate(models_classical):
#   count = 0
#   for train_data in trainingdata_list:
#     histories_classical.append([train(i, n_epochs, list(train_data), 
#                                      name=f"Classical hidden_dim={hidden_dim_list[k]}:n_steps={n_steps_list[count]}",
#                                      file="/content/epochs.csv"), 
#                                 hidden_dim_list[k]])
#     count += 1


## Training

In [39]:
n_qubits = 1 # 2,3,4,5,6,7,8,9,10
n_qubits_list = list(range(1,11))

model_quantum = LSTMTagger(embedding_dim, 
                        hidden_dim, 
                        vocab_size=len(word_to_ix), 
                        tagset_size=len(word_to_ix), 
                        n_qubits=n_qubits)

models_quantum = []

for i in hidden_dim_list:

  for j in n_qubits_list:

    models_quantum.append(LSTMTagger(embedding_dim, 
                            i, 
                            vocab_size=len(word_to_ix), 
                            tagset_size=len(word_to_ix), 
                            n_qubits=j))


### Plot the training history

In [41]:
from matplotlib import pyplot as plt

def plot_history(history_classical, history_quantum, hidden_dim_classical=0, hidden_dim_quantum=0, n_qubits=0, file="pos_training"):
    loss_c = history_classical['loss']
    acc_c = history_classical['acc']
    loss_q = history_quantum['loss']
    acc_q = history_quantum['acc']
    n_epochs = max([len(loss_c), len(loss_q)])
    x_epochs = [i for i in range(n_epochs)]
    
    fig, ax1 = plt.subplots()
    
    ax1.set_xlabel("Epoch")
    ax1.set_ylabel("Loss")
    
    ax1.plot(loss_c, label="Classical LSTM loss", color='orange', linestyle='dashed')
    ax1.plot(loss_q, label="Quantum LSTM loss", color='red', linestyle='solid')

    ax2 = ax1.twinx()
    
    #ax2.set_ylabel("Accuracy")
    ax2.plot(acc_c, label="Classical LSTM accuracy", color='steelblue', linestyle='dashed')
    ax2.plot(acc_q, label="Quantum LSTM accuracy", color='blue', linestyle='solid')
    ax1.tick_params(right=False)  
    ax2.tick_params(right=False)
    plt.title(f"hidden_dim_c={hidden_dim_classical}    n_qubits={n_qubits}   hidden_dim_q={hidden_dim_quantum}")
    plt.ylim(0., 1.1)
    #plt.legend(loc="upper right")
    ax2.yaxis.set_visible(False)
    fig.legend(loc="upper right", bbox_to_anchor=(1.55,1), bbox_transform=ax1.transAxes)

    #plt.savefig(file+".pdf", bbox_inches='tight')
    plt.savefig(file+".png", bbox_inches='tight')
    plt.show()

In [42]:
#data = next(trainingdata_list)
data = split_sequence(h, 3)

In [43]:
# history_quantum = train(model_quantum, n_epochs)

# histories_quantum = []

count = 6
count_steps = 0

his_c_model = train(models_classical[count], n_epochs, data, 
                    name=f"Classical hidden_dim={models_classical[count].hidden_dim}_n_steps={n_steps_list[count_steps]}",
                    file=f"/QUANTUM/classical_hidden_dim_{models_classical[count].hidden_dim}_n_steps={n_steps_list[count_steps]}_epochs.csv")

for i in models_quantum:

  his_q_model = train(i, n_epochs, data,
                      name=f"Quantum hidden_dim={i.hidden_dim}_n_steps={n_steps_list[count_steps]}_n_qubits={i.n_qubits}",
                      file=f"/QUANTUM/quantum_hidden_dim_{i.hidden_dim}_n_qubits_{i.n_qubits}_n_steps={n_steps_list[count_steps]}_epochs.csv")
  
  plot_history(his_c_model, his_q_model, models_classical[count].hidden_dim, i.hidden_dim, i.n_qubits, f"/PHOTOS/hidden_dim_c_{models_classical[count].hidden_dim}_n_qubits_{i.n_qubits}_hidden_dim_q_{i.hidden_dim}")


In [44]:
!zip -r /QUANTUM.zip /QUANTUM/

In [45]:
!zip -r /PHOTOS.zip /PHOTOS/

In [46]:
from google.colab import files
files.download('/QUANTUM.zip')
files.download('/PHOTOS.zip')

In [None]:
plot_history(his_model, his_q_model, hidden_dim_list[0], hidden_dim_list[0], 1, f"/hidden_dim_c_{hidden_dim_list[0]}_n_qubits_{1}_hidden_dim_quantum_{hidden_dim_list[0]}")

In [None]:
for history, hidden_dim in histories_classical:
  for q_history, q_hidden_dim, n_qubits in histories_quantum:
    plot_history(history, q_history, hidden_dim, q_hidden_dim, n_qubits, f"/content/drive/quantum_photos/hidden_dim_c_{hidden_dim}_n_qubits_{n_qubits}_hidden_dim_quantum_{q_hidden_dim}")

In [None]:
plot_history(history_classical, history_quantum, 3, 2)