# Aux

In [1]:
import torch
import torch.optim as optim
#%pip install torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset, TensorDataset
import numpy as np

from src.utils.data_loader import load_mnist_data


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import pennylane as qml
from pennylane import numpy as np

from src.circuits.convolution.default import default_circuit, full_entanglement_circuit
from src.circuits.convolution.NQ_circuit import NQ_circuit
from src.circuits.convolution.no_entanglement_circuit import no_entanglement_random_circuit
from src.circuits.embedding.default import angle_embedding, amplitude_embedding, QAOA_embedding
from src.circuits.embedding.IQP_embedding import custom_iqp_embedding




def custom_angle_embedding(features, wires, rotation='X'):
    """Custom angle embedding that supports batched inputs."""
    n_qubits = len(wires)
    # features shape: (batch_size, n_qubits)
    if rotation == 'X':
        for idx, wire in enumerate(wires):
            qml.RX(features[:, idx], wires=wire)
    elif rotation == 'Y':
        for idx, wire in enumerate(wires):
            qml.RY(features[:, idx], wires=wire)
    elif rotation == 'Z':
        for idx, wire in enumerate(wires):
            qml.RZ(features[:, idx], wires=wire)
    else:
        raise ValueError(f"Invalid rotation axis: {rotation}")

def custom_displacement_embedding(features, wires, method):  # pylint: disable=arguments-differ
        """Custom Displacement embedding that supports bathed inputs"""

        ''''
        constants = np.random.rand(len(features[1]))*np.pi
        if method == "amplitude":
            pars = qml.math.stack([features[:], constants], axis=1)

        elif method == "phase":
            pars = qml.math.stack([constants, features[:]], axis=1)
        '''    
        n_qubits = len(wires)
        for i in range(n_qubits):
            qml.Displacement(features[i, 0], features[i, 1], wires=wires[i : i + 1]) 


# Model

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import pennylane as qml
from pennylane import numpy as np
#from torchsummary import summary

