In [1]:
import qiskit as qsk, numpy as np, matplotlib.pyplot as plt
import torch
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor, Resize
from qiskit.quantum_info import SparsePauliOp, Statevector, Clifford, PauliList
from typing import Callable
from functools import lru_cache
from timeit import default_timer as timer

def random_thetas(shape: tuple[int]):
    possible_theta_values = np.pi / 2 * np.arange(4)
    return np.random.choice(possible_theta_values, shape)

def evolveSPO(op: SparsePauliOp, other: Clifford) -> SparsePauliOp:
    return SparsePauliOp(
        op.paulis.evolve(other)
    )

def img_to_vec(image: torch.Tensor):
    n = int(np.log2(image.shape[0]))  # Image should be a 2^n x 2^n tensor
    # Convert tensor indices from y,x to y0,y1,...,yn-1,x0,x1,...,xn-1
    image = image.reshape([2 for i in range(2*n)])
    # Permute indices from y0,y1,...,yn-1,x0,x1,...,xn-1 to y0,x0,y1,x1,...,yn-1,xn-1
    image = image.permute([i+j for i in range(n) for j in [0, n]])
    vec = image.flatten()
    return np.array([x for x in vec])

def sumZ(n_qubits: int) -> SparsePauliOp:
    return SparsePauliOp.from_sparse_list(
        [("Z", [i], 1) for i in range(n_qubits)],
        num_qubits = n_qubits
    )

mnist_train = MNIST("./data",
                    train= True,
                    transform= ToTensor(),
                    download= True)

