In [12]:
!pip install numpy mlflow streamlit pandas torch torch_geometric tqdm deepchem rdkit matplotlib seaborn
import numpy as np
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, TensorSpec


HYPERPARAMETERS = {
    "batch_size": [32, 128, 64],
    "learning_rate": [0.1, 0.05, 0.01, 0.001],
    "weight_decay": [0.0001, 0.00001, 0.001],
    "sgd_momentum": [0.9, 0.8, 0.5],
    "scheduler_gamma": [0.995, 0.9, 0.8, 0.5, 1],
    "pos_weight" : [1.0],
    "model_embedding_size": [8, 16, 32, 64, 128],
    "model_attention_heads": [1, 2, 3, 4],
    "model_layers": [3],
    "model_dropout_rate": [0.2, 0.5, 0.9],
    "model_top_k_ratio": [0.2, 0.5, 0.8, 0.9],
    "model_top_k_every_n": [0],
    "model_dense_neurons": [16, 128, 64, 256, 32]
}

BEST_PARAMETERS = {
    "batch_size": [128],
    "learning_rate": [0.01],
    "weight_decay": [0.0001],
    "sgd_momentum": [0.8],
    "scheduler_gamma": [0.8],
    "pos_weight": [1.3],
    "model_embedding_size": [64],
    "model_attention_heads": [3],
    "model_layers": [4],
    "model_dropout_rate": [0.2],
    "model_top_k_ratio": [0.5],
    "model_top_k_every_n": [1],
    "model_dense_neurons": [256]
}

input_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 30), name="x"),
                       TensorSpec(np.dtype(np.float32), (-1, 11), name="edge_attr"),
                       TensorSpec(np.dtype(np.int32), (2, -1), name="edge_index"),
                       TensorSpec(np.dtype(np.int32), (-1, 1), name="batch_index")])

output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 1))])

SIGNATURE = ModelSignature(inputs=input_schema, outputs=output_schema)






#utils -
from rdkit import Chem
from rdkit.Chem.Draw import MolToImage
import mlflow
import deepchem as dc
import requests
import torch
import random
import numpy as np
import json
import time

mlflow.set_tracking_uri("http://localhost:5000")


def smiles_to_mol(smiles_string):
    """
    Loads a rdkit molecule object from a given smiles string.
    If the smiles string is invalid, it returns None.
    """
    return Chem.MolFromSmiles(smiles_string)

def mol_file_to_mol(mol_file):
    """
    Checks if the given mol file is valid.
    """
    return Chem.MolFromMolFile(mol_file)

def draw_molecule(mol):
    """
    Draws a molecule in SVG format.
    """
    return MolToImage(mol)

def mol_to_tensor_graph(mol):
    """
    Convert molecule to a graph representation that
    can be fed to the model
    """
    featurizer = dc.feat.MolGraphConvFeaturizer(use_edges=True)
    f = featurizer.featurize(Chem.MolToSmiles(mol))
    data = f[0].to_pyg_graph()
    data["batch_index"] = torch.ones_like(data["x"][:, 0])
    return data


def get_model_predictions(payload):
    """
    Get model predictions
    ENDPOINT = Calls an endpoint to get the predictions
    REGISTRY = Loads model from registry and predicts
    MOCKED = Randomly generated prediction
    """
    option="MOCKED"

    if option == "ENDPOINT":
        # Currently not supported for multi-input models
        DEPLOYED_ENDPOINT = "http://127.0.0.1:5001/invocations"
        headers = {"Content-Type":"application/json"}
        prediction = requests.post(url=DEPLOYED_ENDPOINT,
                                   data={"inputs": {
                                            "x": payload["x"].numpy(),
                                            "edge_attr": payload["edge_attr"].numpy(),
                                            "edge_index": payload["edge_index"].numpy().astype(np.int32),
                                            "batch_index": np.expand_dims(payload["batch_index"].numpy().astype(np.int32), axis=1)
                                        }}, headers=headers)
        prediction = json.loads(prediction.content.decode("utf-8"))

    if option == "REGISTRY":
        # Currently not supported for multi-input models
        model_name = "GraphTransformer"
        model_version = "2"
        model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")


        prediction = model.predict({
            "x": payload["x"].numpy(),
            "edge_attr": payload["edge_attr"].numpy(),
            "edge_index": payload["edge_index"].numpy().astype(np.int32),
            "batch_index": np.expand_dims(payload["batch_index"].numpy().astype(np.int32), axis=1)
        })

    if option == "MOCKED":
        # Fake API call
        time.sleep(2)
        prediction = random.choice([0,1])

    return prediction






