In [1]:
import pandas as pd
import os
import requests
import warnings
import py3Dmol
from chembl_webresource_client.new_client import new_client as client
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem.MolStandardize.rdMolStandardize import FragmentParent
from rdkit import DataStructs
from jazzy.api import molecular_vector_from_smiles as mol_vect
import numpy as np
import pubchempy as pcp
import plotly.graph_objects as go
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.model_selection import RandomizedSearchCV
from sklearn.kernel_ridge import KernelRidge
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_squared_error
import optuna

In [4]:
warnings.filterwarnings("ignore")

In [5]:
working_dir = os.getcwd()
# dir for useful stuff for the actual essay
graphs_rel_path = r"project_results/graphs"
project_results_graphs = os.path.join(working_dir, graphs_rel_path)

In [1]:
def abs_file_path(rel_path):
    working_dir = os.getcwd()
    abs_file_path = os.path.join(working_dir, rel_path.replace("\\", "/"))
    return abs_file_path

In [1]:
def get_publication_years(list_mol_id):
    pub_years = []
    if type(list_mol_id[0]) == int:
        for cid in list_mol_id:
            # call the PubChem API for info about molecule
            url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON"
            response = requests.get(url)
            data = response.json()
            # get the publication year
            for section in data['Record']['Section']:
                if section["TOCHeading"] == 'Names and Identifiers':
                    publication_year = int(section["Section"][-2]["Information"][0]["Value"]["DateISO8601"][0][0:4])
                    if not publication_year:
                        pub_years.append(None)
                        print(f"could not find publication year of CID {cid}")
                    else:
                        pub_years.append(publication_year)
    else:
        for chembl_id in list_mol_id:
            # get activities associated with the molecule
            activities = client.activity.filter(molecule_chembl_id=chembl_id)
            # extract publication year
            publication_year = activities[0]["document_year"]
            # print the molecule ID if document_year is not specified
            if not publication_year:
                pub_years.append(None)
                print(f"could not find publication year of ChEMLB ID {chembl_id}")
            else:
                pub_years.append(publication_year)
    return pub_years

In [2]:
def smiles_from_mol_id(list_mol_id):
    # returns a list of smiles strings for given list of mol ids
    list_smiles = []
    if type(list_mol_id[0]) == int:
        for cid in list_mol_id:
            compound = pcp.Compound.from_cid(cid)
            smiles = compound.isomeric_smiles
            list_smiles.append(smiles)
    else:
        for chembl_id in list_mol_id:
            molecule = client.molecule
            compound = molecule.filter(chembl_id=chembl_id)[0]
            list_smiles.append(compound['molecule_structures']["canonical_smiles"])
    return list_smiles

In [2]:
def halflife_formatting(source_df, isozyme):
    # creates a correctly formatted list of half-life values from df
    halflife = []
    if isozyme == "3A4":
        df_adjusted = source_df
        halflife = df_adjusted["Standard Value"]
    if isozyme == "RLM":
        df_adjusted = source_df.replace({">30": '30'})
        halflife = df_adjusted["Half-life (minutes)"]
    if isozyme == "HLC":
        df_adjusted = source_df
        halflife = df_adjusted["Half-life"]

    # scale the half-lives so that the values are always between 0 and 1 (inclusive - the min value is 0, max is 1)
    reshaped_halflife = np.array(halflife).reshape(-1, 1)
    scaler = MinMaxScaler().fit(reshaped_halflife)
    halflife_scaled = scaler.transform(reshaped_halflife)
    halflife_scaled = [val[0] for val in halflife_scaled]
    return halflife_scaled

