# Hybrid Quantum-Classical Classification using a 1D Convolutional Autoencoder

This notebook implements an advanced feature extraction pipeline for a hybrid quantum-classical classifier. It tests the hypothesis that a 1D Convolutional Autoencoder can effectively learn salient features from a flattened representation of medical images.

**Key Features**:
- **Direct Image Flattening**: Images are transformed into 1D vectors.
- **1D Convolutional Autoencoder**: A custom autoencoder architecture with `Conv1D` layers is trained to learn a compressed, non-linear representation of the flattened vectors. This replaces PCA.
- **Two-Stage Training**: The workflow first trains the autoencoder to learn features, then uses those features to train the final QNN classifier.
- **Structured & Modular**: The code is organized into classes for data management, model training, and experiment execution.

## 1. Imports

In [8]:
import os
import sys
import csv
import time
import random
from datetime import datetime
from typing import Tuple, Dict, Any

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import pennylane as qml
import joblib

from PIL import Image
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, recall_score, f1_score

# Import custom utility libraries
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "../src")))
from plot import PlotUtils

## 2. Configuration

In [9]:
# <<< MODIFICADO: Configuración para el experimento con Autoencoder (sin Hilbert) >>>
CONFIG = {
    "data": {
        "base_url": "../data/dataset_binary/",
        "training_url": "../data/dataset_binary/Training/",
        "testing_url": "../data/dataset_binary/Testing/",
        "class_map": {"tumor": 0, "no_tumor": 1},
        "image_size": 128,
    },
    "results": {
        "graphics_url": '../results/graphics/',
        "csv_url": '../results/csv/',
        "models_url": '../models/',
        "csv_name": 'NO_HILBERT_AUTOENCODER_BC_results_log.csv',
        "loss_plot_name": "NO_HILBERT_AUTOENCODER_BC_loss_plot.png",
        "confusion_matrix_name": "NO_HILBERT_AUTOENCODER_BC_confusion_matrix.png",
        "loss_vs_accuracy_name": "NO_HILBERT_AUTOENCODER_BC_loss_vs_accuracy.png",
    },
    "autoencoder": {
        "latent_dim": 12, # Esta es la nueva dimensión de características
        "epochs": 100,     # Entrenar el AE puede requerir menos épocas
        "lr": 0.001,
        "batch_size": 32
    },
    "classifier": {
        "epochs": 200,
        "lr": 0.05,
        "layers": 3,
        "batch_size": 64,
    },
    "model": {
        "n_qubits": 12, # Debe coincidir con latent_dim
    },
    "seed": 42
}

# Ensure result directories exist
os.makedirs(CONFIG['results']['graphics_url'], exist_ok=True)
os.makedirs(CONFIG['results']['csv_url'], exist_ok=True)
os.makedirs(CONFIG['results']['models_url'], exist_ok=True)

## 3. Autoencoder Architecture
This cell defines the 1D Convolutional Autoencoder architecture. The encoder compresses the high-dimensional flattened vector into a low-dimensional latent space, and the decoder reconstructs it.

In [10]:
# <<< Definición del Autoencoder Convolucional 1D >>>
class Conv1DAutoencoder(nn.Module):
    def __init__(self, latent_dim: int):
        super().__init__()
        self.latent_dim = latent_dim
        
        # Encoder: De 16384 -> latent_dim
        self.encoder = nn.Sequential(
            # Entrada: (batch, 1, 16384)
            nn.Conv1d(1, 16, kernel_size=7, stride=2, padding=3), # -> (batch, 16, 8192)
            nn.ReLU(),
            nn.MaxPool1d(2, stride=2), # -> (batch, 16, 4096)
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2), # -> (batch, 32, 2048)
            nn.ReLU(),
            nn.MaxPool1d(2, stride=2), # -> (batch, 32, 1024)
            nn.Conv1d(32, 64, kernel_size=3, stride=2, padding=1), # -> (batch, 64, 512)
            nn.ReLU(),
            nn.MaxPool1d(2, stride=2), # -> (batch, 64, 256)
            nn.Flatten(), # -> (batch, 64 * 256)
            nn.Linear(64 * 256, latent_dim)
        )
        
        # Decoder: De latent_dim -> 16384
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64 * 256),
            nn.Unflatten(1, (64, 256)), # -> (batch, 64, 256)
            nn.ConvTranspose1d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1), # -> (batch, 32, 512)
            nn.ReLU(),
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1), # -> (batch, 16, 1024)
            nn.ReLU(),
            nn.ConvTranspose1d(16, 8, kernel_size=5, stride=4, padding=2, output_padding=3), # -> (batch, 8, 4096)
            nn.ReLU(),
            nn.ConvTranspose1d(8, 1, kernel_size=7, stride=4, padding=2, output_padding=1), # -> (batch, 1, 16384)
            nn.Sigmoid() # Salida entre 0 y 1, igual que la data normalizada
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

    def encode(self, x):
        return self.encoder(x)

