In [2]:
import numpy as np
import pandas as pd
import re
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.model_selection import train_test_split

In [3]:
def read_data(filename):
    data = np.load(filename, allow_pickle=True)
    df = pd.DataFrame(data.tolist() if data.dtype == 'O' and isinstance(data[0], dict) else data)
    return df
def npy_preprocessor(filename):
    df = read_data(filename)
    return df['index'].values, df['inchi'].values, df['xyz'].values, df['chiral_centers'].values, df['rotation'].values

In [4]:
def generate_label(rotation_item):
    return 1 if rotation_item > 0 else 0

index_array, inchi_array, xyz_array, chiral_centers_array, rotation_array = npy_preprocessor('qm9_filtered.npy')

xyz_array = np.array([np.array(xyz, dtype=np.float32) for xyz in xyz_array], dtype=np.float32)

# Generate labels (binary classification based on rotation)
label_array = [generate_label(rotation_item[0]) for rotation_item in rotation_array]






In [5]:
import torch
from torch_geometric.data import Data
import numpy as np

# Bond lengths between various atoms (Å)
bond_length_thresholds = {
    ('H', 'H'): 0.74,
    ('H', 'C'): 1.10,
    ('H', 'N'): 1.03,
    ('H', 'O'): 0.97,
    ('H', 'F'): 0.93,
    ('C', 'C'): 1.54,
    ('C', 'N'): 1.46,
    ('C', 'O'): 1.43,
    ('C', 'F'): 1.38,
    ('N', 'N'): 1.45,
    ('N', 'O'): 1.43,
    ('N', 'F'): 1.44,
    ('O', 'O'): 1.43,
    ('O', 'F'): 1.41,
}


def euclidean_distance(coord1, coord2):
    return np.linalg.norm(coord1 - coord2)
def get_atom_type(one_hot_vector):
    atom_types = ['C', 'H', 'O', 'N', 'F']
    
    # Check if the one-hot vector is all zeros (i.e., padding row)
    if np.all(one_hot_vector == 0):
        return None
    
    # Find the index of the atom type based on the one-hot vector
    index = np.argmax(one_hot_vector)
    
    if index < len(atom_types):
        return atom_types[index]
    else:
        raise ValueError(f"Invalid one-hot encoding: {one_hot_vector}")

def create_graph_with_bonds(ohecc_matrix):
    node_features = ohecc_matrix[:, :8]  # Assuming 8 features (3 coordinates + 5 one-hot encoding)
    coords = ohecc_matrix[:, :3]         # Cartesian coordinates
    atom_types = ohecc_matrix[:, 3:]     # One-hot encoded atom types (correct slice for one-hot vectors)

    edge_index = []
    
    num_atoms = node_features.shape[0]
    for i in range(num_atoms):
        # Directly use atom_types[i] since atom_types already starts from the 3rd column
        atom_type_i = get_atom_type(atom_types[i])  # No need to slice further
        
        if atom_type_i is None:  # Skip rows that are padding
            continue
        
        for j in range(i + 1, num_atoms):
            atom_type_j = get_atom_type(atom_types[j])
            
            if atom_type_j is None:  # Skip padding rows
                continue
            
            distance = euclidean_distance(coords[i], coords[j])
            
            atom_pair = (atom_type_i, atom_type_j)
            atom_pair_reversed = (atom_type_j, atom_type_i)
            
            # Check if atoms are bonded based on distance thresholds
            if atom_pair in bond_length_thresholds or atom_pair_reversed in bond_length_thresholds:
                threshold = bond_length_thresholds.get(atom_pair, bond_length_thresholds.get(atom_pair_reversed))
                if distance <= threshold + 0.1:  # Allow for small variations
                    edge_index.append([i, j])
                    edge_index.append([j, i])  # Add bidirectional edge

    # Convert edge list to PyTorch tensor
    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()

    # Node features are the OECC matrix (already converted to float32)
    x = torch.tensor(ohecc_matrix, dtype=torch.float32)
    
    return Data(x=x, edge_index=edge_index)

graph = create_graph_with_bonds(xyz_array[index_array == 1])
#print adjacency matrix
print(graph.edge_index)

graph_array = [create_graph_with_bonds(xyz) for xyz in xyz_array]

tensor([], dtype=torch.int64)


In [85]:
from torch_geometric.nn import GCNConv, global_mean_pool

class GCN_OECC(torch.nn.Module):
    def __init__(self):
        super(GCN_OECC, self).__init__()
        self.conv1 = GCNConv(8, 64)  # Input features are 8 (from OECC matrix)
        self.conv2 = GCNConv(64, 128)
        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 1)  # Output layer (binary classification)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = torch.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, data.batch)  # Pool the graph features
        x = torch.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x

# Create the model
model = GCN_OECC()

# Define the loss function and optimizer
criterion = nn.BCELoss()  # Binary Cross Entropy loss for binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Move model to GPU if available
device = torch.device('mps')
model = model.to(device)


In [86]:
from torch_geometric.loader import DataLoader
from sklearn.model_selection import train_test_split

# Split the data into train and test sets
train_graphs, test_graphs, train_labels, test_labels = train_test_split(graph_array, label_array, test_size=0.2)

# Create PyTorch Geometric DataLoader for batching
train_dataset = [Data(x=g.x, edge_index=g.edge_index, y=torch.tensor([y], dtype=torch.float)) 
                 for g, y in zip(train_graphs, train_labels)]
test_dataset = [Data(x=g.x, edge_index=g.edge_index, y=torch.tensor([y], dtype=torch.float)) 
                for g, y in zip(test_graphs, test_labels)]

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [87]:
device = torch.device('mps')
model = GCN_OECC().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.BCELoss()  # Binary Cross Entropy Loss for binary classification

def train(model, loader):
    model.train()
    total_loss = 0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        output = model(data)
        
        # Reshape data.y to match the output size
        target = data.y.view_as(output)  # Ensures both have the same shape [32, 1]
        
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

# Training
for epoch in range(10):  # Number of epochs
    loss = train(model, train_loader)
    print(f'Epoch {epoch+1}, Loss: {loss:.4f}')


Epoch 1, Loss: 0.6919
Epoch 2, Loss: 0.6908
Epoch 3, Loss: 0.6890
Epoch 4, Loss: 0.6869
Epoch 5, Loss: 0.6850
Epoch 6, Loss: 0.6836
Epoch 7, Loss: 0.6820
Epoch 8, Loss: 0.6807
Epoch 9, Loss: 0.6800
Epoch 10, Loss: 0.6790


In [88]:
def test(model, loader):
    model.eval()
    correct = 0
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            output = model(data)
            pred = (output > 0.5).float()  # Apply threshold for binary classification
            correct += pred.eq(data.y.view_as(pred)).sum().item()
    return correct / len(loader.dataset)

test_accuracy = test(model, test_loader)
print(f'Test Accuracy: {test_accuracy:.4f}')
#print confusion matrix
from sklearn.metrics import confusion_matrix
y_true = []
y_pred = []
model.eval()
with torch.no_grad():
    for data in test_loader:
        data = data.to(device)
        output = model(data)
        pred = (output > 0.5).float()  # Apply threshold for binary classification
        y_true.extend(data.y.cpu().numpy())
        y_pred.extend(pred.cpu().numpy())
cm = confusion_matrix(y_true, y_pred)
print(cm)



Test Accuracy: 0.5739
[[9066 3427]
 [6921 4870]]
