<a href="https://colab.research.google.com/github/alexaK88/Q_jpeg_pennylane/blob/main/full_flow_different_qubits.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pennylane
!pip install pennylane pennylane-lightning[gpu]



### Import Libraries

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pennylane as qml
from pennylane.templates import QFT
from sklearn.svm import SVC
from sklearn.datasets import fetch_openml, load_digits
from sklearn.preprocessing import MinMaxScaler, normalize
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix
from pennylane import numpy as pnp
from skimage.transform import resize
from keras.datasets import mnist

### Step 1:  Dataset Preparation


First, we load the MNIST dataset from openML.
- X is the pixel data
- y is the labels
- converting everything to `uint8` here to ensure all values are integers in [0, 255]

In [3]:
# loading mnist from openML
mnist = fetch_openml('mnist_784', version=1, cache=True)
X = mnist['data'].astype(np.uint8) # better to convert for binerization
y = mnist['target'].astype(np.uint8)
y = y.to_numpy()

Next, we focus on 2 classes, i.e. binary classification.
Here, I've been experimenting with different classes, and I stopped on 4 vs 9, cause they have more subtle difference in pixels, they are similar looking.

In [4]:
# focus on binary classification
mask = (y == 4) | (y == 9)
X, y = X[mask], y[mask]
X.shape

(13782, 784)

- I take only the first `n_samples`.
- I convert X to a NumPy array, and shuffling the data randomly

In [5]:
n_samples = 100 # restricting to 6000 samples for now

X = X.values if hasattr(X, "values") else X # safer conversion
perm = np.random.permutation(len(X))
X, y = X[perm], y[perm]

X = X[:n_samples]
y = y[:n_samples]

Now, I normalise pixel intensities.
- [0, 255] -> [0, 1]
- reshaping images back to 2D for resizing, i.e to 28x28 array with float values between 0 and 1.

In [6]:
X = X / 255.0
X = X.reshape(-1, 28, 28)

print(X.shape)
print("Pixel range:", X.min(), X.max())

(100, 28, 28)
Pixel range: 0.0 1.0


And now I reduce images to 8x8 + flattening to (, 64)
- resize -> flatten -> normalize

In [7]:
# convert each 28x28 binarised image to 8x8, then flatten to length 64
def to_8x8_vector(img_row):
    img_8x8 = resize(
        img_row,
        (8, 8),
        anti_aliasing=False,
        preserve_range=True,
        order=1 # controlling interpolation
    )
    img_8x8 = img_8x8.flatten()
    s = np.sum(img_8x8)

    if s > 0:
        img_8x8 = np.sqrt(img_8x8 / s)
    else:
        img_8x8 = np.zeros_like(img_8x8)
        img_8x8[0] = 1.0
      # should be shape (64,)
    return img_8x8

# apply transformation to all images
X_8x8 = np.array([to_8x8_vector(x) for x in X], dtype=float)
X_8x8.shape

(100, 64)

In [8]:
# sanity check, make sure no NaNs exist and all vectors are normalised, i.e. norm is around 1
print("Any NaNs?", np.isnan(X_8x8).any())
print("Norm check:", np.min(np.linalg.norm(X_8x8, axis=1)), np.max(np.linalg.norm(X_8x8, axis=1)))

Any NaNs? False
Norm check: 0.9999999999999999 1.0


I'm gonna do the splitting here, and carry both representations consistently.
- qek inputs: (64,) flattened and normalized vectors, for quantum kernel embedding
- qjpeg: 28x28 images

In [9]:
idx = np.arange(n_samples)

idx_train, idx_test, y_train, y_test = train_test_split(
    idx, y, test_size=0.2, random_state=42, stratify=y
)

# QEK inputs (8x8 -> 64 -> normed)
X_train_qek = X_8x8[idx_train]
X_test_qek  = X_8x8[idx_test]

