In [None]:
from qiskit import QuantumCircuit, QuantumRegister
from qiskit_algorithms.utils import algorithm_globals
from sklearn import datasets
import matplotlib.pyplot as plt
from sklearn.metrics import silhouette_score
import sklearn.metrics.cluster as cluster_metrics
import numpy as np
from qiskit.circuit import ParameterVector
from qiskit_machine_learning.circuit.library import RawFeatureVector
from sklearn.manifold import TSNE
from qiskit.circuit.library import RealAmplitudes, TwoLocal
from sklearn import metrics
from sklearn.model_selection import train_test_split
import pickle
from sklearn.cluster import AgglomerativeClustering, KMeans
import torch
from torch import nn
from qiskit_machine_learning.connectors import TorchConnector
from qiskit_machine_learning.neural_networks import SamplerQNN
from torchsummary import summary
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision import datasets
import math

# SEED

Configuration of the random seed for experimental reproducibility.

In [None]:
# Set all needed seeds
import random
import numpy as np
import torch
import os

def set_seed(seed=42):
    """
    Set seed for reproducibility across all random number generators.
    
    Args:
        seed (int): Seed value for reproducibility
    """
    # Python random module
    random.seed(seed)
    
    # NumPy random generator
    np.random.seed(seed)
    
    # PyTorch random generator
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
    # CUDA deterministic operations
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
    # Qiskit determinism
    algorithm_globals.random_seed = seed
    
    # Python hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)

set_seed(42)

# UTILITIES

Utility functions for metrics evaluation and environment management.

In [None]:
class color:
   PURPLE = '\033[95m'
   CYAN = '\033[96m'
   DARKCYAN = '\033[36m'
   BLUE = '\033[94m'
   GREEN = '\033[92m'
   YELLOW = '\033[93m'
   RED = '\033[91m'
   BOLD = '\033[1m'
   UNDERLINE = '\033[4m'
   END = '\033[0m'

print(f"CUDA Available: {torch.cuda.is_available()}")
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Using {color.BOLD}{str(device).upper()}{color.END} acceleration")

In [None]:
def purity_score(y_true, y_pred):
    """
    Compute the purity score for clustering evaluation.
    
    Args:
        y_true: Ground truth labels
        y_pred: Predicted cluster labels
    
    Returns:
        float: Purity score
    """
    contingency_matrix = cluster_metrics.contingency_matrix(y_true, y_pred)
    return np.sum(np.amax(contingency_matrix, axis=0)) / np.sum(contingency_matrix)

def evaluate_clustering(X, pred_y, true_y):
    """
    Evaluate clustering performance using silhouette and purity metrics.
    
    Args:
        X: Feature matrix
        pred_y: Predicted cluster labels
        true_y: Ground truth labels
    """
    print(f"Silhouette: {silhouette_score(X, pred_y):2.3f}")
    print(f"Purity: {purity_score(true_y, pred_y):2.3f}")

def evaluate_clustering_table(X, pred_y, true_y):
    """
    Evaluate clustering performance and return metrics as tuple.
    
    Args:
        X: Feature matrix
        pred_y: Predicted cluster labels
        true_y: Ground truth labels
    
    Returns:
        tuple: (silhouette_score, purity_score)
    """
    return silhouette_score(X, pred_y), purity_score(true_y, pred_y)

# DATASET IMPORT

Dataset importation and configuration for training and testing procedures.

In [None]:
# ===========================
# DATASET CONFIGURATION
# ===========================
# Select dataset: "MNIST" or "FashionMNIST"
DATASET_TYPE = "MNIST"  # Change to "FashionMNIST" to use Fashion-MNIST

INITIAL_TRAINING_SAMPLES = 400


MNIST_base_transform = transforms.Compose([
transforms.ToTensor()
])

# Dataset loading based on configuration
if DATASET_TYPE == "MNIST":
    MNIST_dataset_train = datasets.MNIST(root="quantum_metric_learning-master/data/dataset/MNIST", train=True, download=True, transform=MNIST_base_transform)
    MNIST_dataset_test = datasets.MNIST(root="quantum_metric_learning-master/data/dataset/MNIST", train=False, download=True, transform=MNIST_base_transform)
    print(f"{color.BOLD}Using MNIST dataset{color.END}")
elif DATASET_TYPE == "FashionMNIST":
    MNIST_dataset_train = datasets.FashionMNIST(root="quantum_metric_learning-master/data/dataset/FashionMNIST", train=True, download=True, transform=MNIST_base_transform)
    MNIST_dataset_test = datasets.FashionMNIST(root="quantum_metric_learning-master/data/dataset/FashionMNIST", train=False, download=True, transform=MNIST_base_transform)
    print(f"{color.BOLD}Using Fashion MNIST dataset{color.END}")
else:
    raise ValueError(f"Unknown dataset type: {DATASET_TYPE}. Choose 'MNIST' or 'FashionMNIST'")

if False:
    index01 = MNIST_dataset_train.train_labels <= 1
    MNIST_dataset_train.data = MNIST_dataset_train.data[index01]
    MNIST_dataset_train.targets = MNIST_dataset_train.targets[index01]

    index01_test = MNIST_dataset_test.train_labels <= 1
    MNIST_dataset_test.data = MNIST_dataset_test.data[index01_test]
    MNIST_dataset_test.targets = MNIST_dataset_test.targets[index01_test]

print(f"FULL DATASET INFORMATION")
print(f"Image shape: {MNIST_dataset_train.data.shape}")
print(f"Total training samples: {len(MNIST_dataset_train)}")
print(f"Total test samples: {len(MNIST_dataset_test)}")
print("")

init_train_data, _, init_train_target, _ = train_test_split(
    range(len(MNIST_dataset_train)), 
    MNIST_dataset_train.targets,
    random_state=42,
    stratify=MNIST_dataset_train.targets,
    test_size=len(MNIST_dataset_train)- INITIAL_TRAINING_SAMPLES)

X = MNIST_dataset_train.data[init_train_data].numpy().astype("float32") / 255
y = MNIST_dataset_train.targets[init_train_data].numpy().astype("float32") 

print("TRAINING DATA INFORMATION")
print(f"Image shape: {X.shape}")
print(f"Total training samples: {len(X)}")
print("")

'''
init_test_data, _, init_test_target, _ = train_test_split(
    range(len(MNIST_dataset_test)), 
    MNIST_dataset_test.targets,
    random_state=42,
    stratify=MNIST_dataset_test.targets,
    test_size=len(MNIST_dataset_test)- INITIAL_TESTING_SAMPLES)
'''

X_test = MNIST_dataset_test.data.numpy().astype("float32") / 255
y_test = MNIST_dataset_test.targets.numpy().astype("float32")

print("TESTING DATA INFORMATION")
print(f"Image shape: {X_test.shape}")
print(f"Total test samples: {len(X_test)}")
print("")

In [None]:
class MNIST_Distance_Dataset_Triplet_Loss(Dataset):
    """
    PyTorch Dataset for triplet loss learning on MNIST-like datasets.
    
    Generates triplets (anchor, positive, negative) for metric learning,
    where positive samples share the same class as anchor, while negative
    samples belong to different classes.
    
    Args:
        data: Image data array
        target: Label array
    """
    def __init__(self, data, target):
        self.data = data
        self.target = target
        self.kers = np.ones((self.data.shape[0], self.data.shape[0]))
        self.output_transform = transforms.ToTensor()
        self.len = len(self.target)
        self.classes = {i:(np.where(self.target == i)[0], np.where(self.target != i)[0] ) for i in range(10)}


    def __getitem__(self, idx):
        """
        Returns a triplet: (anchor, positive, negative).
        
        Args:
            idx: Index of the anchor sample
        
        Returns:
            tuple: (anchor, positive, negative) samples
        """
        return self.get_anchor(idx), self.get_positive(idx), self.get_negative(idx)
    
    def get_anchor(self, idx):
        """Returns the anchor sample."""
        return self.output_transform(self.data[idx])
    

    def get_positive(self, idx):
        """Returns a positive sample (same class as anchor)."""
        i = np.random.choice(self.classes[self.target[idx]][0])
        return self.output_transform(self.data[i])
    
    def get_negative(self, idx):
        """Returns a negative sample (different class from anchor)."""
        i = np.random.choice(self.classes[self.target[idx]][1])
        return self.output_transform(self.data[i])


    def get_order(self):
        """Returns the sorting order of samples by label."""
        return self.target.argsort()


    def ordered_pairwise(self):
        """Returns ordered pairwise kernel matrix."""
        return self.kers[:,self.get_order()][self.get_order(),:]


    def get_flatten(self):
        """Returns flattened image data."""
        return self.data.reshape((self.data.shape[0], self.data.shape[1]**2))


    def __len__(self):
        """Returns the total number of samples."""
        return self.len

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.25, random_state=42)
t = MNIST_Distance_Dataset_Triplet_Loss(X_train, y_train)
v = MNIST_Distance_Dataset_Triplet_Loss(X_val, y_val)