In [1]:
def isz_csv_data_formatting(source_csv_file, isozyme, sep=","):
    # creates a csv with the columns mol_idx, smiles, half-life, published
    source_df = pd.read_csv(abs_file_path(source_csv_file), sep=sep)

    if isozyme + ".csv" in os.listdir(abs_file_path("project_resources")):
        print(f"{isozyme}.csv already exists in dir")

    else:
        if isozyme == "3A4":
            # additional formatting, since not all molecules have the desired property
            source_df = source_df[source_df["Standard Type"] == "T1/2"]

        elif isozyme == "RLM":
            # get rid of mols with half-life ">30"
            source_df = source_df[source_df["Half-life (minutes)"] != ">30"]

        try:
            mol_ids = list(source_df["Molecule ChEMBL ID"])
        except KeyError:
            mol_ids = list(source_df["PUBCHEM_CID"])

        final_df = pd.DataFrame()
        publication_years = get_publication_years(mol_ids)
        orig_smiles = smiles_from_mol_id(mol_ids)
        cleaned_smiles = []

        # replace smiles with two disjoint parts with only the more relevant one
        for smi in orig_smiles:
            mol = Chem.MolFromSmiles(smi)
            cleaned_mol = FragmentParent(mol)  # for further info on how this is done see this class
            cleaned_smi = Chem.MolToSmiles(cleaned_mol)
            cleaned_smiles.append(cleaned_smi)

        final_df["mol_idx"] = list(range(1, len(mol_ids)+1))
        final_df["smiles"] = cleaned_smiles
        final_df["half-life"] = list(halflife_formatting(source_df, isozyme))
        final_df["published"] = publication_years
        final_df.to_csv(abs_file_path(f"project_resources/{isozyme}.csv"), index=False)
        print(f"{isozyme}.csv was successfully created")

In [1]:
def split_csv_data_formatting(isozyme, smiles_as_index, split_smiles, split_type, include_year=False, years=None):
    # saves ML splits as csv files containing mol idxs, smiles and half-lives
    for split in ["train", "test"]:
        location = f"project_resources/base_splits/{split_type}"
        file_name = f"{isozyme}_{split}.csv"
        # check if file already exists
        try:
            with open(f"{location}/{file_name}") as f:
                f.close()
            print(f"{file_name} already exists in {location}")

        except FileNotFoundError:
            split_df = pd.DataFrame()
            split_indexes = []
            split_halflifes = []
            isz_scaff_split_smiles = split_smiles[split]

            # get the index and half-life values for each smiles in data split
            for smi in isz_scaff_split_smiles:
                smi_idx = smiles_as_index[isozyme][smi][0]  # numerical index of smiles
                split_indexes.append(smi_idx)
                mol_halflife = smiles_as_index[isozyme][smi][1]  # half-life value for the specific molecule
                split_halflifes.append(mol_halflife)

            split_df["index"] = split_indexes
            split_df["smiles"] = isz_scaff_split_smiles
            split_df["half-life"] = split_halflifes
            if include_year:
                split_df["published"] = years[isozyme][split]
            split_df.to_csv(abs_file_path(f"{location}/{file_name}"), index=False)
            print(f"{file_name} was successfully created in {location}")

In [7]:
def fp_from_smiles(list_smiles):
    list_fingerprint = []
    for smi in list_smiles:
        mol = Chem.MolFromSmiles(smi)
        fingerprint = AllChem.GetMorganFingerprintAsBitVect(mol, useChirality=True, radius=2, nBits=124)
        vector = np.array(fingerprint)
        list_fingerprint.append(vector)
    # takes a list of smiles strings,output is a corresponding Morgan fingerprint as a list
    return list_fingerprint

In [2]:
def create_interactive_scatter_plot(x_data, y_data, x_label, y_label, title, data_legend, include_diagonal=False, y_error=None):
    # Create the scatter plot
    fig = go.Figure()

    # Add scatter trace
    scatter_trace = go.Scatter(x=x_data, y=y_data, mode='markers', name=data_legend)

    # Add error bars if y_error is provided
    if y_error is not None:
        scatter_trace.error_y = dict(type='data', array=y_error, visible=True)

    fig.add_trace(scatter_trace)

    # Add diagonal line at x=y if include_diagonal is True
    if include_diagonal:
        diagonal_trace = go.Scatter(x=[min(x_data), max(x_data)], y=[min(x_data), max(x_data)],
                                    mode='lines', name='The Diagonal', line=dict(color='orange', dash='dash'))
        fig.add_trace(diagonal_trace)

    # Update layout with labels and title
    fig.update_layout(
        xaxis=dict(title=x_label, range=[min(x_data)-0.03, max(x_data)+0.03]),
        yaxis=dict(title=y_label, range=[min(y_data)-0.01, max(y_data)+0.01]),
        title=title,
        showlegend=True,
        width=800,  # Set the width of the plot
        height=600,  # Set the height of the plot
    )

    # Show the interactive plot
    fig.show()