# QJPEG inputs (28x28 binary images)
X_train_img = X[idx_train]
X_test_img  = X[idx_test]

print("QEK train/test:", X_train_qek.shape, X_test_qek.shape)
print("IMG train/test:", X_train_img.shape, X_test_img.shape)
print("Labels train/test:", y_train.shape, y_test.shape)

QEK train/test: (80, 64) (20, 64)
IMG train/test: (80, 28, 28) (20, 28, 28)
Labels train/test: (80,) (20,)


Data preparation is done.

### Step 2: Quantum Embedding & Kernel Training

Define number of qubits and device.

In [10]:
device = "lightning.qubit"
n_qubits = 6
n_layers = 2          # number of trainable layers
batch_size = 8       # bigger batch for stability
n_steps = 50          # training steps
stepsize = 2e-4       # smaller learning rate
eps = 1e-8      # small epsilon to prevent division by zero
wires = range(n_qubits)
qubit_list = [4, 5, 6, 7, 8]

In [11]:
dev = qml.device(device, wires=n_qubits, shots=None)

In [12]:
def feature_map(x, theta, wires):
    n_qubits = len(wires)
    qml.AmplitudeEmbedding(x, wires=wires, pad_with=0.0, normalize=True)
    for l in range(theta.shape[0]):
        for i in range(n_qubits):
            qml.RX(theta[l, i, 0], wires=wires[i])
            qml.RY(theta[l, i, 1], wires=wires[i])
            qml.RZ(theta[l, i, 2], wires=wires[i])
        for i in range(n_qubits - 1):
            qml.CNOT(wires=[wires[i], wires[i+1]])
        qml.CNOT(wires=[wires[-1], wires[0]])


def train_quantum_kernel(X, y, n_qubits,
                         n_layers=2,
                         n_steps=50,
                         batch_size=8,
                         stepsize=2e-4,
                         ema_alpha=0.9,
                         verbose=True
                         ):
    wires = range(n_qubits)
    dev = qml.device("lightning.qubit", wires=n_qubits, shots=None)

    @qml.qnode(dev, interface="autograd")
    def kernel_qnode(x1, x2, theta):
        feature_map(x1, theta, wires)
        qml.adjoint(feature_map)(x2, theta, wires)
        return qml.expval(qml.Projector([0]*n_qubits, wires=wires))

    def kernel_matrix(X1, X2, theta):
        return qml.math.stack([
            qml.math.stack([
                kernel_qnode(x1, x2, theta)
                for x2 in X2
            ])
            for x1 in X1
        ])

    def center_kernel(K):
        n = K.shape[0]
        H = qml.numpy.eye(n) - qml.numpy.ones((n, n)) / n
        return H @ K @ H

    def kernel_alignment_loss(theta):
        idx = np.random.choice(len(X), batch_size, replace=False)
        Xb = X[idx]
        yb = y[idx]

        K = center_kernel(kernel_matrix(Xb, Xb, theta))
        y_pm = (2 * yb - 1).astype(float)
        yy = qml.math.outer(y_pm, y_pm)

        K /= qml.math.linalg.norm(K)
        yy /= qml.math.linalg.norm(yy)

        return -qml.math.sum(K * yy)

    theta = 0.01 * qml.numpy.random.randn(n_layers, n_qubits, 3)
    opt = qml.AdamOptimizer(stepsize)

    ema = None

    for step in range(n_steps):
        theta, loss = opt.step_and_cost(kernel_alignment_loss, theta)

        loss_val = float(loss)
        ema = loss_val if ema is None else ema_alpha * ema + (1 - ema_alpha) * loss_val

        if verbose:
            print(
                f"Step {step:02d} | "
                f"loss = {loss_val:.4f} | "
                f"ema = {ema:.4f}"
            )

    return theta, kernel_qnode


