In [1]:
import os
import pandas as pd
import numpy as np
import random
random.seed(716)
import pickle
from collections import namedtuple
from random import sample
from sklearn.utils import shuffle
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from statistics import mean, stdev

In [2]:
out_dir = "DTGpred_results"
t_dir = os.path.join(out_dir, 'test_data')
if not os.path.exists(t_dir):
    os.makedirs(t_dir)

In [3]:
# positive test set
positive_test = [line.rstrip('\n') for line in open(os.path.join(out_dir, "testing_class1.txt"))]

# negative test set
negative_test = [line.rstrip('\n') for line in open(os.path.join(out_dir, "testing_class0.txt"))]

# selected dataset
selected_dataset = pd.read_csv(os.path.join(out_dir, "selected_dataset.tsv"), delimiter="\t", index_col=0, low_memory=False)
test_dataset = selected_dataset.loc[selected_dataset.index.isin(positive_test + negative_test)]

In [4]:
print(len(positive_test))
print(len(negative_test))

40
3530


In [5]:
# Define function for drawing negative examples chuncks from list of genes
def negative_sample_draw(gene_list, l = len(positive_test), n=0):
    """get the nth chunck of negative examples"""
    return(gene_list[n*l:n*l+l])

In [6]:
# Define function for creating test set based on draw n. Note: last column is the label
def test_set_n(n=0):
    negative_examples = negative_sample_draw(negative_test, l=len(positive_test), n=n)
    test_examples = positive_test + negative_examples

    test_dataset_n = test_dataset.loc[test_dataset.index.isin(test_examples)].copy()
    test_dataset_n['Targets'] = 0.0
    for target in test_dataset_n.index.to_list():
        if target in positive_test:
            test_dataset_n.loc[target, 'Targets'] = 1.0
    random.seed(4)
    test_dataset_n = shuffle(test_dataset_n)

    # Double-check that the test dataset does not contain labels
    test_data = test_dataset_n.iloc[:, 0:-1]
    for i in range(len(test_data.columns)):
        data = abs(test_data.iloc[:, i])
        if data.equals(test_dataset_n.iloc[:, -1]):
            raise Exception("Chunk n:", n, "target labels match feature:", i, test_data.columns[i], "nFeatures: ", test_data.shape[1])

    # Export test dataset
    test_data.to_csv(os.path.join(t_dir, f'test_data_n{n}.csv'), sep=",", index=True, header=True)
    return(test_dataset_n)

In [7]:
# Define function for averaging prediction probabilities
def averaging_predictions(results):
    DTG_pr = []
    for prediction in results['gene'].unique():
        DTG_pr.append(
            {
                'Gene': prediction,
                'Avg_probability': results[results['gene'] == prediction][1].mean()
            }
        )  
    return(pd.DataFrame(DTG_pr).sort_values(by='Avg_probability', ascending=False))

In [8]:
# Define function to evaluate model m range r on test set n
def model_performance_test(m = "Ensemble", r = 2, n = 0):
    
    # Create test data (X_test) and labels (y_test)
    test_set = test_set_n(n=n)
    gene_names = np.array(test_set.index.to_list())
    X_test = test_set.iloc[:, 0:-1].values
    y_test = test_set.iloc[:, -1].values

    
    n_predictions = {}
    model_run = namedtuple("model_run", ["model", "sample"])

    # Evaluate range r of models m 
    for i in range(r):
        # load model
        read_dir = os.path.join(out_dir, f'draw_{i}')
        model = pickle.load(open(os.path.join(read_dir, f'{m}_m{i}.sav'), 'rb'))
        
        # predictions and perfomance metrics
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        precision= precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        pred = model.predict_proba(X_test)
        roc_auc = roc_auc_score(y_test, pred[:,1])
        
        # save to pd dataframe
        test_predictions = pd.DataFrame(pred)
        test_predictions['gene'] = gene_names
        test_predictions['model'] = m
        test_predictions['sample'] = i
        test_predictions = test_predictions[['gene', 0, 1, 'model', 'sample']]
        test_predictions['Accuracy'] = '{0:0.5f}'.format(accuracy)
        test_predictions['Precision'] = '{0:0.5f}'.format(precision)
        test_predictions['Recall'] = '{0:0.5f}'.format(recall)
        test_predictions['F1'] = '{0:0.5f}'.format(f1)
        test_predictions['ROC_AUC'] = '{0:0.5f}'.format(roc_auc)

        n_predictions[model_run(m, i)] = test_predictions

    # export results tables    
    df_predictions = pd.concat(n_predictions.values(), sort=False, join='outer', axis=0, ignore_index=True)
    df_predictions.to_csv(os.path.join(t_dir, f'{m}_test_n{n}_full.tsv'), index=False, sep="\t")

    predictions = averaging_predictions(df_predictions)
    predictions.to_csv(os.path.join(t_dir, f'{m}_test_n{n}_avg.tsv'), index=False, sep="\t")


In [9]:
for t in range(89):
    model_performance_test(m = "Ensemble", r = 89, n = t)