In [None]:
!pip install mordred
!pip install rdkit -q
!pip install scikit-multilearn -q
!pip install torchmetrics -q



In [None]:
#Importing Libraries
import gc
import torch
import pickle
import statistics
import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import Descriptors
from rdkit.Chem import rdMolDescriptors
from mordred import Calculator, descriptors
from sklearn.feature_selection import RFECV
import statsmodels.stats.weightstats as stests
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import RandomizedSearchCV
from skmultilearn.model_selection import IterativeStratification
from sklearn.metrics import make_scorer, roc_auc_score, precision_score, f1_score, recall_score
from torchmetrics.classification import MultilabelF1Score, MultilabelAUROC, MultilabelPrecision, MultilabelRecall

In [None]:
def format_list(input_str):
    """
    Just changing the format of a column
    to make it workable
    """
    cleaned_str = input_str.strip("[]").replace("'", "")

    items = cleaned_str.split(',')
    items = [item.strip() for item in items]

    formatted_str = ';'.join(items)

    return formatted_str

def check_and_replace(description):
    """
    Iterates through a given ";" separated strings
    and replaces them with the mapping assigned by
    any list labelled "mapping".

    :param description: Text separated by ';'
    :type name: string
    :return: Text replaced according to the mapping
    :rtype: string
    """
    descriptors = description.split(';')
    new_descriptors = []

    for descriptor in descriptors:
        for row in replace:
            if descriptor == row[0]:
                new_descriptors.append(row[1])

    return ';'.join(new_descriptors)

def make_unique(labels):
   """
   Takes a text separated by ";" and makes them
   unique.
   """
   return ';'.join(list(set(labels.split(';'))))

def x_y_split(df):
  """
  Splies the oncoming dataset to X and
  y for classification.

  :param df: A molecular dataset for odor prediction
  :type df: pandas Dataframe
  :return: A list of classes/labels for each row.
  :rtype: pandas dataframes
  """
  x = df[['IsomericSMILES', 'CID']].copy()
  try:
    y = df.drop(['IsomericSMILES', 'Descriptors', 'CID', 'Descriptor Count'], axis=1).copy()
    return x,y
  except:
    y = df.drop(['IsomericSMILES', 'Descriptors', 'CID'], axis=1).copy()
    return x,y

def get_morgan(df):
  """
  This function takes in a dataframe and returns
  a featurized dataframe with morgan fingerprints.

  :param df: A molecular dataset for odor prediction with SMILES strings
  :type df: pandas Dataframe
  :return: A featurized dataframe.
  :rtype: pandas dataframes
  """
  df['molecule'] = df['IsomericSMILES'].apply(lambda x: Chem.MolFromSmiles(x))
  df['MorganFP'] = df['molecule'].apply(lambda x: rdMolDescriptors.GetMorganFingerprintAsBitVect(x,radius=4,nBits=2048,useFeatures=True,useChirality=True))

  df_list = []

  for i in range(df.shape[0]):
    array = np.array(df['MorganFP'][i])
    df_i = pd.DataFrame(array)
    df_i = df_i.T
    df_list.append(df_i)
  morganfp = pd.concat(df_list, ignore_index=True)

  return morganfp

def iterative_train_test_split(X, y, test_size):
  """
  Function doing a train-test split
  using the second order iterative
  stratification method.

  :param df: X and y dataframes for a multilabel machine learning task
  :type df: pandas Dataframes
  :return: train-test split dataframes
  :rtype: pandas dataframes
  """
  stratifier = IterativeStratification(n_splits=2, order=2, sample_distribution_per_fold=[test_size, 1.0-test_size])
  train_indexes, test_indexes = next(stratifier.split(X, y))

  X_train, y_train = X.iloc[train_indexes], y.iloc[train_indexes]
  X_test, y_test = X.iloc[test_indexes], y.iloc[test_indexes]

  return X_train, y_train, X_test, y_test

