# Benchmark

## 1) Setup

In [None]:
from typing import List, Union, Dict, Tuple
import pandas as pd
import numpy as np
import os
import pathlib
import json

In [None]:
%load_ext Cython

In [None]:
RESULTS_PATH = './results'
REPEATS_TO_SEARCH = [1, 2, 3, 4]

In [None]:
%%cython

cimport cython
cimport numpy as np
@cython.boundscheck(False)  # Deactivate bounds checking
@cython.wraparound(False)   # Deactivate negative indexing.
def confusion_matrix(np.int8_t[:] true_class, np.int8_t[:] pred_class, long[:,:] output, long length):
    """ calculate confusion matrix"""
    cdef int i
    for i in range(length):
        output[true_class[i],pred_class[i]]+=1

In [None]:
def preprocess_Y(filename: str, chromosom: str, length: int,
                 repeats_to_search: List[int]) -> np.array:
    """ Reads parse_rm file of repeats to numpy array"""

    Ydata = pd.read_csv(filename, sep='\s+', header=None, index_col=False, usecols=[0,1,2,3])
    Ydata.columns = [
        'chromosom', 'begin', 'end', 'repeatnumber'
    ]
    Ydata = Ydata[Ydata.chromosom == chromosom]
    Ydata.drop('chromosom', axis=1, inplace=True)

    bool_series = None
    for number in repeats_to_search:
        if bool_series is None:
            bool_series = (Ydata['repeatnumber'] == number)
        else:
            bool_series |= (Ydata['repeatnumber'] == number)
    Ydata = Ydata[bool_series]
    Y = np.zeros((len(repeats_to_search) + 1, length), dtype=np.int8)

    def assign_toY(row):
        Y[row['repeatnumber'], row.begin:row.end] = 1

    Ydata.apply(assign_toY, axis=1)
    return Y.argmax(axis=0).astype(np.int8)

In [None]:
def file_to_array(filename: str,
                  shape: np.array,
                  dnabrnn: bool = False) -> np.array:
    """Reads dna-brnn or deepgrp file to array"""
    headernames = ["file", "chr", "start", "end", "class"]
    if dnabrnn:
        headernames.pop(0)
    tmp = pd.read_csv(filename, header=None, sep="\t",
                      names=headernames).filter(headernames[-3:], axis=1)
    Y = np.zeros(shape, dtype=np.int8)

    def assign_toY(row):
        Y[row.start:row.end] = row['class']

    tmp.apply(assign_toY, axis=1)
    return Y

In [None]:
def mcc(C):
    """ MCC implementation based on sklearn"""
    t_sum = C.sum(axis=1, dtype=np.float64)
    p_sum = C.sum(axis=0, dtype=np.float64)
    n_correct = np.trace(C, dtype=np.float64)
    n_samples = p_sum.sum()
    cov_ytyp = n_correct * n_samples - np.dot(t_sum, p_sum)
    cov_ypyp = n_samples**2 - np.dot(p_sum, p_sum)
    cov_ytyt = n_samples**2 - np.dot(t_sum, t_sum)
    return cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)

In [None]:
def calculate_metrics(predictions_class: np.array, true_class: np.array):
    """Calculated important metrics."""
    nof_labels = len(REPEATS_TO_SEARCH) + 1
    cnf_matrix = np.zeros((nof_labels, nof_labels), dtype=int)
    confusion_matrix(true_class, predictions_class, cnf_matrix,
                     true_class.shape[0])
    true_positive = np.diag(cnf_matrix).astype(float)
    false_positive = (cnf_matrix.sum(axis=0) - true_positive).astype(float)
    false_negative = (cnf_matrix.sum(axis=1) - true_positive).astype(float)
    true_negative = (
        cnf_matrix.sum() -
        (false_positive + false_negative + true_positive)).astype(float)
    metrics = {}
    # Sensitivity, hit rate, recall, or true positive rate
    metrics["TPR"] = true_positive / (true_positive + false_negative)
    # Specificity or true negative rate
    metrics["TNR"] = true_negative / (true_negative + false_positive)
    # Precision or positive predictive value
    metrics["PPV"] = true_positive / (true_positive + false_positive)
    # Negative predictive value
    metrics["NPV"] = true_negative / (true_negative + false_negative)
    # Fall out or false positive rate
    metrics["FPR"] = false_positive / (false_positive + true_negative)
    # False negative rate
    metrics["FNR"] = false_negative / (true_positive + false_negative)
    # False discovery rate
    metrics["FDR"] = false_positive / (true_positive + false_positive)
    # Accuracy
    metrics["ACC"] = (true_positive + true_negative) / \
        (true_positive + false_positive + false_negative + true_negative)
    # F1 -Score
    metrics["F1"] = 2 * metrics["TPR"] * \
        metrics["PPV"] / (metrics["TPR"] + metrics["PPV"])
    metrics["TotalACC"] = (
        true_class == predictions_class).sum() / true_class.shape[0]
    metrics['MCC'] = ((true_positive * true_negative) -
                      (false_positive * false_negative)) / np.sqrt(
                          (true_positive + false_positive) *
                          (true_positive + false_negative) *
                          (true_negative + false_positive) *
                          (true_negative + false_negative))
    metrics['totalMCC'] = mcc(cnf_matrix)
    for key in metrics:
        if isinstance(metrics[key], np.ndarray):
            metrics[key] = metrics[key].tolist()
    metrics['confusionmatrix'] = cnf_matrix.tolist()
    return metrics