class QuanvLayer(nn.Module):
    def __init__(self, qkernel_shape, embedding_type="angle", entanglement_type="random", 
                 custom_embedding=None, custom_entanglement=None, qdevice_kwargs=None,
                 iqp_repeats=1, iqp_pattern=None, amplitude_pad_with=None, amplitude_normalize=False,
                 amplitude_validate_norm=True, learning_rate = None, method = "amplitude"):
        super(QuanvLayer, self).__init__()

        self.qkernel_shape = qkernel_shape
        self.embedding_type = embedding_type
        self.entanglement_type = entanglement_type
        self.custom_embedding = custom_embedding
        self.custom_entanglement = custom_entanglement

        self.qdevice_kwargs = qdevice_kwargs or {}
        self.torch_device = device  # Use the global device
        self.qml_device = None      # Initialize PennyLane device during forward pass
        self.qnode = None           # Initialize quantum node during forward pass

        self.iqp_repeats = iqp_repeats
        self.iqp_pattern = iqp_pattern
        self.amplitude_pad_with = amplitude_pad_with
        self.amplitude_normalize = amplitude_normalize
        self.amplitude_validate_norm = amplitude_validate_norm
        self.learning_rate = learning_rate
        self.method = method


    def quantum_circuit(self, inputs):
        wires = range(self.qkernel_shape**2)
        batch_size = inputs.shape[0] if len(inputs.shape) == 2 else None
    
        if self.custom_embedding:
            self.custom_embedding(inputs, wires)
        else:
            if self.embedding_type == "angle":
                #qml.AngleEmbedding(inputs, wires=wires)
                custom_angle_embedding(
                    inputs,
                    wires = wires,
                    rotation = 'Y'
                )
            elif self.embedding_type == "amplitude":
                qml.AmplitudeEmbedding(inputs, wires=wires, pad_with=0.0, normalize=True, id=None)
            elif self.embedding_type == "IQP":
                custom_iqp_embedding(inputs, wires=wires, n_repeats=self.iqp_repeats)
            elif self.embedding_type == "Displacement":
                #qml.DisplacementEmbedding(inputs, wires = wires, method = self.method) # method = phase
                custom_displacement_embedding(inputs, wires = wires, method = self.method)
            elif self.embedding_type == "QAOA":
                if self.qkernel_shape == 2:
                    n = 8
                elif self.qkernel_shape == 3:
                    n = 18
                qml.QAOAEmbedding(inputs, weights = torch.rand(2, n)*np.pi, wires = wires)
            elif self.embedding_type == "Squeezing":
                qml.SqueezingEmbedding(inputs, wires = wires, method = self.method) # method = phase
            else:
                raise ValueError(f"Unsupported embedding type: {self.embedding_type}")
    
        if self.custom_entanglement:
            self.custom_entanglement(self.qkernel_shape**2)
        else:
            if self.entanglement_type == "proof":
                proof_circuit(self.qkernel_shape**2)
            elif self.entanglement_type == "random":
                random_circuit(self.qkernel_shape**2)
            elif self.entanglement_type == "full":
                full_entanglement_circuit(self.qkernel_shape**2)
            elif self.entanglement_type == "none":
                no_entanglement_random_circuit(self.qkernel_shape**2)
            else:
                raise ValueError(f"Unsupported entanglement type: {self.entanglement_type}")
    
        return [qml.expval(qml.PauliZ(w)) for w in wires]


    def forward(self, x):
        batch_size, _, height, width = x.size()
        x = x.to(self.torch_device)
        patch_size = self.qkernel_shape ** 2
    
        if self.qnode is None:
            qml_device_name = self.qdevice_kwargs.pop('qml_device_name', 'default.qubit')
            self.qml_device = qml.device(
                qml_device_name, wires=patch_size, **self.qdevice_kwargs, shots=None
            )
            self.qnode = qml.QNode(
                self.quantum_circuit,
                self.qml_device,
                interface='torch',
                diff_method='backprop',
                vectorized=True
            )
    
        out_height = max(1, height - self.qkernel_shape + 1)
        out_width = max(1, width - self.qkernel_shape + 1)
    
        # Collect all patches into a single tensor
        patches = x.unfold(2, self.qkernel_shape, 1).unfold(3, self.qkernel_shape, 1)
        patches = patches.contiguous().view(batch_size, -1, self.qkernel_shape ** 2)
        patches = patches.view(-1, self.qkernel_shape ** 2)
    
        # Ensure patches are on the correct device
        patches = patches.to(self.torch_device)
    
        # Process all patches at once
        outputs = self.qnode(patches)
        outputs = torch.stack(outputs, dim=1)  # Stack outputs into a tensor
        outputs = outputs.float()
    
        # Reshape outputs back to (batch_size, n_wires, out_height, out_width)
        outputs = outputs.view(
            batch_size,
            out_height,
            out_width,
            -1
        ).permute(0, 3, 1, 2)
        return outputs


