In [1]:
import os
import numpy as np
import pandas as pd
from scipy.spatial.distance import pdist, squareform
from collections import Counter
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import pennylane as qml
import matplotlib.pyplot as plt


In [2]:
def extract_target_property(metadata):
    """
    Tries to extract a numerical target property from the last few lines of metadata.
    Assumes the target is the last valid float number.
    """
    target_property = np.nan  # Default to NaN in case no value is found
    
    # Traverse the metadata to find the last valid numerical value
    for line in reversed(metadata):
        for value in reversed(line.strip().split()):
            try:
                target_property = float(value)
                return target_property  # Return the first valid float found
            except ValueError:
                continue  # Skip non-numeric values
                
    return target_property  # Return NaN if no numeric value is found

In [3]:
# Directory containing the XYZ files
xyz_dir = "QM9/dsgdb9nsd.xyz"

def parse_xyz(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
        
        # Step 1: Get the number of atoms
        num_atoms = int(lines[0].strip())
        
        # Step 2: Extract atomic types and coordinates
        atom_types = []
        coordinates = []
        
        for i in range(2, 2 + num_atoms):
            parts = lines[i].strip().split()
            atom_types.append(parts[0])  # Atom symbol (C, H, O, etc.)
            coords = [float(part.replace('*^', 'e')) for part in parts[1:4]]
            coordinates.append(coords)
        
        # Step 3: Extract the metadata and target property
        metadata_start = 2 + num_atoms
        metadata = lines[metadata_start:]
        target_property = extract_target_property(metadata)
        
        return atom_types, np.array(coordinates), target_property


In [4]:
from scipy.spatial.distance import pdist, squareform
from collections import Counter

def compute_features(atom_types, coordinates):
    # Step 1: Count the number of atoms of each type (C, H, O)
    atom_counts = Counter(atom_types)
    num_c = atom_counts.get('C', 0)
    num_h = atom_counts.get('H', 0)
    num_o = atom_counts.get('O', 0)
    
    # Step 2: Compute pairwise distances between all atoms
    pairwise_distances = squareform(pdist(coordinates))
    
    # Step 3: Compute the mean and max pairwise distances
    mean_distance = np.mean(pairwise_distances)
    max_distance = np.max(pairwise_distances)
    
    # Step 4: Return the feature vector (you can extend this as needed)
    return np.array([num_c, num_h, num_o, mean_distance, max_distance])


In [5]:
def generate_synthetic_target(features):
    # Use a linear combination of features as a temporary target
    return 0.5 * features[3] + 0.3 * features[4]  # Mean distance + max distance (adjust as needed)


In [6]:
# Store parsed data
molecule_data = []

for file_name in os.listdir(xyz_dir):
    if file_name.endswith(".xyz"):
        file_path = os.path.join(xyz_dir, file_name)
        atom_types, coordinates, target_property = parse_xyz(file_path)
        features = compute_features(atom_types, coordinates)
        
        # If target property is NaN, generate a synthetic target
        if np.isnan(target_property):
            target_property = generate_synthetic_target(features)
        
        molecule_data.append({
            "file_name": file_name,
            "features": features,
            "target_property": target_property
        })

# Convert to DataFrame
df = pd.DataFrame(molecule_data)
print(df.head())


              file_name                                           features  \
0  dsgdb9nsd_000001.xyz  [1.0, 4.0, 0.0, 1.2053341170795622, 1.78315766...   
1  dsgdb9nsd_000002.xyz  [0.0, 3.0, 0.0, 0.9884404226522757, 1.61870992...   
2  dsgdb9nsd_000003.xyz  [0.0, 2.0, 1.0, 0.7639047756163965, 1.51335786...   
3  dsgdb9nsd_000004.xyz      [2.0, 2.0, 0.0, 1.396113812525, 3.3232771722]   
4  dsgdb9nsd_000005.xyz  [1.0, 1.0, 0.0, 0.9859315432225048, 2.21834597...   

   target_property  
0        3151.7078  
1        3575.3343  
2        3907.6980  
3        3557.8599  
4        3490.3686  


In [7]:
# Check the number of NaNs in the target property
nan_count = df['target_property'].isna().sum()
total_count = len(df)

print(f"Number of NaN values in target property: {nan_count}")
print(f"Percentage of NaNs: {(nan_count / total_count) * 100:.2f}%")


Number of NaN values in target property: 0
Percentage of NaNs: 0.00%


In [8]:
# Print the last 10 lines of the first XYZ file
file_path = os.path.join(xyz_dir, "dsgdb9nsd_000001.xyz")
with open(file_path, 'r') as f:
    lines = f.readlines()
    print("".join(lines[-10:]))  # Print the last 10 lines for inspection


5
gdb 1	157.7118	157.70997	157.70699	0.	13.21	-0.3877	0.1171	0.5048	35.3641	0.044749	-40.47893	-40.476062	-40.475117	-40.498597	6.469	
C	-0.0126981359	 1.0858041578	 0.0080009958	-0.535689
H	 0.002150416	-0.0060313176	 0.0019761204	 0.133921
H	 1.0117308433	 1.4637511618	 0.0002765748	 0.133922
H	-0.540815069	 1.4475266138	-0.8766437152	 0.133923
H	-0.5238136345	 1.4379326443	 0.9063972942	 0.133923
1341.307	1341.3284	1341.365	1562.6731	1562.7453	3038.3205	3151.6034	3151.6788	3151.7078
C	C	
InChI=1S/CH4/h1H4	InChI=1S/CH4/h1H4



In [9]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Expand the features column into a NumPy feature matrix
X = np.array([np.array(f) for f in df['features']])

# Target vector (target_property)
y = df['target_property'].values

# Split the dataset into training (80%) and test (20%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Normalize the features and target using StandardScaler
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_train = scaler_X.fit_transform(X_train)
X_test = scaler_X.transform(X_test)
y_train = scaler_y.fit_transform(y_train.reshape(-1, 1)).flatten()
y_test = scaler_y.transform(y_test.reshape(-1, 1)).flatten()

print("Training set shape:", X_train.shape)
print("Test set shape:", X_test.shape)


Training set shape: (107108, 5)
Test set shape: (26777, 5)


More intricate circuit, with added layer of RY, RZ gates, and CNOT gates to improve the circuit’s expressiveness. also updated the weights.

In [20]:
import pennylane as qml
import numpy as np

# Number of qubits = number of features
n_qubits = X_train.shape[1]
dev = qml.device("lightning.qubit", wires=n_qubits)

@qml.qnode(dev)
def qnn_circuit(inputs, weights):
    # =========================
    # Step 1: Input Encoding
    # =========================
    # Encode the classical data using a combination of RX, RY, and RZ rotations
    for i in range(n_qubits):
        qml.RX(inputs[i], wires=i)
        qml.RY(inputs[i] * 1.2, wires=i)  # Add scaling for diversity
        qml.RZ(inputs[i] * 0.8, wires=i)

    # =========================
    # Step 2: Layered Circuit
    # =========================
    # Add multiple layers of parameterized rotations and entanglement
    num_layers = 5  # You can experiment with more layers if your setup allows
    for layer in range(num_layers):
        # Parameterized rotations (diversify using RX, RY, RZ)
        for i in range(n_qubits):
            qml.RX(weights[layer * 3 * n_qubits + i], wires=i)
            qml.RY(weights[layer * 3 * n_qubits + n_qubits + i], wires=i)
            qml.RZ(weights[layer * 3 * n_qubits + 2 * n_qubits + i], wires=i)
        
        # Entanglement using both CNOT and CZ gates
        for i in range(n_qubits - 1):
            qml.CNOT(wires=[i, i + 1])  # Local entanglement
        for i in range(n_qubits):
            target = (i + 2) % n_qubits  # Entangle distant qubits
            qml.CZ(wires=[i, target])

    # =========================
    # Step 3: Measurement
    # =========================
    return qml.expval(qml.PauliZ(0))





In [21]:
# Define the Huber loss
def huber_loss(weights, X, y, delta=1.0):
    predictions = np.array([qnn_circuit(x, weights) for x in X])
    predictions = predictions.flatten()
    y = y.flatten()
    residuals = np.abs(predictions - y)
    loss = np.where(residuals <= delta, 0.5 * residuals**2, delta * residuals - 0.5 * delta**2)
    return np.mean(loss)


In [23]:
import time

# Initialize random weights
weights = np.random.randn(3 * n_qubits * 5)

# Time a single circuit evaluation
start = time.time()
single_prediction = qnn_circuit(X_train[0], weights)
end = time.time()

print(f"Time taken for a single circuit evaluation: {end - start:.4f} seconds")


Time taken for a single circuit evaluation: 0.0030 seconds


In [25]:
import time

# Select a single batch of data
batch_size = 50
X_batch = X_train[:batch_size]
y_batch = y_train[:batch_size]

# Time the forward pass and weight update
start_time = time.time()

weights = np.random.randn(3 * n_qubits * 5)
batch_loss = huber_loss(weights, X_batch, y_batch)

end_time = time.time()
print(f"Time taken for a single batch of size {batch_size}: {end_time - start_time:.4f} seconds")



Time taken for a single batch of size 50: 0.1326 seconds


In [26]:
# Test the quantum circuit with a sample input
test_output = qnn_circuit(X_train[0], weights)
print(f"Sample circuit output: {test_output}")


Sample circuit output: -0.008125680782872985


In [None]:
import pennylane as qml
from pennylane import numpy as np
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import time
import pickle  # To save the best model weights

# Initialize weights and optimizer
weights = np.random.randn(3 * n_qubits * 5)  # Match the 5-layer advanced circuit
opt = qml.AdamOptimizer(stepsize=0.01)  # Start with a lower stepsize for stable updates

# Training settings
batch_size = 50
n_epochs = 20  # Increase epochs to give the deeper circuit time to learn
decay_rate = 0.1
convergence_limit = 3  # Stop if no improvement after 4 epochs
best_loss = float('inf')  # Track the lowest loss
epochs_without_improvement = 0  # Counter for early stopping
best_weights = None  # Store the best weights

# Training loop
loss_history = []

for epoch in range(n_epochs):
    start_time = time.time()

    # Update learning rate dynamically
    current_stepsize = 0.01 / (1 + decay_rate * epoch)
    opt.stepsize = current_stepsize
    
    # Shuffle data and create mini-batches
    X_train_shuffled, y_train_shuffled = shuffle(X_train, y_train)
    X_train_batches = [X_train_shuffled[i:i + batch_size] for i in range(0, len(X_train_shuffled), batch_size)]
    y_train_batches = [y_train_shuffled[i:i + batch_size] for i in range(0, len(y_train_shuffled), batch_size)]
    
    epoch_losses = []

    # Iterate through mini-batches
    for X_batch, y_batch in zip(X_train_batches, y_train_batches):
        weights = opt.step(lambda w: huber_loss(w, X_batch, y_batch), weights)
        batch_loss = huber_loss(weights, X_batch, y_batch)
        epoch_losses.append(batch_loss)
    
    # Compute average loss for this epoch
    avg_epoch_loss = np.mean(epoch_losses)
    loss_history.append(avg_epoch_loss)
    print(f"Epoch {epoch + 1}/{n_epochs}: Average Training Loss = {avg_epoch_loss:.4f}, Time = {time.time() - start_time:.2f} s")

    # Check for improvement
    if avg_epoch_loss < best_loss:
        print(f"New best loss found: {avg_epoch_loss:.4f} at epoch {epoch + 1}. Saving model.")
        best_loss = avg_epoch_loss
        best_weights = weights.copy()  # Save the best weights
        epochs_without_improvement = 0  # Reset counter
    else:
        epochs_without_improvement += 1  # Increment the counter if no improvement

    # Early stopping condition
    if epochs_without_improvement >= convergence_limit:
        print(f"Stopping early at epoch {epoch + 1}. No improvement for {convergence_limit} consecutive epochs.")
        break

# Save the best model weights to a file
with open("best_qnn_weights.pkl", "wb") as f:
    pickle.dump(best_weights, f)
    print("Best model weights saved to 'best_qnn_weights.pkl'.")

# Plot the loss curve after training
plt.plot(range(1, len(loss_history) + 1), loss_history, 'b-o', label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Training Loss')
plt.title('Training Loss Curve (Advanced Circuit)')
plt.legend()
plt.show()


KeyboardInterrupt: 

In [19]:
# Calculate the Huber loss on the test set
def huber_loss_test(predictions, y, delta=1.0):
    predictions = predictions.flatten()
    y = y.flatten()

    # Calculate residuals
    residuals = np.abs(predictions - y)
    
    # Apply Huber loss formula
    loss = np.where(residuals <= delta,
                    0.5 * residuals**2,
                    delta * residuals - 0.5 * delta**2)
    
    return np.mean(loss)

# Make predictions on the test set
predictions = np.array([qnn_circuit(x, weights) for x in X_test])
predictions_original = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test_original = scaler_y.inverse_transform(y_test.reshape(-1, 1))

# Calculate Huber loss on the test set
test_huber_loss = huber_loss_test(predictions_original, y_test_original)
print(f"Test Huber Loss: {test_huber_loss:.4f}")



Test Huber Loss: 213.0002