#dashboard
import streamlit as st
# ----------- General things
st.title('HIV Inhibitor Dashboard')
valid_molecule = True
loaded_molecule = None
selection = None
submit = None

# ----------- Sidebar
page = st.sidebar.selectbox('Page Navigation', ["Predictor", "Model analysis"])

st.sidebar.markdown("""---""")
st.sidebar.write("Created by [DeepFindr](https://www.youtube.com/channel/UCScjF2g0_ZNy0Yv3KbsbR7Q)")

if page == "Predictor":
    # ----------- Inputs
    st.markdown("Select input molecule.")
    upload_columns = st.columns([2, 1])

    # File upload
    file_upload = upload_columns[0].expander(label="Upload a mol file")
    uploaded_file = file_upload.file_uploader("Choose a mol file", type=['mol'])

    # Smiles input
    smiles_select = upload_columns[0].expander(label="Specify SMILES string")
    smiles_string = smiles_select.text_input('Enter a valid SMILES string.')

    # If both are selected, give the option to swap between them
    if uploaded_file and smiles_string:
        selection = upload_columns[1].radio("Select input option", ["File", "SMILES"])

    if selection:
        if selection == "File":
            # Save it as temp file
            temp_filename = "temp.mol"
            with open(temp_filename, "wb") as f:
                f.write(uploaded_file.getbuffer())
            loaded_molecule = mol_file_to_mol(temp_filename)
        elif selection== "SMILES":
            loaded_molecule = smiles_to_mol(smiles_string)
    else:
        if uploaded_file:
            # Save it as temp file
            temp_filename = "temp.mol"
            with open(temp_filename, "wb") as f:
                f.write(uploaded_file.getbuffer())
            loaded_molecule = mol_file_to_mol(temp_filename)
        elif smiles_string:
            loaded_molecule = smiles_to_mol(smiles_string)

    # Set validity flag
    if loaded_molecule is None:
            valid_molecule = False
    else:
        valid_molecule = True

    # Draw if valid
    if not valid_molecule and (smiles_string != "" or uploaded_file is not None):
        st.error("This molecule appears to be invalid :no_entry_sign:")
    if valid_molecule and loaded_molecule is not None:
        st.info("This molecule appears to be valid :ballot_box_with_check:")
        pil_img = draw_molecule(loaded_molecule)
        upload_columns[1].image(pil_img)
        submit = upload_columns[1].button("Get predictions")

    # ----------- Submission
    st.markdown("""---""")
    if submit:
        with st.spinner(text="Fetching model prediction..."):
            # Convert molecule to graph features
            graph = mol_to_tensor_graph(loaded_molecule)
            # Call model endpoint
            prediction = get_model_predictions(graph)

        # ----------- Ouputs
        outputs = st.columns([2, 1])
        outputs[0].markdown("HIV Inhibitor Prediction: ")

        if prediction == 1:
            outputs[1].success("Yes")
        else:
            outputs[1].error("No")

        prediction_details = st.expander(label="Model details")
        details = prediction_details.columns([2, 1])

        # All of this is mocked
        details[0].markdown("Confidence: ")
        details[0].markdown("Model Version: ")
        details[0].markdown("Model Name: ")
        details[0].markdown("Test ROC: ")
        details[1].markdown("81%")
        details[1].markdown("1.0.1")
        details[1].markdown("Graph Transformer Network")
        details[1].markdown("0.84")
