Chapter 05. Quantum Machine Learning


Listing 5-1. HHL Implementation

In [1]:
import cirq
from hamiltonian_simulator import HamiltonianSimulation
from QuantumPhaseEstimation import QuantumPhaseEstimation
from EigenValueInversion import EigenValueInversion
import numpy as np
import sympy


class HHL:

    def __init__(self, hamiltonian, initial_state=None, initial_state_transforms=None, qpe_register_size=4, C=None, t=1):
        """
        :param hamiltonian: Hamiltonian to Simulate
        :param C: hyper parameter to Eigen Value Inversion
        :param t: Time for which Hamiltonian is simulated
        :param initial_state: |b>
        """
        self.hamiltonian = hamiltonian
        self.initial_state = initial_state
        self.initial_state_transforms = initial_state_transforms
        self.qpe_register_size = qpe_register_size
        self.C = C
        self.t = t

        const = self.t/np.pi
        self.t = const*np.pi
        if self.C is None:
            self.C = 2*np.pi / (2**self.qpe_register_size * t)


    def build_hhl_circuit(self):
        self.circuit = cirq.Circuit()
        self.ancilla_qubit = cirq.LineQubit(0)
        self.qpe_register = [cirq.LineQubit(i) for i in range(1, self.qpe_register_size+1)]
        if self.initial_state is None:
            self.initial_state_size = int(np.log2(self.hamiltonian.shape[0]))
            if self.initial_state_size == 1:
                self.initial_state = [cirq.LineQubit(self.qpe_register_size + 1)]
            else:
                self.initial_state = [cirq.LineQubit(i) for i in range(self.qpe_register_size + 1,
                                               self.qpe_register_size + 1 + self.initial_state_size)]

        for op in list(self.initial_state_transforms):
            print(op)
            self.circuit.append(op(self.initial_state[0]))

        # Define Unitary Operator simulating the Hamiltonian
        self.U = HamiltonianSimulation(_H_=self.hamiltonian, t=self.t)
        # Perform Quantum Phase Estimation
        _qpe_ = QuantumPhaseEstimation(input_qubits=self.initial_state,
                                       output_qubits=self.qpe_register, U=self.U)
        _qpe_.circuit()
        print(dir(_qpe_))
        print('CIRCUIT',_qpe_.circuit)
        self.circuit += _qpe_.circuit
        # Perform EigenValue Inversion
        _eig_val_inv_ = EigenValueInversion(num_qubits=self.qpe_register_size + 1, C=self.C, t=self.t)
        self.circuit.append(_eig_val_inv_(*(self.qpe_register + [self.ancilla_qubit])))
        #Uncompute the qpe_register to |0..0> state
        print(self.circuit)
        #print(_qpe_.circuit**(-1))
        self.circuit.append(_qpe_.circuit**(-1))
        self.circuit.append(cirq.measure(self.ancilla_qubit,key='a'))
        self.circuit.append([
            cirq.PhasedXPowGate(
                exponent=sympy.Symbol('exponent'),
                phase_exponent=sympy.Symbol('phase_exponent'))(*self.initial_state),
            cirq.measure(*self.initial_state, key='m')
        ])

        #sim = cirq.Simulator()
        #results = sim.simulate(self.circuit)
        #print(results)

    def simulate(self):
        simulator = cirq.Simulator()

        # Cases for measuring X, Y, and Z (respectively) on the memory qubit.
        params = [{
            'exponent': 0.5,
            'phase_exponent': -0.5
        }, {
            'exponent': 0.5,
            'phase_exponent': 0
        }, {
            'exponent': 0,
            'phase_exponent': 0
        }]

        results = simulator.run_sweep(self.circuit, params, repetitions=5000)

        for label, result in zip(('X', 'Y', 'Z'), list(results)):
            # Only select cases where the ancilla is 1.
            # TODO: optimize using amplitude amplification algorithm.
            # Github issue: https://github.com/quantumlib/Cirq/issues/2216
            expectation = 1 - 2 * np.mean(
                result.measurements['m'][result.measurements['a'] == 1])
            print('{} = {}'.format(label, expectation))

    def main():
        """
        Simulates HHL with matrix input, and outputs Pauli observables of the
        resulting qubit state |x>.
        Expected observables are calculated from the expected solution |x>.
        """

        # Eigendecomposition:
        #   (4.537, [-0.971555, -0.0578339+0.229643j])
        #   (0.349, [-0.236813, 0.237270-0.942137j])
        # |b> = (0.64510-0.47848j, 0.35490-0.47848j)
        # |x> = (-0.0662724-0.214548j, 0.784392-0.578192j)
        A = np.array([[4.30213466 - 6.01593490e-08j,
                       0.23531802 + 9.34386156e-01j],
                      [0.23531882 - 9.34388383e-01j,
                       0.58386534 + 6.01593489e-08j]])
        t = 0.358166 * math.pi
        register_size = 4
        input_prep_gates = [cirq.rx(1.276359), cirq.rz(1.276359)]
        expected = (0.144130, 0.413217, -0.899154)

        # Set C to be the smallest eigenvalue that can be represented by the
        # circuit.
        C = 2 * math.pi / (2 ** register_size * t)

        # Simulate circuit
        print("Expected observable outputs:")
        print("X =", expected[0])
        print("Y =", expected[1])
        print("Z =", expected[2])
        print("Actual: ")
        simulate(hhl_circuit(A, C, t, register_size, *input_prep_gates))