print(f"Training set size: {len(t)}")
print(f"Validation set size: {len(v)}")

anchor, pos, neg = t[0]

plt.matshow(anchor.squeeze(0))
plt.matshow(pos.squeeze(0))
plt.matshow(neg.squeeze(0))

In [None]:
reduction_model = TSNE(n_components=2, perplexity=30, random_state=42)

vis_x = reduction_model.fit_transform(t.get_flatten(), t.target)

fig, ax = plt.subplots()
fig.suptitle("Data visualization using TSNE", weight="bold")
ax.scatter(vis_x[:,0], vis_x[:,1], c=t.target, cmap='Dark2')

# ENCODERS

Definition of encoding strategies for data representation in quantum circuits.

### Amplitude encoding

In [None]:
def amplitude_encoding(n_features, param_name):
    """
    Amplitude encoding: requires n = 2^k features.
    
    Args:
        n_features: Number of features to encode
        param_name: Name for the parameter vector
    
    Returns:
        QuantumCircuit: Amplitude encoding circuit
    """
    qc = RawFeatureVector(n_features)
    qc = qc.assign_parameters(ParameterVector(param_name, n_features))
    qc.name = f"Amplitude Encoding {param_name}"
    return qc

amplitude_encoding(4, "a").draw(output="mpl")

### YZ angles encoding

In [None]:
def yz_angles_encoding(n_features, param_name="x", params=None):
    """
    YZ angles encoding using RY and RZ rotations.
    
    Args:
        n_features: Number of features to encode
        param_name: Parameter vector name (creates new ParameterVector)
        params: Existing ParameterVector to reuse
    
    Note: Specify only one of param_name or params
    
    Returns:
        QuantumCircuit: YZ encoding circuit
    """
    if params is None:
        params = ParameterVector(param_name, n_features)
    
    n_qubit = math.ceil(n_features / 2)
    qc = QuantumCircuit(n_qubit, name=f"Angles Encoding")
    gates = [qc.ry, qc.rz]

    for i in range(n_qubit):
        for gate_i in range(2):
            pindex = i*2 + gate_i
            if pindex < n_features:
                gates[gate_i](params[pindex], i)

    return qc

yz_angles_encoding(8, param_name="x").draw(output="mpl")

### Pooling layer

