In [None]:
import numpy as np
import matplotlib.pyplot as plt
from pyqubo import Array, Binary , Placeholder,Constraint, solve_qubo
from itertools import combinations
from sklearn.metrics import accuracy_score
import math
import pandas as pd
from sklearn.model_selection import train_test_split
import neal
import networkx as nx
from dwave.system import DWaveSampler, FixedEmbeddingComposite, EmbeddingComposite
from PQA import *

class OneVsRestClassifier:
    def __init__(self, class_num, classifier, params=None):
        """binary classifierを受け取り、クラスの数分インスタンスを作成する."""
        self.class_num = class_num
        if params is None:
            self.classifiers = [classifier() for _ in range(class_num)]
        else:
            self.classifiers = [classifier(**params) for _ in range(class_num)]

    def solve(self, x, y):
        for i in range(self.class_num):
            print(f"Training classifier {i}...")
            self.classifiers[i].solve(x, self.re_labaling(y, i))
        return self

    def re_labaling(self, y, pos_label):
        """labelを受け取り、pos_labelに指定したカテゴリを+1、それ以外を-1にラベリングしなおしたデータを返す."""
        return np.where(y == pos_label, 1, -1)

    def argmax_by_E(self, result):

        for i in range(result.shape[0]):
            for j in range(result.shape[1]):
                if np.sum(result[:, j], axis=0) == -1: #case (1,-1,-1)
                    pass
                elif np.sum(result[:, j], axis=0) == 1: #case (1,1,-1)
                    a = np.array(np.where(result[:, j] == 1))[0][0]
                    b = np.array(np.where(result[:, j] == 1))[0][1]
                    if self.classifiers[a].energy > self.classifiers[b].energy:
                        result[a, j] = -1
                    else:
                        result[b, j] = -1
                elif np.sum(result[:, j], axis=0) == 3: #case (1,1,1)
                    min_e = np.argmin(np.array([self.classifiers[0].energy, self.classifiers[1].energy, self.classifiers[2].energy]))
                    result[0:min_e, j] = -1
                    result[min_e:i, j] = -1
                elif np.sum(result[:, j], axis=0) == -3: #case (-1,-1,-1)
                    min_e = np.argmin(
                        np.array([self.classifiers[0].energy, self.classifiers[1].energy, self.classifiers[2].energy]))
                    result[min_e, j] = 1

        print("result", result)
        return np.argmax(result, axis=0)

    def predict(self, X):

        result = np.array([model.predict(X) for model in self.classifiers])
        #print("result", result)
        #print("result type and shape", type(result), result.shape)
        #result type and shape <class 'numpy.ndarray'> (3, 150)
        return self.argmax_by_E(result)

    def evaluate(self, X, y):
        pred = self.predict(X)
        print("pred result",pred)
        return accuracy_score(y, pred)
    
