# Hybrid Classical Quantum AutoEncoder for anomaly detection

## VQC
The VQC implemented is the number 10 of the reference paper, which essentially is a stack Pauli $YX$ rotation gate and a circular series of controlled $CX$, stacked while alternated with an encoding based on the Pauli $X$ rotation

In [None]:
from qiskit.circuit import QuantumCircuit, Gate, ParameterVector
from qiskit.opflow.expectations import PauliExpectation
from qiskit_machine_learning.connectors import TorchConnector

def get_encoding_block(nqubits: int, features: ParameterVector) -> Gate:
    """ n parameters required """
    assert len(features) == nqubits 
    block = QuantumCircuit(nqubits, name="Encoding Block")
    for i in range(nqubits):
        block.rx(features[i], i)
    return block.to_gate()

def get_ansatz_block(nqubits: int, parameters: ParameterVector) -> Gate:
    """
    need 2n parameters
    """
    assert nqubits * 2 == len(parameters)
    block = QuantumCircuit(nqubits, name="Ansatz Block")
    for i in range(nqubits):
        block.ry(parameters[i], i)
        block.rx(parameters[i + nqubits], i)
    if nqubits > 1:
        block.cx(nqubits - 1, 0)
        for i in range(nqubits - 1):
            block.cx(i, i + 1)
    return block.to_gate()

def get_ansatz(nqubits: int, parameters: ParameterVector, features: ParameterVector, reps: int=3) -> QuantumCircuit:
    assert len(parameters) == reps * 2 * nqubits
    ansatz = QuantumCircuit(nqubits)
    ansatz.compose(get_ansatz_block(nqubits, parameters[:2 * nqubits]), range(nqubits), inplace=True)
    for i in range(1, reps):
        ansatz.barrier()
        ansatz.compose(get_encoding_block(nqubits, features), range(nqubits), inplace=True)
        ansatz.barrier()
        ansatz.compose(get_ansatz_block(nqubits, parameters[2 * nqubits * i :2 * nqubits * (i + 1)]), range(nqubits), inplace=True)
    return ansatz

def get_ansatz_ws(nqubits: int, parameters: ParameterVector, features: ParameterVector, reps: int=3) -> QuantumCircuit:
    assert len(parameters) == 2 * nqubits
    ansatz = QuantumCircuit(nqubits)
    ansatz.compose(get_ansatz_block(nqubits, parameters), range(nqubits), inplace=True)
    for i in range(1, reps):
        ansatz.barrier()
        ansatz.compose(get_encoding_block(nqubits, features), range(nqubits), inplace=True)
        ansatz.barrier()
        ansatz.compose(get_ansatz_block(nqubits, parameters), range(nqubits), inplace=True)
    return ansatz


In [None]:
from qiskit.utils import algorithm_globals
algorithm_globals.random_seed = 528491

size = (10, 4)
data = algorithm_globals.random.random(size)
nqubits = data.shape[1]
print(data, nqubits)

In [None]:
input = ParameterVector("x", data.shape[1])
weights = ParameterVector("theta", nqubits * 2)
circuit_ws = get_ansatz_ws(nqubits, weights, input)
circuit_ws.draw()


In [None]:
random_weights_ws = algorithm_globals.random.random(2 * nqubits)
print(random_weights_ws)

In [None]:
from qiskit_machine_learning.neural_networks import SamplerQNN

sqnn_ws = SamplerQNN(
    circuit=circuit_ws,
    input_params=input,
    weight_params=weights,
    interpret=lambda x: "{:b}".format(x).count('1') % 2 == 0, # parity check
    output_shape=2
)
print(sqnn_ws)

In [None]:
sampler_qnn_forward = sqnn_ws.forward(data[0], random_weights_ws) # require encoding + ansatz parameters, result is a ndarray
print(f"Forward pass result for SamplerQNN: {sampler_qnn_forward}. \nShape: {sampler_qnn_forward.shape}")


In [None]:
input1 = ParameterVector("x1", data.shape[1])
weights1 = ParameterVector("theta1", nqubits * 2 * 3)
circuit = get_ansatz(nqubits, weights1, input1)
circuit.draw()


In [None]:
def parity(x):
    print(x, str(x), "{:b}".format(x), "{:b}".format(x).count('1') % 2)
    return "{:b}".format(x).count('1') % 2