In [None]:
umbrella_scores = {}
umbrella_scores['f1_macro'] = []
umbrella_scores['auroc_macro'] = []
umbrella_scores['precision_macro'] = []
umbrella_scores['recall_macro'] = []
umbrella_scores['f1_micro'] = []
umbrella_scores['auroc_micro'] = []
umbrella_scores['precision_micro'] = []
umbrella_scores['recall_micro'] = []

In [None]:
dataset = pd.read_csv('alldesc_dataset.csv')
umbrella = pd.read_csv('computer_dataset_11.csv')

In [None]:
dataset['Descriptors'] = dataset['Descriptors'].apply(format_list)
dataset = dataset[['CID', 'IsomericSMILES', 'Descriptors']]

X, y = x_y_split(umbrella)
morgan = get_morgan(X)
train_x, train_y, test_x, test_y = iterative_train_test_split(morgan, y, 0.2)
clf = RandomForestClassifier(random_state=0)
clf.fit(train_x, train_y)

y_hat = clf.predict(test_x)

f1score_macro = MultilabelF1Score(num_labels=len(train_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.float))
auroc_macro = MultilabelAUROC(num_labels=len(train_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.long))
precision_macro = MultilabelPrecision(num_labels=len(train_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.long))
recall_macro = MultilabelRecall(num_labels=len(train_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.long))

f1score_micro = MultilabelF1Score(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.float))
auroc_micro = MultilabelAUROC(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.long))
precision_micro = MultilabelPrecision(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.long))
recall_micro = MultilabelRecall(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(test_y.values, dtype=torch.long))

umbrella_scores['f1_macro'].append(f1score_macro)
umbrella_scores['auroc_macro'].append(auroc_macro)
umbrella_scores['precision_macro'].append(precision_macro)
umbrella_scores['recall_macro'].append(recall_macro)
umbrella_scores['f1_micro'].append(f1score_micro)
umbrella_scores['auroc_micro'].append(auroc_micro)
umbrella_scores['precision_micro'].append(precision_micro)
umbrella_scores['recall_micro'].append(recall_micro)

In [None]:
rand_scores = {}
rand_scores['f1_macro'] = []
rand_scores['auroc_macro'] = []
rand_scores['precision_macro'] = []
rand_scores['recall_macro'] = []
rand_scores['f1_micro'] = []
rand_scores['auroc_micro'] = []
rand_scores['precision_micro'] = []
rand_scores['recall_micro'] = []

labels_df = pd.read_excel('computer_derived_ontology_11.xlsx')
labels_to_remove = labels_df[labels_df['Umbrella Terms'].isna()]['Original Descriptors']
labels_df = labels_df.dropna()

