In [4]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import fetch_openml
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from qiskit import QuantumCircuit
from qiskit.primitives import Estimator
from qiskit.circuit.library import ZFeatureMap, EfficientSU2
from qiskit_machine_learning.connectors import TorchConnector
from qiskit_machine_learning.neural_networks import EstimatorQNN
from qiskit.quantum_info import SparsePauliOp

In [None]:

N_QUBITS = 4
N_LAYERS = 3
TARGET_ACCURACY = 0.80
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)


DIGITS_TO_CLASSIFY = [0, 1]
N_INITIAL_SAMPLES = 20
TOTAL_QUERY_BATCH_SIZE = 10 
ACTIVE_BATCH_SIZE = 200     
BATCH_SIZE_TRAIN = 10


LEARNING_RATE = 0.01
N_INITIAL_EPOCHS = 50       
N_FINETUNE_EPOCHS = 2       

## Now we will start the pre-processing
Here we will load MNIST dataset, Apply PCA and then divide by the digits to classify:

$$ [0,1]$$

In [9]:
def load_and_preprocess_data():
    
    print("Loading and preprocessing data...")
    X, y = fetch_openml("mnist_784", version=1, return_X_y=True, as_frame=False)
    y = y.astype(int)

    mask = np.isin(y, DIGITS_TO_CLASSIFY)
    X_filtered = X[mask]
    y_filtered = y[mask]
    y_mapped = np.where(y_filtered == DIGITS_TO_CLASSIFY[0], 0, 1)

    X_scaled = X_filtered / 255.0 
    pca = PCA(n_components=N_QUBITS)
    X_pca = pca.fit_transform(X_scaled)

    X_pool, X_test, y_pool, y_test = train_test_split(
        X_pca, y_mapped, 
        test_size=0.3, random_state=RANDOM_SEED, stratify=y_mapped
    )
    
    print(f"Data processed succecfully. Pool: {len(y_pool)} samples. Test: {len(y_test)} samples.")
    return X_pool, y_pool, X_test, y_test


## Hybrid Model (QNN)

In [None]:
class HybridQNN(nn.Module):
    
    def __init__(self, n_qubits, n_layers):
        super().__init__()
        
        feature_map = ZFeatureMap(n_qubits)
        ansatz = EfficientSU2(n_qubits, reps=n_layers)
        
        qc = QuantumCircuit(n_qubits)
        qc.compose(feature_map, inplace=True)
        qc.compose(ansatz, inplace=True)
        
        pauli_string = "I" * (n_qubits - 1) + "Z" 
        observable = SparsePauliOp(pauli_string)
        
        estimator = Estimator()

        qnn = EstimatorQNN(
            circuit=qc,
            observables=observable,
            input_params=feature_map.parameters,
            weight_params=ansatz.parameters,
            estimator=estimator,
        )

        self.q_layer = TorchConnector(
            qnn,
            
            initial_weights=np.random.rand(ansatz.num_parameters),
        )
        
        self.c_layer = nn.Linear(1, 2) 

    def forward(self, x):
        x = self.q_layer(x)
        x = self.c_layer(x)
        return x


In [10]:
def run_simulation(config_name, n_passive_per_batch, n_active_per_batch, X_pool, y_pool, X_test_tensor, y_test_tensor, initial_indices, initial_model_state):
   
    
    print(f"\n--- Iniciando Simulação: {config_name} ({n_passive_per_batch}P + {n_active_per_batch}A) ---")
    
    # Criar cópias dos dados
    local_X_pool = np.copy(X_pool)
    local_y_pool = np.copy(y_pool)
    
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE_TRAIN)

    
    labeled_X_pca = [torch.tensor(local_X_pool[i], dtype=torch.float32) for i in initial_indices]
    labeled_y = [torch.tensor(local_y_pool[i], dtype=torch.long) for i in initial_indices]
    
    unlabeled_indices = np.array(list(set(range(len(local_y_pool))) - set(initial_indices)))
    
    
    model = HybridQNN(N_QUBITS, N_LAYERS)
    model.load_state_dict(initial_model_state) 
    
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    loss_fn = nn.CrossEntropyLoss()

    
    current_accuracy = 0.0
    history = [] 
    
    while current_accuracy < TARGET_ACCURACY and len(unlabeled_indices) > 0:
        
        # 5.1. Avaliar
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        current_accuracy = correct / total
        n_labeled = len(labeled_y)
        history.append((n_labeled, current_accuracy))
        print(f"[{config_name}] Rótulos: {n_labeled:4d} | Precisão: {current_accuracy:.4f}")

        if current_accuracy >= TARGET_ACCURACY:
            break
            
        
        new_labels_pca = []
        new_labels_y = []
        indices_to_remove = []
        
        current_unlabeled_indices = np.copy(unlabeled_indices) 

        
        n_to_query_passive = min(n_passive_per_batch, len(current_unlabeled_indices))
        if n_to_query_passive > 0:
            
            passive_indices = np.random.choice(current_unlabeled_indices, n_to_query_passive, replace=False)
            
            for idx in passive_indices:
                new_labels_pca.append(torch.tensor(local_X_pool[idx], dtype=torch.float32))
                new_labels_y.append(torch.tensor(local_y_pool[idx], dtype=torch.long))
                indices_to_remove.append(idx)
            
            mask_to_remove_passive = np.isin(current_unlabeled_indices, passive_indices)
            current_unlabeled_indices = current_unlabeled_indices[~mask_to_remove_passive]

        
        n_to_query_active = min(n_active_per_batch, len(current_unlabeled_indices))
        if n_to_query_active > 0:
            
            n_to_check = min(ACTIVE_BATCH_SIZE, len(current_unlabeled_indices))
            if n_to_check == 0:
                break
                
            batch_indices_relative = np.random.choice(range(len(current_unlabeled_indices)), n_to_check, replace=False)
            batch_indices_absolute = current_unlabeled_indices[batch_indices_relative]
            
            X_U_batch_pca = torch.tensor(local_X_pool[batch_indices_absolute], dtype=torch.float32)
            
            with torch.no_grad():
                outputs = model(X_U_batch_pca)
                probs = torch.softmax(outputs, dim=1)
            
            uncertainty = torch.abs(probs[:, 0] - probs[:, 1])
            most_uncertain_indices_relative = torch.argsort(uncertainty)[:n_to_query_active]
            
            active_indices_raw = batch_indices_absolute[most_uncertain_indices_relative]
            active_indices = np.atleast_1d(active_indices_raw) # Correção do bug anterior

            for idx in active_indices:
                new_labels_pca.append(torch.tensor(local_X_pool[idx], dtype=torch.float32))
                new_labels_y.append(torch.tensor(local_y_pool[idx], dtype=torch.long))
                indices_to_remove.append(idx)

       
        labeled_X_pca.extend(new_labels_pca)
        labeled_y.extend(new_labels_y)
        
        
        mask_to_remove_all = np.isin(unlabeled_indices, indices_to_remove)
        unlabeled_indices = unlabeled_indices[~mask_to_remove_all]
        
        
        train_dataset = TensorDataset(torch.stack(labeled_X_pca), torch.stack(labeled_y))
        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE_TRAIN, shuffle=True)
        
        model.train()
        for epoch in range(N_FINETUNE_EPOCHS):
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = loss_fn(outputs, labels)
                loss.backward()
                optimizer.step()
            
    print(f"--- Simulação {config_name} Concluída ---")
    return history