In [10]:
def to_rdkit_fingerprint(fps):
    rdkit_fingerprints = []
    for prnt in fps:
        bitstring = "".join(prnt.astype(str))
        fp = DataStructs.cDataStructs.CreateFromBitString(bitstring)
        rdkit_fingerprints.append(fp)
    return rdkit_fingerprints

In [2]:
def tanimoto(fps, fp2s):
    tanimoto_similarities = []
    fps = to_rdkit_fingerprint(fps)
    fp2s = to_rdkit_fingerprint(fp2s)
    for x in fps:
        fpsx = []
        for y in fp2s:
            fpsx.append(DataStructs.TanimotoSimilarity(x, y))
        max_tanimoto = max(fpsx)
        tanimoto_similarities.append(round(max_tanimoto, 3))
    return tanimoto_similarities

In [2]:
def param_tuning(x_train, x_test, y_train, y_test, type_ml_use):
    # !!! určování hodnot pro param tuning, lze vylepšit pomocí np.random.randint

    if type_ml_use == 'linear':
        param_grid = {
            'fit_intercept': [True],
            'alpha': [1e-5, 1e-4, 1e-3, 1e-2],
            'l1_ratio': [0, 0.1, 0.5, 0.9, 1]
        }
        reg = ElasticNet()

    if type_ml_use == 'KRR':
        param_grid = {
            "alpha": np.logspace(-4, 1, 20),
            "gamma": np.logspace(-14, 0, 20),
            "kernel": ['linear', 'laplacian', 'rbf']
        }
        reg = KernelRidge()

    if type_ml_use == 'GB':
        param_grid = {
            'n_estimators': [10, 20, 50, 200, 400],
            'learning_rate': [0.02, 0.05],
            'max_depth': [1, 2, 3, 5],
        }
        reg = GradientBoostingRegressor()

    if type_ml_use == 'RF':
        param_grid = {
            'max_depth': [None, 2, 3, 5, 10],
            'max_features': ['auto', 'sqrt', 'log2'],
            'n_estimators': [10, 20, 50, 100, 200],
        }
        reg = RandomForestRegressor()

    if type_ml_use == 'ANN':
        param_grid = {
            'learning_rate_init': [0.001, 0.002, 0.005, 0.01, 0.02, 0.05],
            'hidden_layer_sizes': [[5], [10], [20], [50], [5]*2, [10]*2, [20]*2, [50]*2, [5]*3, [10]*3]
        }
        reg = MLPRegressor()

    grid = RandomizedSearchCV(reg, param_grid, cv=KFold(n_splits=5, shuffle=True), verbose=0)
    grid.fit(x_train, y_train)
    best_reg = grid.best_estimator_
    y_train_predict = best_reg.predict(x_train)
    y_test_predict = best_reg.predict(x_test)
    abs_error = np.abs(y_test_predict-y_test)
    print(f"     best {type_ml_use} hyperparams: {best_reg}")
    # retrain on best hyperparameters
    best_reg.fit(x_train, y_train)

    return y_train_predict, y_test_predict, abs_error

In [1]:
def mol_predict_and_std(models, x_train, x_test, y_train, y_test):
    y_test_avg_predict_dict = {}
    std_dict = {}
    rmsd_dict = {}
    for model in models:
        y_test_predicts = []

        for i in range(3):
            asdf, y_test_predict, ghjk = param_tuning(x_train, x_test, y_train, y_test, model)
            # asdf, ghjk ... dummy variables, are not needed here
            y_test_predicts.append(y_test_predict)

        y_test_predicts_array = np.array(y_test_predicts)

        y_test_avg_predict = np.average(y_test_predicts_array, axis=0)
        standard_deviation = np.std(y_test_predicts_array, axis=0)
        rmsd = np.sqrt(np.average(np.square(y_test_avg_predict-y_test)))
        # root-mean-square deviation

        y_test_avg_predict_dict[model] = y_test_avg_predict
        std_dict[model] = standard_deviation
        rmsd_dict[model] = rmsd
    return y_test_avg_predict_dict, std_dict, rmsd_dict

In [1]:
def show(smi, style='stick'):
    mol = Chem.MolFromSmiles(smi)
    mol = Chem.AddHs(mol)
    AllChem.EmbedMolecule(mol)
    AllChem.MMFFOptimizeMolecule(mol, maxIters=200)
    mblock = Chem.MolToMolBlock(mol)

    view = py3Dmol.view(width=500, height=400)
    view.addModel(mblock, 'mol')
    view.setStyle({style: {}})
    view.zoomTo()
    view.show()