class qSVM():
    def __init__(self, data, label, B=2, K=2, Xi=0, gamma = 10.0, C=3, kernel="rbf", optimizer="PQA"):
        """
        :param B:
        :param K:
        :param Xi:
        :param gamma:
        :param C: #still not used now
        :param kernel: default;rbf only rbf for now,
        :param optimizer:SA, QA, PQA
        """
        #self.data = data
        self.label1 = label[0]
        self.label2 = label[1]
        self.B = B
        self.K = K
        self.N1 = data.shape[0][0]
        self.N2 = data.shape[0][1]
        self.Xi = Xi
        self.gamma = gamma
        self.C = C
        self.kernel = kernel

        self.options = {
            'SA': {},
            'PQA': {}
        }
        self.optimizer = optimizer
        self.alpha1 = Array.create('alpha', shape=self.K * self.N1, vartype='BINARY') #number of spins : K*N
        self.alpha2 = Array.create('alpha', shape=self.K * self.N2, vartype='BINARY') #number of spins : K*N

        self.alpha_result = None
        self.alpha_result_array = None
        self.alpha_real1 = np.zeros((self.N1,))
        self.alpha_real2 = np.zeros((self.N2,))

        self._support_vectors = None
        self._n_support = None
        self._alphas = None
        self._support_labels = None
        self._indices = None
        self.intercept = None
        self.energy = None


    def rbf(self, x, y, g):
        return np.exp(-1.0 * g * np.dot(np.subtract(x, y).T, np.subtract(x, y)))

    def transform(self, X, g):
        K = np.zeros([X.shape[0], X.shape[0]])
        for i in range(X.shape[0]):
            for j in range(X.shape[0]):
                K[i, j] = self.rbf(X[i], X[j], g)
        return K

    def makeQUBO(self, data, label, g, n, alpha):
        
        x = data
        t = label
        #t = self.label.astype(np.double)


        #energy = Sum(0, self.N, lambda n: Sum(0, self.N, lambda m: Sum(0,self.K, lambda k: Sum(0, self.K, lambda j:
        #            alpha[self.K * n +k] * alpha[self.K * m +j] * t[n] * t[m] * self.rbf(x[n],x[m] * self.B ** (k+j))))))
        #const_1 = Sum(0, self.N, lambda n: Sum(0, self.K, lambda k: alpha[self.K * n +k] * self.B ** k))
        #const_2 = Constraint((Sum(0,self.N, lambda n: Sum(0, self.K, lambda k: alpha[self.K * n +k] * t[n] * self.B ** k)))
        #                     ** 2, label="alpha * t = 0")
        #const_3 = Sum(0, self.N, lambda n: Sum(0, self.K, lambda k: alpha[self.K * n + k] * self.B ** k) - self.C)
        
        energy = 0
        for n in range(n):
            for m in range(n):
                for k in range(self.K):
                    for j in range(self.K):
                        alpha[self.K * n +k] * alpha[self.K * m +j] * t[n] * t[m] * self.rbf(x[n],x[m], g) * self.B ** (k+j)

        const_1=0
        for n in range(n):
            for k in range(self.K):
                const_1 += alpha[self.K * n +k] * self.B ** k

        const_2=0
        for n in range(n):
            for k in range(self.K):
                const_2 += alpha[self.K * n +k] * t[n] * self.B ** k

        const_2= const_2 ** 2

        h = 0.5 * energy - const_1 + self.Xi * const_2

        model = h.compile()
        qubo, offset = model.to_qubo()

        return qubo

    def PyquboToNparray(self, qubo_dict, n):
        # qubo generate by pyqubo is tuple type data, this func for changing the qubo from dict to np.array
        # qubometasolver need a (spins_num,spins_num) size np.array type value
        # pyqubo generate a dict type qubo and only has the triangle of qubo like this
        # 2.9, 1.4, 2.6
        # null, 1.7, 5.2
        # null, null, 3.7
        # so dict length is spins*spins/2

        #print("Pyqubo before sort",qubo_dict)

        qubo = np.zeros((self.K * n, self.K * n))
        for i in range (self.K * n):
            for j in range (i, self.K * n):
                qubo[i][j] = qubo_dict.get(('alpha[%s]' % i, 'alpha[%s]' % j))
                if math.isnan(qubo[i][j]):
                    qubo[i][j] = qubo_dict.get(('alpha[%s]' % j, 'alpha[%s]' % i))

        return qubo

    def solve(self, data, label):
        print("solving...")
        qubo1 = self.makeQUBO(data, label, self.gamma, self.N1, alpha = self.alpha1) #type(qubo) = dict
        qubo2 = self.makeQUBO(data, label, self.gamma, self.N2, alpha = self.alpha2) #type(qubo) = dict

        # print("Active optimizer: ", self.optimizer)
        # npqubo = self.PyquboToNparray(qubo)
        # print(np.shape(npqubo))

        if self.optimizer == "PQA":
            #optimizer = self.optimizer(**self.options['neal'])
            # sampleset = neal.SimulatedAnnealingSampler().sample_qubo(qubo)
            
            # self.energy = sampleset.record['energy']
            # self.alpha_result = sampleset.record['sample']
            # self.alpha_result = np.reshape(self.alpha_result,(self.K * self.N))

            # # # ------- Set up D-Wave parameters -------
            token = 'DEV-e6f6f6bb339c85b8cd1357653152e575cd618efb' #saihnaa_0810
            endpoint = 'https://cloud.dwavesys.com/sapi/'
            dw_sampler = DWaveSampler(solver='Advantage_system6.4', token=token, endpoint=endpoint)

            sampler = EmbeddingComposite(dw_sampler)
            
            hardware = nx.Graph(dw_sampler.edgelist)

            embedding, order, TotalQubo, Qubo_list, offset_list, Qubo_comb_time = embedding_search(hardware.copy(), [qubo1, qubo2], level = "easy")

            emb_visualize(embedding, order, offset_list, path='results/embedding.png')

            sampler = FixedEmbeddingComposite(dw_sampler, embedding)
            response = sampler.sample_qubo(TotalQubo, num_reads=100, annealing_time = 20, label='PQA')

            # response = sampler.sample_qubo(qubo, num_reads=1, annealing_time = 20, label='SVM').first

            response_list = response_decoder(response.record, order, offset_list, [qubo1, qubo2])

            e1, s1 = energy_decoder(qubo1, response_list[0])
            e2, s2 = energy_decoder(qubo2, response_list[1])

            self.energy = response[1]
            self.alpha_result = list(response[0].values())
            self.alpha_result = np.reshape(self.alpha_result,(self.K * self.N))
            
        else:
            print("This optimizer is not available")

        # print("alpha_result = ", self.alpha_result)
        # print("energy = {}".format(self.energy))

        K = self.transform(data)

        print("K,N",self.K, self.N)
        for i in range(self.N):
            for j in range(self.K):
                self.alpha_real[i] += self.alpha_result[self.K*i+j] * self.B ** j

        is_sv = self.alpha_real > 1e-5
        # print("(self.alpha_real)", self.alpha_real)
        self._support_vectors = data[is_sv]
        self._n_support = np.sum(is_sv)
        self._alphas = self.alpha_real[is_sv]
        self._support_labels = label[is_sv]
        self._indices = np.arange(data.shape[0])[is_sv]  # the index of supported vector
        self.intercept = 0

        for i in range(self._alphas.shape[0]):
            self.intercept += self._support_labels[i]
            self.intercept -= np.sum(self._alphas * self._support_labels * K[self._indices[i], is_sv])
        self.intercept /= self._alphas.shape[0]
        print("self.intercept", self.intercept)

        return self.alpha_result

    def signum(self, X):
        return np.where(X > 0, 1, -1)

    def predict(self, X):
        score = np.zeros(X.shape[0])
        for i in range(X.shape[0]):
            s = 0
            for alpha, label, sv in zip(self.alpha_result, self._support_labels, self._support_vectors):
                s += alpha * label * self.rbf(X[i], sv) * self.B ** self.K
            score[i] = s
        score = score + self.intercept
        return self.signum(score)

    def evaluate(self, X, y):
        pred = self.predict(X)
        return accuracy_score(y, pred)
    
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from sklearn import datasets
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Load dataset
X1, y1 = datasets.make_moons(n_samples=30, noise=0.10, random_state=420)
X2, y2 = datasets.make_circles(n_samples=30, noise=0.01, factor = 0.5, random_state=420)