else:
    st.markdown("This page is not implemented yet :no_entry_sign:")






#dataset
import pandas as pd
from rdkit import Chem
import torch
import torch_geometric
from torch_geometric.data import Dataset, Data
import numpy as np
import os
from tqdm import tqdm

print(f"Torch version: {torch.__version__}")
print(f"Cuda available: {torch.cuda.is_available()}")
print(f"Torch geometric version: {torch_geometric.__version__}")

"""
!!!
NOTE: This file was replaced by dataset_featurizer.py
but is kept to illustrate how to build a custom dataset in PyG.
!!!
"""


class MoleculeDataset(Dataset):
    def __init__(self, root, filename, test=False, transform=None, pre_transform=None):
        """
        root = Where the dataset should be stored. This folder is split
        into raw_dir (downloaded dataset) and processed_dir (processed data).
        """
        self.test = test
        self.filename = filename
        super(MoleculeDataset, self).__init__(root, transform, pre_transform)

    @property
    def raw_file_names(self):
        """ If this file exists in raw_dir, the download is not triggered.
            (The download func. is not implemented here)
        """
        return self.filename

    @property
    def processed_file_names(self):
        """ If these files are found in raw_dir, processing is skipped"""
        self.data = pd.read_csv(self.raw_paths[0]).reset_index()

        if self.test:
            return [f'data_test_{i}.pt' for i in list(self.data.index)]
        else:
            return [f'data_{i}.pt' for i in list(self.data.index)]

    def download(self):
        pass

    def process(self):
        self.data = pd.read_csv(self.raw_paths[0])
        for index, mol in tqdm(self.data.iterrows(), total=self.data.shape[0]):
            mol_obj = Chem.MolFromSmiles(mol["smiles"])
            # Get node features
            node_feats = self._get_node_features(mol_obj)
            # Get edge features
            edge_feats = self._get_edge_features(mol_obj)
            # Get adjacency info
            edge_index = self._get_adjacency_info(mol_obj)
            # Get labels info
            label = self._get_labels(mol["HIV_active"])

            # Create data object
            data = Data(x=node_feats,
                        edge_index=edge_index,
                        edge_attr=edge_feats,
                        y=label,
                        smiles=mol["smiles"]
                        )
            if self.test:
                torch.save(data,
                    os.path.join(self.processed_dir,
                                 f'data_test_{index}.pt'))
            else:
                torch.save(data,
                    os.path.join(self.processed_dir,
                                 f'data_{index}.pt'))

    def _get_node_features(self, mol):
        """
        This will return a matrix / 2d array of the shape
        [Number of Nodes, Node Feature size]
        """
        all_node_feats = []

        for atom in mol.GetAtoms():
            node_feats = []
            # Feature 1: Atomic number
            node_feats.append(atom.GetAtomicNum())
            # Feature 2: Atom degree
            node_feats.append(atom.GetDegree())
            # Feature 3: Formal charge
            node_feats.append(atom.GetFormalCharge())
            # Feature 4: Hybridization
            node_feats.append(atom.GetHybridization())
            # Feature 5: Aromaticity
            node_feats.append(atom.GetIsAromatic())
            # Feature 6: Total Num Hs
            node_feats.append(atom.GetTotalNumHs())
            # Feature 7: Radical Electrons
            node_feats.append(atom.GetNumRadicalElectrons())
            # Feature 8: In Ring
            node_feats.append(atom.IsInRing())
            # Feature 9: Chirality
            node_feats.append(atom.GetChiralTag())

            # Append node features to matrix
            all_node_feats.append(node_feats)

        all_node_feats = np.asarray(all_node_feats)
        return torch.tensor(all_node_feats, dtype=torch.float)

    def _get_edge_features(self, mol):
        """
        This will return a matrix / 2d array of the shape
        [Number of edges, Edge Feature size]
        """
        all_edge_feats = []

        for bond in mol.GetBonds():
            edge_feats = []
            # Feature 1: Bond type (as double)
            edge_feats.append(bond.GetBondTypeAsDouble())
            # Feature 2: Rings
            edge_feats.append(bond.IsInRing())
            # Append node features to matrix (twice, per direction)
            all_edge_feats += [edge_feats, edge_feats]

        all_edge_feats = np.asarray(all_edge_feats)
        return torch.tensor(all_edge_feats, dtype=torch.float)

    def _get_adjacency_info(self, mol):
        """
        We could also use rdmolops.GetAdjacencyMatrix(mol)
        but we want to be sure that the order of the indices
        matches the order of the edge features
        """
        edge_indices = []
        for bond in mol.GetBonds():
            i = bond.GetBeginAtomIdx()
            j = bond.GetEndAtomIdx()
            edge_indices += [[i, j], [j, i]]

        edge_indices = torch.tensor(edge_indices)
        edge_indices = edge_indices.t().to(torch.long).view(2, -1)
        return edge_indices

    def _get_labels(self, label):
        label = np.asarray([label])
        return torch.tensor(label, dtype=torch.int64)

    def len(self):
        return self.data.shape[0]

    def get(self, idx):
        """ - Equivalent to __getitem__ in pytorch
            - Is not needed for PyG's InMemoryDataset
        """
        if self.test:
            data = torch.load(os.path.join(self.processed_dir,
                                 f'data_test_{idx}.pt'))
        else:
            data = torch.load(os.path.join(self.processed_dir,
                                 f'data_{idx}.pt'))
        return data








