In [None]:
import pandas as pd, numpy as np, pennylane as qml, tensorflow as tf
from tensorflow.keras.layers import Lambda, Dropout
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import roc_curve, auc

from pennylane.templates import AmplitudeEmbedding, StronglyEntanglingLayers

#-----------------------------------------------------------------------------------------------------
#1. Data Collection and Preparation
#-----------------------------------------------------------------------------------------------------

# Set TensorFlow to use float32 and eager execution for better quantum debugging
tf.keras.backend.set_floatx("float32")
tf.config.run_functions_eagerly(True)

# Hyperparameters for model setup
CSV_PATH   = "C:\\Users\\fenlei\\Downloads\\machinelearning_data_EEG.csv"
N_QUBITS   = 3                # 2**4 = 16 amplitudes
AMP_DIM    = 2 ** N_QUBITS
N_LAYERS   = 3
BATCH_SIZE = 32
EPOCHS     = 20
VAL_SPLIT  = 0.1

# Load and preprocess the dataset
df = pd.read_csv(CSV_PATH).dropna(axis=1, how="all")
target = df.select_dtypes(include="object").columns[0]
y = LabelEncoder().fit_transform(df[target]).astype("float32")

# Dimensionality reduction to match quantum circuit input
X = df.drop(columns=[target]).select_dtypes("number")
X = SimpleImputer(strategy="mean").fit_transform(X)
X = StandardScaler().fit_transform(X).astype("float32")
X = PCA(n_components=AMP_DIM).fit_transform(X).astype("float32")   # 16 feats

# Split data into train/test sets
X_tr, X_test, y_tr, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
    X_tr, y_tr, test_size=VAL_SPLIT, stratify=y_tr, random_state=42
)

# Compute class weights for imbalanced datasets
class_weight = dict(enumerate(
    compute_class_weight("balanced", classes=np.unique(y_train), y=y_train)
))

#-----------------------------------------------------------------------------------------------------
#2. Quantum Layers
#-----------------------------------------------------------------------------------------------------

# Define a custom quantum layer
class QNNLayer(tf.keras.layers.Layer):
    def __init__(self, n_qubits, n_layers):
        super().__init__(dtype=tf.float32)
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        
        # Initialize weights
        init_weights = tf.random.uniform(
            shape=(n_layers, n_qubits, 3),
            minval=0,
            maxval=2*np.pi,
            dtype=tf.float32
        )
        self.qweights = tf.Variable(init_weights, dtype=tf.float32, trainable=True)
        
        # Define quantum circuit
        dev = qml.device("default.qubit", wires=n_qubits)
        
        @qml.qnode(dev, interface="tf", diff_method="parameter-shift")
        def circuit(inputs, weights):
            # Let the AmplitudeEmbedding handle normalization automatically.
            AmplitudeEmbedding(inputs, wires=range(n_qubits), normalize=True)
            StronglyEntanglingLayers(weights, wires=range(n_qubits))
            return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]
        
        self.circuit = circuit
    
    def call(self, inputs):
        # We'll process one sample at a time
        batch_size = tf.shape(inputs)[0]
        output_tensor = tf.TensorArray(tf.float32, size=batch_size)
        
        for i in range(batch_size):
            # Process single input
            single_input = tf.cast(inputs[i], tf.float32)
            # Manual normalization before quantum circuit
            single_result = self.circuit(single_input, self.qweights)
            single_result = tf.cast(single_result, tf.float32)
            output_tensor = output_tensor.write(i, single_result)
        
        return output_tensor.stack()
    
    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.n_qubits)
    
#-----------------------------------------------------------------------------------------------------
#3. Building a Hybrid Classical-Quantum Model
#   a. Classical layers preprocess EEG features.
#   b. Quantum layer (QNNLayer) transforms features with quantum operations.
#   c. Final classical layer makes the binary prediction.
#-----------------------------------------------------------------------------------------------------

# Hybrid classical-quantum model
inp = tf.keras.Input(shape=(AMP_DIM,), dtype=tf.float32)
x   = tf.keras.layers.Dense(32, activation="relu", dtype=tf.float32)(inp)
x   = Dropout(0.2)(x)
x   = tf.keras.layers.Dense(AMP_DIM, dtype=tf.float32)(x)

# Use the simplified QNN layer
x   = QNNLayer(N_QUBITS, N_LAYERS)(x)
out = tf.keras.layers.Dense(1, activation="sigmoid", dtype=tf.float32)(x)

model = tf.keras.Model(inp, out)

# Use a standard optimizer with a small learning rate
optimizer = tf.keras.optimizers.Adam(learning_rate=5e-4, epsilon=1e-8)

model.compile(optimizer=optimizer,
              loss="binary_crossentropy",
              metrics=["accuracy", tf.keras.metrics.AUC(name="auc")])

#-----------------------------------------------------------------------------------------------------
#4. Training Models
#-----------------------------------------------------------------------------------------------------

# Convert all tensors explicitly to float32 to be ready for training
x_train_tensor = tf.convert_to_tensor(X_train, dtype=tf.float32)
y_train_tensor = tf.convert_to_tensor(y_train, dtype=tf.float32)
x_val_tensor = tf.convert_to_tensor(X_val, dtype=tf.float32)
y_val_tensor = tf.convert_to_tensor(y_val, dtype=tf.float32)

# Print model summary before training
model.summary()

