In [None]:
import os
import platform
import tensorflow as tf

tf_version = tf.__version__
keras_version = tf.keras.__version__
python_version = platform.python_version()

print(f"TensorFlow version installed: {tf_version}")
print(f"Keras version installed: {keras_version}")

if tf_version != "2.10.1":
    raise ValueError(f"Incorrect TensorFlow version: Required 2.10.1, but found {tf_version}.")
if keras_version != "2.10.0":
    raise ValueError(f"Incorrect Keras version: Required version 2.10.0, but found {keras_version}.")
if python_version != "3.10.11":
    raise ValueError(f"Incorrect Python version: Required version 3.10.11, but found {python_version}.")

tf.data.experimental.enable_debug_mode()
import datetime
import tensorflow_addons as tfa
import pandas as pd
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np
import random
import pennylane.numpy as qnp
import warnings
import pennylane as qml
import inspect

In [None]:
seed_value = 42
def set_random_seed(seed=42):
    np.random.seed(seed)
    qnp.random.seed(seed)
    random.seed(seed)
    tf.random.set_seed(seed)
    tf.keras.utils.set_random_seed(seed)
    tf.config.experimental.enable_op_determinism()
    os.environ['PYTHONHASHSEED'] = str(seed)

set_random_seed(seed_value)

In [None]:
# 1. Load data
df = pd.read_csv("./abalone.csv")

# Define numeric columns for outlier detection
numeric_cols = ['Diameter', 'Length', 'Height',
                'Whole weight', 'Shucked weight',
                'Viscera weight', 'Shell weight']

# Remove outliers using IQR
Q1 = df[numeric_cols].quantile(0.25)
Q3 = df[numeric_cols].quantile(0.75)
IQR = Q3 - Q1

# Keep only rows without extreme outliers
mask = ~((df[numeric_cols] < (Q1 - 2 * IQR)) | (df[numeric_cols] > (Q3 + 2 * IQR))).any(axis=1)
df = df[mask].reset_index(drop=True)

# Prepare target and features
y_train = df['Rings'].values.reshape(-1, 1)
X_train = df.drop(columns=['Rings'])
X_train = pd.get_dummies(X_train, columns=["Sex"], drop_first=True, dtype=int)

# Fit scalers
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_train[numeric_cols] = scaler_X.fit_transform(X_train[numeric_cols])
y_train = scaler_y.fit_transform(y_train)

In [None]:
class QuantumKerasLayerWithDropout(qml.qnn.KerasLayer):
    def __init__(self, qnode, weight_shapes, output_dim, activation=None, **kwargs):
        self.activation_fn = tf.keras.activations.get(activation)
        super().__init__(qnode=qnode, weight_shapes=weight_shapes, output_dim=output_dim, **kwargs)
        
    def _signature_validation(self, qnode, weight_shapes):
        sig = inspect.signature(qnode.func).parameters
        param_kinds = [p.kind for p in sig.values()]
        
        if self.input_arg not in sig:
            raise ValueError(f"QNode must have an input parameter named {self.input_arg}")
        
        expected_weight_params = set(sig.keys()) - {self.input_arg, 'training_flag'}
        provided_weight_keys = set(weight_shapes.keys())
        
        if expected_weight_params != provided_weight_keys:
            missing = expected_weight_params - provided_weight_keys
            extra = provided_weight_keys - expected_weight_params
            err_msg = f"Mismatch between QNode parameters and weight_shapes keys.\n"
            err_msg += f" - QNode signature parameters (excluding input '{self.input_arg}' and 'training_flag'): {expected_weight_params}\n"
            err_msg += f" - Weight shape keys provided: {provided_weight_keys}\n"
            if missing: err_msg += f" - Missing weight shapes for: {missing}\n"
            if extra: err_msg += f" - Extra weight shapes provided for: {extra}\n"
            raise ValueError(f"Must specify a shape for every non-input parameter (excluding 'training_flag') in the QNode. {err_msg}")
        
        if inspect.Parameter.VAR_POSITIONAL in param_kinds:
            raise ValueError("QNode cannot have variable positional arguments")
   
    def call(self, inputs, training=None):
        if training is None:
            is_training_bool = tf.constant(False, dtype=tf.bool)
        else:
            is_training_bool = tf.cast(training, dtype=tf.bool)

        qnode_kwargs = {
            self.input_arg: inputs,
            **self.qnode_weights
        }

        if 'training_flag' in inspect.signature(self.qnode.func).parameters:
            qnode_kwargs['training_flag'] = is_training_bool
            
        output = self.qnode(**qnode_kwargs)
        
        if not isinstance(output, list) or not output:
            raise ValueError("QNode must return a list of measurements.")
        if len(output) > 1:
            output = tf.stack(output, axis=1)
        else:
            output = tf.expand_dims(output[0], axis=-1)
        if self.activation_fn is not None:
            output = self.activation_fn(output)
        return output
    
    def get_config(self):
        config = super().get_config()
        config.update({'activation': tf.keras.activations.serialize(self.activation_fn)})
        return config