# dataset featurizer
import pandas as pd
import torch
import torch_geometric
from torch_geometric.data import Dataset
import numpy as np
import os
from tqdm import tqdm
import deepchem as dc
from rdkit import Chem

print(f"Torch version: {torch.__version__}")
print(f"Cuda available: {torch.cuda.is_available()}")
print(f"Torch geometric version: {torch_geometric.__version__}")

class MoleculeDataset(Dataset):
    def __init__(self, root, filename, test=False, transform=None, pre_transform=None):
        """
        root = Where the dataset should be stored. This folder is split
        into raw_dir (downloaded dataset) and processed_dir (processed data).
        """
        self.test = test
        self.filename = filename
        super(MoleculeDataset, self).__init__(root, transform, pre_transform)

    @property
    def raw_file_names(self):
        """ If this file exists in raw_dir, the download is not triggered.
            (The download func. is not implemented here)
        """
        return self.filename

    @property
    def processed_file_names(self):
        """ If these files are found in raw_dir, processing is skipped"""
        self.data = pd.read_csv(self.raw_paths[0]).reset_index()

        if self.test:
            return [f'data_test_{i}.pt' for i in list(self.data.index)]
        else:
            return [f'data_{i}.pt' for i in list(self.data.index)]


    def download(self):
        pass

    def process(self):
        self.data = pd.read_csv(self.raw_paths[0]).reset_index()
        featurizer = dc.feat.MolGraphConvFeaturizer(use_edges=True)
        for index, row in tqdm(self.data.iterrows(), total=self.data.shape[0]):
            # Featurize molecule
            mol = Chem.MolFromSmiles(row["smiles"])
            f = featurizer._featurize(mol)
            data = f.to_pyg_graph()
            data.y = self._get_label(row["HIV_active"])
            data.smiles = row["smiles"]
            if self.test:
                torch.save(data,
                    os.path.join(self.processed_dir,
                                 f'data_test_{index}.pt'))
            else:
                torch.save(data,
                    os.path.join(self.processed_dir,
                                 f'data_{index}.pt'))


    def _get_label(self, label):
        label = np.asarray([label])
        return torch.tensor(label, dtype=torch.int64)

    def len(self):
        return self.data.shape[0]

    def get(self, idx):
        """ - Equivalent to __getitem__ in pytorch
            - Is not needed for PyG's InMemoryDataset
        """
        if self.test:
            data = torch.load(os.path.join(self.processed_dir,
                                 f'data_test_{idx}.pt'))
        else:
            data = torch.load(os.path.join(self.processed_dir,
                                 f'data_{idx}.pt'))
        return data