## 4. DataManager Class
This class handles all data loading and preprocessing tasks, including flattening the images.

In [11]:
# <<< MODIFICADO: DataManager para aplanar imágenes directamente >>>
class DataManager:
    """Handles loading and flattening of the image dataset."""
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.seed = config['seed']
        random.seed(self.seed)
        np.random.seed(self.seed)

    def _load_images_from_dir(self, data_dir: str) -> Tuple[np.ndarray, np.ndarray]:
        X, y = [], []
        class_map = self.config['data']['class_map']
        image_size = (self.config['data']['image_size'], self.config['data']['image_size'])
        for class_name, label in class_map.items():
            class_dir = os.path.join(data_dir, class_name)
            if not os.path.isdir(class_dir): continue
            for f in os.listdir(class_dir):
                if f.lower().endswith('.jpg'):
                    img = Image.open(os.path.join(class_dir, f)).convert('L').resize(image_size)
                    X.append(np.array(img))
                    y.append(label)
        return np.stack(X), np.array(y)

    def get_flattened_data(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Loads, normalizes, and flattens data."""
        X_train_raw, y_train = self._load_images_from_dir(self.config['data']['training_url'])
        X_test_raw, y_test = self._load_images_from_dir(self.config['data']['testing_url'])
        
        X_train_normalized = X_train_raw.astype(np.float32) / 255.0
        X_test_normalized = X_test_raw.astype(np.float32) / 255.0

        print("Flattening images...")
        # Aplanar las imágenes a vectores 1D
        X_train_flat = X_train_normalized.reshape(X_train_normalized.shape[0], -1)
        X_test_flat = X_test_normalized.reshape(X_test_normalized.shape[0], -1)
        
        return X_train_flat, y_train, X_test_flat, y_test

## 5. QuantumTrainer Class
This class defines the hybrid quantum-classical model architecture, the training loop, and evaluation logic. It receives the final processed features from the autoencoder.

In [12]:
class QuantumTrainer:
    """Defines and trains the hybrid quantum-classical model."""
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.hp = config['classifier']
        self.model_config = config['model']
        self.model = self._build_model()

    def _build_model(self) -> nn.Module:
        n_qubits = self.model_config['n_qubits']
        layers = self.hp['layers']
        n_features = self.config['autoencoder']['latent_dim']

        dev = qml.device("lightning.qubit", wires=n_qubits)

        @qml.qnode(dev)
        def qnode(inputs, weights):
            num_circuit_qubits = n_qubits // 2
            qml.AngleEmbedding(inputs, wires=range(num_circuit_qubits))
            qml.BasicEntanglerLayers(weights, wires=range(num_circuit_qubits))
            return [qml.expval(qml.PauliZ(wires=i)) for i in range(num_circuit_qubits)]
        
        weight_shapes = {"weights": (layers, n_qubits // 2)}

        class HybridModel(nn.Module):
            def __init__(self, input_features):
                super().__init__()
                self.input_features = input_features
                self.clayer_1 = nn.Linear(input_features, input_features)
                self.qlayer_1 = qml.qnn.TorchLayer(qnode, weight_shapes)
                self.qlayer_2 = qml.qnn.TorchLayer(qnode, weight_shapes)
                self.clayer_2 = nn.Linear(input_features, input_features)
                self.final_layer = nn.Linear(input_features, 1)

            def forward(self, x):
                x = self.clayer_1(x)
                x_1, x_2 = torch.split(x, self.input_features // 2, dim=1)
                x_1 = self.qlayer_1(x_1)
                x_2 = self.qlayer_2(x_2)
                x = torch.cat([x_1, x_2], dim=1)
                x = self.clayer_2(x)
                x = self.final_layer(x)
                return x

        return HybridModel(input_features=n_features)

    def train_and_evaluate(self, data: Dict[str, np.ndarray]) -> Dict[str, Any]:
        x_train_t = torch.tensor(data['x_train'], dtype=torch.float32)
        y_train_t = torch.tensor(data['y_train'], dtype=torch.float32).unsqueeze(1)
        x_test_t = torch.tensor(data['x_test'], dtype=torch.float32)
        y_test_t = torch.tensor(data['y_test'], dtype=torch.float32).unsqueeze(1)

        train_loader = DataLoader(TensorDataset(x_train_t, y_train_t), batch_size=self.hp['batch_size'], shuffle=True)
        
        optimizer = torch.optim.SGD(self.model.parameters(), lr=self.hp['lr'])
        loss_fn = nn.BCEWithLogitsLoss()

        epoch_results = []
        loss_history, accuracy_history = [], []
        print("--------------- CLASSIFIER EPOCHS --------------------")

        for epoch in range(self.hp['epochs']):
            self.model.train()
            running_loss = 0.0
            for xb, yb in train_loader:
                optimizer.zero_grad()
                loss = loss_fn(self.model(xb), yb)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
            
            avg_loss = running_loss / len(train_loader)
            loss_history.append(avg_loss)
            print(f"Epoch {epoch + 1}/{self.hp['epochs']} - Loss: {avg_loss:.4f}")

            self.model.eval()
            with torch.no_grad():
                outputs = self.model(x_test_t)
                probs = torch.sigmoid(outputs)
                preds = (probs >= 0.5).int()
                
                acc = (preds == y_test_t.int()).sum().item() / len(y_test_t)
                accuracy_history.append(acc)
                y_true_np, preds_np = y_test_t.numpy(), preds.numpy()
                
                f1 = f1_score(y_true_np, preds_np)
                recall = recall_score(y_true_np, preds_np)
                cm = confusion_matrix(y_true_np, preds_np)

                print(f"Accuracy: {acc:.4f} - F1: {f1:.4f} - Recall: {recall:.4f}")
                print("---------------------------------------------------")
                epoch_results.append((acc, f1, recall, epoch + 1, self.model.state_dict(), avg_loss, cm))

        epoch_results.sort(key=lambda x: x[0], reverse=True)
        best_acc, best_f1, best_recall, best_ep, best_state_dict, best_loss, best_cm = epoch_results[0]
        
        best_model_filename = self._save_model(best_state_dict, best_ep, best_acc)

        return {
            'best_accuracy': best_acc, 'best_f1': best_f1, 'best_recall': best_recall,
            'best_loss': best_loss, 'best_epoch': best_ep, 'confusion_matrix': best_cm,
            'model_filename': best_model_filename, 'loss_history': loss_history,
            'accuracy_history': accuracy_history
        }
    
    def _save_model(self, model_state_dict, epoch, accuracy) -> str:
        save_dir = self.config['results']['models_url']
        filename = f"NO_HILBERT_AUTOENCODER_BC_model_epoch_{epoch}_acc_{accuracy:.4f}.pt"
        filepath = os.path.join(save_dir, filename)
        torch.save(model_state_dict, filepath)
        print(f"Best classifier model saved to: {filepath}")
        return filename

    def save_preprocessing_artifacts(self, artifacts: Dict[str, Any], model_filename: str):
        save_dir = self.config['results']['models_url']
        base_filename = os.path.splitext(model_filename)[0]
        
        # Guardar el encoder entrenado y el scaler
        torch.save(artifacts['autoencoder_encoder'].state_dict(), f"{save_dir}{base_filename}_encoder.pt")
        print(f"Encoder artifact saved to: {save_dir}{base_filename}_encoder.pt")
        
        joblib.dump(artifacts['scaler_angle'], f"{save_dir}{base_filename}_scaler_angle.pkl")
        print(f"Scaler_angle artifact saved to: {save_dir}{base_filename}_scaler_angle.pkl")

## 6. ExperimentRunner Class
This class orchestrates the two-stage experiment: first training the autoencoder, then using its encoder to train the final classifier.

In [13]:
# <<< MODIFICADO: ExperimentRunner para el flujo sin Hilbert >>>
class ExperimentRunner:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"Using device: {self.device}")

    def _train_autoencoder(self, X_train_flat: np.ndarray) -> Conv1DAutoencoder:
        print("\n--- STAGE 1: Training Autoencoder ---")
        ae_config = self.config['autoencoder']
        model = Conv1DAutoencoder(latent_dim=ae_config['latent_dim']).to(self.device)
        optimizer = torch.optim.Adam(model.parameters(), lr=ae_config['lr'])
        loss_fn = nn.MSELoss()
        
        X_train_tensor = torch.tensor(X_train_flat, dtype=torch.float32).unsqueeze(1)
        train_loader = DataLoader(X_train_tensor, batch_size=ae_config['batch_size'], shuffle=True)
        
        for epoch in range(ae_config['epochs']):
            running_loss = 0.0
            for data in train_loader:
                batch = data.to(self.device)
                optimizer.zero_grad()
                reconstructed = model(batch)
                loss = loss_fn(reconstructed, batch)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
            
            avg_loss = running_loss / len(train_loader)
            print(f"Autoencoder Epoch {epoch + 1}/{ae_config['epochs']} - Reconstruction Loss: {avg_loss:.6f}")
        
        print("--- Autoencoder training finished ---")
        return model

    def run(self):
        start_time = time.time()
        
        # 1. Preparar Datos (Aplanado directo)
        data_manager = DataManager(self.config)
        X_train_flat, y_train, X_test_flat, y_test = data_manager.get_flattened_data()
        self._log_class_distribution(y_train, "training data")
        self._log_class_distribution(y_test, "test data")

        # 2. Entrenar Autoencoder
        autoencoder = self._train_autoencoder(X_train_flat)
        
        # 3. Extraer Características (Encoder)
        print("\n--- Extracting features using the trained encoder ---")
        scaler_angle = MinMaxScaler(feature_range=(0, np.pi / 2))
        with torch.no_grad():
            autoencoder.eval()
            X_train_t = torch.tensor(X_train_flat, dtype=torch.float32).unsqueeze(1).to(self.device)
            X_test_t = torch.tensor(X_test_flat, dtype=torch.float32).unsqueeze(1).to(self.device)
            
            x_train_encoded = autoencoder.encode(X_train_t).cpu().numpy()
            x_test_encoded = autoencoder.encode(X_test_t).cpu().numpy()

        x_train = scaler_angle.fit_transform(x_train_encoded)
        x_test = scaler_angle.transform(x_test_encoded)
        
        data = {"x_train": x_train, "y_train": y_train, "x_test": x_test, "y_test": y_test}
        artifacts = {"autoencoder_encoder": autoencoder.encoder, "scaler_angle": scaler_angle}

        # 4. Entrenar Clasificador QNN
        print("\n--- STAGE 2: Training QNN Classifier ---")
        trainer = QuantumTrainer(self.config)
        results = trainer.train_and_evaluate(data)
        
        # 5. Guardar artefactos y loggear
        print("\n3. Saving artifacts and logging results...")
        trainer.save_preprocessing_artifacts(artifacts, results['model_filename'])
        
        end_time = time.time()
        duration = end_time - start_time
        results['execution_time'] = duration
        
        self._generate_plots(results)
        self._log_to_csv(results)
        
        print(f"\n--- Experiment finished in {duration:.2f} seconds ---")

    def _log_class_distribution(self, y_data: np.ndarray, data_type: str):
        unique_classes, counts = np.unique(y_data, return_counts=True)
        print(f"Distribution of classes in {data_type}:")
        for cls, count in zip(unique_classes, counts):
            class_name = [k for k, v in self.config['data']['class_map'].items() if v == cls][0]
            print(f"  Class '{class_name}' ({cls}): {count} samples")

    def _log_to_csv(self, results: Dict[str, Any]):
        csv_path = os.path.join(self.config['results']['csv_url'], self.config['results']['csv_name'])
        file_exists = os.path.isfile(csv_path)
        
        header = [
            'date', 'execution_time_seconds', 'epochs', 'learning_rate', 
            'features', 'layers', 'batch_size', 'loss', 'accuracy', 
            'recall', 'f1_score', 'best_epoch', 'model_filename', 'image_size'
        ]
        
        row = [
            datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            f"{results.get('execution_time', 0):.2f}",
            self.config['classifier']['epochs'], self.config['classifier']['lr'], self.config['autoencoder']['latent_dim'], 
            self.config['classifier']['layers'], self.config['classifier']['batch_size'],
            f"{results.get('best_loss', 0):.4f}", f"{results.get('best_accuracy', 0):.4f}",
            f"{results.get('best_recall', 0):.4f}", f"{results.get('best_f1', 0):.4f}",
            results.get('best_epoch', 0), results.get('model_filename', ''),
            self.config['data']['image_size']
        ]

        with open(csv_path, 'a', newline='') as f:
            writer = csv.writer(f)
            if not file_exists:
                writer.writerow(header)
            writer.writerow(row)

        print(f"\nResults logged to {csv_path}")
        print("Run summary:")
        print(dict(zip(header, row)))
        
    def _generate_plots(self, results: Dict[str, Any]):
        graphics_dir = self.config['results']['graphics_url']
        class_names = list(self.config['data']['class_map'].keys())
    
        PlotUtils.plot_loss(results['loss_history'], save_path=os.path.join(graphics_dir, self.config['results']['loss_plot_name']))
        PlotUtils.plot_confusion_matrix(results['confusion_matrix'], class_names=class_names, save_path=os.path.join(graphics_dir, self.config['results']['confusion_matrix_name']))
        PlotUtils.plot_loss_vs_accuracy(results['loss_history'], results['accuracy_history'], save_path=os.path.join(graphics_dir, self.config['results']['loss_vs_accuracy_name']))
        print("\nAll plots have been generated and saved.")

## 7. Run Experiment

In [14]:
if __name__ == "__main__":
    runner = ExperimentRunner(CONFIG)
    runner.run()

Using device: cpu
Flattening images...
Distribution of classes in training data:
  Class 'tumor' (0): 1998 samples
  Class 'no_tumor' (1): 2000 samples
Distribution of classes in test data:
  Class 'tumor' (0): 249 samples
  Class 'no_tumor' (1): 250 samples

--- STAGE 1: Training Autoencoder ---
Autoencoder Epoch 1/100 - Reconstruction Loss: 0.052809
Autoencoder Epoch 2/100 - Reconstruction Loss: 0.029682
Autoencoder Epoch 3/100 - Reconstruction Loss: 0.026336
Autoencoder Epoch 4/100 - Reconstruction Loss: 0.023500
Autoencoder Epoch 5/100 - Reconstruction Loss: 0.021941
Autoencoder Epoch 6/100 - Reconstruction Loss: 0.021130
Autoencoder Epoch 7/100 - Reconstruction Loss: 0.020585
Autoencoder Epoch 8/100 - Reconstruction Loss: 0.020128
Autoencoder Epoch 9/100 - Reconstruction Loss: 0.019726
Autoencoder Epoch 10/100 - Reconstruction Loss: 0.019408
Autoencoder Epoch 11/100 - Reconstruction Loss: 0.019135
Autoencoder Epoch 12/100 - Reconstruction Loss: 0.018905
Autoencoder Epoch 13/100 - 