# Preprocess dataset
y1 = LabelEncoder().fit_transform(y1)
y2 = LabelEncoder().fit_transform(y2)

# Split into train/test sets
X_train1, X_test1, y_train1, y_test1 = train_test_split(X1, y1, test_size=0.4, random_state=240)
X_train2, X_test2, y_train2, y_test2 = train_test_split(X2, y2, test_size=0.4, random_state=240)

# Set the parameters for the qSVM
params = {
    "data": [X_train1, X_train2],
    "label": [y_train1, y_train2],
    "B": 2,
    "K": 2,
    "Xi": 1,
    "gamma": [10.0],
    "C": 1,
    "kernel": "rbf",
    "optimizer": "PQA"
}

# Train OneVsRestClassifier
one_vs_rest = OneVsRestClassifier(class_num=2, classifier=qSVM, params=params)

# Re-train the OneVsRestClassifier on the projected data
one_vs_rest.solve(X_train, y_train)

# Evaluate OneVsRestClassifier
accuracy = one_vs_rest.evaluate(X_test, y_test)

# Create a mesh to plot in
x_min, x_max = X_test[:, 0].min() - 1, X_test[:, 0].max() + 1
y_min, y_max = X_test[:, 1].min() - 1, X_test[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.02), np.arange(y_min, y_max, 0.02))

# Define colors and markers for each class
colors = ['red', 'blue']
markers = ['s', 'o', "v", "^"]

# Plot the decision boundary. For that, we will assign a color to each
# point in the mesh [x_min, x_max]x[y_min, y_max].
Z = one_vs_rest.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contourf(xx, yy, Z, alpha=0.8, cmap=ListedColormap(colors))

# Plot training samples
for idx, cl in enumerate(np.unique(y_train)):
    plt.scatter(x=X_train[y_train == cl, 0], 
                y=X_train[y_train == cl, 1],
                alpha=0.8, 
                color=colors[idx],
                marker=markers[idx], 
                label=f'Training class {cl}', 
                edgecolor='black')

# Plot test samples
for idx, cl in enumerate(np.unique(y_test)):
    plt.scatter(x=X_test[y_test == cl, 0], 
                y=X_test[y_test == cl, 1],
                alpha=0.8, 
                color=colors[idx],
                marker=markers[idx+2], 
                label=f'Test class {cl}', 
                edgecolor='black')

# Add a title with the accuracy
plt.title(f'qSVM Decision Boundary, Test Accuracy: {accuracy*100:.2f}%')

# plt.xlabel('Principal Component 1')
# plt.ylabel('Principal Component 2')
plt.legend(loc='best')

plt.show()