if __name__ == '__main__':
    A = np.array([[4.30213466 - 6.01593490e-08j,
                   0.23531802 + 9.34386156e-01j],
                  [0.23531882 - 9.34388383e-01j,
                   0.58386534 + 6.01593489e-08j]])
    t = 0.358166 * np.pi
    C = None
    qpe_register_size = 4
    initial_state_transforms = [cirq.rx(1.276359), cirq.rz(1.276359)]
    _hhl_ = HHL(hamiltonian=A,initial_state_transforms=initial_state_transforms,qpe_register_size=4)
    _hhl_.build_hhl_circuit()
    _hhl_.simulate()

cirq version 1.6.1
numpy version 2.3.0
fire 0.7.1
elapsedtimer 1.0.0
Rx(0.4062776880196569π)
Rz(0.4062776880196569π)


TypeError: Can't instantiate abstract class HamiltonianSimulation without an implementation for abstract methods '_num_qubits_', '_qid_shape_', 'num_qubits'

In [2]:
import cirq
import numpy as np
print('cirq version',cirq.__version__)
print('numpy version',np.__version__)


class SwapTest:
    def __init__(self,prepare_input_states=None,input_state_dim=None,nq=0,measure=False,copies=1000):
        self.nq = nq
        self.prepare_input_states = prepare_input_states
        self.input_state_dim = input_state_dim
        if input_state_dim is not None:
            self.num_qubits_input_states = int(np.log2(self.input_state_dim))
            print(self.num_qubits_input_states)
        
        
        self.measure = measure
        self.copies = copies
        self.ancilla_qubit = cirq.LineQubit(self.nq)
        self.nq += 1
        
        
        if self.prepare_input_states is not None:
            if input_state_dim is None:
                raise ValueError("Please enter a valid dimension for input states to compare")
            else:
                self.num_qubits_input_states = int(np.log2(self.input_state_dim))
                self.input_1 = [cirq.LineQubit(i) for i in range(self.nq, self.nq +self.num_qubits_input_states)]
                self.nq += self.num_qubits_input_states
                self.input_2 = [cirq.LineQubit(i) for i in range(self.nq,
                                                                 self.nq + self.num_qubits_input_states)]
                self.nq += self.num_qubits_input_states
    
    def build_circuit(self,input_1=None,input_2=None,input_1_transforms=None,input_2_transforms=None):
        
        self.circuit = cirq.Circuit()
        if input_1 is not None:
            self.input_1 = input_1
        if input_2 is not None:
            self.input_2 = input_2
        if input_1_transforms is not None:
            for op in input_1_transforms:
                print(op)
                print(self.input_1)
                self.circuit.append(op.on_each(self.input_1))
        if input_2_transforms is not None:
            for op in input_2_transforms:
                self.circuit.append(op.on_each(self.input_2))
        

        # Ancilla in + state         
        self.circuit.append(cirq.H(self.ancilla_qubit))
        # Swap states conditoned on the ancilla
        for i in range(len(self.input_1)):
            self.circuit.append(cirq.CSWAP(self.ancilla_qubit, self.input_1[i], 
                                           self.input_2[i]))
        # Hadamard Transform on Ancilla 
        self.circuit.append(cirq.H(self.ancilla_qubit))
        if self.measure:
            self.circuit.append(cirq.measure(self.ancilla_qubit,key='m'))
        print(self.circuit)
        
    def simulate(self):
        sim = cirq.Simulator()
        results = sim.run(self.circuit,repetitions=self.copies)
        results = results.histogram(key='m')
        prob_0 = results[0]/self.copies
        dot_product_sq = 2*(max(prob_0 - .5,0))
        return prob_0,dot_product_sq