In [None]:
def pooling_layer(in_lane, param_prefix="pool"):
    """
    Pooling layer for quantum circuit dimensionality reduction.
    
    Args:
        in_lane: Number of input qubits (must be even)
        param_prefix: Prefix for parametrized gates
    
    Returns:
        QuantumCircuit: Pooling layer circuit
    """
    qc = QuantumCircuit(in_lane, name="Pooling Layer")
    params = ParameterVector(param_prefix, length=in_lane //2 *3)

    for i in range(in_lane//2):
        current = i
        aux = i+ in_lane//2

        base_param =  current*(in_lane//2 -1)

        qc.rz(-np.pi/2, aux)
        qc.cx(aux, current)
        qc.rz(params[base_param + 0], current)
        qc.ry(params[base_param + 1], aux)
        qc.cx(current, aux)
        qc.ry(params[base_param + 2], aux)

    return qc

pooling_layer(4, param_prefix="m").draw(output="mpl")

# ENCODING VARIATIONS

Alternative encoding strategies and variations for advanced experiments.

### HRyRx encoding

In [None]:
def hRyRx_encoding(n_features):
    """
    Hadamard-RY-RX encoding strategy.
    
    Applies Hadamard gate followed by RY and RX rotations on each qubit.
    
    Args:
        n_features: Number of features to encode (must be even)
    
    Returns:
        QuantumCircuit: HRyRx encoding circuit
    """
    # Number of qubits required
    n_qubits = math.floor(n_features /2) + (1 if (n_features % 2) != 0 else 0)
    # Feature vector from neural network
    n_feature = n_features
    feature_map = QuantumCircuit(n_qubits)
    input_params = ParameterVector(name='x', length=n_feature)
    idx = 0
    for i in range(n_qubits):
        feature_map.h(i)
        feature_map.ry(input_params[idx], i)
        feature_map.rx(input_params[idx+1], i)
        idx +=2

    return feature_map

hRyRx_encoding(8).draw(output="mpl")

### HRyRz encoding

In [None]:
def hRyRz_encoding(n_features):
    """
    Hadamard-RY-RZ encoding strategy.
    
    Applies Hadamard gate followed by RY and RZ rotations on each qubit.
    
    Args:
        n_features: Number of features to encode (must be even)
    
    Returns:
        QuantumCircuit: HRyRz encoding circuit
    """
    # Number of qubits required
    n_qubits = math.floor(n_features /2) + (1 if (n_features % 2) != 0 else 0)
    # Feature vector from neural network
    n_feature = n_features
    feature_map = QuantumCircuit(n_qubits)
    input_params = ParameterVector(name='x', length=n_feature)
    idx = 0
    for i in range(n_qubits):
        feature_map.h(i)
        feature_map.ry(input_params[idx], i)
        feature_map.rz(input_params[idx+1], i)
        idx +=2
    return feature_map

hRyRz_encoding(8).draw(output="mpl")

### X angles encoding

Feature encoding through RX rotations applied to each qubit.

In [None]:
def x_angles_encoding(n_features, param_name):
    """
    X angles encoding using RX rotations.
    
    Args:
        n_features: Number of features to encode
        param_name: Name for the parameter vector
    
    Returns:
        QuantumCircuit: RX encoding circuit
    """
    params = ParameterVector(param_name, n_features)
    n_qubit = n_features
    qc = QuantumCircuit(n_qubit, name=f"Angles Encoding {param_name}")
    

    for i in range(n_qubit):
        qc.rx(params[i], i)

    return qc

x_angles_encoding(4, "a").draw(output="mpl")

### RxRy encoding

Feature encoding through alternating RX and RY rotations.

In [None]:
def RxRy_encoding(n_features, params=None):
    """
    RxRy encoding using RX and RY rotations.
    
    Args:
        n_features: Number of features to encode
        params: ParameterVector to use (if None, creates new with name 'x')
    
    Returns:
        QuantumCircuit: RxRy encoding circuit
    """
    if params is None:
        params = ParameterVector(name='x', length=n_features)
    
    n_qubits = math.ceil(n_features / 2)
    feature_map = QuantumCircuit(n_qubits)
    idx = 0
    
    for i in range(n_qubits):
        if idx < n_features:
            feature_map.rx(params[idx], i)
        if idx + 1 < n_features:
            feature_map.ry(params[idx + 1], i)
        idx += 2
    
    return feature_map

RxRy_encoding(8).draw(output="mpl")

### RxRz encoding

Feature encoding through alternating RX and RZ rotations.

In [None]:
def RxRz_encoding(n_features, params=None):
    """
    RxRz encoding using RX and RZ rotations.
    
    Args:
        n_features: Number of features to encode
        params: ParameterVector to use (if None, creates new with name 'x')
    
    Returns:
        QuantumCircuit: RxRz encoding circuit
    """
    if params is None:
        params = ParameterVector(name='x', length=n_features)
    
    n_qubits = math.ceil(n_features / 2)
    feature_map = QuantumCircuit(n_qubits)
    idx = 0
    
    for i in range(n_qubits):
        if idx < n_features:
            feature_map.rx(params[idx], i)
        if idx + 1 < n_features:
            feature_map.rz(params[idx + 1], i)
        idx += 2
    
    return feature_map

RxRz_encoding(8).draw(output="mpl")

### RyRx encoding

Feature encoding through alternating RY and RX rotations.

In [None]:
def RyRx_encoding(n_features, params=None):
    """
    RyRx encoding using RY and RX rotations.
    
    Args:
        n_features: Number of features to encode
        params: ParameterVector to use (if None, creates new with name 'x')
    
    Returns:
        QuantumCircuit: RyRx encoding circuit
    """
    if params is None:
        params = ParameterVector(name='x', length=n_features)
    
    n_qubits = math.ceil(n_features / 2)
    feature_map = QuantumCircuit(n_qubits)
    idx = 0
    
    for i in range(n_qubits):
        if idx < n_features:
            feature_map.ry(params[idx], i)
        if idx + 1 < n_features:
            feature_map.rx(params[idx + 1], i)
        idx += 2
    
    return feature_map

RyRx_encoding(8).draw(output="mpl")

### RzRx encoding

Feature encoding through alternating RZ and RX rotations.

In [None]:
def RzRx_encoding(n_features, params=None):
    """
    RzRx encoding using RZ and RX rotations.
    
    Args:
        n_features: Number of features to encode
        params: ParameterVector to use (if None, creates new with name 'x')
    
    Returns:
        QuantumCircuit: RzRx encoding circuit
    """
    if params is None:
        params = ParameterVector(name='x', length=n_features)
    
    n_qubits = math.ceil(n_features / 2)
    feature_map = QuantumCircuit(n_qubits)
    idx = 0
    
    for i in range(n_qubits):
        if idx < n_features:
            feature_map.rz(params[idx], i)
        if idx + 1 < n_features:
            feature_map.rx(params[idx + 1], i)
        idx += 2
    
    return feature_map

RzRx_encoding(8).draw(output="mpl")

### Nonlinear full entanglement encoding

Feature map with nonlinearity and full entanglement structure among qubits.

In [None]:
def feature_map_nonlinear_full_entaglement(num_features, params):
    """
    Feature map with nonlinearity (quadratic) and full entanglement.
    
    Args:
        num_features: Number of features to encode
        params: Parameter values for encoding
    
    Returns:
        QuantumCircuit: Nonlinear fully entangled feature map
    """
    qc = QuantumCircuit(num_features)
    for i in range(num_features):
        qc.ry(params[i] ** 2, i)
    for i in range(num_features):
        for j in range(i + 1, num_features):
            qc.cz(i, j)
    return qc

feature_map_nonlinear_full_entaglement(4, [0.1, 0.2, 0.3, 0.4]).draw(output='mpl')

### Standard Reuploading Encoding

Standard encoding scheme with cyclic reuploading of features.

In [None]:
def standard_reuploading_encoding(n_features, param_name="x", layers=3):
    """
    Standard Data Reuploading: data is loaded multiple times into the circuit
    in successive layers with different encodings and entanglement.
    Only encoding and CNOT gates, no ansatz.
    
    Structure (repeating cycle):
    - Layer 0, 3, 6, ...: yz_angles_encoding (RY, RZ)
    - Layer 1, 4, 7, ...: RxRy_encoding (RX, RY)
    - Layer 2, 5, 8, ...: RzRx_encoding (RZ, RX)
    
    Args:
        n_features: Number of input features
        param_name: Parameter name
        layers: Number of reuploading layers (default=3)
    
    Returns:
        QuantumCircuit: Standard reuploading circuit
    """
    n_qubits = math.ceil(n_features / 2)
    qc = QuantumCircuit(n_qubits, name=f"Standard Reuploading")
    params = ParameterVector(param_name, n_features)
    
    for layer in range(layers):
        # Encoder sequence repeats cyclically
        encoder_choice = layer % 3
        
        if encoder_choice == 0:
            # YZ encoding (RY, RZ)
            encoder_block = yz_angles_encoding(n_features, params=params)
                
        elif encoder_choice == 1:
            # RX-RY encoding (RX, RY)
            encoder_block = RxRy_encoding(n_features, params=params)
                
        else:
            # RZ-RX encoding (RZ, RX)
            encoder_block = RzRx_encoding(n_features, params=params)

        qc.compose(encoder_block, inplace=True)
        qc.barrier()
        
        # Entanglement layer with CNOT gates
        for i in range(n_qubits - 1):
            qc.cx(i, i + 1)
        
        # Circular entanglement
        if n_qubits > 2:
            qc.cx(n_qubits - 1, 0)
        
        qc.barrier()
        qc.barrier()
    
    return qc

# Circuit test
standard_reuploading_encoding(8, param_name="x", layers=2).draw(output="mpl")

### Partial Reuploading Encoding

Encoding scheme with partial reuploading and sliding window mechanism.

In [None]:
def partial_reuploading_encoding(n_features, layers=3, features_per_layer=4):
    """
    Partial Data Reuploading with sequential sliding window and cyclic wrapping.
    Each layer loads DIFFERENT features on overlapping qubits sequentially.
    The feature cycle is determined by layers * features_per_layer and restarts.
    Only encoding (RY, RZ) and entanglement (CNOT), no ansatz.
    
    Example with 3 layers, 4 features/layer:
    - Layer 0: features 0-3  → qubits (0,1)
    - Layer 1: features 4-7  → qubits (1,2)  
    - Layer 2: features 0-3  → qubits (2,3)  [restarts from beginning]
    Total cycle: 8 features, independent of n_features
    
    Args:
        n_features: TOTAL number of available features (must be >= features_per_layer)
        layers: Number of encoding blocks
        features_per_layer: How many features to load in each layer (must be even)
    
    Returns:
        QuantumCircuit: Partial reuploading sequential with wrapping
    """
    # Ensure features_per_layer is even
    if features_per_layer % 2 != 0:
        features_per_layer += 1
        print(f"Adjusted features_per_layer to {features_per_layer} for pair encoding.")
    
    # Calculate total feature cycle (independent of n_features)
    total_cycle = layers * features_per_layer
    
    # Ensure n_features is sufficient for one cycle iteration
    if n_features < features_per_layer:
        raise ValueError(f"n_features ({n_features}) must be >= features_per_layer ({features_per_layer})")
    
    # Calculate number of qubits needed for sliding window
    n_qubits_per_encoding = math.ceil(features_per_layer / 2)
    n_qubits = n_qubits_per_encoding + (layers - 1)
    
    # Initialize circuit with cycle independent of n_features
    qc = QuantumCircuit(n_qubits, name="Partial Reuploading Sequential")
    input_params = ParameterVector("x", total_cycle)
    
    for layer in range(layers):
        # Calculate qubit offset for this layer (sliding window)
        qubit_offset = layer
        
        # Calculate starting feature index for this layer (cycles over total_cycle)
        start_feature_idx = (layer * features_per_layer) % total_cycle
        
        # Load feature block onto current window qubits with cyclic wrapping
        feature_idx = 0
        for i in range(n_qubits_per_encoding):
            qubit_target = qubit_offset + i
            
            # Calculate feature indices with cyclic wrapping over total_cycle
            p_idx_1 = (start_feature_idx + feature_idx) % total_cycle
            p_idx_2 = (start_feature_idx + feature_idx + 1) % total_cycle
            
            # Apply encoding gates
            qc.ry(input_params[p_idx_1], qubit_target)
            qc.rz(input_params[p_idx_2], qubit_target)
            
            feature_idx += 2

        qc.barrier()
        
        # Entanglement layer on qubits in this layer
        qubits_in_layer = list(range(qubit_offset, qubit_offset + n_qubits_per_encoding))
        for i in range(len(qubits_in_layer) - 1):
            qc.cx(qubits_in_layer[i], qubits_in_layer[i + 1])
        
        # Circular entanglement (optional, only if more than 2 qubits)
        if len(qubits_in_layer) > 2:
            qc.cx(qubits_in_layer[-1], qubits_in_layer[0])
        
        qc.barrier()
        qc.barrier()
    
    return qc

# Circuit test - 8 feature cycle (2 layers * 4 features/layer), same with n_features=24, 12, 8, etc.
partial_reuploading_encoding(n_features=12, layers=3, features_per_layer=4).draw(output="mpl")

# ANSATZ

### MPS


In [None]:
def MPS(num_qubits,parameter_prefix="x", **kwargs):
    """
    Constructs a Matrix Product State (MPS) quantum circuit.

    Args:
        num_qubits (int): The number of qubits in the circuit.
        **kwargs: Additional keyword arguments to be passed to the 
        RealAmplitudes.

    Returns:
        QuantumCircuit: The constructed MPS quantum circuit.
        
    """
    qc = QuantumCircuit(num_qubits)
    qubits = range(num_qubits)
    for i, j in zip(qubits[:-1], qubits[1:]):
        block = RealAmplitudes(2, parameter_prefix=f"{parameter_prefix}_{i},{j}", **kwargs)
        qc.compose(block, [i, j], inplace=True)
        # if i < num_qubits - 2:
        #     qc.barrier()
    return qc

MPS(4).draw('mpl')

In [None]:
def TwoLocalMPS(num_qubits,parameter_prefix="θ", **kwargs):
    """
    Constructs a Matrix Product State (MPS) quantum circuit using TwoLocal.

    Args:
        num_qubits (int): The number of qubits in the circuit.
        **kwargs: Additional keyword arguments to be passed to TwoLocal.

    Returns:
        QuantumCircuit: The constructed MPS quantum circuit.
        
    """
    qc = QuantumCircuit(num_qubits)
    qubits = range(num_qubits)
    for i, j in zip(qubits[:-1], qubits[1:]):
        block = TwoLocal(
            2,
            rotation_blocks='rx',
            entanglement='reverse_linear',
            entanglement_blocks='cx',
            parameter_prefix=f'{parameter_prefix}_{i},{j}',
            insert_barriers=True,
            **kwargs

        )
        qc.compose(block, [i, j], inplace=True)
    return qc

TwoLocalMPS(4).draw('mpl')

In [None]:
def create_ansatz(num_qubits, parameter_prefix="θ", ansatz_type="MPS", **kwargs):
    """
    Crea un ansatz del tipo specificato.
    
    Args:
        num_qubits (int): Numero di qubit.
        parameter_prefix (str): Prefisso per i parametri.
        ansatz_type (str): Tipo di ansatz - "MPS" (RealAmplitudes) o "TwoLocalMPS" (TwoLocal).
        **kwargs: Argomenti aggiuntivi passati alla funzione ansatz.
    
    Returns:
        QuantumCircuit: Il circuito ansatz.
    """
    if ansatz_type == "MPS":
        return MPS(num_qubits, parameter_prefix=parameter_prefix, **kwargs)
    elif ansatz_type == "TwoLocalMPS":
        return TwoLocalMPS(num_qubits, parameter_prefix=parameter_prefix, **kwargs)
    else:
        raise ValueError(f"Ansatz type '{ansatz_type}' non supportato. Usa 'MPS' o 'TwoLocalMPS'.")

# CIRCUITS

Definizione e composizione dei circuiti quantistici per l'apprendimento.

### Original circuit definition (with RealAmplitudes and Pooling)

Uncomment the necessary encoding to perform specific experiments.

In [None]:
encoding = yz_angles_encoding(8, param_name="e")
#encoding = hRyRx_encoding(8)
#encoding= hRyRz_encoding(8)
#encoding = RyRx_encoding(8)





ansatz = QuantumCircuit(4)
ansatz.barrier()
############################2#######################
ansatz = ansatz.compose(RealAmplitudes(num_qubits=4, reps=1, name="Layer1", parameter_prefix="l1"))
ansatz.barrier()
############################3#######################
ansatz = ansatz.compose(pooling_layer(4, "pool2"))
ansatz.barrier()
############################4#######################
ansatz = ansatz.compose(RealAmplitudes(num_qubits=2, reps=1, name="Layer2",parameter_prefix="l2"), qubits=[2,3])
ansatz.barrier()

ansatz.decompose().draw(output="mpl")


qnn = QuantumCircuit(4).compose(encoding).compose(ansatz)

display(qnn.decompose().draw("mpl"))



### Tensor Network circuit (MPS only)

Example of quantum circuit with pure tensor network (MPS) structure.

In [None]:
encoding = yz_angles_encoding(8, param_name="e")


qnn = QuantumCircuit(4).compose(encoding)

# Adding MPS to the circuit
ansatz = MPS(num_qubits=4, parameter_prefix="mps")
qnn = qnn.compose(ansatz)

# Circuit visualization
display(qnn.draw("mpl"))

In [None]:
# Display parameter counts for encoding, ansatz, and total circuit
print("Encoding parameters:", len(encoding.parameters))
print("Ansatz parameters:", len(ansatz.parameters))
print("Total circuit parameters:", len(qnn.parameters))

### Standard reuploading circuit

Quantum circuit with standard data reuploading for metric learning.

In [None]:
from qiskit import QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import RealAmplitudes
import math

def build_mps_reuploading_model(n_features, layers=2, fixed_encoder=None, ansatz_type="MPS"):
    """
    Constructs a reuploading classifier using combinations of encoders and ansatz.
    
    Args:
        n_features (int): Number of input features.
        layers (int): Number of [Encoder -> Ansatz] sequence repetitions (default=2).
        ansatz_type (str): Type of ansatz - "MPS" or "TwoLocalMPS" (default="MPS").
        fixed_encoder (str): If None, cycles between three encoders (yz, RxRy, RzRx).
                            If specified, uses the same encoder for all layers.
                            Options: "yz", "RxRy", "RzRx", "RxRz", "RyRx" (default=None).
    
    Returns:
        QuantumCircuit: Complete reuploading quantum circuit
    """
    
    # Calculate required number of qubits
    n_qubits = math.ceil(n_features / 2)
    
    # Initialize final circuit
    full_qc = QuantumCircuit(n_qubits, name=f"{ansatz_type} Re-uploading Classifier")
    
    # Create input parameters ONCE (shared across all layers)
    input_params = ParameterVector("x", n_features)
    
    # Reuploading cycle (Sandwich architecture)
    for layer in range(layers):
        
        # --- A. Add Encoder ---
        if fixed_encoder is None:
            # Cyclic behavior: cycles between three encoders
            encoder_choice = layer % 3
            
            if encoder_choice == 0:
                # YZ encoding (RY, RZ)
                encoder_block = yz_angles_encoding(n_features, params=input_params)
            elif encoder_choice == 1:
                # RX-RY encoding (RX, RY)
                encoder_block = RxRy_encoding(n_features, params=input_params)
            else:
                # RZ-RX encoding (RZ, RX)
                encoder_block = RzRx_encoding(n_features, params=input_params)
        else:
            # Use specified fixed encoder for all layers
            if fixed_encoder == "yz":
                encoder_block = yz_angles_encoding(n_features, params=input_params)
            elif fixed_encoder == "RxRy":
                encoder_block = RxRy_encoding(n_features, params=input_params)
            elif fixed_encoder == "RxRz":
                encoder_block = RxRz_encoding(n_features, params=input_params)
            elif fixed_encoder == "RyRx":
                encoder_block = RyRx_encoding(n_features, params=input_params)
            elif fixed_encoder == "RzRx":
                encoder_block = RzRx_encoding(n_features, params=input_params)
            else:
                raise ValueError(f"Encoder '{fixed_encoder}' not supported. "
                               "Use: 'yz', 'RxRy', 'RxRz', 'RyRx', 'RzRx' or None for cyclic behavior.")

        full_qc.compose(encoder_block, inplace=True)
        full_qc.barrier()
        
        # --- B. Add Ansatz (MPS or TwoLocalMPS) ---
        ansatz_block = create_ansatz(n_qubits, parameter_prefix=f"w_L{layer}", ansatz_type=ansatz_type, reps=1)
        full_qc.compose(ansatz_block, inplace=True)
        
        full_qc.barrier()
        full_qc.barrier()

    return full_qc

# --- TEST ---
# Example with 8 features and 2 depth layers

ansatz_type = "MPS"  # Choose between "MPS" and "TwoLocalMPS"
fixed_enc = None  # Choose between None (cyclic), "yz", "RxRy", "RxRz", "RyRx", "RzRx"
qnn_std_reupload = build_mps_reuploading_model(n_features=8, layers=2, fixed_encoder=fixed_enc, ansatz_type=ansatz_type)

# Display the result
print(f"Circuit with encoder: {fixed_enc}, ansatz: {ansatz_type}")
print("Encoding parameters:", len([p for p in qnn_std_reupload.parameters if p.name.startswith('x')]))
print("Ansatz parameters:", len([p for p in qnn_std_reupload.parameters if p.name.startswith('w')]))
print("Total parameters:", len(qnn_std_reupload.parameters))
qnn_std_reupload.draw(output="mpl")

### Partial reuploading circuit

Quantum circuit with partial data reuploading and sliding window mechanism.

In [None]:
def build_partial_mps_model(n_features, layers=3, features_per_layer=4, fixed_encoder="yz", ansatz_type="MPS"):
    """
    Constructs a Sequential Partial classifier with sliding window architecture.
    Each layer loads DIFFERENT features on sequentially overlapping qubits.
    
    Example with 4 qubits, 3 layers, 4 features/layer:
    - Layer 0: features 0-3  → qubits (0,1)
    - Layer 1: features 4-7  → qubits (1,2)  
    - Layer 2: features 8-11 → qubits (2,3)
    Total: 12 features mapped
    
    Args:
        n_features (int): TOTAL number of features to map (increased from standard case).
        layers (int): Number of [Encoding -> Ansatz] blocks.
        features_per_layer (int): How many features to load in each layer (must be even).
        ansatz_type (str): Type of ansatz - "MPS" or "TwoLocalMPS" (default="MPS").
        fixed_encoder (str): Encoder to use for all layers.
                            Options: "yz", "RxRy", "RxRz", "RyRx", "RzRx" (default="yz").
    
    Returns:
        QuantumCircuit: Partial reuploading quantum circuit
    """
    
    # Ensure features_per_layer is even
    if features_per_layer % 2 != 0:
        features_per_layer += 1
    
    # Encoder validation
    valid_encoders = {"yz", "RxRy", "RxRz", "RyRx", "RzRx"}
    if fixed_encoder not in valid_encoders:
        raise ValueError(f"Encoder '{fixed_encoder}' not supported. "
                        f"Use: {valid_encoders}")
    
    # Map encoder to gate pairs (gate1, gate2)
    encoder_gates = {
        "yz": ("ry", "rz"),
        "RxRy": ("rx", "ry"),
        "RxRz": ("rx", "rz"),
        "RyRx": ("ry", "rx"),
        "RzRx": ("rz", "rx"),
    }
    gate1_name, gate2_name = encoder_gates[fixed_encoder]
    
    # Calculate number of qubits needed for sliding window
    # First layer uses features_per_layer/2 qubits, then each layer adds 1 qubit
    n_qubits_per_encoding = math.ceil(features_per_layer / 2)
    n_qubits = n_qubits_per_encoding + (layers - 1)
    
    # Calculate total number of features that will be mapped
    total_features_mapped = layers * features_per_layer
    
    # Initialize circuit
    full_qc = QuantumCircuit(n_qubits, name=f"Sequential Sliding Window ({fixed_encoder})")
    input_params = ParameterVector("x", total_features_mapped)
    
    for layer in range(layers):
        
        # --- A. SEQUENTIAL SLIDING WINDOW ENCODER ---
        # Calculate qubit offset for this layer (sliding window)
        qubit_offset = layer
        
        # Calculate starting feature index for this layer (sequential, not cyclic)
        start_feature_idx = layer * features_per_layer
        
        # Load feature block onto current window qubits
        feature_idx = 0
        for i in range(n_qubits_per_encoding):
            qubit_target = qubit_offset + i
            
            p_idx_1 = start_feature_idx + feature_idx
            p_idx_2 = start_feature_idx + feature_idx + 1
            
            # Apply gates only if index is valid
            if p_idx_1 < total_features_mapped:
                getattr(full_qc, gate1_name)(input_params[p_idx_1], qubit_target)
            if p_idx_2 < total_features_mapped:
                getattr(full_qc, gate2_name)(input_params[p_idx_2], qubit_target)
            
            feature_idx += 2
        
        full_qc.barrier()
        
        # --- B. ANSATZ (MPS or TwoLocalMPS) only on populated qubits in this layer ---
        # Calculate qubits used in this layer
        qubits_in_layer = list(range(qubit_offset, qubit_offset + n_qubits_per_encoding))
        ansatz_block = create_ansatz(n_qubits_per_encoding, parameter_prefix=f"w_L{layer}", ansatz_type=ansatz_type, reps=1)
        full_qc.compose(ansatz_block, qubits_in_layer, inplace=True)
        
        full_qc.barrier()
        full_qc.barrier()
    
    return full_qc

# --- TEST ---
# Example with 12 features, 3 layers, and 4 features per layer

ansatz_type = "MPS"  # Choose between "MPS" and "TwoLocalMPS"
fixed_enc = "yz"  # Choose encoder: "yz", "RxRy", "RxRz", "RyRx", "RzRx"
qnn_partial_reupload = build_partial_mps_model(n_features=12, layers=3, features_per_layer=4, 
                                               fixed_encoder=fixed_enc, ansatz_type=ansatz_type)

# Display the result
print(f"Circuit with encoder: {fixed_enc}, ansatz: {ansatz_type}")
print("Encoding parameters:", len([p for p in qnn_partial_reupload.parameters if p.name.startswith('x')]))
print("Ansatz parameters:", len([p for p in qnn_partial_reupload.parameters if p.name.startswith('w')]))
print("Total parameters:", len(qnn_partial_reupload.parameters))
qnn_partial_reupload.draw(output="mpl")

# HYBRID NETWORKS

Hybrid architectures combining classical neural networks and quantum models.

### 8-features per image

In [None]:
# ===============================
# QUANTUM MODEL: 8 FEATURES
# ===============================
"""
This block defines the hybrid model for images with 8 extracted features,
combining a classical convolutional neural network and a quantum circuit.
The circuit is constructed using the build_mps_reuploading_model function.
"""

from qiskit.primitives import Sampler
def parity(x):
    """Returns the parity (number of 1-bits) of an integer x."""
    return f"{bin(x)}".count("1")

# Quantum circuit construction for 8 features
qnn_circuit_8 = build_mps_reuploading_model(n_features=8, layers=2)

# Parameter separation: first 8 are inputs, remaining are ansatz weights
input_ps_8 = list(qnn_circuit_8.parameters)[:8]
weight_ps_8 = list(qnn_circuit_8.parameters)[8:]

qmodel_8 = SamplerQNN(
    circuit=qnn_circuit_8,
    input_params=input_ps_8,
    weight_params=weight_ps_8,
    input_gradients=True
)

class HybridRegressorConvNet8Features(nn.Module):
    """
    Hybrid neural network combining a classical CNN with a quantum circuit.
    
    Args:
        qm1: Quantum model (SamplerQNN) to connect as a layer.
    """
    def __init__(self, qm1):
        super().__init__()

        # Convolutional blocks for feature extraction
        self.conv_1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(5,5)),
            nn.ReLU()
        )

        self.conv_2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(5,5)),
            nn.MaxPool2d(kernel_size=(2,2)),
            nn.ReLU()
        )

        self.conv_3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(5,5)),
            nn.MaxPool2d(kernel_size=(2,2)),
            nn.ReLU()
        )
        self.flatten = nn.Flatten()

        # Reduction to 8 features for quantum layer
        self.reduction = nn.Sequential(
            nn.Linear(in_features=576, out_features=120),
            nn.ReLU(),
            nn.Linear(in_features=120, out_features=8)
        )

        # Output layer (from 16 to 16 qubit outputs)
        self.final = nn.Sequential(
            nn.Linear(in_features=16, out_features=16),
        )

        # Quantum layer connected via TorchConnector
        self.q1 = TorchConnector(qm1)

    def forward(self, x):
        """
        Executes forward pass: feature extraction, reduction, quantum layer, and final output.
        
        Args:
            x: Input tensor (batch of images)
        
        Returns:
            torch.Tensor: Output embeddings
        """
        x = self.conv_1(x)
        x = self.conv_2(x)
        x = self.conv_3(x)

        x = self.flatten(x)
        x = self.reduction(x)

        # Pass through quantum layer and normalization
        x = self.q1(x) * 100
        x = self.final(x)

        return x