# Main Script

In [11]:
if __name__ == "__main__":
    
    
    X_pool, y_pool, X_test_np, y_test_np = load_and_preprocess_data()
    X_test_tensor = torch.tensor(X_test_np, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test_np, dtype=torch.long)

    
    print("\n--- A criar o ponto de partida comum (Treino Inicial) ---")
    
    
    initial_indices = np.random.choice(range(len(y_pool)), N_INITIAL_SAMPLES, replace=False)
    
    
    labeled_X_pca = [torch.tensor(X_pool[i], dtype=torch.float32) for i in initial_indices]
    labeled_y = [torch.tensor(y_pool[i], dtype=torch.long) for i in initial_indices]
    
    train_dataset = TensorDataset(torch.stack(labeled_X_pca), torch.stack(labeled_y))
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE_TRAIN, shuffle=True)
    
    
    initial_model = HybridQNN(N_QUBITS, N_LAYERS)
    optimizer = optim.Adam(initial_model.parameters(), lr=LEARNING_RATE)
    loss_fn = nn.CrossEntropyLoss()
    
    initial_model.train()
    for epoch in range(N_INITIAL_EPOCHS):
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = initial_model(inputs)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
    
    
    initial_model_state = initial_model.state_dict()
    print("--- Ponto de partida criado. A iniciar as 11 simulações. ---")
    
    
    configurations = []
    for n_active in range(TOTAL_QUERY_BATCH_SIZE + 1): # De 0 a 10
        n_passive = TOTAL_QUERY_BATCH_SIZE - n_active
        config_name = f"({n_passive}P + {n_active}A)"
        configurations.append((config_name, n_passive, n_active))
    
    results = {}

    print("\nAVISO: As 11 simulações vão começar. Isto pode demorar várias horas.")
    start_time_total = time.time()

    for name, n_p, n_a in configurations:
        start_time_sim = time.time()
        
        history = run_simulation(
            name, n_p, n_a, 
            X_pool, y_pool, X_test_tensor, y_test_tensor,
            initial_indices, initial_model_state
        )
        results[name] = history
        print(f"Tempo da simulação {name}: {(time.time() - start_time_sim)/60:.2f} minutos")

    print(f"\n--- EXPERIÊNCIA COMPLETA CONCLUÍDA ---")
    print(f"Tempo total: {(time.time() - start_time_total)/60:.2f} minutos")

    
    plt.figure(figsize=(14, 9))
    
    for name, history in results.items():
        if history:
            x_data, y_data = zip(*history)
            plt.plot(x_data, y_data, 'o-', label=name, markersize=4, alpha=0.8)

    plt.axhline(y=TARGET_ACCURACY, color='gray', linestyle=':', label=f"{TARGET_ACCURACY * 100}% Target")
    plt.xlabel("Número Total de Rótulos")
    plt.ylabel("Precisão no Conjunto de Teste")
    plt.title("Comparação de Estratégias de Active Learning (Qiskit+PyTorch)")
    plt.legend(title="Estratégia (Passivo + Ativo)")
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.show()

Loading and preprocessing data...
Data processed succecfully. Pool: 10346 samples. Test: 4434 samples.

--- A criar o ponto de partida comum (Treino Inicial) ---


  estimator = Estimator()
  qnn = EstimatorQNN(


KeyboardInterrupt: 