In [3]:
!pip list

Package                   Version
------------------------- --------------
anyio                     4.4.0
argon2-cffi               23.1.0
argon2-cffi-bindings      21.2.0
arrow                     1.3.0
asttokens                 2.4.1
async-lru                 2.0.4
attrs                     23.2.0
Babel                     2.15.0
beautifulsoup4            4.12.3
bleach                    6.1.0
certifi                   2024.6.2
cffi                      1.16.0
charset-normalizer        3.3.2
colorama                  0.4.6
comm                      0.2.2
contourpy                 1.2.1
cycler                    0.12.1
debugpy                   1.8.1
decorator                 5.1.1
defusedxml                0.7.1
dill                      0.3.8
executing                 2.0.1
fastdtw                   0.3.4
fastjsonschema            2.20.0
fonttools                 4.53.0
fqdn                      1.5.1
h11                       0.14.0
httpcore                  1.0.5
httpx           

In [2]:
import csv
import logging
import os
import time

import pandas as pd
from IPython.core.display_functions import clear_output
from qiskit import QuantumCircuit, transpile, assemble
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit.primitives import Sampler
from qiskit_aer import AerSimulator
import numpy as np
import matplotlib.pyplot as plt
from qiskit_machine_learning.algorithms import QSVC

# Setup logging
logging.basicConfig(filename='quantum_embedding.log', level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
def quantum_encoder(theta, n, m):
    qc = QuantumCircuit(n + m)
    for i in range(n):
        qc.ry(theta[i % len(theta)], i)
    return qc

def matrix_inverse_evolution(qc, beta, adj_matrix, n, t=1):
    identity = np.eye(2**n)
    I_minus_betaA = identity - beta * adj_matrix
    
    for _ in range(t):
        for i in range(2**n):
            for j in range(2**n):
                if I_minus_betaA[i][j] != 0:
                    angle = -2 * I_minus_betaA[i][j]
                    qc.rz(angle, j % n)
    return qc

def hadamard_test(theta, beta, adj_matrix, n, m, t=1):
    qc = QuantumCircuit(1 + n + m, 1)  # 1 ancilla qubit + address and embedding registers, 1 classical bit
    
    # Prepare the initial state
    qc.h(0)  # Hadamard on the ancilla qubit
    
    # Quantum encoder circuit
    encoder_qc = quantum_encoder(theta, n, m)
    qc = qc.compose(encoder_qc, qubits=range(1, n + m + 1))
    
    # Apply controlled-U (matrix-inverse evolution)
    controlled_U = QuantumCircuit(n + m)
    controlled_U = matrix_inverse_evolution(controlled_U, beta, adj_matrix, n, t)
    qc = qc.compose(controlled_U.control(1), qubits=[0] + list(range(1, n + m + 1)))
    
    # Apply the second Hadamard gate
    qc.h(0)
    
    # Measure only the ancilla qubit
    qc.measure(0, 0)
    
    return qc

def estimate_L(theta, adj_matrix, beta, n, m, t):
    qc = hadamard_test(theta, beta, adj_matrix, n, m, t)
    logging.info(f"Quantum Circuit:\n{qc}")
    simulator = AerSimulator()
    transpiled_qc = transpile(qc, simulator)
    # display(transpiled_qc.draw())
    qobj = assemble(transpiled_qc)
    result = simulator.run(qobj).result()
    counts = result.get_counts()
    
    # Calculate expectation value <psi|U|psi>
    logging.info(f"Counts: {counts}")
    expectation_value = (counts.get('0', 0) - counts.get('1', 0)) / sum(counts.values())
    return expectation_value

In [None]:
def optimize_parameters(theta, adj_matrix, beta, alpha, delta_theta, n, m, t, max_iter=100):
    objective_values = []
    for iteration in range(max_iter):
        grad = np.zeros_like(theta)
        for i in range(len(theta)):
            theta_plus = theta.copy()
            theta_minus = theta.copy()
            theta_plus[i] += delta_theta
            theta_minus[i] -= delta_theta
            L_plus = estimate_L(theta_plus, adj_matrix, beta, n, m, t)
            # print("L_plus", L_plus)
            L_minus = estimate_L(theta_minus, adj_matrix, beta, n, m, t)
            # print("L_minus", L_minus)
            grad[i] = (L_plus - L_minus) / (2 * delta_theta)
        
        theta += alpha * grad
        L_theta = estimate_L(theta, adj_matrix, beta, n, m, t)
        objective_values.append(L_theta)
        
        # Logging information
        logging.info(f"Iteration {iteration + 1}: Objective Value = {L_theta}")
        logging.info(f"Iteration {iteration + 1}: Gradients = {grad}")
        logging.info(f"Iteration {iteration + 1}: Parameters (theta) = {theta}")

        # print(np.linalg.norm(grad)) # this is 0.0 :(
        grad_norm = np.linalg.norm(grad)
        if grad_norm < 1e-6:
            logging.info(f"Convergence reached at iteration {iteration + 1}")
            print(f"Gradient Norm: {grad_norm:.6f} < {1e-6:.6f} !!!")
            break
        print(f"Gradient Norm: {grad_norm:.6f} > {1e-6:.6f}")
    return theta, objective_values

In [None]:
def check_convergence_criterion(adj_matrix, beta):
    eigenvalues = np.linalg.eigvals(beta * adj_matrix)
    if np.max(np.abs(eigenvalues)) >= 1:
        raise ValueError("Convergence criterion is not satisfied.")
    
def check_adjacency_matrix_dimension(adj_matrix):
    if adj_matrix.shape[0] != adj_matrix.shape[1]:
        raise ValueError("Adjacency matrix must be square.")
    if adj_matrix.shape[0] < 2:
        raise ValueError("Adjacency matrix must be at least 2x2.")
    
def extend_adjacency_matrix(adj_matrix):
    N = adj_matrix.shape[0]
    n = int(np.ceil(np.log2(N)))

    if 2**n == N:
        return adj_matrix

    print(f"Extending the matrix dimensionality to the nearest power of 2.")
    extended_adj_matrix = np.zeros((2**n, 2**n))
    extended_adj_matrix[:N, :N] = adj_matrix
    return extended_adj_matrix

In [None]:
def get_adj_matrix(path, beta):
    df = pd.read_csv(path)
    print(df.head())
    
    adj_matrix = np.array([
        [0, 1, 1, 1],
        [1, 0, 1, 0],
        [1, 1, 0, 0],
        [1, 0, 0, 0]
    ])
    
    # Check some stuff
    check_adjacency_matrix_dimension(adj_matrix)
    check_convergence_criterion(adj_matrix, beta)
    return extend_adjacency_matrix(adj_matrix), labels

In [None]:
beta = 0.1
path = f"{os.getcwd()}/flat_kg.csv"

adj_matrix, labels = get_adj_matrix(path, beta)

N = adj_matrix.shape[0]
n = int(np.ceil(np.log2(N)))  # Number of qubits in the address register
m = 1  # Number of qubits in the embedding register (adjust as needed)

gamma = 0.5
alpha = 0.1
delta_theta = 0.01
t = 1  # Time parameter for evolution

theta = np.random.rand(N) * 2 * np.pi  # Initialize theta randomly

optimized_theta, objective_values = optimize_parameters(theta, adj_matrix, beta, alpha, delta_theta, n, m, t)

# Plotting the objective function
plt.plot(objective_values)
plt.xlabel('Iteration')
plt.ylabel('Objective Function Value')
plt.title('Optimization of Objective Function')
plt.show()

print("Optimized Parameters:", optimized_theta)

# TODO: Turn this into our encoding layer
encoder = ...

## Evaluation

Now we need to know how well this embedding performs. This is measured using a QML algorithm with a simple encoder as a benchmark

### Simple encoder

In [None]:
num_features = adj_matrix.shape[1]

feature_map = ZZFeatureMap(feature_dimension=num_features, reps=1)
feature_map.decompose().draw(output="mpl", style="clifford", fold=20)

### Declaring some universally used stuff

In [None]:
ansatz = RealAmplitudes(num_qubits=num_features, reps=3)
ansatz.decompose().draw(output="mpl", style="clifford", fold=20)

sampler = Sampler()

### Visualising Evaluation

In [None]:
objective_func_vals = []
plt.rcParams["figure.figsize"] = (12, 6)

def callback_graph(weights, obj_func_eval):
    clear_output(wait=True)
    objective_func_vals.append(obj_func_eval)
    plt.title("Objective function value against iteration")
    plt.xlabel("Iteration")
    plt.ylabel("Objective function value")
    plt.plot(range(len(objective_func_vals)), objective_func_vals)
    plt.show()

### Training QML Algorithm

In [None]:
def train_qml(sampler, encoder, ansatz, callback_graph, train_features, train_labels):
    classifier = QSVC(
        sampler=sampler,
        feature_map=encoder,
        ansatz=ansatz,
        callback=callback_graph,
    )
    
    start = time.time()
    classifier.fit(train_features, train_labels)
    elapsed = time.time() - start
    
    print(f"Training time: {round(elapsed)} seconds")
    
    return classifier

#### Simple encoder

In [None]:
train_qml(sampler, feature_map, ansatz, callback_graph, adj_matrix, labels)

#### Our encoder

In [None]:
train_qml(sampler, encoder, ansatz, callback_graph, adj_matrix, labels)