# Model instantiation and summary
distance_model_8 = HybridRegressorConvNet8Features(qmodel_8)
distance_model_8 = distance_model_8.to(device)
summary(distance_model_8, input_size=(1,28,28))

### 12-features per image

Architecture and pipeline for images with 12 extracted features.

In [None]:
# ===============================
# QUANTUM MODEL: 12 FEATURES
# ===============================
"""
This block defines the hybrid model for images with 12 extracted features,
combining a classical convolutional neural network and a quantum circuit.
The circuit is constructed using the build_partial_mps_model function.
"""

from qiskit.primitives import Sampler

# Quantum circuit construction for 12 features
qnn_circuit_12 = build_partial_mps_model(n_features=12, layers=3, features_per_layer=4)

# Parameter separation: first 12 are inputs, remaining are ansatz weights
input_ps_12 = list(qnn_circuit_12.parameters)[:12]
weight_ps_12 = list(qnn_circuit_12.parameters)[12:]

qmodel_12 = SamplerQNN(
    circuit=qnn_circuit_12,
    input_params=input_ps_12,
    weight_params=weight_ps_12,
    input_gradients=True
)

class HybridRegressorConvNet12Features(nn.Module):
    """
    Hybrid neural network combining a classical CNN with a quantum circuit.
    
    Args:
        qm1: Quantum model (SamplerQNN) to connect as a layer.
    """
    def __init__(self, qm1):
        super().__init__()

        # Convolutional blocks for feature extraction
        self.conv_1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(5,5)),
            nn.ReLU()
        )

        self.conv_2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(5,5)),
            nn.MaxPool2d(kernel_size=(2,2)),
            nn.ReLU()
        )

        self.conv_3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(5,5)),
            nn.MaxPool2d(kernel_size=(2,2)),
            nn.ReLU()
        )
        self.flatten = nn.Flatten()

        # Reduction to 12 features for quantum layer
        self.reduction = nn.Sequential(
            nn.Linear(in_features=576, out_features=120),
            nn.ReLU(),
            nn.Linear(in_features=120, out_features=12)
        )

        # Output layer (from 16 to 16 qubit outputs)
        self.final = nn.Sequential(
            nn.Linear(in_features=16, out_features=16),
        )

        # Quantum layer connected via TorchConnector
        self.q1 = TorchConnector(qm1)

    def forward(self, x):
        """
        Executes forward pass: feature extraction, reduction, quantum layer, and final output.
        
        Args:
            x: Input tensor (batch of images)
        
        Returns:
            torch.Tensor: Output embeddings
        """
        x = self.conv_1(x)
        x = self.conv_2(x)
        x = self.conv_3(x)

        x = self.flatten(x)
        x = self.reduction(x)

        # Pass through quantum layer and normalization
        x = self.q1(x) * 100
        x = self.final(x)

        return x