In [None]:
def create_qnode(n_qubits, ansatz_type, embedding_type, quantum_dropout_rate, dropoutScaling):
    device_type = "default.qubit"
    if(device_type != "default.qubit"):
        warnings.warn(f"Device type '{device_type}' is not the expected 'default.qubit'. Potential compatibility issues.", UserWarning)
        
    dev = qml.device(device_type, wires=n_qubits, seed=seed_value)

    @qml.qnode(dev, interface='tf')
    def qnode(inputs, weights, training_flag):
        #tf.print("\ntraining_flag:", training_flag, summarize=-1)

        # Embedding
        if embedding_type == "AmplitudeEmbedding":
            qml.AmplitudeEmbedding(inputs, wires=range(n_qubits), normalize=True, pad_with=0.0)
        elif embedding_type == "AngleEmbedding":
            qml.AngleEmbedding(inputs, wires=range(n_qubits))

        n_layers = tf.shape(weights)[0]

        # Generate random values for dropout mask
        rand_vals = tf.random.uniform(tf.shape(weights), 0.0, 1.0, dtype=tf.float32)

        # Determine the mask based on the training_flag argument
        keep_mask = tf.cond(training_flag, lambda: rand_vals >= quantum_dropout_rate, lambda: tf.ones_like(rand_vals, dtype=tf.bool))
        #tf.print(f"\nkeep_mask:{keep_mask}")

        for layer_i in range(n_layers):
            if ansatz_type == 'BasicEntangler':
                for qubit_i in range(n_qubits):
                    angle = weights[layer_i, qubit_i]
                    current_mask_value = keep_mask[layer_i, qubit_i]
                    
                    if quantum_dropout_rate == 0.0:
                        angle_eff = angle
                    else:
                        angle_eff = tf.where(current_mask_value,angle,tf.constant(0.0, dtype=weights.dtype))
                        
                        angle_eff = tf.cond(
                            tf.logical_and(training_flag, dropoutScaling),
                            lambda: angle_eff / (1.0 - quantum_dropout_rate),
                            lambda: angle_eff
                        )
                    qml.RY(angle_eff, wires=qubit_i)
                for qubit_i in range(n_qubits - 1):
                    qml.CNOT(wires=[qubit_i, qubit_i + 1])

            elif ansatz_type == 'StronglyEntangling':
                current_weights_layer = weights[layer_i]
                current_mask_layer = keep_mask[layer_i]
                for qubit_i in range(n_qubits):
                    params = current_weights_layer[qubit_i]
                    mask_params = current_mask_layer[qubit_i]
                    
                    if quantum_dropout_rate == 0.0:
                        masked_params = params
                    else:
                        masked_params = tf.where(mask_params,params,tf.zeros_like(params, dtype=weights.dtype))
                        
                        masked_params = tf.cond(
                            tf.logical_and(training_flag, dropoutScaling),
                            lambda: masked_params / (1.0 - quantum_dropout_rate),
                            lambda: masked_params
                        )
                    #tf.print(f"params: {masked_params}")
                    qml.Rot(masked_params[0], masked_params[1], masked_params[2], wires=qubit_i)

                current_range = (layer_i % n_qubits) + 1

                for qubit_i in range(n_qubits):
                    target_qubit = (qubit_i + current_range) % n_qubits
                    if qubit_i != target_qubit:
                        qml.CNOT(wires=[qubit_i, target_qubit])
                    
        measurements = []

        measurements.extend([qml.expval(qml.PauliZ(i)) for i in range(n_qubits)])

        if n_qubits > 1:
            def next_qubit(i, n):
                return (i + 1) % n
            
            for i in range(n_qubits):
                j = next_qubit(i, n_qubits)
                measurements.append(qml.expval(qml.PauliZ(i) @ qml.PauliZ(j)))
        else:
            measurements.append(qml.expval(qml.PauliX(0)))

        return measurements

    return qnode

