In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import joblib

from interpret.glassbox import ExplainableBoostingClassifier as EBM
from interpret.perf import ROC
from interpret import show
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, auc, roc_curve
from sklearn.utils import resample
from tqdm import tqdm

### Functions

In [None]:

def load_data(filepath, sep="\t", header=None, columns=None):
    '''
    this function loads the data from the given file path and returns a pandas dataframe
    filepath: str, path to the file e.g. "data/train.csv"
    sep: str, separator used in the file e.g. ",", "\t"
    header: int, row number to use as the column names e.g. 0, None
    columns: list, list of column names to use e.g. ["col1", "col2", "col3"]

    '''
    data_df = pd.read_csv(filepath, sep=sep, header=header, names=columns)
    
    return data_df

In [None]:
def preprocess_data(data_df, label_column, removed_labels, chosen_labels = {'TP': 0, 'FP': 1, 'FP_CA': 1}):
    '''
    this function preprocesses the data by removing the rows with the removed labels and mapping the labels to the chosen labels
    data_df: pandas dataframe, dataframe to preprocess
    label_column: str, name of the column containing the labels
    removed_labels: list, list of labels to remove from the data
    chosen_labels: dict, dictionary to map the labels to the chosen labels

    '''
    data_df = data_df[~data_df[label_column].isin(removed_labels)]
    data_df = data_df.dropna(subset=[label_column])
    data_df[label_column] = data_df[label_column].map(chosen_labels)
    return data_df

In [None]:
def balance_data(data_df, label_column, size=1.0):
    '''
    this function balances the data by upsampling the minority class and downsampling the majority class
    data_df: pandas dataframe, dataframe to balance
    label_column: str, name of the column containing the labels
    size: float, size of the minority class relative to the majority class
    
    '''
    mx = data_df[label_column].value_counts().max() # get the maximum count of the labels
    TP = data_df[data_df[label_column] == 0]
    FP = data_df[data_df[label_column] == 1]

    if len(TP)  == mx:
        TP_downsampled = TP
        FP_upsampled = resample(FP, replace=True, n_samples=int(len(TP)*size), random_state=27)
    else:
        FP_upsampled = FP
        TP_downsampled = resample(TP, replace=True, n_samples=int(len(FP)*size), random_state=27)
    
        
    balanced_data_df = pd.concat([TP_downsampled, FP_upsampled])
    return balanced_data_df

In [None]:
def split_data(data_df, label_column, drop_columns, scaler_flag=False, random_state=42):
    '''
    this function splits the data into training and testing sets
    data_df: pandas dataframe, dataframe to split
    label_column: str, name of the column containing the labels
    drop_columns: list, list of columns to drop from the data
    scaler_flag: bool, flag to scale the data
    random_state: int, random state for reproducibility
        
    '''
    
    X = data_df.drop([label_column, *drop_columns], axis=1)
    y = data_df[label_column]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.10, random_state=random_state)
    
    if scaler_flag:
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
    
    return X_train, X_test, y_train, y_test

In [None]:
def train_ebm(X_train, y_train, random_seed=42):
    '''
    this function trains the Explainable Boosting Machine model
    X_train: numpy array, training features
    y_train: numpy array, training labels
    random_seed: int, random state for reproducibility
    
    '''
    ebm = EBM(random_state=random_seed)
    n_chunks = 100
    X_train_chunks = np.array_split(X_train, n_chunks)
    y_train_chunks = np.array_split(y_train, n_chunks)
    for i in tqdm(range(n_chunks)):
        ebm.fit(X_train_chunks[i], y_train_chunks[i])
    
    return ebm

In [None]:
def evaluate_model(ebm, X_test, y_test):
    '''
    this function evaluates the model using the test set
    ebm: Explainable Boosting Machine model
    X_test: numpy array, test features
    y_test: numpy array, test labels
        
        '''
    
    y_pred = ebm.predict(X_test)
    print("Accuracy: ", accuracy_score(y_test, y_pred))
    print("Classification Report: \n", classification_report(y_test, y_pred))
    print("Confusion Matrix: \n", confusion_matrix(y_test, y_pred))
    
    ebm_perf = ROC(ebm.predict_proba).explain_perf(X_test, y_test)
    show(ebm_perf)
    
    conf_matrix = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()
    return ebm_perf