In [13]:
def prepare_qek_inputs(X, n_qubits, eps=1e-12):
    dim = 2 ** n_qubits
    Xq = X[:, :dim].copy()

    # Renormalize each sample
    norms = np.linalg.norm(Xq, axis=1, keepdims=True)
    # If a vector is all zeros, replace its norm with 1.0 to avoid division by zero
    # and then set the vector to a valid normalized state (e.g., |0...0>)
    zero_norm_indices = np.where(norms.flatten() < eps)[0]
    if len(zero_norm_indices) > 0:
        Xq[zero_norm_indices, 0] = 1.0  # Set first element to 1 for |0...0>
        Xq[zero_norm_indices, 1:] = 0.0 # Set rest to 0
        norms[zero_norm_indices] = 1.0 # Ensure norm is 1.0

    # Ensure norms are at least eps to prevent division by near-zero values
    norms = np.maximum(norms, eps)
    Xq = Xq / norms

    return Xq

In [14]:
results = []

for nq in qubit_list:
    print(f"\nTraining kernel with {nq} qubits")
    Xq = prepare_qek_inputs(X_train_qek, nq)
    # Pass the prepared Xq to train_quantum_kernel, not the original X_train_qek
    theta, kernel = train_quantum_kernel(Xq, y_train, nq)

    # Initialize results as a dictionary first if it's meant to store non-numeric keys
    # or ensure nq is used as an index if results is a list
    if isinstance(results, list): # if results is still an empty list, convert to dict
        if not results: # only if empty
            results = {}

    results[nq] = {
        "theta": theta,
        "kernel": kernel
    }


Training kernel with 4 qubits
Step 00 | loss = -0.0458 | ema = -0.0458
Step 01 | loss = -0.0598 | ema = -0.0472
Step 02 | loss = -0.0185 | ema = -0.0443
Step 03 | loss = -0.0795 | ema = -0.0478
Step 04 | loss = -0.0280 | ema = -0.0458
Step 05 | loss = -0.0422 | ema = -0.0455
Step 06 | loss = -0.1289 | ema = -0.0538
Step 07 | loss = -0.0879 | ema = -0.0572
Step 08 | loss = -0.0211 | ema = -0.0536
Step 09 | loss = -0.0280 | ema = -0.0511
Step 10 | loss = -0.0615 | ema = -0.0521
Step 11 | loss = -0.0626 | ema = -0.0532
Step 12 | loss = -0.0493 | ema = -0.0528
Step 13 | loss = -0.1401 | ema = -0.0615
Step 14 | loss = -0.0804 | ema = -0.0634
Step 15 | loss = -0.0850 | ema = -0.0655
Step 16 | loss = -0.0545 | ema = -0.0644
Step 17 | loss = -0.0535 | ema = -0.0633
Step 18 | loss = -0.1241 | ema = -0.0694
Step 19 | loss = -0.0455 | ema = -0.0670
Step 20 | loss = -0.0528 | ema = -0.0656
Step 21 | loss = -0.1089 | ema = -0.0699
Step 22 | loss = -0.0123 | ema = -0.0642
Step 23 | loss = -0.0792 |

  res = super().__array_ufunc__(ufunc, method, *args, **kwargs)
  lambda ans, x, y: unbroadcast_f(x, lambda g: g / y),


Step 28 | loss = nan | ema = nan
Step 29 | loss = nan | ema = nan
Step 30 | loss = nan | ema = nan
Step 31 | loss = nan | ema = nan
Step 32 | loss = nan | ema = nan
Step 33 | loss = nan | ema = nan
Step 34 | loss = nan | ema = nan
Step 35 | loss = nan | ema = nan
Step 36 | loss = nan | ema = nan
Step 37 | loss = nan | ema = nan
Step 38 | loss = nan | ema = nan
Step 39 | loss = nan | ema = nan
Step 40 | loss = nan | ema = nan
Step 41 | loss = nan | ema = nan
Step 42 | loss = nan | ema = nan
Step 43 | loss = nan | ema = nan
Step 44 | loss = nan | ema = nan
Step 45 | loss = nan | ema = nan
Step 46 | loss = nan | ema = nan
Step 47 | loss = nan | ema = nan
Step 48 | loss = nan | ema = nan
Step 49 | loss = nan | ema = nan