In [5]:
# CODE SPECIFIC TO JAZZY:

In [1]:
def mol_fts(smiles):
    features = []
    for smi in smiles:
        try:
            features.append(mol_vect(smi))
        except:
            # "except JazzyError" gives NameError: name 'JazzyError' is not defined
            features.append(np.nan)
    return features

In [1]:
def parse_jazzy_df(df):
    cols = df.columns
    data = {}  # all data from csv file (i.e. mol indexes, smiles, half-lives and features)
    for col in cols:
        data[col] = list(df[col])
    nan_idxs = np.argwhere(np.isnan(data["dgtot"]))
    nan_idxs = [int(idx) for idx in nan_idxs]
    data_clumped = []  # same as data, but in the form [[idx1, smi1, thalf1, fts1], [idx2, smi2, thalf2, fts2],...]]
    for col in cols:
        for i, foo in zip(range(len(data[col])), data[col]):
            if len(data_clumped) < i+1:
                data_clumped.append([])
            data_clumped[i].append(foo)

    # remove all mols for which Jazzy features generation wasn't successful
    num_pops = 0
    for nan_idx in nan_idxs:
        data_clumped.pop(nan_idx - num_pops)
        num_pops += 1
        print(f"     removed index {nan_idx} corresponding to NaN")
    print(f"     {len(data_clumped)}, {data_clumped[0]}")

    # filter out only the features
    mol_features = np.array([feature[3:11] for feature in data_clumped])
    halflives = np.array([feature[2] for feature in data_clumped])
    smiles = np.array([feature[1] for feature in data_clumped])
    contains_nan = np.any(np.isnan(mol_features))

    return smiles, mol_features, halflives, contains_nan

In [1]:
# CODE SPECIFIC TO NEQUIP:

In [2]:
def list_splitter(list_to_split, ratio):
    elements = len(list_to_split)
    middle = int(elements * ratio)
    return [list_to_split[:middle], list_to_split[middle:]]

In [1]:
def get_unique_elements(list_smiles):
    # gets unique symbols from every mol in the list e.g. ["C1=CC=C(C=C1)O", "C1=CSC=C1"] -> ["C", "O", "S"]
    formulae = ""
    for smiles in list_smiles:
        mol = Chem.MolFromSmiles(smiles)
        chemical_formula = Chem.rdMolDescriptors.CalcMolFormula(mol)
        formulae += chemical_formula
    unique_elements = [part for part in formulae if part.isalpha()]
    for idx, element in zip(range(len(unique_elements)), unique_elements):
        two_letter_element = ""
        if element.islower():
            two_letter_element += unique_elements[idx - 1]
            two_letter_element += unique_elements[idx]
            unique_elements.remove(element)
            unique_elements.append(two_letter_element)
    return set(unique_elements)

In [1]:
# CODE SPECIFIC TO OPTUNA:

