# Supervised Learning with Kernel Methods

Here we use a variational classifier to learn a nonlinear boundary using kernel methods.

The variational circuit rchitecture is specified by [Farhi and Neven (2018)](https://arxiv.org/abs/1802.06002). 
The kernel map is specified by [Havlicek et al (2018)](https://arxiv.org/abs/1804.11326)

In [1]:
import pennylane as qml
from pennylane import numpy as np
from pennylane.optimize import NesterovMomentumOptimizer

from scipy.stats import unitary_group
import matplotlib.pyplot as plt

In [2]:
dev = qml.device('default.qubit', wires=2)

In [3]:
def U_phi(x):
    #print(x, np.shape(x))
    # x3 := (pi - x1)(pi - x2)
    x_0, x_1, x_2 = x[0], x[1], x[2]
    #print(x_0, x_1, x_2)
        
    qml.RZ( x_0 , wires=0)
    qml.RZ( x_1 , wires=1)
    
    qml.CNOT(wires=[0,1])
    qml.RZ(x_2,wires=1)
    qml.CNOT(wires=[0,1])

In [4]:
def layer(W): # 6 weights are specified at each layer
    
    # euler angles
    qml.Rot(W[0, 0], W[0, 1], W[0, 2], wires=0)
    qml.Rot(W[1, 0], W[1, 1], W[1, 2], wires=1)

    qml.CNOT(wires=[0, 1])

In [5]:
def featuremap(x):
    for i in range(2):
        qml.Hadamard(wires=0)
        qml.Hadamard(wires=1)
        U_phi(x)

In [6]:
@qml.qnode(dev)
def circuit1(weights, x):

    featuremap(x)

    for W in weights:
        layer(W)

    return qml.expval.PauliZ(wires=0)

@qml.qnode(dev)
def circuit2(weights, x):

    featuremap(x)

    for W in weights:
        layer(W)

    return qml.expval.PauliZ(wires=1)

In [7]:
def variational_classifier(var, x): # x is a keyword argument -> fixed (not trained)
    weights = var[0]
    bias = var[1]

    return circuit1(weights, x) * circuit2(weights, x) + bias

In [8]:
def square_loss(labels, predictions):

    loss = 0
    for l, p in zip(labels, predictions):
        loss = loss + (l - p) ** 2
    loss = loss / len(labels)

    return loss

In [9]:
def accuracy(labels, predictions):
    #print(labels, predictions)
    loss = 0
    for l, p in zip(labels, predictions):
        if abs(l - p) < 1e-5:
            loss = loss + 1
    loss = loss / len(labels)

    return loss

In [10]:
def cost(var, X, Y):

    predictions = [variational_classifier(var, x) for x in X]
    #if (len(Y) == num_data):
    #    print("[(pred, label), ...]: ", list(zip(predictions, Y)))
    return square_loss(Y, predictions) 

In [11]:
random_U = unitary_group.rvs(4)
random_U = random_U / (np.linalg.det(random_U) ** (1/4)) # so that det = 1


print("random unitary: ", random_U)
print("det: ", np.linalg.det(random_U))

random unitary:  [[ 0.56480469+0.02828828j  0.00919685-0.54666223j  0.17628252-0.49642827j
   0.2758558 -0.16630779j]
 [ 0.76026959-0.00843403j  0.14776823+0.17506096j -0.10832053+0.47307077j
  -0.35248134-0.09830992j]
 [ 0.19169964+0.02404574j -0.6268324 +0.26471267j  0.31711591+0.29431852j
   0.54700278+0.11525388j]
 [-0.25430813+0.0107177 j -0.06982527-0.42511976j  0.28338993+0.46846389j
  -0.06125777-0.6678992 j]]
det:  (1.0000000000000002-8.326672684688678e-17j)


In [None]:
@qml.qnode(dev)
def data_label_1(x):
    #print(u)
    #print("label the following:", x)
    featuremap(x)
    qml.QubitUnitary(random_U, wires=[0,1])
    
    return qml.expval.PauliZ(wires=0)

@qml.qnode(dev)
def data_label_2(x):
    #print("label the following:", x)
    featuremap(x)
    qml.QubitUnitary(random_U, wires=[0,1])
    
    return qml.expval.PauliZ(wires=1)

In [None]:
def gen_data(thresh):
    #thresh = 0.3

    X = np.array([])
    Y = np.array([])
    ctr = 0 # num valid data pts
    maxval = 0.0
    minval = 0.0

    np.random.seed(0)

    while ctr < 40:
        x = np.random.rand(2) * 2 * np.pi
        x = np.append(x, (np.pi - x[0]) * (np.pi - x[1]))
        y_1 = data_label_1(x)
        y_2 = data_label_2(x)
        #print(y_1, y_2, y_1 * y_2)
        if (np.abs(y_1 * y_2) > maxval):
            maxval = y_1 * y_2
            #print("new max separation: ", maxval)
        elif (y_1 * y_2 < minval):
            minval = y_1 * y_2
            #print("new min separation: ", minval)

        if y_1 * y_2 > thresh:
            Y = np.append(Y, +1)
            X = np.append(X, x)
            ctr += 1
            #print("+1")
        elif y_1 * y_2 < -1 * thresh:
            Y = np.append(Y, -1)
            X = np.append(X, x)
            ctr += 1
            #print("-1")
            
    X = X.reshape(-1, 3)
    print("Data: ", list(zip(X, Y)))
    return X, Y

In [None]:
def divide_train_test(X, Y):
    global num_data
    num_data = len(Y)
    global num_train
    num_train = int(0.5 * num_data)

    print("size data, size train: ", num_data, num_train)

    index = np.random.permutation(range(num_data))
    X_train = X[index[:num_train]]
    Y_train = Y[index[:num_train]]

    X_test = X[index[num_train:]]
    Y_test = Y[index[num_train:]]
    
    return X_train, Y_train, X_test, Y_test

In [None]:
num_qubits = 2
num_layers = 6
var_init = (0.01 * np.random.randn(num_layers, num_qubits, 3), 0.0)

In [None]:
def train_and_test(thresh):
    X, Y = gen_data(thresh)
    X_train, Y_train, X_test, Y_test = divide_train_test(X, Y)

    opt = NesterovMomentumOptimizer(0.01)
    batch_size = 5

    # train the variational classifier
    var = var_init
    
    test_accuracies = []
    train_accuracies = []
    costs = []
    for it in range(100):

        # Update the weights by one optimizer step
        batch_index = np.random.randint(0, num_train, (batch_size, ))
        X_train_batch = X_train[batch_index]
        Y_train_batch = Y_train[batch_index]
        var = opt.step(lambda v: cost(v, X_train_batch, Y_train_batch), var)

        # Compute predictions on train and validation set
        predictions_train = [np.sign(variational_classifier(var, f)) for f in X_train]
        predictions_test = [np.sign(variational_classifier(var, f)) for f in X_test]

        # Compute accuracy on train and validation set
        acc_train = accuracy(Y_train, predictions_train)
        acc_test = accuracy(Y_test, predictions_test)
        
        # Compute cost on all samples
        c = cost(var, X, Y)
        
        costs.append(c)
        test_accuracies.append(acc_train)
        train_accuracies.append(acc_test)
        
        print("Iter: {:5d} | Cost: {:0.7f} | Acc train: {:0.7f} | Acc validation: {:0.7f} "
              "".format(it+1, c, acc_train, acc_test))
        
    return train_accuracies, test_accuracies, costs

In [None]:
thresholds = [0.0, 0.1, 0.2, 0.3]
thresh_test_accuracies = []
thresh_train_accuracies = []
thresh_costs = []
for thresh in thresholds:
        print("New threshold: ", thresh)
        trn_ac, tst_ac, costs = train_and_test(thresh)
        thresh_train_accuracies.append(trn_ac)
        thresh_test_accuracies.append(tst_ac)
        thresh_costs.append(costs)
        
print(thresh_test_accuracies)
print(thresh_train_accuracies)
print(thresh_costs)

In [None]:
%matplotlib inline

plt.figure(figsize=(20,20)) 

num_thresh = len(thresholds)
for i in range(num_thresh):
    plt.subplot(num_thresh, 1, i + 1)
    plt.plot(thresh_train_accuracies[i], '-', label='Training accuracy')
    plt.plot(thresh_test_accuracies[i], '-', label='Test accuracy')

    plt.title('Data Separation Threshold %0.2f' % thresholds[i])
    plt.xlabel('# of iterations')
    plt.ylabel('Classification accuracy')
    plt.legend(loc='best')

#plt.tight_layout()
plt.show()

In [None]:
%matplotlib inline

plt.figure(figsize=(20,20)) 

num_thresh = len(thresholds)
for i in range(num_thresh):
    plt.plot(thresh_costs[i], '-', label='Data Separation Threshold %0.2f' % thresholds[i])

plt.xlabel('# of iterations')
plt.ylabel('L2 Loss')
plt.legend(loc='best')

#plt.tight_layout()
plt.show()