def custom_interpret(x):
    return "{:b}".format(x).count('1') % 4


sqnn = SamplerQNN(
    circuit=circuit,
    input_params=input1,
    weight_params=weights1,
    interpret=lambda x: custom_interpret(x), # parity check
    output_shape=4
)
print(sqnn)

In [None]:
random_weights = algorithm_globals.random.random(len(weights1))
sampler_qnn_forward = sqnn.forward(data[0], random_weights) # require encoding + ansatz parameters, result is a ndarray
print(f"Forward pass result for SamplerQNN: {sampler_qnn_forward}. \nShape: {sampler_qnn_forward.shape}")

In [None]:
from qiskit_machine_learning.neural_networks import EstimatorQNN
from qiskit.quantum_info import SparsePauliOp
from qiskit.opflow.expectations import PauliExpectation

def get_Z_expectation_qubitwise(nqubits: int) -> list:
    """
        :nqubits
        the number of qubits to evaluate
        :return
        the qubitwise observables for each Z expectation value, which follows the formula P(1) = (1 - Exp) / 2
    """
    obs = []
    for i in range(nqubits):
        string = "I" * i + "Z" + "I" * (nqubits - (i + 1))
        obs.append(SparsePauliOp.from_list([(string, 1)]))
    return obs

ob = get_Z_expectation_qubitwise(4)
print(ob)

# observable1 = SparsePauliOp.from_list([("Z", 1), ("Z", 1), ("Z", 1), ("I", 1)])
eqnn_ws = EstimatorQNN(
    circuit=circuit_ws,
    input_params=input,
    weight_params=weights,
    observables=ob
)

print(eqnn_ws)
circuit_ws.draw()

In [None]:
result = eqnn_ws.forward(data[0], random_weights_ws)
print(result)

# Classic AutoEncoder Wrapper

In [None]:
import torch.nn as nn
import torch

class HAE(nn.Module):
    """
        general structure is:
        - encoder, FC input_size -> 54 -> 4
        - qnn
        - decoder, FC 4 -> 54 -> input_size
        - tanh activations
    """

    def __init__(self, qnn, input_size: int, nqubits: int = 4) -> None:
        super(HAE, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(input_size, 54),
            nn.Tanh(),
            nn.Linear(54, nqubits),
            nn.Tanh()
        )
        self.vqc = TorchConnector(qnn)
        self.decoder = nn.Sequential(
            nn.Linear(nqubits, 54),
            nn.Tanh(),
            nn.Linear(54, input_size)
        )

    def forward(self, X):
        X = self.encoder(X)
        X = self.vqc(X)
        X = (torch.ones_like(X) - X) / 2
        X = self.decoder(X)
        return X
    
    def encode(self, X):
        return self.vqc(self.encode(X))

In [None]:
import torch
ins = 160
input = torch.rand(ins)
print(input)

In [None]:
test = HAE(eqnn_ws, input_size=ins)

In [None]:
forward = test(input)
print(forward)

# Some Training & Evaluation functions

In [None]:
from tqdm import tqdm
from torch.optim import Adam
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

def evaluate(model, data_loader):
    mse = nn.MSELoss()
    model.eval()
    avg_error = 0
    with torch.no_grad():
        for X, y in tqdm(data_loader, desc="Validating", leave=False):
            X = X.to(device)
            reconstruction = model(X)
            avg_error += mse(reconstruction, X).sum().item() / len(X)
    return avg_error

def training(model, train_dl, val_dl, epochs: int = 100):
    mse = nn.MSELoss()
    optim = Adam(model.parameters(), lr=0.001)
    model.train()
    for i in range(1, epochs + 1):
        for X, y in tqdm(train_dl, "Epoch #{}".format(i), leave=True):
            X = X.to(device)
            reconstruction = model(X)
            loss = mse(reconstruction, X)

            optim.zero_grad()
            loss.backward()
            optim.step()
        # if i % 5 == 0:
        rec_error = evaluate(model, val_dl)
        print("Validation average reconstruction error: {}".format(rec_error))
        model.train()

# Data
Define a random dataset and the arrhytmia dataset loader

In [None]:
from torch.utils.data import DataLoader, Dataset, Subset
import numpy as np
class RandomDataset(Dataset):
    def __init__(self, size, length, mean = 0, std_dev = 1) -> None:
        super().__init__()
        self.values = torch.normal(mean, std_dev, size=(length, size))
        self.labels = (torch.normal(0, 1, size=(length,)) > 0) * 1
        self.labels.tolist()

    def __len__(self):
        return len(self.values)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        X = self.values[idx]
        y = self.labels[idx]
        return X, y
    