def main(prepare_input_states=True,input_state_dim=4,
                  input_1_transforms=[cirq.H],
                  input_2_transforms=[cirq.I],measure=True,copies=1000):
    st = SwapTest(prepare_input_states=prepare_input_states,input_state_dim=input_state_dim,measure=measure,copies=copies)
    st.build_circuit(input_1_transforms=input_1_transforms,
                     input_2_transforms=input_2_transforms)
    prob_0, dot_product_sq = st.simulate()
    print(f"Probability of zero state {prob_0}")
    print(f"Sq of Dot product  {dot_product_sq}")
    print(f"Dot product  {dot_product_sq**0.5}")

if __name__ == '__main__':
    main()
    

cirq version 1.6.1
numpy version 2.3.0
2
H
[cirq.LineQubit(1), cirq.LineQubit(2)]
0: ───H───@───@───H───M('m')───
          │   │
1: ───H───×───┼────────────────
          │   │
2: ───H───┼───×────────────────
          │   │
3: ───I───×───┼────────────────
              │
4: ───I───────×────────────────
Probability of zero state 0.606
Sq of Dot product  0.21199999999999997
Dot product  0.4604345773288535


Listing 5-3. Implementation of Quantum Euclidean Distance Computation

In [3]:
import cirq
import numpy as np
import math
from swap_test import SwapTest
print('cirq version', cirq.__version__)
print('numpy version', np.__version__)