In [None]:
trials = 10
for count in range(trials):
    replace = labels_df.copy()
    replace['Umbrella Terms'] = np.random.permutation(replace['Umbrella Terms'] )
    replace = replace.values.tolist()
    # Changing to Umbrella terms and normalizing it once again
    rand = dataset.copy()
    rand['Descriptors'] = rand['Descriptors'].apply(lambda x: ';'.join([item for item in x.split(';') if item not in labels_to_remove.index]))
    rand = rand[rand['Descriptors'] != '']
    rand['Descriptors'] = rand['Descriptors'].apply(check_and_replace)
    rand['Descriptors'] = rand['Descriptors'].apply(make_unique)
    rand['Descriptors'] = rand['Descriptors'].dropna()
    rand = rand[rand['Descriptors'] != '']
    #rand['Descriptor Count'] = rand['Descriptors'].apply(lambda x: len(x.split(';')))
    rand['Descriptors'] = rand['Descriptors'].apply(lambda x: x.split(';'))
    mlb = MultiLabelBinarizer(sparse_output=True)
    mlb.fit(rand['Descriptors'])
    rand = rand.join(pd.DataFrame.sparse.from_spmatrix(mlb.transform(rand['Descriptors']), index=rand.index, columns=mlb.classes_))
    rand_x, rand_y = x_y_split(rand)
    rand_morgan = get_morgan(rand_x)
    rand_morgan_train_x, rand_morgan_train_y, rand_morgan_test_x, rand_morgan_test_y = iterative_train_test_split(rand_morgan, rand_y, 0.2)
    # Memory managing
    del rand_morgan
    del rand_y
    gc.collect()

    clf = RandomForestClassifier(random_state=0)
    clf.fit(rand_morgan_train_x, rand_morgan_train_y.values)

    y_hat = clf.predict(rand_morgan_test_x)

    f1score = MultilabelF1Score(num_labels=len(rand_morgan_test_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.float))
    auroc_macro = MultilabelAUROC(num_labels=len(rand_morgan_test_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.long))
    precision_macro = MultilabelPrecision(num_labels=len(rand_morgan_test_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.long))
    recall_macro = MultilabelRecall(num_labels=len(rand_morgan_test_y.columns), average="macro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.long))

    f1score_micro = MultilabelF1Score(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.float))
    auroc_micro = MultilabelAUROC(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.long))
    precision_micro = MultilabelPrecision(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.long))
    recall_micro = MultilabelRecall(num_labels=len(train_y.columns), average="micro")(torch.tensor(y_hat, dtype=torch.float), torch.tensor(rand_morgan_test_y.values, dtype=torch.long))

    rand_scores['f1_macro'].append(float(f1score_macro))
    rand_scores['auroc_macro'].append(float(auroc_macro))
    rand_scores['precision_macro'].append(float(precision_macro))
    rand_scores['recall_macro'].append(float(recall_macro))
    rand_scores['f1_micro'].append(float(f1score_micro))
    rand_scores['auroc_micro'].append(float(auroc_micro))
    rand_scores['precision_micro'].append(float(precision_micro))
    rand_scores['recall_micro'].append(float(recall_micro))


In [None]:
for key in umbrella_scores:
   z_statistic, p_value = stests.ztest(rand_scores[key], value=umbrella_scores[key])
   print(key)
   print("Umbrella score", umbrella_scores[key])
   print("Random scores mean", statistics.mean(rand_scores[key]))
   print("Random scores st
   dev", statistics.stdev(rand_scores[key]))
   print("Z-statistic:", z_statistic)
   print("P-value:", p_value)

f1_macro
Umbrella score [tensor(0.4616)]
Random scores mean 0.46160274744033813
Random scores stdev 0.0
Z-statistic: [nan]
P-value: [nan]
auroc_macro
Umbrella score [tensor(0.6489)]
Random scores mean 0.6008251667022705
Random scores stdev 0.00798470585119466
Z-statistic: [-19.042835]
P-value: [7.5336863e-81]
precision_macro
Umbrella score [tensor(0.5607)]
Random scores mean 0.5003776222467422
Random scores stdev 0.025906122651449027
Z-statistic: [-7.3678956]
P-value: [1.73342441e-13]
recall_macro
Umbrella score [tensor(0.4033)]
Random scores mean 0.40049120485782624
Random scores stdev 0.014226067766120646
Z-statistic: [-0.62128943]
P-value: [0.53440921]
f1_micro
Umbrella score [tensor(0.5356)]
Random scores mean 0.528855049610138
Random scores stdev 0.021633981876177686
Z-statistic: [-0.9869191]
P-value: [0.3236823]
auroc_micro
Umbrella score [tensor(0.6885)]
Random scores mean 0.6711460888385773
Random scores stdev 0.013849106374872562
Z-statistic: [-3.9689913]
P-value: [7.21775229e

  zstat = (value1 - value2 - diff) / std_diff


In [None]:
with open('/scistor/informatica/asa521/macro_comp_scores.pkl', 'wb') as fp:
    pickle.dump(rand_scores, fp)
    print('scores saved successfully to file')