In [33]:
import matplotlib.pyplot as plt

from pathlib import Path
from pandas import read_csv, DataFrame, concat
from sklearn.metrics import auc, RocCurveDisplay
from sklearn.metrics import classification_report
from sklearn.model_selection import StratifiedKFold
from matplotlib.pyplot import subplots, savefig, close
from numpy import linspace, interp, mean, std, minimum, maximum

from sklearn.svm import SVC, LinearSVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

In [2]:
import sys

IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    DATA_DIR = Path('https://pseudouridine.s3.ap-south-1.amazonaws.com/data')
else:
    DATA_DIR = Path('..') / 'data'

In [3]:
SPECIES = [
    'H.Sapiens100',
    'H.Sapiens495',
    'M.musculus472',
    'S.cerevisiae100',
    'S.cerevisiae314'
]

ENCODINGS = [
    'anf',
    'binary',
    'cksnap',
    'dnc',
    'eiip',
    'enac',
    'kmer',
    'knc',
    'nac',
    'ncp',
    'nd',
    'psdp',
    'pse_eiip',
    'pse_knc',
    'psnp',
    'pstp',
    'rc_kmer'
]

In [4]:
ML_MODELS = {
    'svc': lambda params: SVC(**params),
    'linear_svc': lambda params: LinearSVC(**params),
    'logistic': lambda params: LogisticRegression(**params),
    'knn': lambda params: KNeighborsClassifier(**params),
    'random_forest': lambda params: RandomForestClassifier(**params),
}

In [18]:
def visualize_cv_roc(results):
    tprs = []
    aucs = []
    mean_fpr = linspace(0, 1, 100)
    fig, ax = subplots(figsize=(7.5, 7.5))

    for index, (y_true, y_pred) in enumerate(results):
        viz = RocCurveDisplay.from_predictions(
            y_true,
            y_pred,
            name=f'ROC fold {index}',
            alpha=0.3,
            lw=1,
            ax=ax
        )

        interp_tpr = interp(mean_fpr, viz.fpr, viz.tpr)
        interp_tpr[0] = 0.0
        tprs.append(interp_tpr)
        aucs.append(viz.roc_auc)

    ax.plot([0, 1], [0, 1], "k--", label="chance level (AUC = 0.5)")
    mean_tpr = mean(tprs, axis=0)
    mean_tpr[-1] = 1.0
    mean_auc = auc(mean_fpr, mean_tpr)
    std_auc = std(aucs)
    ax.plot(
        mean_fpr,
        mean_tpr,
        color="b",
        label=r"Mean ROC (AUC = %0.2f $\pm$ %0.2f)" % (mean_auc, std_auc),
        lw=2,
        alpha=0.8,
    )

    std_tpr = std(tprs, axis=0)
    tprs_upper = minimum(mean_tpr + std_tpr, 1)
    tprs_lower = maximum(mean_tpr - std_tpr, 0)
    ax.fill_between(
        mean_fpr,
        tprs_lower,
        tprs_upper,
        color="grey",
        alpha=0.2,
        label=r"$\pm$ 1 std. dev.",
    )

    ax.set(
        xlim=[-0.05, 1.05],
        ylim=[-0.05, 1.05],
        xlabel="False Positive Rate",
        ylabel="True Positive Rate",
        title=f"Mean ROC curve with variability\n(Positive label')",
    )
    ax.axis("square")
    return fig