class euclidean_distance:
    def __init__(self, input_state_dim, prepare_input_states=False,copies=10000):
        self.prepare_input_states = prepare_input_states
        self.input_state_dim = input_state_dim
        self.copies = copies
        self.nq = 0
        self.control_qubit = cirq.LineQubit(0)
        self.nq += 1 

        self.num_qubits_per_state = int(np.log2(self.input_state_dim))
        self.state_store_qubits = [cirq.LineQubit(i) for i 
                                   in range(self.nq, self.nq +self.num_qubits_per_state)]
        self.nq += self.num_qubits_per_state



        if self.prepare_input_states:

            self.input_1 = [cirq.LineQubit(i) for i in range(self.nq, self.nq + self.num_qubits_per_state)]
            self.nq += self.num_qubits_per_state

            self.input_2 = [cirq.LineQubit(i) for i in range(self.nq, self.nq + self.num_qubits_per_state)]
            self.nq += self.num_qubits_per_state


        self.other_state_qubits = [cirq.LineQubit(i) for i in range(self.nq, self.nq + 1 + self.num_qubits_per_state)]
        self.nq += 1 + self.num_qubits_per_state
        self.circuit = cirq.Circuit()

    def dist_circuit(self, input_1_norm=1, input_2_norm=1, input_1=None,
                input_2=None, input_1_transforms=None, input_2_transforms=None,
                input_1_circuit=None,
                input_2_circuit=None):

        self.input_1_norm = input_1_norm
        self.input_2_norm = input_2_norm
        self.input_1_circuit = input_1_circuit
        self.input_2_circuit = input_2_circuit

        if input_1 is not None:
            self.input_1 = input_1

        if input_2 is not None:
            self.input_2 = input_2
        
            
        if input_1_transforms is not None:
            
            self.input_1_circuit = []
            
            for op in input_1_transforms:
                #print(op)
                #print(self.input_1)
                self.circuit.append(op.on_each(self.input_1))
                self.input_1_circuit.append(op.on_each(self.input_1))
        if input_2_transforms is not None:
            self.input_2_circuit = []
            for op in input_2_transforms:
                self.circuit.append(op.on_each(self.input_2))
                self.input_2_circuit.append(op.on_each(self.input_2))

        self.input_1_uncompute = cirq.inverse(self.input_1_circuit)
        self.input_2_uncompute = cirq.inverse(self.input_2_circuit)

        # Create the required state 1

        self.circuit.append(cirq.H(self.control_qubit))
        print("length",len(self.input_2))
        for i in range(len(self.input_2)):
            self.circuit.append(cirq.CSWAP(self.control_qubit, self.state_store_qubits[i], self.input_2[i]))
        #for c in self.input_1_uncompute:
        #    self.circuit.append(c[0].controlled_by(self.control_qubit))
        #self.circuit.append(cirq.X(self.input_1[0]).controlled_by(self.control_qubit))
        self.circuit.append(cirq.X(self.control_qubit))

        for i in range(len(self.input_1)):
            self.circuit.append(cirq.CSWAP(self.control_qubit, self.state_store_qubits[i], self.input_1[i]))
        #self.circuit.append(cirq.ControlledGate(self.input_2_uncompute)(self.control_qubit, *self.input_1))
        for c in self.input_2_uncompute:
            self.circuit.append(c[0].controlled_by(self.control_qubit))
        #self.circuit.append(cirq.X(self.input_1[0]).controlled_by(self.control_qubit))
        self.circuit.append(cirq.X(self.control_qubit))
        for c in self.input_1_uncompute:
            self.circuit.append(c[0].controlled_by(self.control_qubit))

        # Prepare the other state qubit 
        self.Z = self.input_1_norm**2 + self.input_2_norm**2
        print(self.Z) 
        theta = 2*math.acos(self.input_1_norm/np.sqrt(self.Z))
        print(theta)
        self.circuit.append(cirq.ry(theta)(self.other_state_qubits[0]))
        self.circuit.append(cirq.Z(self.other_state_qubits[0]))

        self.st = SwapTest(prepare_input_states=False, input_state_dim=4,nq=self.nq,measure=False)

        print(self.other_state_qubits)
        self.state = [self.control_qubit] + self.state_store_qubits
        self.st.build_circuit(input_1=self.state,input_2=self.other_state_qubits)
        self.circuit += self.st.circuit
        #print(self.circuit)
        #self.circuit.append(cirq.measure(*(self.input_1 + self.input_2), key='k'))
        self.circuit.append(cirq.measure(self.st.ancilla_qubit, key='k'))
        
        print(self.circuit)
        
    def compute_distance(self):
        sim = cirq.Simulator()
        results = sim.run(self.circuit, repetitions=self.copies).histogram(key='k')
        results = dict(results)
        #results = sim.simulate(self.circuit)
        print(results)
        results = dict(results)
        prob_0 = results[0]/self.copies
        print(prob_0)
        euclidean_distance = 4*self.Z*max((prob_0 - 0.5),0)
        print("Euclidean distance",euclidean_distance)



if __name__ == '__main__':

    dist_obj = euclidean_distance(input_state_dim=2, prepare_input_states=True,copies=100000)
    theta = math.acos(1/math.sqrt(3))
    dist_obj.dist_circuit(input_1_transforms=[cirq.H], input_2_transforms=[cirq.H])
    dist_obj.compute_distance()