In [2]:
class HyperparamTuner():
    def __init__(self, model_identifier, X_train, y_train, X_test, y_test):
        self.model_identifier = model_identifier
        self.X_train = X_train
        self.y_train = y_train
        self.X_test = X_test
        self.y_test = y_test

    def sample_params(self, trial: optuna.Trial, model_identifier):
        if model_identifier == 'linear':
            fit_intercept = trial.set_user_attr("fit_intercept", True)
            alpha = trial.suggest_float('alpha', 1e-5, 1e-1)
            l1_ratio = trial.suggest_float('l1_ratio', 0, 1)
            return {
                "fit_intercept": fit_intercept,
                "alpha": alpha,
                "l1_ratio": l1_ratio
            }, ElasticNet()

        if model_identifier == 'KRR':
            alpha = trial.suggest_float("alpha", 1e-4, 1)
            gamma = trial.suggest_float("gamma", 0, 1e-14)
            kernel = trial.suggest_categorical("kernel", ["linear", "laplacian", "rbf"])
            return {
                "alpha": alpha,
                "gamma": gamma,
                "kernel": kernel
            }, KernelRidge()

        if model_identifier == 'GB':
            n_estimators = trial.suggest_categorical("n_estimators", [10, 20, 50, 200, 500])
            learning_rate = trial.suggest_float("learning_rate", 0.005, 1)
            max_depth = trial.suggest_categorical("max_depth", [1, 2, 3, 4, 5])
            return {
                "n_estimators": n_estimators,
                "learning_rate": learning_rate,
                "max_depth": max_depth
            }, GradientBoostingRegressor()

        if model_identifier == 'RF':
            n_estimators = trial.suggest_categorical("n_estimators", [10, 20, 50, 200, 500])
            max_features = trial.suggest_categorical("max_features", ["auto", "sqrt", "log2"])
            max_depth = trial.suggest_categorical("max_depth", [None, 2, 3, 4, 5, 10])
            return {
                "n_estimators": n_estimators,
                "max_features": max_features,
                "max_depth": max_depth
            }, RandomForestRegressor()

        if model_identifier == 'ANN':
            learning_rate_init = trial.suggest_float("learning_rate_init", 0.001, 0.1)
            hidden_layer_sizes = trial.suggest_categorical("hidden_layer_sizes",
                                                           [[5], [10], [20], [50], [5]*2, [10]*2, [20]*2, [50]*2, [5]*3, [10]*3, [50]*3])
            return {
            "learning_rate_init": learning_rate_init,
            "hidden_layer_sizes": hidden_layer_sizes
            }, MLPRegressor()

    def cross_validation_splits(self, X_train, X_test, y_train, y_test, cv_splits=5):
        """
        Splits the data into cv_splits different combinations for cross-validation.

        Parameters:
        - X_train: Training data features
        - X_test: Testing data features
        - y_train: Training data labels
        - y_test: Testing data labels
        - cv_splits: Number of cross-validation splits

        Returns:
        - List of tuples, where each tuple contains (X_train_fold, X_test_fold, y_train_fold, y_test_fold)
        """
        # Initialize StratifiedKFold with the desired number of splits
        kf = KFold(n_splits=cv_splits, shuffle=True)  # random_state=42)

        # Initialize an empty list to store the data splits
        data_splits = []

        # Loop through the cross-validation splits
        for train_index, test_index in kf.split(X_train, y_train):
            X_train_fold, X_val_fold = X_train[train_index], X_train[test_index]
            y_train_fold, y_val_fold = y_train[train_index], y_train[test_index]

            # Append the current split to the list
            data_splits.append((X_train_fold, X_val_fold, y_train_fold, y_val_fold))

        # Append the original test data to the list
        data_splits.append((X_train, X_test, y_train, y_test))

        return data_splits

    def evaluate(self, model, X_test, y_test, return_predictions=False):
        predictions = model.predict(X_test)
        rmsd = mean_squared_error(y_test, predictions, squared=False)
        if return_predictions:
            return rmsd, predictions
        else:
            return rmsd

    def train_test_return(self, parameters, model, return_predictions=False):
        runs = 3
        runs_results = []
        y_tests_predicted = []

        for run in range(runs):
            validation_splits = self.cross_validation_splits(self.X_train, self.X_test, self.y_train, self.y_test)
            cv_fold_results = []
            y_test_predicted = []
            fold_num = 0

            for (X_train_val, X_test_val, y_train_val, y_test_val) in validation_splits:
                fold_num += 1
                model.fit(X_train_val, y_train_val)

                if return_predictions and fold_num == 6:
                    cv_fold_rmsd, validation_predictions = self.evaluate(model, X_test_val, y_test_val, return_predictions=return_predictions)
                    y_test_predicted.append(validation_predictions)
                else:
                    cv_fold_rmsd = self.evaluate(model, X_test_val, y_test_val, return_predictions=False)

                cv_fold_results.append(cv_fold_rmsd)

            runs_results.append(np.mean(cv_fold_results))
            y_tests_predicted.append(y_test_predicted)

        # Calculate the standard deviation of predictions
        y_tests_predicted = np.array(y_tests_predicted)
        std = np.std(y_tests_predicted, axis=0)

        if return_predictions:
            # Return the mean RMSD, average predictions, and standard deviations
            return np.mean(runs_results), np.average(y_tests_predicted, axis=0)[0], std[0]
        else:
            # Return the mean objective/s of these runs
            return np.mean(runs_results)

    def objective(self, trial=None):
        parameters, model = self.sample_params(trial, self.model_identifier)
        return self.train_test_return(parameters, model)