In [None]:
def calculate_all(results, chr_length, references, outputs,  is_dnabrnn=False):
    """ Calculates metrics based on deepgrp or dna-brnn output file"""
    for k in results:
        foldername, chrfile = os.path.split(k)
        chromosom = chrfile.replace('.fa', '').replace("telomere_to_telomere_X", "CM020874.1")
        seqlen = int(chr_length.loc[(foldername,chromosom)])
        for name, reference in references[foldername].items():
            outputs.setdefault(name, {})
            outputs[name].setdefault(chromosom, {})
            if outputs[name][chromosom].keys() == results[k].keys():
                continue
            if isinstance(reference, str):
                Ytrue = preprocess_Y(reference, chromosom, seqlen, REPEATS_TO_SEARCH)
            else:
                Ytrue_diff = preprocess_Y(reference[0], chromosom, seqlen, REPEATS_TO_SEARCH)
                Ytrue = preprocess_Y(reference[1], chromosom, seqlen, REPEATS_TO_SEARCH)
                Ytrue[Ytrue == Ytrue_diff] = 0
            for model in results[k]:
                if model in outputs[name][chromosom]:
                    continue
                predfilename = "{}_{}.fa_{}.tsv".format(foldername, chromosom,
                                                        model)
                predfilename = predfilename.replace("_CM020874.1","telomere_to_telomere_X")
                Ypred = file_to_array(predfilename, Ytrue.shape, is_dnabrnn)
                metrics = calculate_metrics(Ypred, Ytrue)
                outputs[name][chromosom][model] = metrics
                del Ypred
            del Ytrue
    return outputs

In [None]:
chr_length = pd.DataFrame()
for chromosome in pathlib.Path("data").glob("*.chrom.sizes"):
    tmp = pd.read_csv(
    chromosome,
    sep='\t',
    header=None,
    names=['chromosome', 'sequence length'])
    tmp["genomebuild"] = chromosome.stem
    chr_length = chr_length.append(tmp)

In [None]:
REFERENCE = {
    "hg19": {"hg19":"repeatmasker/repeats_hg19.tsv"},
    "hg38": {"hg38":"repeatmasker/repeats_hg38.tsv",
             "dfam":"dfam/hg38.dfam.bed",
             #"dfam_and_rm":"dfam_repeatmasker_intersection.csv",
            #"dfam_no_rm":"dfam_not_repeatmasker.csv",
             #"hg19_hg38_similar":"repeatmaskerhg38_nonexacthg19_intersection.tsv",
            },
    "mm10": {"mm10": "mm10.bed"},
}

## 2) Evaluate DeepGRP

In [None]:
filename = 'deepgrp_results.json'

In [None]:
try:
    with pathlib.Path(RESULTS_PATH, "deepgrp_runningtime.json").open('r') as file:
        results = json.load(file)
except (json.JSONDecodeError, FileNotFoundError):
    results = {}

In [None]:
try:
    with pathlib.Path(RESULTS_PATH,filename).open('r') as file:
        outputs = json.load(file)
except (json.JSONDecodeError, FileNotFoundError):
    outputs = {}

In [None]:
deepgrp_results = calculate_all(results, chr_length, REFERENCE,outputs)

In [None]:
with pathlib.Path(RESULTS_PATH, filename).open('w') as file:
    json.dump(deepgrp_results, file)

## 3) Evaluate dna-brnn

In [None]:
filename = 'dnabrnn_results.json'

In [None]:
try:
    with pathlib.Path(RESULTS_PATH, "dnabrnn_runningtime.json").open('r') as file:
        results = json.load(file)
except (json.JSONDecodeError, FileNotFoundError):
    results = {}

In [None]:
try:
    with pathlib.Path(RESULTS_PATH,filename).open('r') as file:
        outputs = json.load(file)
except (json.JSONDecodeError, FileNotFoundError):
    outputs = {}

In [None]:
dnabrnnresults = calculate_all(results, chr_length, REFERENCE,outputs, is_dnabrnn=True)

#### Save results

In [None]:
with open(os.path.join(filename), 'w') as file:
    json.dump(dnabrnnresults, file)