<a href="https://colab.research.google.com/github/MehrdadJalali-KIT/BlackHole/blob/main/BlackHole_Evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Mount drive
from google.colab import drive
import os

drive.mount('/content/drive')
# Change working path
os.chdir('/content/drive/MyDrive/Research/MOF/Black_Hole')

Mounted at /content/drive


In [2]:
!pip install rdkit

Collecting rdkit
  Downloading rdkit-2024.3.5-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.9 kB)
Downloading rdkit-2024.3.5-cp310-cp310-manylinux_2_28_x86_64.whl (33.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.1/33.1 MB[0m [31m33.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rdkit
Successfully installed rdkit-2024.3.5


In [5]:
import pandas as pd
import numpy as np
import networkx as nx
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Input, Dense, Dropout, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.losses import CategoricalCrossentropy
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.stats.contingency_tables import mcnemar
from sklearn.metrics import confusion_matrix, cohen_kappa_score, matthews_corrcoef, roc_auc_score
import seaborn as sns
from sklearn.metrics import confusion_matrix
import random
import tensorflow as tf
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from tensorflow.keras.models import load_model
import warnings
from rdkit import RDLogger
from tensorflow.keras import models, layers, regularizers
from sklearn.metrics import accuracy_score
import time
from tensorflow.keras.callbacks import EarlyStopping

# Suppress specific deprecation warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

# Additionally, suppress RDKit warnings globally
RDLogger.DisableLog('rdApp.*')

def generate_fingerprint(smiles):
    """Generates a molecular fingerprint given a SMILES string."""
    try:
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            return np.zeros((1024,), dtype=float)  # Return an array of zeros if molecule can't be parsed
        return np.array(AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=1024), dtype=float)
    except Exception as e:
        print(f"SMILES Parse Error: {e}")
        return np.zeros((1024,), dtype=float)  # Return an array of zeros in case of an error

def plot_confusion_matrix(y_true, y_pred, classes):
    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Calculate percentage accuracy for each element in the confusion matrix
    cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100

    # Combine counts and percentages for display
    annot = np.empty_like(cm).astype(str)
    nrows, ncols = cm.shape
    for i in range(nrows):
        for j in range(ncols):
            c = cm[i, j]
            p = cm_percentage[i, j]
            annot[i, j] = f'{c}\n({p:.1f}%)'  # Count and percentage

    # Plot the confusion matrix with annotations
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=annot, fmt='', cmap='Blues', xticklabels=classes, yticklabels=classes, cbar=False)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.show()

def label_encode_metal_names(metal_names):
    """Encodes metal names as integers."""
    metal_dict = {metal: idx for idx, metal in enumerate(np.unique(metal_names))}
    return np.array([metal_dict[metal] for metal in metal_names])

def preprocess_graph(graph, features):
    # Determine the dimensionality of the feature vectors
    feature_dimension = features.shape[1]

    # Convert the graph to an adjacency matrix
    adjacency_matrix = nx.adjacency_matrix(graph).toarray()

    # Initialize an empty list to store feature vectors
    feature_vectors = []

    # Create a mapping from node labels to integer indices
    node_to_index = {node: index for index, node in enumerate(graph.nodes())}

    # Iterate over nodes in the graph
    for node in graph.nodes():
        # Get the integer index corresponding to the node label
        node_index = node_to_index[node]
        # Check if the node index is valid
        if node_index < len(features):
            # Append the feature vector corresponding to the node index
            feature_vectors.append(features[node_index])
        else:
            # If the node index is out of range, assign a default feature vector
            feature_vectors.append(np.zeros((feature_dimension,)))

    # Convert the list of feature vectors to a numpy array
    feature_matrix = np.array(feature_vectors)

    return adjacency_matrix, feature_matrix

def build_gcn_model(input_shape_feature, input_shape_adjacency, num_classes):
    # Define input layers
    x_inp_feature = Input(shape=(input_shape_feature,))
    x_inp_adjacency = Input(shape=(input_shape_adjacency,))

    # Feature processing with multiple layers
    x_feature = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(x_inp_feature)
    x_feature = Dropout(0.5)(x_feature)
    x_feature = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(x_feature)
    x_feature = Dropout(0.3)(x_feature)

    # Adjacency processing with multiple layers
    x_adjacency = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(x_inp_adjacency)
    x_adjacency = Dropout(0.5)(x_adjacency)
    x_adjacency = Dense(64, activation='relu', kernel_regularizer=l2(0.01))(x_adjacency)
    x_adjacency = Dropout(0.3)(x_adjacency)

    # Concatenate feature and adjacency outputs
    x = concatenate([x_feature, x_adjacency])

    # Output layer
    output = Dense(num_classes, activation='softmax')(x)

    # Create model
    model = Model(inputs=[x_inp_feature, x_inp_adjacency], outputs=output)

    # Using a smaller learning rate
    optimizer = Adam(learning_rate=0.0009)

    # Compile model
    model.compile(optimizer=optimizer, loss=CategoricalCrossentropy(), metrics=['accuracy'])

    return model

def build_feedforward_model(input_shape, num_classes):
    model = models.Sequential([
        layers.Input(shape=(input_shape,)),
        layers.Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.5),
        layers.Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

def train_gcn_model(model, adjacency_matrix, feature_matrix, labels, epochs, batch_size):
    if model is not None and adjacency_matrix is not None and feature_matrix is not None and labels is not None:
        # Early stopping to prevent overfitting
        early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

        # ModelCheckpoint to save the best model
        model_checkpoint = ModelCheckpoint('best_gcn_model.keras', monitor='val_loss', save_best_only=True, verbose=1)

        start_time = time.time()
        # Train the model
        history = model.fit([feature_matrix, adjacency_matrix], labels,
                            epochs=epochs, batch_size=batch_size,
                            validation_split=0.2, callbacks=[early_stopping, model_checkpoint])
        end_time = time.time()

        # Calculate total training time
        total_training_time = end_time - start_time
        print(f"Total training time: {total_training_time:.2f} seconds")

        return history
    else:
        print("Error: One or more input arguments to train_gcn_model is None.")


from sklearn.model_selection import train_test_split

if __name__ == "__main__":
    edge_list_filenames = [
        'sparsified_graph_edges_blackhole_0.1.csv',
        'sparsified_graph_edges_blackhole_0.2.csv',
        'sparsified_graph_edges_blackhole_0.3.csv',
        'sparsified_graph_edges_blackhole_0.4.csv',
        'sparsified_graph_edges_blackhole_0.5.csv',
        'sparsified_graph_edges_blackhole_0.6.csv',
        'sparsified_graph_edges_blackhole_0.7.csv',
        'sparsified_graph_edges_blackhole_0.8.csv',
        'sparsified_graph_edges_blackhole_0.9.csv'
    ]

    summary_data_filename = '1M1L3D_summary.csv'

    # Initialize lists to track accuracies and thresholds
    accuracies = []
    thresholds = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]

    # Loop through all edge list files
    for edges_list_filename, threshold in zip(edge_list_filenames, thresholds):
        print(f"Processing edge list file: {edges_list_filename}")

        # Load data
        edges_list = pd.read_csv(edges_list_filename, header=None, names=['source', 'target', 'weight'], delimiter=' ')
        summary_data = pd.read_csv(summary_data_filename)

        node_labels_source = edges_list['source'].astype(str).unique()
        node_labels_target = edges_list['target'].astype(str).unique()
        node_labels = np.unique(np.concatenate((node_labels_source, node_labels_target)))
        node_labels = list(set(node_labels))

        print("Unique node labels:", len(node_labels))

        summary_data_filtered = summary_data[summary_data['refcode'].isin(node_labels)]
        print("Filtered summary data:\n", len(summary_data_filtered))

        if not summary_data_filtered.empty:
            linker_smiles = summary_data_filtered['linker SMILES']
            if not linker_smiles.empty:
                # Generate features
                linker_features = np.stack(linker_smiles.dropna().apply(generate_fingerprint).values)
                metal_names = summary_data_filtered['metal']
                metal_features = label_encode_metal_names(metal_names).reshape(-1, 1)

                other_features = summary_data_filtered[['Largest Cavity Diameter', 'Largest Free Sphere']].values.astype('float32')
                features = np.concatenate((linker_features, metal_features, other_features), axis=1)

                # Generate labels
                summary_data_filtered['PLD_category'] = pd.cut(
                    summary_data_filtered['Pore Limiting Diameter'],
                    bins=[-np.inf, 2.4, 4.4, 5.9, np.inf],
                    labels=['nonporous', 'small pore', 'medium pore', 'large pore']
                )
                labels = pd.get_dummies(summary_data_filtered['PLD_category']).values

                # Split the data into training and testing sets
                X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=56)

                # Load the sparsified graph
                graph = nx.read_weighted_edgelist(edges_list_filename)

                # Preprocess the graph data
                adjacency_matrix, feature_matrix = preprocess_graph(graph, features)

                # Split the adjacency and feature matrices accordingly
                adj_train, adj_test, feat_train, feat_test = train_test_split(adjacency_matrix, feature_matrix, test_size=0.2, random_state=56)

                # Provide the number of classes
                num_classes = labels.shape[1]

                # Build the GCN model
                gcn_model = build_gcn_model(feat_train.shape[1], adj_train.shape[1], num_classes)

                # Train the GCN model
                history = train_gcn_model(gcn_model, adj_train, feat_train, y_train, epochs=40, batch_size=32)

                # Evaluate the model on the test set
                test_loss, test_accuracy = gcn_model.evaluate([feat_test, adj_test], y_test, verbose=0)
                print(f'Test Accuracy for threshold {threshold}: {test_accuracy}')

                # Track the accuracy
                accuracies.append(test_accuracy)

                # Continue with your evaluation metrics and comparison logic
                # ...
            else:
                print("Error: linker_smiles column is empty.")
        else:
            print("Error: summary_data_filtered DataFrame is empty.")

    # Plot the accuracy comparison
    plt.figure(figsize=(10, 6))
    plt.plot(thresholds, accuracies, marker='o', color='b', label='Test Accuracy')
    plt.xlabel('Threshold')
    plt.ylabel('Accuracy')
    plt.title('GCN Test Accuracy Across Different Sparsification Thresholds')
    plt.grid(True)
    plt.legend()
    plt.show()


Processing edge list file: sparsified_graph_edges_blackhole_0.1.csv


FileNotFoundError: [Errno 2] No such file or directory: 'sparsified_graph_edges_blackhole_0.1.csv'