cirq version 1.6.1
numpy version 2.3.0
cirq version 1.6.1
numpy version 2.3.0
length 1
2
1.5707963267948968
2
[cirq.LineQubit(4), cirq.LineQubit(5)]
0: ───────×───────────
          │
1: ───────┼───×───────
          │   │
4: ───────×───┼───────
          │   │
5: ───────┼───×───────
          │   │
6: ───H───@───@───H───
0: ───H──────────@───X───@───@───X───@───────×────────────────────
                 │       │   │       │       │
1: ──────────────×───────×───┼───────┼───────┼───×────────────────
                 │       │   │       │       │   │
2: ───H──────────┼───────×───┼───────H───────┼───┼────────────────
                 │           │               │   │
3: ───H──────────×───────────H───────────────┼───┼────────────────
                                             │   │
4: ───Ry(0.5π)───Z───────────────────────────×───┼────────────────
                                             │   │
5: ──────────────────────────────────────────┼───×────────────────
                       

Listing 5-4. Quantum K-Means Clustering


In [4]:
import cirq
from swap_test import SwapTest
import pandas as pd
import math
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

class QuantumKMeans:
    def __init__(self,data_csv,num_clusters,features,copies=1000,iters=100):
        self.data_csv = data_csv
        self.num_clusters = num_clusters
        self.features =features
        self.copies =copies
        self.iters =iters
    def data_preprocess(self):
        df = pd.read_csv(self.data_csv)
        print(df.columns)
        df['theta'] = df.apply(lambda x: math.atan(x[self.features[1]]/x[self.features[0]]), axis=1)
        self.X =df.values[:,:2]
        self.row_norms = np.sqrt((self.X**2).sum(axis=1))
        self.X =self.X/self.row_norms[:, np.newaxis]
        self.X_q_theta = df.values[:,2]
        self.num_datapoints= self.X.shape[0]
    def distance(self,x,y):
        st =SwapTest(prepare_inputstates=True, input_state_dim=2, measure=True, copies=self.copies)
        st.build_circuit(input_1_transforms=[cirq.ry(x)], input_2_transforms=[cirq.ry(y)])
        prob_0, _ = st.simulate()
        _distance_ = 1-prob_0
        del st
        return _distance_
    def init_clusters(self):
        self.cluster_points = np.random.randint(self.num_datapoints, size=self.num_clusters)
        self.cluster_datapoints =self.X[self.cluster_points,:]
        self.cluster_theta =self.X_q_theta[self.cluster_points]
        self.clusters =np.zeros(len(self.X_q_theta))
    def assign_clusters(self):
        self.distance_matrix = np.zeros((self.num_datapoints, self.num_clusters))
        for i, x in enumerate(list(self.X_q_theta)):
            for j, y in enumerate(list(self.cluster_theta)):
                self.distance_matrixp[i, j] = self.distance(x,y)
        self.clusters = np.argmin(self.distance_matrix, axis=1)
    def update_cluster(self):
        updated_cluster_datapoints =[]
        updated_cluster_theta = []
        for k in range(self.num_clusters):
            centroid = np.mean(self.X[self.clusters ==k], axis=0)
            centroid_theta = math.atan(centroid[1]/centroid[0])
            updated_cluster_datapoints.append(centroid)
            updated_cluster_theta.append(centroid_theta)
        self.cluster_datapoints = np.array(updated_cluster_datapoints)
        self.cluster_theta = np.array(updated_cluster_theta)
    def plot(self):
        fig = plt.figure(figsize=(8,8))
        colors =['red', 'green', 'blue', 'purple', 'yellow', 'black']
        plt.scatter(self.X[:,0],self.X[:,1],c=self.clusters,cmap=matplotlib.colors.ListedColormap(colors[:self.num_clusters]))
        plt.savefig('Cluster.png')
    def run(self):
        self.data_preprocess()
        self.init_clusters()
        for i in range(self.iters):
            self.assign_clusters()
            self.update_clusters()
        self.plot()