In [None]:
def create_model(weight_shapes, n_qubits, ansatz_type, num_classical_layers, num_classical_neurons, quantum_dropout_rate, embedding_type, input_dim, dropout_scaling):
    
    if input_dim < 1:
        raise ValueError("input_dim must be bigger than 0")
    if not 0.0 <= quantum_dropout_rate < 1.0:
        raise ValueError("quantum_dropout_rate must be between 0 (inclusive) and 1 (exclusive)")
    if not (1 <= n_qubits < 16):
        raise ValueError("n_qubits must be between 1 and 15")
    if embedding_type not in ["AmplitudeEmbedding", "AngleEmbedding"]:
        raise ValueError("Please use a valid embedding_type ('AmplitudeEmbedding' or 'AngleEmbedding')")
    if n_qubits >= 9 and embedding_type == "AmplitudeEmbedding":
        raise ValueError(f"AmplitudeEmbedding requires 2^{n_qubits} features. For n_qubits={n_qubits}, this is {2**n_qubits}, which might be too large.")
    if ansatz_type not in ["BasicEntangler", "StronglyEntangling"]:
        raise ValueError("Please use a valid ansatz_type ('BasicEntangler' or 'StronglyEntangling')")
    if "weights" not in weight_shapes:
        raise ValueError(f"Missing 'weights' in weight_shapes for {ansatz_type}")
    if len(num_classical_neurons) != num_classical_layers:
        raise ValueError("Length of num_classical_neurons must match num_classical_layers")
    if (ansatz_type == 'StronglyEntangling' and len(weight_shapes["weights"]) != 3) or (ansatz_type == "BasicEntangler" and len(weight_shapes["weights"]) != 2):
        raise ValueError(f"Invalid weight_shapes for {ansatz_type}")
    
    tf.keras.backend.clear_session()
    set_random_seed(seed_value)
    
    qnode_instance = create_qnode(n_qubits, ansatz_type, embedding_type, quantum_dropout_rate, dropout_scaling)
    quantum_layer = QuantumKerasLayerWithDropout(qnode_instance, weight_shapes, output_dim=2*n_qubits) #2*n_qubits because I am doing 2 measurements per qubit
    
    model = tf.keras.Sequential(name=f"QuantumModel_{ansatz_type}_{embedding_type}_dropout{quantum_dropout_rate}_qubits{n_qubits}_depth{weight_shapes['weights'][0]}")
    
    model.add(tf.keras.layers.Input(shape=(input_dim,), name="Input"))
    
    if embedding_type == "AmplitudeEmbedding":
        dense_units = 2**n_qubits
        model.add(tf.keras.layers.Dense(dense_units, name="Embedding_Dense"))
    else: 
        dense_units = n_qubits
        model.add(tf.keras.layers.Dense(dense_units, name="Embedding_Dense"))
        
    model.add(quantum_layer)

    for i in range(num_classical_layers):
        units = num_classical_neurons[i]
        model.add(tf.keras.layers.Dense(units, kernel_initializer=tf.keras.initializers.HeNormal(), name=f"Classical_Dense_{i+1}"))
        model.add(tf.keras.layers.ReLU())
    
    model.add(tf.keras.layers.Dense(1, name="Output_Dense"))
    return model

In [None]:
def count_lines(filename):
    try:
        with open(filename, 'r') as f:
            line_count = sum(1 for _ in f)
        return line_count
    except FileNotFoundError:
        return 0

In [None]:
CV = 8
learning_rate = 0.0015
num_classical_neurons = [8]
num_classical_layers = len(num_classical_neurons)

filename = "dataAbalone.txt"

model_num = 0
num_models_tested = count_lines(filename)