# Model instantiation and summary
distance_model_12 = HybridRegressorConvNet12Features(qmodel_12)
distance_model_12 = distance_model_12.to(device)
summary(distance_model_12, input_size=(1,28,28))

# TRAINING - Model Training Procedures

Section dedicated to model training and epoch management with standardized output formatting.

### Training configuration

Initial configuration for model training procedure.

In [None]:
n_features = 12
if n_features == 8:
    model_to_train = qmodel_8
    distance_model = HybridRegressorConvNet8Features(model_to_train)
else:
    model_to_train = qmodel_12
    distance_model = HybridRegressorConvNet12Features(model_to_train)
training_dataloader = DataLoader(t, batch_size=8, shuffle=True)
val_data_loader = DataLoader(v, batch_size=8, shuffle=True)
distance_model = distance_model.to(device)
optimizer = torch.optim.SGD(distance_model.parameters(), lr=1e-2)
loss = torch.nn.TripletMarginLoss(margin=2)
epochs = 100

In [None]:
model_to_train.circuit.draw(output="mpl")

### Training loop

Main model training loop with standardized output formatting.

In [None]:
def train(model, epochs, optimizer, criterion, train_data_loader, val_data_loader, device, validation_step=False, print_at=1, seed=42):
    """
    Executes model training with standardized output formatting.
    
    Args:
        model: Neural network model to train
        epochs: Number of training epochs
        optimizer: Optimization algorithm
        criterion: Loss function
        train_data_loader: Training data loader
        val_data_loader: Validation data loader (optional)
        device: Computation device (CPU/GPU)
        validation_step: Whether to perform validation (default=False)
        print_at: Print frequency (every N epochs, default=1)
        seed: Random seed for reproducibility (default=42)
    
    Returns:
        model: Trained model
    """
    
    set_seed(seed)
    for epoch in range(epochs):
        set_seed(seed + epoch)
        
        prnt = (epoch % print_at) != 0 if epoch != (epochs - 1) else False
        
        if not prnt:
            print("\033[1mEpoch {}/{}\033[0m".format(epoch + 1, epochs))
        
        # --- Training Phase ---
        model.train()
        train_loss = 0.0
        train_samples = 0
        
        for anchor, positive, negative in tqdm(train_data_loader, disable=prnt):
            optimizer.zero_grad()
            anchor = anchor.to(device)
            positive = positive.to(device)
            negative = negative.to(device)
            e_A = model(anchor)
            e_P = model(positive)
            e_N = model(negative)
            loss = criterion(e_A, e_P, e_N)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            train_samples += positive.size(0)
        train_loss /= len(train_data_loader)
        
        if not prnt:
            print(f"TRAINING | Loss: {train_loss:2.6f}")
            print("")

