In [None]:
import joblib
import click
import json
import time
import os
import itertools
import collections.abc
import sys
from tqdm import tqdm
# !{sys.executable} -m pip install optimparallel
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pennylane as qml
from sklearn.metrics import mean_squared_error,r2_score
os.environ["OMP_NUM_THREADS"] = "12"
from scipy.optimize import minimize
# Qiskit
from qiskit import QuantumCircuit
from qiskit.quantum_info import Pauli, SparsePauliOp, Operator
from qiskit.primitives import StatevectorEstimator
from qiskit.circuit import Parameter, ParameterVector
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit.quantum_info import SparsePauliOp
from qiskit_ibm_runtime import EstimatorV2 as Estimator
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_aer.noise import NoiseModel
from qiskit_ibm_runtime.fake_provider import FakeQuebec
from qiskit_ibm_runtime import Session

from optimparallel import minimize_parallel


In [None]:
def mitarai(quantumcircuit,num_wires,paramname='x'):
    # encoding as proposed by Mitarai et al.
    num_features = num_wires
    features = ParameterVector(paramname,num_features*2)
    for i in range(num_wires):
        feature_idx = i % num_features  # Calculate the feature index using modulo
        quantumcircuit.ry(np.arcsin(features[feature_idx * 2]), i)
        quantumcircuit.rz(np.arccos(features[feature_idx * 2 + 1] ** 2), i)


def double_angle(quantumcircuit, num_wires,paramname='x'):
    #  creates a circuit that encodes features into wires via angle encoding with an RY then RZ gate
    #  the features are encoded 1-1 onto the qubits
    #  if more wires are passed then features the remaining wires will be filled from the beginning of the feature list
    num_features = num_wires
    features = ParameterVector(paramname,num_features*2)
    for i in range(num_wires):
        feature_index = i % num_features
        quantumcircuit.ry(features[feature_index], i)
        quantumcircuit.rz(features[feature_index], i)

def entangle_cnot(quantumcircuit,num_wires):
    #  entangles all of the wires in a circular fashion using cnot gates
    for i in range(num_wires):
        
        if i == num_wires - 1:
            quantumcircuit.cx(i, 0)
        else:
            quantumcircuit.cx(i, i+1)


def entangle_cz(quantumcircuit,num_wires):
    #  entangles all of the wires in a circular fashion using cz gates
    for i in range(num_wires):
        
        if i == num_wires - 1:
            quantumcircuit.cz(i, 0)
        else:
            quantumcircuit.cz(i, i+1)


def HardwareEfficient(quantumcircuit,num_wires,paramname='theta'):
    parameters = ParameterVector(paramname,num_wires*3)
    for qubit in range(num_wires):
        quantumcircuit.rx(parameters[qubit * 3], qubit)  
        quantumcircuit.rz(parameters[qubit * 3 + 1], qubit)  
        quantumcircuit.rx(parameters[qubit * 3 + 2], qubit)  
    entangle_cnot(quantumcircuit,num_wires)



In [None]:
# def circuit(nqubits):
#     qc = QuantumCircuit(nqubits)
#     mitarai(qc,nqubits)
#     entangle_cz(qc,nqubits)
#     qc.barrier()
#     mitarai(qc,nqubits,paramname='x1')
#     entangle_cz(qc,nqubits)
#     qc.barrier()
#     HardwareEfficient(qc,nqubits)
#     qc.barrier()
#     return qc


def circuit(nqubits,RUD=1):
    qc = QuantumCircuit(nqubits)
    for i in range(RUD):
        double_angle(qc,nqubits,paramname=f'x{i}')
        qc.barrier()
        HardwareEfficient(qc,nqubits,paramname=f'theta{i}')
        qc.barrier()
    return qc

In [None]:
with open('linear_train.bin','rb') as f:
    train = joblib.load(f)

with open('linear_test.bin','rb') as f:
    test = joblib.load(f)

with open('linear_scaler.bin','rb') as f:
    scaler = joblib.load(f)
X_train, y_train = train['X'],train['y']
X_test, y_test = test['X'],test['y']


with open('PCA5_0.8_Morgan_train.bin','rb') as f:
    bse_train = joblib.load(f)

with open('PCA5_0.8_Morgan_test.bin','rb') as f:
    bse_test = joblib.load(f)

with open('PCA5_0.8_Morgan_scaler.bin','rb') as f:
    bse_scaler = joblib.load(f)

X_bse_train, y_bse_train = bse_train['X'],bse_train['y']
X_bse_test, y_bse_test = bse_test['X'],bse_test['y']


X_bse_train[np.isclose(X_bse_train,1)]=1
X_bse_train[np.isclose(X_bse_train,-1)]=-1








with open('5_DDCC_train.bin','rb') as f:
    ddcc_train = joblib.load(f)

with open('5_DDCC_test.bin','rb') as f:
    ddcc_test = joblib.load(f)