def get_image(idx: int, dataset, n_qubits: int):
    image, label = dataset[idx]
    size = 2**(n_qubits//2)
    padded_image = torch.zeros([size, size])
    padded_image[:28, :28] = image.squeeze()[:, :]
    return padded_image

def get_index_from_label(
        qc: qsk.QuantumCircuit,
        label,
        starting_index: int = 0,
        reverse_search_order: bool = False
) -> int:
    for i, instruction in enumerate(qc.data[starting_index:]):
        if instruction.label == label: return i + starting_index
    raise ValueError(f"Label {label} not found in circuit")

class ParametricCircuit:
    def __init__(self):
        self.n_parameters: int = None
        self.params_shape: tuple[int] = None

        self.cache_heads: dict[str, Clifford] = {}
        self.cache_tails: dict[str, Clifford] = {}
    
    def __call__(self, thetas: np.ndarray, *args, **kwds) -> qsk.QuantumCircuit:
        pass

    def single_gate_clifford(self, label: str, theta_shift: float) -> Clifford:
        pass

    def cache_circuit_segments(self):
        pass

class Model:
    def __init__(self,
                 parametric_circuit: ParametricCircuit,
                 encoding: Callable[..., Statevector],
                 observable: SparsePauliOp
                 ):
        self.param_circ = parametric_circuit
        self.enc = encoding
        self.obs = observable
        self.use_inefficient_method: bool = False
        self.cache_plus: dict = {}
        self.cache_minus: dict = {}
        self.cache_obs_plus: dict = {}
        self.cache_obs_minus: dict = {}


    # def inefficient_call(self, thetas: np.ndarray, input) -> float:
    #     psi = self.enc(input)
    #     qc = self.param_circ(thetas)
    #     evolved_psi = psi.evolve(qc)
    #     return evolved_psi.expectation_value(self.obs)

    def __call__(self, thetas: np.ndarray, input) -> float:
        psi = self.enc(input)
        qc = self.param_circ(thetas)
        if self.use_inefficient_method == False:
            evolved_ops = evolveSPO(self.obs, Clifford(qc))
            return psi.expectation_value(evolved_ops)
        else:
            evolved_psi = psi.evolve(qc)
            return evolved_psi.expectation_value(self.obs)
    
    def param_shift_rule_grad(self, thetas: np.ndarray, input, index: tuple[int]) -> float:
        # print(f"Relevant theta = {thetas[index]}")
        thetas[index] += np.pi / 2
        # print(f"Relevant theta = {thetas[index]}")
        value_plus = self.__call__(thetas, input)
        # print(f"Value plus = {value_plus}")
        thetas[index] -= np.pi
        # print(f"Relevant theta = {thetas[index]}")
        value_minus = self.__call__(thetas, input)
        thetas[index] += np.pi/2
        # print(f"Value minus = {value_minus}")
        return (value_plus - value_minus) / 2
    
    def empirical_NTK(self, thetas: np.ndarray, input1, input2) -> float:
        grad = self.param_shift_rule_grad
        # print(
        #     np.array([
        #         self.param_shift_rule_grad(thetas, input1, index) #* grad(thetas, input2, index)
        #         for index in np.ndindex(thetas.shape)
        #     ])
        # )
        return np.sum([
            grad(thetas, input1, index) * grad(thetas, input2, index)
            for index in np.ndindex(thetas.shape)
        ])
    
    def analytic_NTK(self, input1, input2, n_shots):
        return np.mean([
            self.empirical_NTK(random_thetas(self.param_circ.params_shape),
                               input1, input2)
            for _ in range(n_shots)
        ])
    
    def cache_ops(self):
        for i in range(self.param_circ.n_parameters):
            head = self.param_circ.cache_heads[str(i)]
            tail = self.param_circ.cache_tails[str(i)]
            self.cache_minus[str(i)] = head.compose(
                self.param_circ.single_gate_clifford(str(i), -np.pi/2)
            ).compose(head)
            self.cache_plus[str(i)] = head.compose(
                self.param_circ.single_gate_clifford(str(i), np.pi/2)
            ).compose(tail)
    
    def cache_ops_obs(self):
        for k, v in self.cache_plus.items():
            self.cache_obs_plus[k] = evolveSPO(self.obs, v)
        for k, v in self.cache_minus.items():
            self.cache_obs_minus[k] = evolveSPO(self.obs, v)
    
    def new_grad(self, thetas: np.ndarray, psi):
        out = []
        for i in range(self.param_circ.n_parameters):
            # op_plus = evolveSPO(self.obs, self.cache_plus[str(i)])
            # op_minus = evolveSPO(self.obs, self.cache_minus[str(i)])
            op_plus = self.cache_obs_plus[str(i)]
            op_minus = self.cache_obs_minus[str(i)]
            out.append(1/2 * (psi.expectation_value(op_plus) - psi.expectation_value(op_minus)))
        return np.array(out)
    
    def new_emp_NTK(self, thetas: np.ndarray, input1, input2, use_caches: bool = False,
                    use_caches_states: bool = False):
        time0 = timer()
        if not use_caches: self.param_circ(thetas)
        time1 = timer()

        print(time1 - time0)

        if not use_caches: self.param_circ.cache_circuit_segments()
        time2 = timer()
        print(time2 - time1)
        if not use_caches:
            self.cache_ops()
            self.cache_ops_obs()
        time3 = timer()

        if not use_caches_states:
            self.psi1 = self.enc(input1)
            self.psi2 = self.enc(input2)

        print(time3 - time2)
        return np.dot(
            self.new_grad(thetas, self.psi1),
            self.new_grad(thetas, self.psi2)
        )


    # This function is all wrong, I have to structure it better. The single_gate_cliff should
    # have a shifted theta, not the same theta.
    # def new_emp_NTK(self, thetas: np.ndarray, input1, input2) -> float:
    #     cliff_cache: dict = {}
    #     for i in range(self.param_circ.n_parameters):
    #         cliff_cache[str(i)] = self.param_circ.cache_heads[str(i)].compose(
    #             self.param_circ.single_gate_clifford(str(i))
    #         ).compose(
    #             self.param_circ.cache_tails[str(i)]
    #         )

    


class SimpleTestCircuit(ParametricCircuit):
    def __init__(self, n_qubits: int):
        self.n_parameters: int = n_qubits
        self.params_shape: tuple[int] = (n_qubits)

        self.cache_heads: dict[str, Clifford] = {}
        self.cache_tails: dict[str, Clifford] = {}
    
    def __call__(self, thetas: np.ndarray) -> qsk.QuantumCircuit:
        qc = qsk.QuantumCircuit(self.n_parameters)
        for i in range(self.n_parameters): qc.rx(thetas[i], i)
        return qc

class ConvolutionalQNN(ParametricCircuit):
    def __init__(self, n_qubits: int, n_layers: int):
        self.qc: qsk.QuantumCircuit = None
        self.n_qubits: int = n_qubits
        self.n_layers: int = n_layers
        self.params_shape: tuple[int] = (4, 5*n_qubits//2 - 4, n_layers)
        self.n_parameters: int = np.prod(self.params_shape)
        self.gate_label: int = 0

        self.cache_heads: dict[str, Clifford] = {}
        self.cache_tails: dict[str, Clifford] = {}

    def elementary_2_qubit_gate(self, thetas: np.ndarray, qubit1: int, qubit2: int):
        self.qc.rx(thetas[0], qubit1, label= str(self.gate_label))
        self.gate_label +=1

        self.qc.rx(thetas[1], qubit2, label= str(self.gate_label))
        self.gate_label +=1
        
        self.qc.cx(qubit1, qubit2)
        
        self.qc.rz(thetas[2], qubit1, label= str(self.gate_label))
        self.gate_label +=1
        
        self.qc.rz(thetas[3], qubit2, label= str(self.gate_label))
        self.gate_label +=1
    
    def layer(self, thetas: np.ndarray):
        i = 0
        def apply_gate(qubit1: int, qubit2: int):
            nonlocal i
            self.elementary_2_qubit_gate(thetas[:, i], qubit1, qubit2)
            i += 1
        
        def xiyi_gates(central_qubit: int):
            if central_qubit>0: apply_gate(central_qubit-2, central_qubit-1)
            apply_gate(central_qubit, central_qubit+1)
            if central_qubit<self.n_qubits-2: apply_gate(central_qubit+2, central_qubit+3)
        
        def xxyy_gates(lower_qubit: int):
            apply_gate(lower_qubit, lower_qubit+2)
            apply_gate(lower_qubit+1, lower_qubit+3)
        
        for qubit in np.arange(0, self.n_qubits-2, 2):
            xiyi_gates(qubit)
            xxyy_gates(qubit)
        xiyi_gates(self.n_qubits-2)
    
    def __call__(self, thetas):
        self.gate_label = 0
        self.qc = qsk.QuantumCircuit(self.n_qubits)
        for i in range(self.n_layers):
            self.layer(thetas[:,:,i])
            self.qc.barrier()
        return self.qc
    
    def cache_circuit_segments(self):
        for i in range(self.n_parameters):
            index = get_index_from_label(self.qc, str(i), i)

            head = qsk.QuantumCircuit(self.n_qubits)
            tail = qsk.QuantumCircuit(self.n_qubits)
            
            head.data = self.qc.data[:index]
            tail.data = self.qc.data[index+1:]
            
            self.cache_heads[str(i)] = Clifford(head)
            self.cache_tails[str(i)] = Clifford(tail)
    
    def single_gate_clifford(self, label: str, theta_shift: float) -> Clifford:
        index = get_index_from_label(self.qc, label)
        instruction = self.qc.data[index : index + 1]
        instruction[0].operation.params[0] += theta_shift
        qc_single_gate = qsk.QuantumCircuit(self.n_qubits)
        qc_single_gate.data = instruction
        return Clifford(qc_single_gate)




In [2]:
np.random.seed(45)
n_qubits = 10
circ = ConvolutionalQNN(n_qubits, 2)
thetas = random_thetas(circ.params_shape)
# thetas = np.zeros(n_qubits); thetas[0] = np.pi / 2
pauli_op = sumZ(n_qubits)
#display(circ(thetas).draw("mpl"))
evolved_pauli_op = evolveSPO(pauli_op, Clifford(circ(thetas)))
print(evolved_pauli_op)


SparsePauliOp(['IIIIIIXXXZ', 'IIIIZYZIXX', 'IIZZZYIXII', 'IIZZXIXZYZ', 'IIZZYZXIXZ', 'YYZYZYXYZI', 'XXXZZYIZXX', 'YYZXXIYXIX', 'ZYYXYYYYXI', 'YYYYYYYYXI'],
              coeffs=[ 1.+0.j,  1.+0.j,  1.+0.j,  1.+0.j, -1.+0.j,  1.+0.j, -1.+0.j,  1.+0.j,
  1.+0.j,  1.+0.j])


In [3]:
test_pauli_str = evolved_pauli_op.to_list()[8][0]
print(test_pauli_str)
def get_masks(pauli_str: str) -> tuple[int, int]:
    z_mask = np.sum([
        2**i if pauli_str[-(i+1)] in ['Z', 'Y'] else 0
        for i in range(len(pauli_str))
    ])
    x_mask = np.sum([
        2**i if pauli_str[-(i+1)] in ['X', 'Y'] else 0
        for i in range(len(pauli_str))
    ])
    return z_mask, x_mask
z_mask_test, x_mask_test = get_masks(test_pauli_str)
print(z_mask_test, x_mask_test)
print(x_mask_test)

ZYYXYYYYXI
956 510
510


In [9]:
img = get_image(49, mnist_train, n_qubits)
start_time = timer()
vec = img_to_vec(img)
end_time = timer()
vec_indices = np.where(vec > 0.0)[0]
vec_amps = vec[vec_indices]
psi = Statevector(vec)

print(end_time - start_time)

0.0020201120005367557


In [None]:
import sys
n_images = 71_000
M = np.zeros((n_images, n_images))
# M[:] = 1.0
M2 = np.zeros((n_images, n_images))
M3 = np.zeros((n_images, n_images))
print(sys.getsizeof(
    M
    ) / 10**9)
M.dtype

In [5]:
len(vec_amps)

184

In [6]:
start_time = timer()
print(
    psi.expectation_value(evolved_pauli_op[8])
)
end_time = timer()
print(f"Native Qiskit time for final scalar prod: {end_time - start_time}")

(2.0312803032990825+0j)
Native Qiskit time for final scalar prod: 0.003868162000344455


In [7]:
delta = x_mask_test
z_mask = z_mask_test
x_mask = x_mask_test

In [8]:
import numba

def pauli_expectation_old(indices, amps,
                      delta, z_mask, y_mask, n_qubits):
    acc = 0.0 + 0.0j
    for k in range(indices.shape[0]):
        i = indices[k]
        amp_i = amps[k]

        # partner index after bit flips
        j = i ^ delta

        # lookup psi[j] (sparse search: linear here, but could hash/dict)
        amp_j = 0.0 + 0.0j
        # for kk in range(indices.shape[0]):
        #     if indices[kk] == j:
        #         amp_j = amps[kk]
        #         break
        kk = np.searchsorted(indices, i)
        if indices[kk] == j:
            amp_j = amps[kk]


        if amp_j != 0.0:
            # phase factor
            # count parity of (i & z_mask) bits
            parity = bin(i & z_mask).count("1") & 1
            phase = -1.0 if parity else 1.0

            # Y gives an extra factor of i^bit
            y_parity = bin(i & y_mask).count("1")
            if y_parity % 4 == 1:
                phase *= 1j
            elif y_parity % 4 == 2:
                phase *= -1
            elif y_parity % 4 == 3:
                phase *= -1j

            acc += np.conjugate(amp_i) * phase * amp_j

    return acc



@numba.njit
def pauli_expectation(indices, amps,
                      delta, z_mask, y_mask, n_qubits):
    acc = 0.0 + 0.0j
    for k in range(indices.shape[0]):
        i = indices[k]
        amp_i = amps[k]

        # partner index after bit flips
        j = i ^ delta

        # lookup psi[j] (assuming indices sorted)
        amp_j = 0.0 + 0.0j
        kk = np.searchsorted(indices, j)
        if kk < indices.shape[0] and indices[kk] == j:
            amp_j = amps[kk]

        if amp_j != 0.0:
            # phase factor
            parity = (i & z_mask).bit_count() & 1
            phase = -1.0 if parity else 1.0

            # Y gives an extra factor
            y_parity = (i & y_mask).bit_count()
            if y_parity % 4 == 1:
                phase *= 1j
            elif y_parity % 4 == 2:
                phase *= -1
            elif y_parity % 4 == 3:
                phase *= -1j

            acc += np.conjugate(amp_i) * phase * amp_j

    return acc


In [9]:
import numpy as np
import numba
from numba import njit

@njit(inline='always')
def popcount(x):
    # Kernighan’s algorithm
    c = 0
    while x:
        x &= x - 1
        c += 1
    return c

@njit
def pauli_expectation(indices, amps, delta, z_mask, y_mask, n_qubits):
    acc = 0.0 + 0.0j
    n = indices.shape[0]

    for k in range(n):
        i = indices[k]
        amp_i = amps[k]

        # partner index after bit flips
        j = i ^ delta

        # binary search (requires indices sorted ascending!)
        amp_j = 0.0 + 0.0j
        kk = np.searchsorted(indices, j)   # <-- was 'i' in your snippet; should be 'j'
        if kk < n and indices[kk] == j:
            amp_j = amps[kk]

        if amp_j != 0.0j:
            # Z phase: (-1)^{popcount(i & z_mask)}
            parity = popcount(i & z_mask) & 1
            phase = (-1.0 + 0.0j) if parity else (1.0 + 0.0j)

            # Y phase: i^{popcount(i & y_mask)}  (mod 4)
            y_par = popcount(i & y_mask) % 4
            if y_par == 1:
                phase *= 1j
            elif y_par == 2:
                phase *= -1.0
            elif y_par == 3:
                phase *= -1j

            acc += amp_i.conjugate() * phase * amp_j

    return acc

@njit
def pauli_expectation_2(
    indices, full_vector,
    delta, z_mask, y_mask, n_qubits
):
    acc = 0.0 + 0.0j
    n = indices.shape[0]

    for k in range(n):
        i = indices[k]
        amp_i = full_vector[i]

        j = i ^ delta
        amp_j = full_vector[j]
        # Z phase: (-1)^{popcount(i & z_mask)}
        parity = popcount(i & z_mask) & 1
        phase = (-1.0 + 0.0j) if parity else (1.0 + 0.0j)

        # Y phase: i^{popcount(i & y_mask)}  (mod 4)
        y_par = popcount(i & y_mask) % 4
        if y_par == 1:
            phase *= 1j
        elif y_par == 2:
            phase *= -1.0
        elif y_par == 3:
            phase *= -1j

        acc += amp_i.conjugate() * phase * amp_j
    return acc

#@njit
def empty_function():
    return 0

@njit
def pauli_expectation_3(
    indices, data,
    z_mask, x_mask, phase
):
    out = 0.0 + 0.0j
    n_nonzeros = indices.shape[0]

    y_par = popcount(z_mask & x_mask) % 4
    if y_par == 1:
        phase_due_to_y = 1j
    elif y_par == 2:
        phase_due_to_y = -1.0
    elif y_par == 3:
        phase_due_to_y = -1j

    for k in range(n_nonzeros):
        i = indices[k]
        amp_i = data[i]
        
        j = i ^ x_mask
        amp_j = data[j]

        #phase_i = (-1) ** popcount(i & z_mask) * 1j ** popcount(z_mask & x_mask)
        parity = popcount(i & z_mask) & 1
        phase_i = (-1.0 + 0.0j) if parity else (1.0 + 0.0j)
        
        # if y_par == 1:
        #     phase_i *= 1j
        # elif y_par == 2:
        #     phase_i *= -1.0
        # elif y_par == 3:
        #     phase_i *= -1j

        out += amp_j.conjugate() * phase_i * amp_i

    return out * phase * phase_due_to_y

In [10]:
start_time = timer()
print(pauli_expectation(vec_indices, vec_amps,
                  delta, z_mask, x_mask, n_qubits))
end_time = timer()
print(end_time - start_time)

(-8.208319922552022+0j)
0.6955289329998777


In [11]:
start_time = timer()
pauli_expectation_2(vec_indices, vec,
                  delta, z_mask, x_mask, n_qubits)
end_time = timer()
print(end_time - start_time)

0.10894941800006563


In [13]:
start_time = timer()
result = pauli_expectation_3(vec_indices, vec,
                   z_mask, x_mask, 1.0)
end_time = timer()
print(end_time - start_time)
print(result)

4.3401999846537365e-05
(2.0312803032990825-0j)


In [18]:
def save_input_file(filename, indices, data, z_mask, x_mask, phase):
    n_nonzeros = len(indices)
    with open(filename, "w") as f:
        # number of indices
        f.write(f"{n_nonzeros}\n")
        
        # indices line
        f.write(" ".join(str(int(x)) for x in indices) + "\n")
        
        # data (flattened real/imag pairs)
        for amp in data:
            f.write(f"{amp.real} {amp.imag} ")
        f.write("\n")
        
        # masks
        f.write(f"{int(z_mask)}\n")
        f.write(f"{int(x_mask)}\n")
        
        # phase
        f.write(f"{phase.real} {phase.imag}\n")

save_input_file("test.txt", vec_indices, vec, z_mask, x_mask, 1.0 + 0.0j)

In [15]:
start_time = timer()
empty_function()
end_time = timer()
print(end_time - start_time)

2.248700002382975e-05


In [18]:
N = 2 ** (n_qubits)
test_vec = np.random.random(N)
test_mat = np.random.random((N, N))

start_time = timer()
print(
    test_vec.dot(
        test_mat.dot(
            test_vec
        )
    )
)
end_time = timer()
print(f"Numpy dense matrix time: {end_time - start_time}")

129367.58313426099
Numpy dense matrix time: 0.00389756499998839


In [22]:
N = 252
test_vec1 = np.random.random(N)
test_vec2 = np.random.random(N)

start_time = timer()
test_vec1.dot(test_vec2)
end_time = timer()
print(f"Np time to product cached {N}-long vectors: {end_time - start_time}")

Np time to product cached 252-long vectors: 3.835600000456907e-05


In [None]:
0.00038 * 500