In [None]:
def global_explanation(ebm):
    global_explanation = ebm.explain_global()
    show(global_explanation)
    
    return global_explanation

def local_explanation(ebm, X_test, y_test , len = None):
    if len:
        local_explanation = ebm.explain_local(X_test[:len], y_test[:len])
    else:
        local_explanation = ebm.explain_local(X_test, y_test)
    show(local_explanation)
    return local_explanation

In [None]:
def plot_selected_regions_positive(term_scores, all_regions, selected_regions, model_name):
    """
    Plots the mean scores above 1 for selected regions using Plotly.
    
    Parameters:
    term_scores (list of np.ndarray): List of term scores from the EBM model.
    all_regions (list of str): List of all region names corresponding to the term scores.
    selected_regions (list of str): List of region names to be selected for plotting.
    
    Raises:
    ValueError: If the number of selected regions does not match the number of selected term scores.
    """
    # Filter the term scores and regions
    selected_term_scores = []
    for region in selected_regions:
        index = all_regions.index(region)
        selected_term_scores.append(term_scores[index])

    # Ensure the number of selected regions matches the number of selected term scores
    if len(selected_term_scores) != len(selected_regions):
        raise ValueError("The number of selected regions does not match the number of selected term scores.")

    # Initialize a dictionary to store the filtered scores
    region_scores = {}

    # Filter scores above 1 and compute the mean for each selected region
    for i, scores in enumerate(selected_term_scores):
        filtered_scores = scores[scores > 0]
        if len(filtered_scores) > 0:
            region_scores[selected_regions[i]] = filtered_scores.mean()  # Taking the mean value for simplicity

    # Sort the regions by their mean scores above 1 from highest to lowest
    sorted_region_scores = dict(sorted(region_scores.items(), key=lambda item: item[1], reverse=True))

    # Create the bar plot using Plotly
    fig = go.Figure()

    fig.add_trace(go.Bar(
        x=list(sorted_region_scores.keys()),
        y=list(sorted_region_scores.values()),
        marker_color='#006db6'
    ))

    fig.update_layout(
        title=f'{model_name} Scores for Selected Regions',
        xaxis_title='Region Name',
        yaxis_title='Score',
    )

    fig.show()

In [None]:
def plot_selected_regions(term_scores, all_regions, selected_regions, model_name):
    """
    Plots the mean scores above 1 for selected regions using Plotly.
    
    Parameters:
    term_scores (list of np.ndarray): List of term scores from the EBM model.
    all_regions (list of str): List of all region names corresponding to the term scores.
    selected_regions (list of str): List of region names to be selected for plotting.
    
    Raises:
    ValueError: If the number of selected regions does not match the number of selected term scores.
    """
    # Filter the term scores and regions
    selected_term_scores = []
    for region in selected_regions:
        index = all_regions.index(region)
        selected_term_scores.append(term_scores[index])

    # Ensure the number of selected regions matches the number of selected term scores
    if len(selected_term_scores) != len(selected_regions):
        raise ValueError("The number of selected regions does not match the number of selected term scores.")

    # Initialize a dictionary to store the filtered scores
    region_scores = {}

    # compute the mean for each selected region
    for i, scores in enumerate(selected_term_scores):
        region_scores[selected_regions[i]] = scores.max()
        

    # Sort the regions by their mean scores above 1 from highest to lowest
    sorted_region_scores = dict(sorted(region_scores.items(), key=lambda item: item[1], reverse=True))

    # Create the bar plot using Plotly
    fig = go.Figure()

    fig.add_trace(go.Bar(
        x=list(sorted_region_scores.keys()),
        y=list(sorted_region_scores.values()),
        marker_color='#006db6'
    ))

    fig.update_layout(
        title=f'{model_name} Scores for Selected Regions',
        xaxis_title='Region Name',
        yaxis_title='Score',
    )

    fig.show()