### Training loop with early stopping

Training loop with early stopping logic and automatic checkpointing mechanism.

In [None]:
def train(model, epochs, optimizer, criterion, train_data_loader, val_data_loader, device, 
          validation_step=False, print_at=1, early_stopping_patience=3, 
          min_delta=0.001, restore_best_weights=True, seed=42, 
          checkpoint_interval=100, checkpoint_path="model_results/triplet_loss"):
    """
    Training function with automatic checkpointing every N epochs and standardized output.
    
    Args:
        model: Neural network model to train
        epochs: Number of training epochs
        optimizer: Optimization algorithm
        criterion: Loss function
        train_data_loader: Training data loader
        val_data_loader: Validation data loader
        device: Computation device (CPU/GPU)
        validation_step: Whether to perform validation (default=False)
        print_at: Print frequency (default=1)
        early_stopping_patience: Patience for early stopping (default=3)
        min_delta: Minimum improvement threshold (default=0.001)
        restore_best_weights: Whether to restore best weights (default=True)
        seed: Random seed (default=42)
        checkpoint_interval: Checkpoint save frequency (default=100)
        checkpoint_path: Directory for saving checkpoints
    
    Returns:
        tuple: (trained_model, checkpoint_epochs, checkpoint_logs)
    """
    import os
    import time
    os.makedirs(checkpoint_path, exist_ok=True)

    # Initialize time tracking
    training_start_time = time.time()

    set_seed(seed)
    best_loss = float('inf')
    no_improvement_count = 0
    best_model_state = None
    checkpoint_epochs = []
    checkpoint_logs = {}  # Dictionary to save time and loss for each checkpoint

    for epoch in range(epochs):
        
        set_seed(seed + epoch)
        prnt = (epoch % print_at) != 0 if epoch != (epochs-1) else False

        # Initialize early_stopping_triggered at the beginning of each epoch
        early_stopping_triggered = False

        if not prnt:
            print(f"\033[1mEpoch {epoch+1}/{epochs}\033[0m")

        ### --> Training Phase
        model.train()
        train_loss = 0.0
        train_samples = 0

        for anchor, positive, negative in tqdm(train_data_loader, disable=prnt):
            optimizer.zero_grad()
            anchor = anchor.to(device)
            positive = positive.to(device)
            negative = negative.to(device)

            e_A = model(anchor)
            e_P = model(positive)
            e_N = model(negative)

            loss = criterion(e_A, e_P, e_N)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_samples += positive.size(0)

        train_loss /= len(train_data_loader)
        
        ### --> Validation Phase (if requested)
        val_loss = 0.0
        if validation_step and not prnt:
            model.eval()
            with torch.no_grad():
                for anchor, positive, negative in val_data_loader:
                    anchor = anchor.to(device)
                    positive = positive.to(device)
                    negative = negative.to(device)

                    e_A = model(anchor)
                    e_P = model(positive)
                    e_N = model(negative)

                    loss = criterion(e_A, e_P, e_N)
                    val_loss += loss.item()

                val_loss /= len(val_data_loader)

        ### --> Early Stopping Logic
        if validation_step and not prnt:
            # Check for improvement
            if (best_loss - val_loss) > min_delta:
                best_epoch = epoch
                best_loss = val_loss
                no_improvement_count = 0
                # Save best model state
                if restore_best_weights:
                    print(f"Best epoch: {best_epoch+1}")
                    best_model_state = model.state_dict().copy()
            else:
                no_improvement_count += 1
                if no_improvement_count >= early_stopping_patience:
                    print(f"\n{color.RED}Early stopping at epoch {epoch+1}{color.END}")
                    early_stopping_triggered = True

        ### --> Print results
        if not prnt:
            log_str = f"TRAINING | Loss: {train_loss:2.6f}"
            if validation_step:
                log_str += f" | VALIDATION -> Loss: {val_loss:2.6f}"
            print(log_str)
            print("")

        ### --> Save checkpoint every checkpoint_interval epochs
        if (epoch + 1) % checkpoint_interval == 0:
            # Calculate total training time in minutes
            elapsed_time_minutes = (time.time() - training_start_time) / 60
            
            checkpoint_filename = f"{checkpoint_path}/checkpoint_epoch_{epoch+1}.pt"
            torch.save(model.state_dict(), checkpoint_filename)
            checkpoint_epochs.append(epoch + 1)
            
            # Save time and train loss for this checkpoint
            checkpoint_logs[epoch + 1] = {
                'training_time_minutes': elapsed_time_minutes,
                'train_loss': train_loss
            }
            
            print(f"{color.GREEN}Checkpoint saved: {checkpoint_filename}{color.END}")
            print(f"Training time: {elapsed_time_minutes:.2f} minutes")
            print(f"Train Loss: {train_loss:.6f}")
            print("")

        ### --> Stop training if early stopping triggered
        if early_stopping_triggered:
            break

    ### --> Restore best weights if requested
    if restore_best_weights and best_model_state is not None:
        model.load_state_dict(best_model_state)
        print(f"{color.GREEN}Restored best model weights{color.END}")

    return model, checkpoint_epochs, checkpoint_logs