Training kernel with 5 qubits
Step 00 | loss = -0.0617 | ema = -0.0617
Step 01 | loss = -0.0334 | ema = -0.0588
Step 02 | loss = -0.0382 | ema = -0.0568
Step 03 | loss = -0.0468 | ema = -0.0558
Step 04 | loss = -0.0794 | ema = -0.0581
Step 05 | loss = -0.0272 | ema = -0.05

### Step 3: QJPEG Compression

In [15]:
def vectorization(img, Cr, Cc, renorm=False):
    "Vectorize the image into amplitude-encoding patches suitable for quantum circuits"
    # splitting the original image (Mr, Mc) into S equal-size patches of shape (Cr, Cc)
    Mr, Mc = img.shape
    assert Mr % Cr == 0 and Mc % Cc == 0
    patches = (img.reshape(Mc//Cr, Cr, -1, Cc).swapaxes(1, 2).reshape(-1, Cr, Cc))
    # 64 patches, (64, 64, 64) shape; S=64

    # vectorize each patch and collect all in a (N, Cr*Cc) array
    vect_patches = np.reshape(patches,  (patches.shape[0], Cr*Cc)) # (64, 4096)

    # normalize each (Cr*Cc) vector to the intensity of the corresponding (Cr, Cc) patch
    states = np.zeros((patches.shape[0], Cr*Cc)) # (64, 4096)
    norm = np.zeros(patches.shape[0])

    for idx in range(patches.shape[0]): # for each patch
        # compute the sum of pixels intensities
        norm[idx] = vect_patches[idx].sum()
        if norm[idx] == 0:
            # empty patch -> encode |0...0>
            states[idx, 0] = 1.0
            norm[idx] = 1.0
            continue

        # normalize the patch vector so that its entries sum is 1
        tmp = vect_patches[idx] / norm[idx]
        # take the element-wise square root of the normalized vector
        states[idx] = np.sqrt(tmp)
    if renorm == False:
        norm = np.ones(patches.shape[0])
    print(states[:10])

    return states, norm # amplitudes, pixel intensities' sums

In [16]:
def qft_swaps(wires):
    n = len(wires)
    # apply QFT to all qubits
    qml.QFT(wires=wires)
    # add swaps to reverse qubit order!
    for i in range(n // 2):
        qml.SWAP(wires=[wires[i], wires[n - i - 1]])


def iqft_swaps(wires):
    n = len(wires)
    # swaps again - BEFORE iqft
    for i in reversed(range(n // 2)):
        qml.SWAP(wires=[wires[i], wires[n-i-1]])
    qml.adjoint(QFT)(wires=wires)

In [17]:
def circuit_builder(states, n0, n2, shots):
    ntilde = (n0 - n2) // 2
    n1 = n0 - ntilde

    qnodes = []

    # define device with n0 qubits
    dev = qml.device(device, wires=n0, shots=shots)

    for idx in range(states.shape[0]):
        # qnode to capture current input state
        @qml.qnode(dev)
        def circuit():
            # print("State norm:", np.linalg.norm(states[idx]))
            # initializing the state (using AmplitudeEmbedding here, but I'm wondering if something else could work faster)
            qml.AmplitudeEmbedding(states[idx], wires=range(n0), pad_with=0.0, normalize=True)

            # Hadamard on all n0 qubits
            for w in range(n0):
                qml.Hadamard(wires=w)

            # apply QFT on all qubits
            qft_swaps(wires=range(n0))

            # apply IQFT on first n1 qubits
            iqft_swaps(wires=range(n1))

            # setting boundaries - Rule 2
            discard_start = n0 // 2 - ntilde
            discard_end = n0 // 2 - 1
            discarded_qubits = set(range(discard_start, discard_end + 1))

            # keep exactly n2 qubits for output
            measured_qubits = list(range(n2))


            # Hadamard on remaining qubits
            for q in measured_qubits:
                qml.Hadamard(wires=q)

            # print(f'Measured qubits: {measured_qubits}')

            return qml.probs(wires=measured_qubits)
        qnodes.append(circuit)

    return qnodes

In [18]:
def reconstruction(qnodes, n2, norm):
    out_freq = np.zeros((len(qnodes), 2**n2))
    for idx, qnode in enumerate(qnodes):
        probs = qnode()
        out_freq[idx] = qnode() * norm[idx]

    return out_freq

In [19]:
def devectorization(out_freq):
    S = out_freq.shape[0]
    nrow = int(np.sqrt(out_freq.shape[1])) # rows per patch
    ncol = nrow

    decoded_patches = np.reshape(out_freq,\
                      (out_freq.shape[0], nrow, ncol)) # (S, nrow, ncol)

    im_h, im_w = nrow*int(np.sqrt(S)), ncol*int(np.sqrt(S)) # final shape

    # initialization
    decoded_img = np.zeros((im_w, im_h))

    idx = 0
    for row in np.arange(im_h - nrow + 1, step=nrow):
        for col in np.arange(im_w - ncol + 1, step=ncol):
            decoded_img[row:row+nrow, col:col+ncol] = decoded_patches[idx]
            idx += 1

    return decoded_img

In [20]:
def qjpeg_feature_map_quantum(img_28x28):
    """
    True QJPEG-inspired feature map:
    - probabilities sum to 1
    - amplitudes = sqrt(probabilities)
    - output dimension = 64 (6 qubits)
    """

    img = img_28x28.astype(float)
    img = img / img.sum()              # probabilities
    amps = np.sqrt(img.flatten())      # amplitudes

    # reduce to 64 amplitudes (simple truncation for now)
    amps = amps[:64]

    # safety
    if np.linalg.norm(amps) == 0:
        amps[0] = 1.0
    else:
        amps /= np.linalg.norm(amps)

    return amps


### Step 4: Inference without retraining

In [25]:
C = 100.0  # SVM regularization - the higher, the better
n_qubits_list = [4, 5, 6, 7, 8]
n_layers = 2
alpha = 0.8 # weight for combining QEK + QJPEG

In [32]:
def truncate_and_normalize(x, n_qubits, eps=1e-12):
    """Truncate vector to 2**n_qubits and normalize for QEK input."""
    dim = 2 ** n_qubits
    v = x[:dim].copy()
    norm = np.linalg.norm(v)
    if norm < eps:
        v[0] = 1.0
        v[1:] = 0.0
        norm = 1.0
    return v / norm

def qjpeg_to_32(img_28x28):
    """Return 32-dimensional QJPEG vector from 28x28 image."""
    amps = qjpeg_feature_map_quantum(img_28x28)  # get 64 amplitudes
    amps_32 = amps[:32]  # truncate to 32
    norm = np.linalg.norm(amps_32)
    if norm == 0:
        amps_32[0] = 1.0
        norm = 1.0
    return amps_32 / norm

def prepare_qek_features(X, n_qubits):
    return np.array([truncate_and_normalize(x, n_qubits) for x in X])

def compute_kernel(states_a, states_b=None):
    """Compute kernel matrix from quantum states."""
    if states_b is None:
        states_b = states_a
    K = np.zeros((len(states_a), len(states_b)))
    for i, a in enumerate(states_a):
        for j, b in enumerate(states_b):
            K[i, j] = np.abs(np.vdot(a, b))**2
    return K
# Inference loop
results = []

for n_qubits in n_qubits_list:
    wires = range(n_qubits)
    dev = qml.device(device, wires=wires, shots=None)

    # Prepare features
    X_train_qek_n = prepare_qek_features(X_train_qek, n_qubits)
    X_test_qek_n  = prepare_qek_features(X_test_qek, n_qubits)

    # Expressive QNode
    @qml.qnode(dev, interface="autograd")
    def expressive_state(x, theta):
        feature_map(x, theta, wires=wires)
        return qml.state()

    # Truncate theta if needed
    n_layers_trained, n_qubits_trained, _ = theta.shape
    theta_n = theta[:, :n_qubits, :].copy() if n_qubits <= n_qubits_trained else \
              np.concatenate([theta, 0.01*np.random.randn(n_layers_trained, n_qubits - n_qubits_trained, 3)], axis=1)

    # Compute QEK states
    states_train = np.array([expressive_state(x, theta_n) for x in X_train_qek_n])
    states_test  = np.array([expressive_state(x, theta_n) for x in X_test_qek_n])

    # Kernel
    K_train = compute_kernel(states_train)
    K_test  = compute_kernel(states_test, states_train)
    K_train /= np.linalg.norm(K_train)
    K_test  /= np.linalg.norm(K_train)

    # SVM
    clf = SVC(kernel="precomputed", C=C)
    clf.fit(K_train, y_train)
    y_pred = clf.predict(K_test)
    acc_qek = accuracy_score(y_test, y_pred)

    # Store everything
    results.append({
        "n_qubits": n_qubits,
        "acc_qek": acc_qek,
        "states_train": states_train,
        "states_test": states_test
    })

def combine_qek_qjpeg(qek_states, qjpeg_features, alpha=0.8):
    combined = [np.concatenate([alpha * q, (1-alpha) * c]) for q, c in zip(qek_states, qjpeg_features)]
    combined = [v / np.linalg.norm(v) for v in combined]
    return np.array(combined)

X_train_qjpeg_32 = np.array([qjpeg_to_32(img.reshape(28,28)) for img in X_train_img])
X_test_qjpeg_32  = np.array([qjpeg_to_32(img.reshape(28,28)) for img in X_test_img])

for r in results:
    n_qubits = r["n_qubits"]
    states_train_n = r["states_train"]
    states_test_n  = r["states_test"]

    X_train_comb = combine_qek_qjpeg(states_train_n, X_train_qjpeg_32, alpha)
    X_test_comb  = combine_qek_qjpeg(states_test_n, X_test_qjpeg_32, alpha)

    K_train_comb = compute_kernel(X_train_comb)
    K_test_comb  = compute_kernel(X_test_comb, X_train_comb)
    K_train_comb /= np.linalg.norm(K_train_comb)
    K_test_comb  /= np.linalg.norm(K_train_comb)

    clf_comb = SVC(kernel="precomputed", C=C)
    clf_comb.fit(K_train_comb, y_train)
    y_pred_comb = clf_comb.predict(K_test_comb)
    acc_combined = accuracy_score(y_test, y_pred_comb)
    r["acc_combined"] = acc_combined

for r in results:
    print(f"\n{'='*50}\nInference with {r["n_qubits"]} qubits\n{'='*50}")
    print(f"QEK accuracy: {r["acc_qek"]:.4f}")
    print(f"QEK + QJPEG accuracy: {r["acc_combined"]:.4f}")


Inference with 4 qubits
QEK accuracy: 0.9500
QEK + QJPEG accuracy: 0.9500

Inference with 5 qubits
QEK accuracy: 0.5500
QEK + QJPEG accuracy: 0.5500

Inference with 6 qubits
QEK accuracy: 0.8500
QEK + QJPEG accuracy: 0.8000

Inference with 7 qubits
QEK accuracy: 0.8500
QEK + QJPEG accuracy: 0.8000

Inference with 8 qubits
QEK accuracy: 0.8500
QEK + QJPEG accuracy: 0.8000