def cv_classification_report(results):
    score_sum = classification_report(results[0][0], results[0][1], output_dict=True, zero_division=0)
    for result in results[1:]:
        score = classification_report(result[0], result[1], output_dict=True, zero_division=0)

        score_sum['0']['precision'] += score['0']['precision']
        score_sum['0']['recall'] += score['0']['recall']
        score_sum['0']['f1-score'] += score['0']['f1-score']
        score_sum['0']['support'] += score['0']['support']

        score_sum['1']['precision'] += score['1']['precision']
        score_sum['1']['recall'] += score['1']['recall']
        score_sum['1']['f1-score'] += score['1']['f1-score']
        score_sum['1']['support'] += score['1']['support']

        score_sum['accuracy'] += score['accuracy']

        score_sum['macro avg']['precision'] += score['macro avg']['precision']
        score_sum['macro avg']['recall'] += score['macro avg']['recall']
        score_sum['macro avg']['f1-score'] += score['macro avg']['f1-score']
        score_sum['macro avg']['support'] += score['macro avg']['support']

        score_sum['weighted avg']['precision'] += score['weighted avg']['precision']
        score_sum['weighted avg']['recall'] += score['weighted avg']['recall']
        score_sum['weighted avg']['f1-score'] += score['weighted avg']['f1-score']
        score_sum['weighted avg']['support'] += score['weighted avg']['support']

    score_sum['0']['precision'] /= len(results)
    score_sum['0']['recall'] /= len(results)
    score_sum['0']['f1-score'] /= len(results)
    score_sum['0']['support'] /= len(results)

    score_sum['1']['precision'] /= len(results)
    score_sum['1']['recall'] /= len(results)
    score_sum['1']['f1-score'] /= len(results)
    score_sum['1']['support'] /= len(results)

    score_sum['accuracy'] /= len(results)

    score_sum['macro avg']['precision'] /= len(results)
    score_sum['macro avg']['recall'] /= len(results)
    score_sum['macro avg']['f1-score'] /= len(results)
    score_sum['macro avg']['support'] /= len(results)

    score_sum['weighted avg']['precision'] /= len(results)
    score_sum['weighted avg']['recall'] /= len(results)
    score_sum['weighted avg']['f1-score'] /= len(results)
    score_sum['weighted avg']['support'] /= len(results)

    return score_sum


def train_model_cv(model, dataset, splits=10):
    data = read_csv(dataset / 'all.csv')

    x = data.drop('label', axis=1)
    y = data['label']

    k_fold = StratifiedKFold(n_splits=splits)

    results = []
    for fold, (train, test) in enumerate(k_fold.split(x, y)):
        model.fit(x.iloc[train], y.iloc[train])

        y_true = y.iloc[test]
        y_pred = model.predict(x.iloc[test])

        results.append((y_true, y_pred))

    return results

In [30]:
def train_model(model, dataset):
    data1 = read_csv(dataset / 'train.csv')
    data2 = read_csv(dataset / 'valid.csv')

    data = concat([data1, data2])

    x = data.drop('label', axis=1)
    y = data['label']

    model.fit(x, y)


def simple_classification_report(model, dataset):
    data = read_csv(dataset / 'test.csv')

    x = data.drop('label', axis=1)
    y = data['label']

    return classification_report(model.predict(x), y, output_dict=True, zero_division=0)


def visualize_roc(model, dataset):
    data = read_csv(dataset / 'test.csv')

    x = data.drop('label', axis=1)
    y = data['label']

    RocCurveDisplay.from_predictions(
        model.predict(x),
        y,
        name=f"ROC",
        color="darkorange",
    )
    plt.plot([0, 1], [0, 1], "k--", label="AUC = 0.5")
    plt.axis("square")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")

In [38]:
def perform_experiment(specie, encoding, model_name, model_params=None, cv=True, cv_splits=10, save_dir=None):
    model = ML_MODELS[model_name](model_params if model_params is not None else dict())
    dataset = DATA_DIR / 'processed' / specie / encoding

    if cv:
        results = train_model_cv(model, dataset, cv_splits)
        figure = visualize_cv_roc(results)
        report = cv_classification_report(results)

        if save_dir is None:
            figure.show()
            print(DataFrame(report).transpose())
        else:
            save_dir = Path(save_dir)
            save_dir.mkdir(exist_ok=True)

            exp_name = f'{model_name}({specie})[{encoding}]_cv={cv_splits}'
            print()
            figure.savefig(save_dir / f'{exp_name}.svg')
            close()
            DataFrame(report).transpose().to_csv(save_dir / f'{exp_name}.csv')
    else:
        train_model(model, dataset)
        visualize_roc(model, dataset)
        report = simple_classification_report(model, dataset)

        if save_dir is None:
            print(DataFrame(report).transpose())
        else:
            save_dir = Path(save_dir)
            save_dir.mkdir(exist_ok=True)

            exp_name = f'{model_name}({specie})[{encoding}]'
            savefig(save_dir / f'{exp_name}.svg')
            close()
            DataFrame(report).transpose().to_csv(save_dir / f'{exp_name}.csv')

In [40]:
perform_experiment(SPECIES[1], ENCODINGS[0], 'svc', cv=False, save_dir='../results')