### Training execution

Training loop execution and checkpoint saving with standardized output.

In [None]:
# CHECKPOINT CONFIGURATION
CHECKPOINT_PATH = "model_results/triplet_loss/Standard_TwoLocalMPS"
CHECKPOINT_INTERVAL = 1  # Save every epoch

# Training without Early Stopping
distance_model, checkpoint_epochs, checkpoint_logs = train(
    model=distance_model,
    epochs=2,
    optimizer=optimizer,
    criterion=loss,
    train_data_loader=training_dataloader,
    val_data_loader=None,
    validation_step=False,
    device=device,
    print_at=1,
    seed=42,
    checkpoint_interval=CHECKPOINT_INTERVAL,
    checkpoint_path=CHECKPOINT_PATH
)

# Save checkpoint logs for evaluation
CHECKPOINT_EVAL_LOGS = checkpoint_logs

print(f"\n{color.BOLD}Training completed{color.END}")
print(f"Checkpoints saved at epochs: {checkpoint_epochs}")


# Training with Early Stopping (commented out)
# distance_model, checkpoint_epochs, checkpoint_logs = train(
#     distance_model,
#     epochs=200,
#     optimizer=optimizer,
#     criterion=loss,
#     train_data_loader=training_dataloader,
#     val_data_loader=val_data_loader,
#     device=device,
#     validation_step=True,
#     early_stopping_patience=4,
#     min_delta=0.005,
#     seed=42,
#     checkpoint_interval=CHECKPOINT_INTERVAL,
#     checkpoint_path=CHECKPOINT_PATH
# )

# AUTOMATIC CHECKPOINT EVALUATION ON TEST SET

Automatic performance evaluation of saved checkpoints on the test set with standardized output.

In [None]:
import os
import glob

def evaluate_checkpoint_on_test(checkpoint_path, model_template, test_dataset, device, batch_size=128):
    """
    Loads a checkpoint and evaluates performance on the test set with standardized output.
    
    Args:
        checkpoint_path: Path to checkpoint file
        model_template: Model template for loading weights
        test_dataset: Test dataset
        device: Computation device
        batch_size: Batch size for evaluation (default=128)
    
    Returns:
        dict: Evaluation results dictionary
    """
    # Load checkpoint
    model_template.load_state_dict(torch.load(checkpoint_path))
    model_template.eval()
    
    # Generate embeddings for test set
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    test_embedding_data = np.empty((0, 16))
    
    with torch.no_grad():
        for anchor, _, _ in tqdm(test_loader, desc=f"Generating embeddings"):
            anchor = anchor.to(device)
            embeddings = model_template(anchor).to("cpu").numpy()
            test_embedding_data = np.concatenate((test_embedding_data, embeddings), axis=0)
    
    # Agglomerative Clustering
    agg_clustering = AgglomerativeClustering(n_clusters=10, linkage="average")
    agg_prediction = agg_clustering.fit_predict(test_embedding_data)
    agg_silhouette, agg_purity = evaluate_clustering_table(test_embedding_data, agg_prediction, test_dataset.target)
    
    # KMeans Clustering
    kmeans_clustering = KMeans(n_clusters=10, init="k-means++", n_init=10, random_state=42)
    kmeans_prediction = kmeans_clustering.fit_predict(test_embedding_data)
    kmeans_silhouette, kmeans_purity = evaluate_clustering_table(test_embedding_data, kmeans_prediction, test_dataset.target)
    
    return {
        'checkpoint': checkpoint_path,
        'test_embedding': test_embedding_data,
        'agg_silhouette': agg_silhouette,
        'agg_purity': agg_purity,
        'agg_prediction': agg_prediction,
        'kmeans_silhouette': kmeans_silhouette,
        'kmeans_purity': kmeans_purity,
        'kmeans_prediction': kmeans_prediction
    }

def evaluate_all_checkpoints(checkpoint_path, model_template, test_dataset, device):
    """
    Evaluates all saved checkpoints from training with standardized output.
    
    Args:
        checkpoint_path: Directory containing checkpoints
        model_template: Model template for loading weights
        test_dataset: Test dataset
        device: Computation device
    
    Returns:
        list: List of evaluation results for all checkpoints
    """
    # Find all checkpoints
    checkpoint_files = sorted(glob.glob(f"{checkpoint_path}/checkpoint_epoch_*.pt"))
    
    if not checkpoint_files:
        print(f"{color.RED}No checkpoints found in {checkpoint_path}{color.END}")
        return []
    
    print(f"\n{color.BOLD}=== CHECKPOINT EVALUATION ON TEST SET ==={color.END}")
    print(f"Found {len(checkpoint_files)} checkpoints to evaluate\n")
    
    results = []
    for checkpoint_file in checkpoint_files:
        # Extract epoch number from filename
        epoch_num = int(checkpoint_file.split('_')[-1].replace('.pt', ''))
        print(f"\n{color.CYAN}{'='*60}{color.END}")
        print(f"{color.BOLD}Checkpoint Epoch {epoch_num}{color.END}")
        print(f"{color.CYAN}{'='*60}{color.END}\n")
        
        result = evaluate_checkpoint_on_test(checkpoint_file, model_template, test_dataset, device)
        result['epoch'] = epoch_num
        results.append(result)
        
        # Print results with standardized formatting
        print(f"\n{color.GREEN}Test Set Results (10,000 samples):{color.END}")
        print(f"  Agglomerative Clustering:")
        print(f"    - Silhouette: {result['agg_silhouette']:.4f}")
        print(f"    - Purity:     {result['agg_purity']:.4f}")
        print(f"  KMeans Clustering:")
        print(f"    - Silhouette: {result['kmeans_silhouette']:.4f}")
        print(f"    - Purity:     {result['kmeans_purity']:.4f}")
    
    return results

# Prepare test dataset
t_test = MNIST_Distance_Dataset_Triplet_Loss(X_test, y_test)

# Evaluate all checkpoints
checkpoint_results = evaluate_all_checkpoints(
    checkpoint_path=CHECKPOINT_PATH,
    model_template=distance_model,
    test_dataset=t_test,
    device=device
)

In [None]:
# Summary table of results with training time and train loss
print(f"\n{color.BOLD}{'='*100}{color.END}")
print(f"{color.BOLD}SUMMARY TABLE - CHECKPOINT COMPARISON {color.END}")
print(f"{color.BOLD}{'='*100}{color.END}\n")

# Header with training information
print(f"{'Epoch':<10} {'Train':<20} {'Agglomerative':<30} {'KMeans':<30}")
print(f"{'':10} {'Loss':<10} {'Time(min)':<10} {'Silhouette':<15} {'Purity':<15} {'Silhouette':<15} {'Purity':<15}")
print(f"{'-'*100}")