with open('5_DDCC_scaler.bin','rb') as f:
    ddcc_scaler = joblib.load(f)

X_ddcc_train, y_ddcc_train = ddcc_train['X'],ddcc_train['y']
X_ddcc_test, y_ddcc_test = ddcc_test['X'],ddcc_test['y']



In [None]:
def predict(params, ansatz, hamiltonian, estimator, num_qubits, X):
    if len(X)==1:
        featparams = dict([(i,X.item()) for idx,i in enumerate(ansatz.parameters) if 'x' in i.name])
    else:
        featparams = dict([(i,X[idx % num_qubits]) for idx,i in enumerate(ansatz.parameters) if 'x' in i.name])
    
    ansatz = ansatz.assign_parameters(featparams)    
    pub = (ansatz, [hamiltonian], [params])
    result = estimator.run(pubs=[pub]).result()
    energy = result[0].data.evs[0]
    return energy

In [None]:
def cost_func(params, ansatz, hamiltonian, estimator, num_qubits, X, y):
    """Return estimate of energy from estimator

    Parameters:

        params (ndarray): Array of ansatz parameters
        ansatz (QuantumCircuit): Parameterized ansatz circuit
        hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
        estimator (EstimatorV2): Estimator primitive instance
        cost_history_dict: Dictionary for storing intermediate results

    Returns:
        float: Energy estimate
    """
    t0=time.perf_counter()
    y_pred = np.array([predict(params, ansatz, hamiltonian, estimator,num_qubits,x) for x in X]).reshape(*y.shape)
    # print(y,y_pred)
    loss = mean_squared_error(y,y_pred)
    cost_history_dict["iters"] += 1
    cost_history_dict["prev_vector"] = params
    cost_history_dict["cost_history"].append(loss)
    print(f"Iters. done: {cost_history_dict['iters']} Current cost: {loss} Accuracy: {r2_score(y,y_pred)} Time: {time.perf_counter()-t0}")

    return loss

In [None]:
cost_history_dict = {
    "prev_vector": None,
    "iters": 0,
    "cost_history": [],
}

In [None]:
num_qubits = 5
RUD = 3
# X = X_bse_train[0:1].flatten()
# Y = y_bse_test[0:1]

# X,Y = X_bse_train, y_bse_train
# X,Y = X_ddcc_train, y_ddcc_train
X,Y = X_train, y_train

In [None]:
# 
qc = circuit(num_qubits,RUD)

num_params = len([i for i in list(qc.parameters) if 'theta' in i.name]) // RUD
# x0 = 2 * np.pi * np.random.random(num_params)
generator = np.random.default_rng(12958234)
x0 = np.tile(generator.uniform(-np.pi, np.pi, num_params),RUD)

print(num_params,x0.shape)
# service = QiskitRuntimeService(channel="ibm_quantum", instance='pinq-quebec-hub/univ-toronto/default')
# _backend = service.least_busy(operational=True, simulator=False, min_num_qubits=127)
_backend = FakeQuebec()
target = _backend.target
pm = generate_preset_pass_manager(target=target, optimization_level=3)

qc = pm.run(qc)


estimator = Estimator(mode=_backend)
estimator.options.default_shots = 1024.0
estimator.options.resilience_level = 1

# estimator = StatevectorEstimator()

print(qc.num_qubits)

observables_labels = ''.join(['I']*(num_qubits-1))+"Z"
observables = [SparsePauliOp(observables_labels)]

mapped_observables = [observable.apply_layout(qc.layout) for observable in observables]


In [None]:
# job = estimator.run([(qc, mapped_observables)])
# y_pred = job.result()[0].data.evs
scores = []
with Session(backend=_backend) as session:
    for i in range(1):
        cost_history_dict = {
            "prev_vector": None,
            "iters": 0,
            "cost_history": [],
        }    
        # res = minimize_parallel(
        #     cost_func,
        #     x0,
        #     args=(qc, mapped_observables, estimator,X,Y),options={'maxiter':50})    
        res = minimize(
            cost_func,
            x0,
            args=(qc, mapped_observables, estimator, num_qubits, X, Y),
            method="cobyla", options={'maxiter':1000}
    )
        x0 = res.x
    
        y_pred = np.array([predict(x0,qc, mapped_observables, estimator, num_qubits, x) for x in X])
        r2 = r2_score(Y,y_pred)
        loss = mean_squared_error(Y,y_pred)
        print(f"Iteration: {i} R2: {r2} MSE: {loss}")
        scores.append((r2,loss))
        # if i % 10 ==0:
        plt.scatter(Y,Y,label='true')
        plt.scatter(Y,y_pred,label='pred')
        plt.legend()
        plt.show()


In [None]:
y_pred = np.array([predict(x0,qc, mapped_observables, estimator,x) for x in X])
r2_score(Y,y_pred)

In [None]:
plt.plot(np.array(scores)[:,1])

In [None]:
plt.plot(np.array(scores)[:,0])