random_data = RandomDataset(160, 1000, 100, 5)

In [None]:
import pandas as pd
class ArrythmiaDS(Dataset):

    def __init__(self, path: str = "./datasets/arrhythmia.data", get_anomalies: bool = False) -> None:

        def _is_nominal(df, idx) -> bool:
            l = df.iloc[:, idx]
            return len(l) == len(l[l == 0]) + len(l[l == 1])
    
        def _fix_missing(df: pd.DataFrame):
            for i in range(len(df.columns)):
                mean_value = df.iloc[:, i].mean(skipna=True)
                if _is_nominal(df, i):
                    # the mean is the bernoulli probability
                    df.iloc[:, i].map(lambda x: x if x is not pd.NA else 1 * (np.random.random(mean_value) > 0.5))
                else:
                    pass
                    df.iloc[:, i].fillna(value=mean_value, inplace=True)
            return df


        super().__init__()
        self.data = pd.read_csv(path, sep=',', na_values='?', dtype=np.float32)
        self.labels = self.data.iloc[:, -1]
        self.data = self.data.iloc[:, :-1]
        self.data = _fix_missing(self.data)

        # get normal data or anomalies
        if not get_anomalies:
            self.data = self.data[self.labels == 1] 
            self.labels = self.labels[self.labels == 1] 
        else:
            self.data = self.data[self.labels != 1] 
            self.labels = self.labels[self.labels != 1] * 0 # use label = 0 as a generic indicator of anomaly

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data.iloc[idx, :]), torch.tensor(self.labels.iloc[idx])
    

def get_splits(dataset, dataset_anomalies, test_split: float = 0.3, validation_split: float = 0.3, batch_size: int = 64) -> tuple[DataLoader, DataLoader, DataLoader]:
    I = np.random.permutation(len(dataset))
    Ian = np.random.permutation(len(dataset_anomalies))
    test_size = int(len(dataset) * test_split)
    a_test_size = int(len(dataset_anomalies) * test_split)
    train_val_size = int((len(dataset) - test_size) * validation_split)
    a_train_val_size = int((len(dataset_anomalies) - a_test_size) * validation_split)
    ds_test = Subset(dataset, I[:test_size]) + Subset(dataset_anomalies, Ian[:a_test_size])
    ds_val = Subset(dataset, I[test_size: test_size + train_val_size]) + Subset(dataset_anomalies, Ian[a_test_size: a_test_size + a_train_val_size])
    ds_train = Subset(dataset, I[test_size + train_val_size:])
    return DataLoader(ds_train, batch_size=batch_size, shuffle=True), DataLoader(ds_val, batch_size=batch_size, shuffle=True), DataLoader(ds_test, batch_size=batch_size, shuffle=True)

ds = ArrythmiaDS()
dsa = ArrythmiaDS(get_anomalies=True)
dl = DataLoader(ds)
train_dl, val_dl, test_dl = get_splits(ds, dsa)

# A quick test
We will only use two qubits as latent space

In [None]:
train_features, train_labels = next(iter(train_dl))
print(train_features.shape, train_labels)

In [None]:
from qiskit.primitives import Estimator
reps = 3
weights = ParameterVector("w", 8 * reps) # 2n * reps
inputs = ParameterVector("x", 4) # n
qnn_circuit = get_ansatz(4, parameters=weights, features=inputs, reps=reps)
qnn = EstimatorQNN(
    circuit=qnn_circuit,
    input_params=inputs,
    weight_params=weights,
    input_gradients=True,
    observables=get_Z_expectation_qubitwise(4),
    estimator=Estimator(options={'shots': 20})
)

In [None]:
hae = HAE(qnn, 279, nqubits=4).to(device)

In [None]:
training(hae, train_dl, val_dl, epochs=30)

In [None]:
torch.save(hae.state_dict(), "weights/hae_3reps.pt")

In [None]:
w = torch.load("weights/hae_3reps.pt")
print(type(w))

In [None]:
hae.load_state_dict(w)

In [None]:
evaluate(hae, test_dl)

In [None]:
for i, (x, y) in enumerate(test_dl):
    print(x.shape)