In [None]:
def plot_selected_regions_together(term_scores, all_regions, selected_regions, technologies):
    """
    Plots the mean scores for selected regions using Plotly, with scores for different technologies side by side.

    Parameters:
    term_scores (list of np.ndarray): List of term scores from the EBM model.
    all_regions (list of str): List of all region names corresponding to the term scores.
    selected_regions (list of str): List of region names to be selected for plotting.
    technologies (list of str): List of technology names corresponding to the term scores.

    Raises:
    ValueError: If the number of selected regions does not match the number of selected term scores.
    """
    # Filter the term scores and regions
    selected_term_scores = []
    for region in selected_regions:
        if region in all_regions:
            index = all_regions.index(region)
            selected_term_scores.append([ts[index] for ts in term_scores])
        else:
            raise ValueError(f"Region {region} not found in all_regions")

    # Ensure the number of selected regions matches the number of selected term scores
    if len(selected_term_scores) != len(selected_regions):
        raise ValueError("The number of selected regions does not match the number of selected term scores.")

    # Initialize a dictionary to store the scores
    region_scores = {tech: {} for tech in technologies}

    # Compute the max for each selected region for each technology
    for tech, scores_list in zip(technologies, zip(*selected_term_scores)):
        for i, scores in enumerate(scores_list):
            if len(scores) > 0:
                region_scores[tech][selected_regions[i]] = np.max(scores)

    # Custom colors for each technology
    colors = ['#006db6', '#ff6f61', '#2d7f5e']
    colors = ['#000000', '#046bb3', '#8c8c8c']
    # Create the bar plot using Plotly
    fig = go.Figure()

    for tech, color in zip(technologies, colors):
        fig.add_trace(go.Bar(
            x=selected_regions,
            y=[region_scores[tech].get(region, 0) for region in selected_regions],
            name=tech,
            marker_color=color,
        ))

    fig.update_layout(
        title='Scores for Selected Regions by Technology',
        xaxis_title='Region Name',
        yaxis_title='Score',
        barmode='group',
        # font size for selected regions
        xaxis_tickfont=dict(size=22),
        # font size for legend
        legend=dict(
            title='Technology',
            title_font_size=22,
            font_size=24,
        ),
        yaxis_tickfont=dict(size=22),
        font = dict(size=22),
        width = 1920,
        height = 720,

        )

    fig.show()


In [None]:
def save_model(model, model_name):
    joblib.dump(model, f"{model_name}_model.pkl")
    print(f"{model_name} model saved successfully")

## Training Model on Various Technology VCFs


This section extends the notebook to train models on VCFs from different sequencing technologies (Illumina, Ultima, PacBio) and rank their weakest regions of performance (FP and FN).

### Steps:
1. Load and preprocess data for each technology.
2. Train models for each technology.
3. Evaluate models to rank the weakest regions of performance.
4. Compare the weakest regions of performance across technologies.
5. Save the Models.

In [None]:
illumina_data = load_data("./datasets/Illumina_dataset.txt", sep="\t", columns=['chrom', 'pos', 'ref', 'alt', 'filter', 'lowmappabilityall', 'lowGC', 'HighGC', 'Homoploymers', 'SegDup', 'label'], header=0)
ultima_data = load_data("./datasets/Ultima_dataset.txt", sep="\t", columns=['chrom', 'pos', 'ref', 'alt', 'filter', 'lowmappabilityall', 'lowGC', 'HighGC', 'Homoploymers', 'SegDup', 'label'] , header=0)
pacbio_data = load_data("./datasets/PacBio_dataset.txt", sep="\t", columns=['chrom', 'pos', 'ref', 'alt', 'filter', 'lowmappabilityall', 'lowGC', 'HighGC', 'Homoploymers', 'SegDup', 'label'] , header=0)

print("All datasets loaded successfully")

In [None]:
illumina_data = preprocess_data(illumina_data, 'label', ['HARD', 'OUT', 'IGN'], {'TP': 0, 'FP': 1, 'FP_CA': 1})
ultima_data = preprocess_data(ultima_data, 'label', ['HARD', 'OUT', 'IGN'], {'TP': 0, 'FP': 1, 'FP_CA': 1})
pacbio_data = preprocess_data(pacbio_data, 'label', ['HARD', 'OUT', 'IGN'], {'TP': 0, 'FP': 1, 'FP_CA': 1})