# model
import torch
import torch.nn.functional as F
from torch.nn import Linear, BatchNorm1d, ModuleList
from torch_geometric.nn import TransformerConv, TopKPooling
from torch_geometric.nn import global_mean_pool as gap, global_max_pool as gmp
torch.manual_seed(42)

class GNN(torch.nn.Module):
    def __init__(self, feature_size, model_params):
        super(GNN, self).__init__()
        embedding_size = model_params["model_embedding_size"]
        n_heads = model_params["model_attention_heads"]
        self.n_layers = model_params["model_layers"]
        dropout_rate = model_params["model_dropout_rate"]
        top_k_ratio = model_params["model_top_k_ratio"]
        self.top_k_every_n = model_params["model_top_k_every_n"]
        dense_neurons = model_params["model_dense_neurons"]
        edge_dim = model_params["model_edge_dim"]

        self.conv_layers = ModuleList([])
        self.transf_layers = ModuleList([])
        self.pooling_layers = ModuleList([])
        self.bn_layers = ModuleList([])

        # Transformation layer
        self.conv1 = TransformerConv(feature_size,
                                    embedding_size,
                                    heads=n_heads,
                                    dropout=dropout_rate,
                                    edge_dim=edge_dim,
                                    beta=True)

        self.transf1 = Linear(embedding_size*n_heads, embedding_size)
        self.bn1 = BatchNorm1d(embedding_size)

        # Other layers
        for i in range(self.n_layers):
            self.conv_layers.append(TransformerConv(embedding_size,
                                                    embedding_size,
                                                    heads=n_heads,
                                                    dropout=dropout_rate,
                                                    edge_dim=edge_dim,
                                                    beta=True))

            self.transf_layers.append(Linear(embedding_size*n_heads, embedding_size))
            self.bn_layers.append(BatchNorm1d(embedding_size))
            if i % self.top_k_every_n == 0:
                self.pooling_layers.append(TopKPooling(embedding_size, ratio=top_k_ratio))


        # Linear layers
        self.linear1 = Linear(embedding_size*2, dense_neurons)
        self.linear2 = Linear(dense_neurons, int(dense_neurons/2))
        self.linear3 = Linear(int(dense_neurons/2), 1)

    def forward(self, x, edge_attr, edge_index, batch_index):
        # Initial transformation
        x = self.conv1(x, edge_index, edge_attr)
        x = torch.relu(self.transf1(x))
        x = self.bn1(x)

        # Holds the intermediate graph representations
        global_representation = []

        for i in range(self.n_layers):
            x = self.conv_layers[i](x, edge_index, edge_attr)
            x = torch.relu(self.transf_layers[i](x))
            x = self.bn_layers[i](x)
            # Always aggregate last layer
            if i % self.top_k_every_n == 0 or i == self.n_layers:
                x , edge_index, edge_attr, batch_index, _, _ = self.pooling_layers[int(i/self.top_k_every_n)](
                    x, edge_index, edge_attr, batch_index
                    )
                # Add current representation
                global_representation.append(torch.cat([gmp(x, batch_index), gap(x, batch_index)], dim=1))

        x = sum(global_representation)

        # Output block
        x = torch.relu(self.linear1(x))
        x = F.dropout(x, p=0.8, training=self.training)
        x = torch.relu(self.linear2(x))
        x = F.dropout(x, p=0.8, training=self.training)
        x = self.linear3(x)

        return x

Torch version: 2.3.0+cu121
Cuda available: False
Torch geometric version: 2.5.3
Torch version: 2.3.0+cu121
Cuda available: False
Torch geometric version: 2.5.3