print(f"Number of models tested: {num_models_tested}")

In [None]:
val_r2_scores = []
train_r2_scores = []
param_count = 0

for qubits in range(3,7):
    for depth in range(3,7):
        for dropout in [0.0, 0.1, 0.25]:
            for ansatz in ['StronglyEntangling']:
                for emb in ["AngleEmbedding", "AmplitudeEmbedding"]:
                    for scaling in [True, False]:
                        if(model_num < num_models_tested):
                            model_num += 1
                            continue

                        if scaling == False and dropout == 0.0: #If dropout is 0.0, dropout scaling doesnt change the model
                            print_info = f'{emb}, {ansatz}, Dropout Scaling: {scaling}, Qubits: {qubits}, Quantum Depth: {depth}, Dropout Rate: {dropout}, Train R2: {np.mean(train_r2_scores)}, Val R2: {np.mean(val_r2_scores)}, Quantum Params: {3*qubits*depth}, Total Params: {param_count}'
                            with open(filename, "a") as file:
                                file.write(f"{print_info}\n")
                            model_num += 1
                            print(f"{print_info}")
                            continue

                        weight_shapes = {
                            "weights": (depth, qubits) if ansatz=="BasicEntangler" else (depth, qubits, 3), #3 is the default number of parameters per qubit for StronglyEntangling
                        }

                        kf = KFold(n_splits=CV, shuffle=True, random_state=seed_value)

                        train_r2_scores.clear()
                        val_r2_scores.clear()
                        
                        print(f'\n{emb}, {ansatz}, dropout scaling: {scaling}, qubits: {qubits}, quantum depth: {depth}, dropout rate: {dropout}\n')
                        param_count = 0
                        
                        for fold_number, (train_index, val_index) in enumerate(kf.split(X_train, y_train)):
                            X_train_cv, X_val_cv = X_train.iloc[train_index], X_train.iloc[val_index]
                            y_train_cv, y_val_cv = y_train[train_index], y_train[val_index]

                            model = create_model(
                                quantum_dropout_rate=dropout,
                                n_qubits=qubits,
                                weight_shapes=weight_shapes,
                                ansatz_type=ansatz,
                                num_classical_layers=num_classical_layers,
                                num_classical_neurons=num_classical_neurons,
                                embedding_type=emb,
                                input_dim=len(X_train_cv.columns),
                                dropout_scaling=scaling
                            )

                            if(fold_number == 0):
                                model.summary()

                            model.compile(
                                optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                                loss="mse",
                                metrics=[tfa.metrics.RSquare(name='r2_score')]
                            )

                            early_stopping = tf.keras.callbacks.EarlyStopping(
                                monitor='val_loss',
                                patience=6,
                                mode='min',
                                restore_best_weights=True,
                                verbose=0
                            )
                            
                            with tf.device('/CPU:0'):
                                model.fit(
                                    X_train_cv, y_train_cv,
                                    validation_data=(X_val_cv, y_val_cv),
                                    epochs=12,
                                    batch_size=48,
                                    verbose=1,
                                    callbacks=[early_stopping],
                                    shuffle=True
                                )
                            
                            param_count = model.count_params()
                            
                            y_train_pred = model.predict(X_train_cv)
                            y_val_pred = model.predict(X_val_cv)
                            
                            train_r2 = r2_score(y_train_cv, y_train_pred)
                            val_r2 = r2_score(y_val_cv, y_val_pred)

                            train_r2_scores.append(train_r2)
                            val_r2_scores.append(val_r2)

                            print(f"{datetime.datetime.now()} (fold: {fold_number + 1}/{CV}) Train R2: {train_r2:.4f}, Val R2: {val_r2:.4f}")

                        print_info = f'{emb}, {ansatz}, Dropout Scaling: {scaling}, Qubits: {qubits}, Quantum Depth: {depth}, Dropout Rate: {dropout}, Train R2: {np.mean(train_r2_scores)}, Val R2: {np.mean(val_r2_scores)}, Quantum Params: {3*qubits*depth}, Total Params: {param_count}'
                        with open(filename, "a") as file:
                            file.write(f"{print_info}\n")
                        model_num += 1
                        print(f"{print_info}")