print("All datasets preprocessed successfully")

In [None]:
illumina_data = balance_data(illumina_data, 'label')
ultima_data = balance_data(ultima_data, 'label')
pacbio_data = balance_data(pacbio_data, 'label')

print("All datasets balanced successfully")

In [None]:
print(illumina_data['filter'].value_counts())
print(ultima_data['filter'].value_counts())
print(pacbio_data['filter'].value_counts())

In [None]:
illumina_X_train, illumina_X_test, illumina_y_train, illumina_y_test = split_data(illumina_data, 'label', ['chrom', 'pos', 'ref', 'alt', 'filter'], random_state=90)
ultima_X_train, ultima_X_test, ultima_y_train, ultima_y_test = split_data(ultima_data, 'label', ['chrom', 'pos', 'ref', 'alt', 'filter'], random_state=90)
pacbio_X_train, pacbio_X_test, pacbio_y_train, pacbio_y_test = split_data(pacbio_data, 'label', ['chrom', 'pos', 'ref', 'alt', 'filter'],  random_state=90)

print("All datasets split successfully")

In [None]:
models = {'illumina' : (illumina_X_train, illumina_y_train), 'ultima' : (ultima_X_train, ultima_y_train), 'pacbio' : (pacbio_X_train, pacbio_y_train)}

# create the models to access them later
illumina_ebm = None
ultima_ebm = None
pacbio_ebm = None

for model, (X_train, y_train) in models.items():
    print(f"Training {model} model")
    if model == 'illumina':
        illumina_ebm = train_ebm(X_train, y_train)
    elif model == 'ultima':
        ultima_ebm = train_ebm(X_train, y_train)
    else:
        pacbio_ebm = train_ebm(X_train, y_train)
    print(f"{model} model trained successfully")

print("All models trained successfully")


In [None]:
# evaluate the models
illumina_perf = evaluate_model(illumina_ebm, illumina_X_test, illumina_y_test)
ultima_perf = evaluate_model(ultima_ebm, ultima_X_test, ultima_y_test)
pacbio_perf = evaluate_model(pacbio_ebm, pacbio_X_test, pacbio_y_test)

print("All models evaluated successfully")

In [None]:
all_regions = [
    'lowmappabilityall', 'SegDup', 'Homopolymers', 
    'lowmappabilityall & Homopolymers', 'Homopolymers & SegDup', 
    'lowmappabilityall & SegDup', 'lowGC', 
    'lowGC & SegDup', 'lowGC & Homopolymers', 'HighGC'
]

all_ebms = {'Illumina': illumina_ebm, 'Ultima': ultima_ebm, 'PacBio': pacbio_ebm}
selected_regions = ['lowmappabilityall', 'SegDup', 'Homopolymers', 'lowGC', 'HighGC']

for model_name, ebm in all_ebms.items():
    term_scores = ebm.term_scores_
    plot_selected_regions(term_scores, all_regions, selected_regions, model_name)

print("All selected regions plotted successfully")

In [None]:

global_explanation(ultima_ebm)


In [None]:
term_scores = [illumina_ebm.term_scores_, ultima_ebm.term_scores_, pacbio_ebm.term_scores_]
technologies = ['Illumina', 'Ultima', 'PacBio']
all_regions = [
    'lowmappabilityall', 'SegDup', 'Homopolymers', 
    'lowmappabilityall & Homopolymers', 'Homopolymers & SegDup', 
    'lowmappabilityall & SegDup', 'lowGC', 
    'lowGC & SegDup', 'lowGC & Homopolymers', 'HighGC'
]
selected_regions = ['lowmappabilityall', 'SegDup', 'Homopolymers', 'lowGC', 'HighGC']

plot_selected_regions_together(term_scores, all_regions, selected_regions, technologies)


In [None]:
save_model(illumina_ebm, 'illumina')
save_model(ultima_ebm, 'ultima')
save_model(pacbio_ebm, 'pacbio')