if __name__ == '__main__':
    data_Cvs = 'DataForQComparison.csv'
    num_clusters =4
    qkmeans = QuantumKMeans(data_cvs=data_csv, num_clusters=num_clusters, iters=10, features=['Annual Income_k$',0])
    qkmeans.run()

NameError: name 'data_csv' is not defined

In [5]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from qiskit import Aer
from qiskit.aqua.components.feature_maps import SecondOrderExpansion,FirstOrderExpansion
from qiskit.aqua.algorithms import QSVM
from qiskit.aqua import QuantumInstance
import numpy as np
import matplotlib.pyplot as plt


class QSVM_routine:

    def __init__(self,
                 feature_dim=2,
                 feature_depth=2,
                 train_test_split=0.3,
                 train_samples=5,
                 test_samples=2,
                 seed=0,
                 copies=5):
        self.feature_dim = feature_dim
        self.feature_depth = feature_depth
        self.train_test_split = train_test_split
        self.train_samples = train_samples
        self.test_samples = test_samples
        self.seed = seed
        self.copies = copies

    # Create train test datasets

    def train_test_datasets(self):
        self.class_labels = [r'A', r'B']
        data, target = datasets.load_breast_cancer(True)
        train_X, test_X, train_y, test_y = train_test_split(data, target,
                                           test_size=self.train_test_split,
                                           random_state=self.seed)
        # Mean std normalization 
        self.z_scale = StandardScaler().fit(train_X)
        self.train_X_norm = self.z_scale.transform(train_X)
        self.test_X_norm = self.z_scale.transform(test_X)

        # Project the data into dimensions equal to the 
        # number of qubits
        self.pca = PCA(n_components=self.feature_dim).fit(self.train_X_norm)
        self.train_X_norm = self.pca.transform(self.train_X_norm)
        self.test_X_norm = self.pca.transform(self.test_X_norm)

        # Scale to the range (-1,+1)
        X_all = np.append(self.train_X_norm, self.test_X_norm, axis=0)
        minmax_scale = MinMaxScaler((-1, 1)).fit(X_all)
        self.train_X_norm = minmax_scale.transform(self.train_X_norm)
        self.test_X_norm = minmax_scale.transform(self.test_X_norm)

        # Pick training and test number of datapoint 
        self.train = {key: (self.train_X_norm[train_y == k, :])[:self.train_samples] for k, key in
                      enumerate(self.class_labels)}
        self.test = {key: (self.test_X_norm[test_y == k, :])[:self.test_samples] for k, key in
                     enumerate(self.class_labels)}

        
    # Train the QSVM Model
    def train_model(self):
        backend = Aer.get_backend('qasm_simulator')
        feature_expansion = SecondOrderExpansion(feature_dimension=self.feature_dim,
                                                 depth=self.feature_depth,
                                                 entangler_map=[[0, 1]])
        #feature_expansion = FirstOrderExpansion(feature_dimension=self.feature_dim)

        # Model definition
        svm = QSVM(feature_expansion, self.train, self.test)
        #svm.random_seed = self.seed
        q_inst = QuantumInstance(backend, shots=self.copies)

        # Train the SVM
        result = svm.run(q_inst)
        return svm, result

    # Analyze the training and test results

    def analyze_training_and_inference(self, result, svm):
        data_kernel_matrix = result['kernel_matrix_training']
        image = plt.imshow(np.asmatrix(data_kernel_matrix),
                           interpolation='nearest',
                           origin='upper',
                           cmap='bone_r')
        plt.show()
        print(f"Test Accuracy: {result['testing_accuracy']}")

    def main(self):
        self.train_test_datasets()
        svm, result = self.train_model()
        self.analyze_training_and_inference(svm, result)


if __name__ == '__main__':
    qsvm = QSVM_routine()
    qsvm.main()

ImportError: cannot import name 'Aer' from 'qiskit' (c:\Users\user\AppData\Local\Programs\Python\Python313\Lib\site-packages\qiskit\__init__.py)