class QuanvolutionalNet(nn.Module):
    def __init__(self, qkernel_shape=2, classical_kernel_shape=3, embedding_type="angle", 
                 entanglement_type="random", n_classes=10, use_quantum=True, 
                 custom_embedding=None, custom_entanglement=None, qdevice_kwargs=None,
                 iqp_repeats=1, iqp_pattern=None, amplitude_pad_with=None, amplitude_normalize=False,
                 amplitude_validate_norm=True, epochs=50, batch_size=32, learning_rate = None, method = "amplitude"):
        super(QuanvolutionalNet, self).__init__()
        self.use_quantum = use_quantum
        self.qkernel_shape = qkernel_shape
        self.device = device  # Use the global device

        self.method = method
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.batch_size = batch_size

        if use_quantum:
            self.quanv = QuanvLayer(
                qkernel_shape=qkernel_shape,
                embedding_type=embedding_type,
                entanglement_type=entanglement_type,
                custom_embedding=custom_embedding,
                custom_entanglement=custom_entanglement,
                qdevice_kwargs=qdevice_kwargs,
                iqp_repeats=iqp_repeats,
                iqp_pattern=iqp_pattern,
                amplitude_pad_with=amplitude_pad_with,
                amplitude_normalize=amplitude_normalize,
                amplitude_validate_norm=amplitude_validate_norm
            ).to(self.device)
            in_channels = self.qkernel_shape**2
        else:
            self.conv1_classical = nn.Conv2d(1, qkernel_shape**2, kernel_size=qkernel_shape).to(self.device)
            in_channels = qkernel_shape**2

        self.conv1 = nn.Conv2d(in_channels, 16, kernel_size=classical_kernel_shape).to(self.device)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=2, padding=1).to(self.device)
        self.fc1 = None
        self.fc2 = nn.Linear(128, n_classes).to(self.device)

        #print("Neural Network Structure:")
        #summary(self, (1, 8, 8))



    def forward(self, x):
        x = x.to(self.device)
        if self.use_quantum:
            x = self.quanv(x)
        else:
            x = self.conv1_classical(x)
        
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.conv2(x)
        x = torch.relu(x)
        x = x.reshape(x.size(0), -1)
    
        if self.fc1 is None:
            self.fc1 = nn.Linear(x.size(1), 128).to(self.device)
    
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.fc2(x)
        return x


    def fit(self, X_train=None, y_train=None, train_loader=None, criterion=nn.CrossEntropyLoss(), optimizer=None, epochs=None, batch_size=None):

        if epochs is not None:
            self.epochs = epochs

        if batch_size is not None:
            self.batch_size = batch_size

        
        if train_loader is None and X_train is not None and y_train is not None:
            if isinstance(X_train, np.ndarray):
                X_train = torch.tensor(X_train, dtype=torch.float32)
            if isinstance(y_train, np.ndarray):
                y_train = torch.tensor(y_train, dtype=torch.long)  # Assuming y_train are class labels
    
            X_train = X_train.to(self.device)
            y_train = y_train.to(self.device)
    
            dataset = TensorDataset(X_train, y_train)
            train_loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

        
        if optimizer is None:
            optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
    
        self.train()
        self.to(self.device)
        
        for epoch in range(self.epochs):
            running_loss = 0.0
            for inputs, labels in train_loader:
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)
                optimizer.zero_grad()
                outputs = self(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
    
            average_loss = running_loss / len(train_loader)

    def predict(self, X=None, batch_size=32):
        self.eval()  # Set the model to evaluation mode
        self.to(self.device)
    
        if isinstance(X, np.ndarray):
            X = torch.tensor(X, dtype=torch.float32)
    
        X = X.to(self.device)
    
        dataset = TensorDataset(X)
        data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    
        all_predictions = []
        with torch.no_grad():
            for inputs in data_loader:
                inputs = inputs[0].to(self.device)
                outputs = self(inputs)
                _, predicted = torch.max(outputs.data, 1)
                all_predictions.append(predicted.cpu())  # Move to CPU for concatenation
    
        all_predictions = torch.cat(all_predictions)
    
        return all_predictions
    


    def __name__(self):
        return self.__class__.__name__

# Train

In [16]:
#%pip install qcml
#%pip install scikit-learn
#%pip install dask
#%pip install pynvml
from qcml.bench.grid_search import GridSearch
from qcml.utils.log import log_setup
import os

log_settings = {
    "output": "both",
    "terminal_level": "INFO",
    "file_level": "DEBUG",
    "hide_logs": ["jax", "pennylane", "bokeh", "distributed"],
    "slack_notify": False
}

log_setup(**log_settings)

X_train, y_train, X_val, y_val = load_mnist_data(batch_size=4, output='np', limit=300)

os.environ["JAX_PLATFORM_NAME"] = "cpu"
os.environ["XLA_PYTHON_CLIENT_PREALLOCATE"] = "false"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
torch.backends.cudnn.benchmark = True

2024-09-30 10:34:38,876 - root - [94mINFO[0m - Logging is set up. (log.py:391)
2024-09-30 10:34:38,877 - root - [94mINFO[0m - jax logs are set to ERROR level. (log.py:421)
2024-09-30 10:34:38,877 - root - [94mINFO[0m - pennylane logs are set to ERROR level. (log.py:421)
2024-09-30 10:34:38,878 - root - [94mINFO[0m - bokeh logs are set to ERROR level. (log.py:421)
2024-09-30 10:34:38,878 - root - [94mINFO[0m - distributed logs are set to ERROR level. (log.py:421)


Using device: cuda


In [17]:
embedding_type = "QAOA"

#lrs = linspace_manual(0.008, 0.01, 6)
lrs = torch.linspace(0.0001, 0.01, 7, requires_grad=False)

learning_rates = [lr.item() for lr in lrs]

if embedding_type == "all":

    # Angle, amplitude and IQP embedding
    
    model_grid_quantum = {
        "qkernel_shape": [2, 3, 4],
        "classical_kernel_shape": [2, 3, 5],
        "embedding_type": ["angle", "amplitude", "IQP"],
        "entanglement_type": ["random", "full", "none", "proof"],
        "n_classes": [10],
        "use_quantum": [True],
        "iqp_repeats": [1, 2, 3],
        "amplitude_pad_with": [None, 0.0],
        "amplitude_normalize": [True, False],
        "amplitude_validate_norm": [True, False],
        "learning_rate": learning_rates
    }

elif embedding_type == "angle":

    # Angle embedding
    
    model_grid_quantum = {
        "qkernel_shape": [2, 3],
        "classical_kernel_shape": [2, 3, 5],
        "embedding_type": [embedding_type],
        "entanglement_type": [ "none"],
        "n_classes": [10],
        "learning_rate": learning_rates,
        "use_quantum": [True]
    }

elif embedding_type == "QAOA":

    # QAOA embedding
    
    model_grid_quantum = {
        "qkernel_shape": [2],
        "classical_kernel_shape": [2, 3, 5],
        "embedding_type": ["QAOA"],
        "entanglement_type": [ "none"],
        "n_classes": [10],
        "learning_rate": learning_rates,
        "use_quantum": [True]
    }

elif embedding_type == "amplitude":

    # Amplitude embedding

    model_grid_quantum = {
        "qkernel_shape": [2, 3],
        "classical_kernel_shape": [2, 3, 5],
        "embedding_type": ["amplitude"],
        "entanglement_type": ["none"],
        "n_classes": [10],
        "learning_rate": learning_rates,
        "use_quantum": [True],
        "amplitude_pad_with": [None, 0.0],
        "amplitude_normalize": [False],
        "amplitude_validate_norm": [True]
    }

elif embedding_type == "IQP":

    # IQP embedding

    model_grid_quantum = {
        "qkernel_shape": [2, 3],
        "classical_kernel_shape": [2, 3, 5],
        "embedding_type": ["IQP"],
        "entanglement_type": ["proof"],
        "n_classes": [10],
        "learning_rate": learning_rates,
        "use_quantum": [True],
        "iqp_repeats": [1, 2]
    }

elif embedding_type == "Displacement" or embedding_type == "Squeezing":

    # Displacement or Squeezing embedding

        model_grid_quantum = {
        "qkernel_shape": [2, 3],
        "classical_kernel_shape": [2, 3, 5],
        "embedding_type": [embedding_type],
        "entanglement_type": [ "none"],
        "n_classes": [10],
        "learning_rate": learning_rates,
        "method": ["amplitude"],
        "use_quantum": [True]
    }

    


experiment_name = "quantum_mnist_" + embedding_type + ","

gs = GridSearch(
    classifiers=[QuanvolutionalNet],
    param_grid=model_grid_quantum,
    #checkpoint_interval=1,
    batch_size=50,
    #info_eval_criteria="single",
    experiment_name=experiment_name
).run(X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val)

2024-09-30 10:34:39,542 - qcml.bench.grid_search - [94mINFO[0m - Starting grid search with QuanvolutionalNet, 21 combinations, 50 parallel jobs (grid_search.py:165)
2024-09-30 10:34:48,734 - qcml.bench.grid_search - [94mINFO[0m - Evaluated params: {'qkernel_shape': 2, 'classical_kernel_shape': 2, 'embedding_type': 'QAOA', 'entanglement_type': 'none', 'n_classes': 10, 'learning_rate': 0.0001, 'use_quantum': True}, accuracy: 0.1367 (grid_search.py:241)
2024-09-30 10:34:57,896 - qcml.bench.grid_search - [94mINFO[0m - Evaluated params: {'qkernel_shape': 2, 'classical_kernel_shape': 2, 'embedding_type': 'QAOA', 'entanglement_type': 'none', 'n_classes': 10, 'learning_rate': 0.00175, 'use_quantum': True}, accuracy: 0.2167 (grid_search.py:241)
2024-09-30 10:35:07,053 - qcml.bench.grid_search - [94mINFO[0m - Evaluated params: {'qkernel_shape': 2, 'classical_kernel_shape': 2, 'embedding_type': 'QAOA', 'entanglement_type': 'none', 'n_classes': 10, 'learning_rate': 0.0034, 'use_quantum': T

In [7]:
model_grid_classical = {
    "classical_kernel_shape": [2, 3, 5],
    "n_classes": [10],
    "use_quantum": [False],
}

gs = GridSearch(
    classifiers=[QuanvolutionalNet],
    param_grid=model_grid_classical,
    #checkpoint_interval=1,
    batch_size=50,
    experiment_name="classical_mnist,"
).run(X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val)


2024-09-30 09:15:21,877 - qcml.bench.grid_search - [94mINFO[0m - Starting grid search with QuanvolutionalNet, 3 combinations, 50 parallel jobs (grid_search.py:165)
2024-09-30 09:15:21,880 - qcml.bench.grid_search - [91mERROR[0m - Error with parameters {'classical_kernel_shape': 2, 'n_classes': 10, 'use_quantum': False}: '<=' not supported between instances of 'float' and 'NoneType' (grid_search.py:246)


TypeError: '<=' not supported between instances of 'float' and 'NoneType'

# Sin bench

In [None]:
# Example usage with limit
train_loader, test_loader = load_mnist_data(batch_size=4, img_size=8, limit=10)


model = QuanvolutionalNet(
    qkernel_shape=2, 
    classical_kernel_shape=3, 
    embedding_type="angle",  # Can be "amplitude", "angle", or "IQP"
    entanglement_type="random", 
    n_classes=10, 
    use_quantum=True,  # Change to False for classical variant
    qdevice_kwargs={"shots": None},
    iqp_repeats=2, 
    iqp_pattern=[[0, 1], [1, 2]],  
    amplitude_pad_with=0.0, 
    amplitude_normalize=True, 
    amplitude_validate_norm=True
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

model.fit(train_loader, criterion, optimizer, epochs=10)

model.eval()  # Set the model to evaluation mode
correct = 0
total = 0
with torch.no_grad():  # Disable gradient computation for evaluation
    for inputs, labels in test_loader:
        outputs = model.predict(inputs)
        total += labels.size(0)
        correct += (outputs == labels).sum().item()

accuracy = 100 * correct / total

In [18]:
# Paso 1: Definimos el dispositivo
dev = qml.device("default.qubit", wires=3)

# Paso 2: Definimos el circuito (QNode) usando la función manual_IQPEmbedding
@qml.qnode(dev)

def custom_iqp_embedding(features, wires, n_repeats=1, pattern=None):
    """
    Custom IQP embedding that supports batched inputs.

    Args:
        features (torch.Tensor): Input features with shape (batch_size, n_qubits).
        wires (Iterable): Wires to apply the embedding on.
        n_repeats (int): Number of times to repeat the embedding pattern.
        pattern (List[Tuple[int, int]]): Custom entanglement pattern.
    """
    

def circuit(n_qubits):
    wires = range(n_qubits)
    for wire in range(n_qubits-1):
        qml.RY(np.random.rand()*np.pi, wires = wire)
        qml.RZ(np.random.rand()*np.pi, wires = wire)

    qml.Barrier()
    
    for wire in range(n_qubits-2):
        qml.CNOT(wires = [wires[-wire-3], wires[-wire -2]])

    qml.Barrier()

    for wire in range(n_qubits-1):
        qml.RY(np.random.rand()*np.pi, wires = wire)
        qml.RZ(np.random.rand()*np.pi, wires = wire)

    qml.Barrier()
    
    for wire in range(n_qubits-2):
        qml.CNOT(wires = [wires[-wire-3], wires[-wire -2]])

    qml.Barrier()

    for wire in range(n_qubits-1):
        qml.RY(np.random.rand()*np.pi, wires = wire)
        qml.RZ(np.random.rand()*np.pi, wires = wire)

        
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits-1)]  # Retornamos expectativas (no afecta la visualización)

# Paso 3: Creamos los datos de entrada


# Paso 4: Dibujamos el circuito
print(qml.draw(circuit)(5))

0: ──RY(0.49)──RZ(2.19)──||───────╭●──||──RY(0.85)──RZ(1.15)──||───────╭●──||──RY(2.30)──RZ(0.37)─┤
1: ──RY(1.05)──RZ(0.60)──||────╭●─╰X──||──RY(0.46)──RZ(0.70)──||────╭●─╰X──||──RY(2.92)──RZ(0.19)─┤
2: ──RY(0.48)──RZ(2.10)──||─╭●─╰X─────||──RY(0.93)──RZ(3.14)──||─╭●─╰X─────||──RY(2.19)──RZ(2.39)─┤
3: ──RY(0.10)──RZ(0.31)──||─╰X────────||──RY(2.66)──RZ(1.46)──||─╰X────────||──RY(1.33)──RZ(0.96)─┤

   <Z>
   <Z>
   <Z>
   <Z>