for result in checkpoint_results:
    epoch_num = result['epoch']
    
    # Get train loss and time if available
    train_loss_str = "N/A"
    train_time_str = "N/A"
    if 'CHECKPOINT_EVAL_LOGS' in globals() and epoch_num in CHECKPOINT_EVAL_LOGS:
        train_loss_str = f"{CHECKPOINT_EVAL_LOGS[epoch_num]['train_loss']:.4f}"
        train_time_str = f"{CHECKPOINT_EVAL_LOGS[epoch_num]['training_time_minutes']:.2f}"
    
    print(f"{epoch_num:<10} "
          f"{train_loss_str:<10} {train_time_str:<10} "
          f"{result['agg_silhouette']:<15.4f} {result['agg_purity']:<15.4f} "
          f"{result['kmeans_silhouette']:<15.4f} {result['kmeans_purity']:<15.4f}")

print(f"{'-'*100}\n")

# Identify best checkpoint for each metric
best_agg_sil = max(checkpoint_results, key=lambda x: x['agg_silhouette'])
best_agg_pur = max(checkpoint_results, key=lambda x: x['agg_purity'])
best_km_sil = max(checkpoint_results, key=lambda x: x['kmeans_silhouette'])
best_km_pur = max(checkpoint_results, key=lambda x: x['kmeans_purity'])

print(f"{color.GREEN}Best performance:{color.END}")
print(f"  Agglomerative Silhouette: Epoch {best_agg_sil['epoch']} ({best_agg_sil['agg_silhouette']:.4f})")
print(f"  Agglomerative Purity:     Epoch {best_agg_pur['epoch']} ({best_agg_pur['agg_purity']:.4f})")
print(f"  KMeans Silhouette:        Epoch {best_km_sil['epoch']} ({best_km_sil['kmeans_silhouette']:.4f})")
print(f"  KMeans Purity:            Epoch {best_km_pur['epoch']} ({best_km_pur['kmeans_purity']:.4f})")

In [None]:
# Grafici comparativi delle metriche
fig, axes = plt.subplots(3, 2, figsize=(16, 18))
fig.suptitle('Confronto Performance Checkpoint sul Test Set', fontsize=16, weight='bold')

epochs = [r['epoch'] for r in checkpoint_results]

# Train Loss (se disponibile)
if 'CHECKPOINT_EVAL_LOGS' in globals():
    train_losses = [CHECKPOINT_EVAL_LOGS.get(e, {}).get('train_loss', None) for e in epochs]
    if any(tl is not None for tl in train_losses):
        axes[0, 0].plot(epochs, train_losses, 
                       marker='*', linewidth=2, markersize=10, color='darkred')
        axes[0, 0].set_xlabel('Epoca', fontsize=12)
        axes[0, 0].set_ylabel('Train Loss', fontsize=12)
        axes[0, 0].set_title('Train Loss per Checkpoint', fontsize=14, weight='bold')
        axes[0, 0].grid(True, alpha=0.3)
        axes[0, 0].set_xticks(epochs)

# Training Time (se disponibile)
if 'CHECKPOINT_EVAL_LOGS' in globals():
    train_times = [CHECKPOINT_EVAL_LOGS.get(e, {}).get('training_time_minutes', None) for e in epochs]
    if any(tt is not None for tt in train_times):
        axes[0, 1].plot(epochs, train_times, 
                       marker='o', linewidth=2, markersize=10, color='darkblue')
        axes[0, 1].set_xlabel('Epoca', fontsize=12)
        axes[0, 1].set_ylabel('Tempo (minuti)', fontsize=12)
        axes[0, 1].set_title('Tempo di Training Cumulativo', fontsize=14, weight='bold')
        axes[0, 1].grid(True, alpha=0.3)
        axes[0, 1].set_xticks(epochs)

# Agglomerative Silhouette
axes[1, 0].plot(epochs, [r['agg_silhouette'] for r in checkpoint_results], 
                marker='o', linewidth=2, markersize=8, color='steelblue')
axes[1, 0].set_xlabel('Epoca', fontsize=12)
axes[1, 0].set_ylabel('Silhouette Score', fontsize=12)
axes[1, 0].set_title('Agglomerative Clustering - Silhouette', fontsize=14, weight='bold')
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].set_xticks(epochs)

# Agglomerative Purity
axes[1, 1].plot(epochs, [r['agg_purity'] for r in checkpoint_results], 
                marker='s', linewidth=2, markersize=8, color='seagreen')
axes[1, 1].set_xlabel('Epoca', fontsize=12)
axes[1, 1].set_ylabel('Purity Score', fontsize=12)
axes[1, 1].set_title('Agglomerative Clustering - Purity', fontsize=14, weight='bold')
axes[1, 1].grid(True, alpha=0.3)
axes[1, 1].set_xticks(epochs)

# KMeans Silhouette
axes[2, 0].plot(epochs, [r['kmeans_silhouette'] for r in checkpoint_results], 
                marker='^', linewidth=2, markersize=8, color='coral')
axes[2, 0].set_xlabel('Epoca', fontsize=12)
axes[2, 0].set_ylabel('Silhouette Score', fontsize=12)
axes[2, 0].set_title('KMeans Clustering - Silhouette', fontsize=14, weight='bold')
axes[2, 0].grid(True, alpha=0.3)
axes[2, 0].set_xticks(epochs)

# KMeans Purity
axes[2, 1].plot(epochs, [r['kmeans_purity'] for r in checkpoint_results], 
                marker='D', linewidth=2, markersize=8, color='mediumpurple')
axes[2, 1].set_xlabel('Epoca', fontsize=12)
axes[2, 1].set_ylabel('Purity Score', fontsize=12)
axes[2, 1].set_title('KMeans Clustering - Purity', fontsize=14, weight='bold')
axes[2, 1].grid(True, alpha=0.3)
axes[2, 1].set_xticks(epochs)

plt.tight_layout()
plt.show()

In [None]:
# Visualizzazione t-SNE per tutti i checkpoint confrontati con KMeans
num_checkpoints = len(checkpoint_results)
fig, axes = plt.subplots(1, num_checkpoints, figsize=(10 * num_checkpoints, 8))

if num_checkpoints == 1:
    axes = [axes]

fig.suptitle('Confronto Clustering KMeans tra Checkpoint (t-SNE Visualization)', 
             fontsize=16, weight='bold')

for idx, result in enumerate(checkpoint_results):
    # Applica t-SNE agli embedding
    reduction_model = TSNE(n_components=2, perplexity=30, random_state=42)
    vis_embed = reduction_model.fit_transform(result['test_embedding'])
    
    # Visualizza il clustering
    axes[idx].scatter(vis_embed[:, 0], vis_embed[:, 1], 
                     c=result['kmeans_prediction'], cmap='Dark2', alpha=0.6, s=10)
    axes[idx].set_title(f"Epoca {result['epoch']}\n"
                       f"Silhouette: {result['kmeans_silhouette']:.4f} | "
                       f"Purity: {result['kmeans_purity']:.4f}",
                       fontsize=12, weight='bold')
    axes[idx].set_xlabel('t-SNE Dim 1')
    axes[idx].set_ylabel('t-SNE Dim 2')
    axes[idx].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Save results to file for reference
import json
from datetime import datetime

results_summary = {
    'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'test_samples': len(t_test),
    'checkpoints': []
}

for result in checkpoint_results:
    epoch_num = result['epoch']
    checkpoint_entry = {
        'epoch': epoch_num,
        'agglomerative': {
            'silhouette': float(result['agg_silhouette']),
            'purity': float(result['agg_purity'])
        },
        'kmeans': {
            'silhouette': float(result['kmeans_silhouette']),
            'purity': float(result['kmeans_purity'])
        }
    }
    
    # Add training time and train loss if available
    if 'CHECKPOINT_EVAL_LOGS' in globals() and epoch_num in CHECKPOINT_EVAL_LOGS:
        checkpoint_entry['training_time_minutes'] = CHECKPOINT_EVAL_LOGS[epoch_num]['training_time_minutes']
        checkpoint_entry['train_loss'] = CHECKPOINT_EVAL_LOGS[epoch_num]['train_loss']
    
    results_summary['checkpoints'].append(checkpoint_entry)

# Save to JSON
results_file = f"{CHECKPOINT_PATH}/evaluation_results.json"
with open(results_file, 'w') as f:
    json.dump(results_summary, f, indent=2)

print(f"\n{color.GREEN}Results saved to: {results_file}{color.END}")
print(f"\n{color.BOLD}Information saved for each checkpoint:{color.END}")
print(f"  - Clustering metrics (Silhouette and Purity)")
print(f"  - Training time (minutes)")
print(f"  - Train Loss")