In [None]:
import pandas as pd
from rdkit import Chem
from rdkit.Chem import Descriptors
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
import matplotlib.pyplot as plt

In [None]:
from sklearn.model_selection import ShuffleSplit
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import KFold, cross_val_score
from sklearn.model_selection import StratifiedKFold, cross_val_score

In [None]:
def importStatements():
  import pandas as pd
  from rdkit import Chem
  from rdkit.Chem import Descriptors
  import numpy as np
  from sklearn.model_selection import train_test_split
  from sklearn.metrics import r2_score, mean_squared_error
  import matplotlib.pyplot as plt
  from sklearn.model_selection import ShuffleSplit
  from sklearn.preprocessing import StandardScaler
  from sklearn.ensemble import RandomForestRegressor
  from sklearn.model_selection import RandomizedSearchCV
  from sklearn.model_selection import KFold, cross_val_score
  from sklearn.model_selection import StratifiedKFold, cross_val_score

In [None]:
def CalcRDKitDescriptors(fileName):
  df = pd.read_csv(fileName)
  smiles_strings = df['SMILES'].tolist()
  mySmiles = [Chem.MolFromSmiles(mol) for mol in smiles_strings]
  myDescriptors = [Descriptors.CalcMolDescriptors(mol) for mol in mySmiles]
  return pd.DataFrame(myDescriptors)

In [None]:
def makeTrainAndTest(fileNameTrain, fileNameTest, target):
  dfTrain = pd.read_csv(fileNameTrain)
  dfTest = pd.read_csv(fileNameTest)

  descTrain = CalcRDKitDescriptors(fileNameTrain)
  descTest = CalcRDKitDescriptors(fileNameTest)

  train_X = descTrain.dropna(axis = 1)
  train_y = dfTrain[target]
  test_X = descTest.dropna(axis = 1)
  test_y = dfTest[target]

  common_columns = train_X.columns.intersection(test_X.columns)
  train_X = train_X[common_columns]
  test_X = test_X[common_columns]

  return train_X, train_y, test_X, test_y

In [None]:
modelTypes = {}
modelTypes['RF'] = RandomForestRegressor()

def loopedKfoldCrossVal(modelType, cycleNum, train_X, train_y):
    modelTypes = {'RF': RandomForestRegressor()}

    num_cv = cycleNum
    predictions_filename = f'CV{modelType}_predictions.csv'

    predStats = {'r2_sum': 0, 'rmsd_sum': 0, 'bias_sum': 0, 'sdep_sum': 0}
    predictionStats = pd.DataFrame(data=np.zeros((num_cv, 6)), columns=['Fold', 'Number of Molecules', 'r2', 'rmsd', 'bias', 'sdep'])

    myPreds = pd.DataFrame(index=train_y.index, columns=['Prediction', 'Fold'])
    myPreds['Prediction'] = np.nan
    myPreds['Fold'] = np.nan

    train_test_split = KFold(n_splits=num_cv, shuffle=True, random_state=1)

    for n, (train_idx, test_idx) in enumerate(train_test_split.split(train_X)):
        x_train = train_X.iloc[train_idx]
        x_test = train_X.iloc[test_idx]
        y_train = train_y.iloc[train_idx]
        y_test = train_y.iloc[test_idx]

        model = modelTypes[modelType]

        # Train model
        model.fit(x_train, y_train)

        y_pred = model.predict(x_test)

        # Metrics calculations
        r2 = r2_score(y_test, y_pred)
        rmsd = mean_squared_error(y_test, y_pred, squared=False)
        bias = np.mean(y_pred - y_test)
        sdep = np.std(y_pred - y_test)

        # Update stats
        predStats['r2_sum'] += r2
        predStats['rmsd_sum'] += rmsd
        predStats['bias_sum'] += bias
        predStats['sdep_sum'] += sdep

        # Update predictions
        myPreds.loc[test_idx, 'Prediction'] = y_pred
        myPreds.loc[test_idx, 'Fold'] = n + 1

        # Ensure correct number of values are assigned
        predictionStats.iloc[n] = [n + 1, len(test_idx), r2, rmsd, bias, sdep]

    # Calculate averages
    r2_av = predStats['r2_sum'] / num_cv
    rmsd_av = predStats['rmsd_sum'] / num_cv
    bias_av = predStats['bias_sum'] / num_cv
    sdep_av = predStats['sdep_sum'] / num_cv

    # Create a DataFrame row for averages
    avg_row = pd.DataFrame([['Average', len(train_y), r2_av, rmsd_av, bias_av, sdep_av]], columns=predictionStats.columns)

    # Append average row to the DataFrame
    predictionStats = pd.concat([predictionStats, avg_row], ignore_index=True)

    myPreds.to_csv(predictions_filename, index=True)
    predictionStats.to_csv(f'CV{modelType}_stats.csv', index=False)

    return myPreds, predictionStats

In [None]:
modelTypes = {}
modelTypes['RF'] = RandomForestRegressor()