# Trying implicit normalization in amplitude embedding
try:
    print("\nAttempting standard training...")
    model.fit(
        x_train_tensor, y_train_tensor,
        validation_data=(x_val_tensor, y_val_tensor),
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        class_weight=class_weight
    )
    
    # Evaluate on test set
    test_metrics = model.evaluate(X_test, y_test, batch_size=BATCH_SIZE)
    print(f"Test loss: {test_metrics[0]:.4f}")
    print(f"Test accuracy: {test_metrics[1]:.4f}")
    print(f"Test AUC: {test_metrics[2]:.4f}")
    
    # Get ROC curve
    y_prob = model.predict(X_test, batch_size=BATCH_SIZE).flatten()
    fpr, tpr, _ = roc_curve(y_test, y_prob)
    test_auc = auc(fpr, tpr)
    print(f"Test AUC (from ROC): {test_auc:.3f}")
# Manual normalization
except Exception as e:
    print(f"\nStandard training failed with error: {str(e)}")
    print("\nFalling back to custom training...")
    
    # If standard training fails, try a custom training loop
    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-4, epsilon=1e-8)
    
    # Custom training loop with manual batch processing
    for epoch in range(EPOCHS):
        print(f"Epoch {epoch+1}/{EPOCHS}")
        
        # Shuffle training data
        indices = tf.range(start=0, limit=tf.shape(x_train_tensor)[0], dtype=tf.int32)
        shuffled_indices = tf.random.shuffle(indices)
        x_shuffled = tf.gather(x_train_tensor, shuffled_indices)
        y_shuffled = tf.gather(y_train_tensor, shuffled_indices)
        
        # Process in batches
        total_loss = 0
        num_batches = 0
        
        for i in range(0, len(x_shuffled), BATCH_SIZE):
            end_idx = min(i + BATCH_SIZE, len(x_shuffled))
            x_batch = x_shuffled[i:end_idx]
            y_batch = y_shuffled[i:end_idx]
            
            with tf.GradientTape() as tape:
                # Forward pass
                y_pred = model(x_batch, training=True)
                
                # Calculate loss with class weights
                loss_values = tf.keras.losses.binary_crossentropy(y_batch, y_pred)
                sample_weights = tf.ones_like(y_batch)
                
                for cls, weight in class_weight.items():
                    cls_mask = tf.cast(tf.equal(y_batch, cls), tf.float32)
                    sample_weights = sample_weights * cls_mask * weight + (1 - cls_mask) * sample_weights
                
                weighted_loss = loss_values * sample_weights
                batch_loss = tf.reduce_mean(weighted_loss)
            
            # Compute and apply gradients
            grads = tape.gradient(batch_loss, model.trainable_variables)
            # Clip gradients to prevent explosion
            grads, _ = tf.clip_by_global_norm(grads, 5.0)
            # Force to float32
            grads = [tf.cast(g, tf.float32) if g is not None else None for g in grads]
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            
            total_loss += batch_loss
            num_batches += 1
            
        # Calculate average loss for epoch
        avg_loss = total_loss / num_batches
        print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {avg_loss:.4f}")
        
        # Validation
        val_pred = model.predict(x_val_tensor, verbose=0)
        val_loss = tf.reduce_mean(tf.keras.losses.binary_crossentropy(y_val_tensor, val_pred))
        val_acc = tf.reduce_mean(tf.cast(tf.equal(tf.round(val_pred), y_val_tensor), tf.float32))
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}")
    
#-----------------------------------------------------------------------------------------------------
#5. Evaluate Results
#-----------------------------------------------------------------------------------------------------

    # Evaluate on test set
    test_pred = model.predict(X_test, verbose=0)
    test_acc = tf.reduce_mean(tf.cast(tf.equal(tf.round(test_pred), y_test), tf.float32))
    print(f"Test Accuracy: {test_acc:.4f}")
    
    # ROC curve
    y_prob = test_pred.flatten()
    fpr, tpr, _ = roc_curve(y_test, y_prob)
    test_auc = auc(fpr, tpr)
    print(f"Test AUC: {test_auc:.3f}")
    # Save the Model
    model.save("QML1.h5")


Attempting standard training...
Epoch 1/20




[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m107s[0m 16s/step - accuracy: 0.5225 - auc: 0.5157 - loss: 0.6928 - val_accuracy: 0.6250 - val_auc: 0.6445 - val_loss: 0.6740
Epoch 2/20
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m91s[0m 13s/step - accuracy: 0.5882 - auc: 0.6164 - loss: 0.6849 - val_accuracy: 0.6667 - val_auc: 0.6523 - val_loss: 0.6728
Epoch 3/20
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m91s[0m 14s/step - accuracy: 0.5398 - auc: 0.5515 - loss: 0.7131 - val_accuracy: 0.6667 - val_auc: 0.6680 - val_loss: 0.6703
Epoch 4/20
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m191s[0m 30s/step - accuracy: 0.4920 - auc: 0.4873 - loss: 0.7050 - val_accuracy: 0.6667 - val_auc: 0.6758 - val_loss: 0.6697
Epoch 5/20
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m187s[0m 25s/step - accuracy: 0.5541 - auc: 0.6039 - loss: 0.6889 - val_accuracy: 0.6667 - val_auc: 0.6914 - val_loss: 0.6679
Epoch 6/20
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

In [19]:
confusion_matrix([int(y>0.5) for y in y_prob], y_test)

array([[26,  4],
       [14, 15]], dtype=int64)