def loopedStratKfoldCrossVal(modelType, cycleNum, train_X, train_y, distributor):

  num_cv = cycleNum

  predictions_filename = f'CV{modelType}_predictions.csv'

  predStats = {'r2_sum': 0, 'rmsd_sum': 0, 'bias_sum': 0, 'sdep_sum': 0}
  predictionStats = pd.DataFrame(data = np.zeros((num_cv, 6)), columns = ['Fold', 'Number of Molecules', 'r2', 'rmsd', 'bias', 'sdep'])

  myPreds = pd.DataFrame(data = np.zeros((len(train_y), 2)), index = train_y.index, columns = ['Prediction', 'Fold'])
  myPreds['Prediction'] = np.nan
  myPreds['Fold'] = np.nan

  train_test_split = StratifiedKFold(n_splits = num_cv, shuffle = True, random_state = 1)

  for n, [train_idx, test_idx] in enumerate(train_test_split.split(train_X, distributor)):

    train_idx = train_y.index[train_idx]
    test_idx = train_y.index[test_idx]

    x_train = train_X.loc[train_idx]
    x_test = train_X.loc[test_idx]
    y_train = train_y.loc[train_idx]
    y_test = train_y.loc[test_idx]

    model = modelTypes[modelType]

    # Train RF model:
    model.fit(x_train, y_train)

    y_pred = model.predict(x_test)

    # Coefficient of determination
    r2 = r2_score(y_test, y_pred)
    # Root mean squared error
    rmsd = mean_squared_error(y_test, y_pred)**0.5
    # Bias
    bias = np.mean(y_pred - y_test)
    # Standard deviation of the error of prediction
    sdep = np.mean(((y_pred - y_test) - np.mean(y_pred - y_test))**2)**0.5

    # Save running sum of results:
    predStats['r2_sum'] += r2
    predStats['rmsd_sum'] += rmsd
    predStats['bias_sum'] += bias
    predStats['sdep_sum'] += sdep

    # Save individual predictions:

    myPreds.loc[test_idx, 'Prediction'] = y_pred
    myPreds.loc[test_idx, 'Fold'] = n + 1

    predictionStats.loc[n, :] = [n + 1, len(test_idx), r2, rmsd, bias, sdep]

  # Average results over resamples:
  r2_av = predStats['r2_sum']/num_cv
  rmsd_av = predStats['rmsd_sum']/num_cv
  bias_av = predStats['bias_sum']/num_cv
  sdep_av = predStats['sdep_sum']/num_cv
  avg_row = pd.DataFrame([['Average', len(train_y), r2_av, rmsd_av, bias_av, sdep_av]], columns=predictionStats.columns)
  predictionStats = pd.concat([predictionStats, avg_row], ignore_index=True)

  myPreds.to_csv(predictions_filename, index=True)
  predictionStats.to_csv(f'CV{modelType}_stats.csv', index=False)

  return myPreds, predictionStats

In [None]:
def modelStats(test_y, y_pred):
    # Coefficient of determination
    r2 = r2_score(test_y, y_pred)
    # Root mean squared error
    rmsd = mean_squared_error(test_y, y_pred)**0.5
    # Bias
    bias = np.mean(y_pred - test_y)
    # Standard deviation of the error of prediction
    sdep = np.mean(((y_pred - test_y) - np.mean(y_pred - test_y))**2)**0.5
    return r2, rmsd, bias, sdep

In [None]:
def plotter(modelType, test_y, y_pred):
    
    r2, rmsd, bias, sdep = modelStats(test_y, y_pred)
    statisticValues = f"r2: {round(r2, 3)}\nrmsd: {round(rmsd, 3)}\nbias: {round(bias, 3)}\nsdep: {round(sdep, 3)}"
    
    nptest_y = test_y.to_numpy() if isinstance(test_y, pd.Series) else test_y
    npy_pred = y_pred
    
    minVal = min(nptest_y.min(), npy_pred.min())
    maxVal = max(nptest_y.max(), npy_pred.max())
    
    a, b = np.polyfit(test_y, y_pred, 1)
    xvals = np.linspace(minVal - 1, maxVal + 1, 100)
    yvals = xvals
    
    plt.plot(xvals, yvals, '--')
    plt.scatter(nptest_y, npy_pred)
    plt.plot(test_y, a*test_y+b)
    plt.xlabel('Measured')
    plt.ylabel('Predicted')
    plt.xlim(minVal - 1, maxVal + 1)
    plt.ylim(minVal - 1, maxVal + 1)
    plt.title(f'{modelType} Model')
    plt.text(0.01, 0.99, statisticValues, transform=plt.gca().transAxes, fontsize=12, verticalalignment='top', horizontalalignment='left')
    plt.savefig(f'{modelType}_model.png')
    plt.show()

In [None]:
def plotModel(modelType, train_X, train_y, test_X, test_y):
  model = modelTypes[modelType]
  model.fit(train_X, train_y)
  y_pred = model.predict(test_X)
  plotter(modelType, test_y, y_pred)

In [None]:
def metaboliteModelStuff(fileNameTrain, fileNameTest):
  train_X, train_y, test_X, test_y = makeTrainAndTest(fileNameTrain, fileNameTest, 'pIC50')
  myPreds, predictionStats = loopedKfoldCrossVal('RF', 10, train_X, train_y)
  plotModel('RF', train_X, train_y, test_X, test_y)