# Instructions
This notebook takes a Pathogenic scoring lookup table and calculates the variant load for all protein-encoding genes from raw fasta mtDNA files or a list of patient variants.

Before uploading a lookup table ensure the mtDNA variant lookup table has the following three column headings: Position, Reference, Mutation, Pathogenicity_Score

The position column corresponds to the reference mtDNA unaligned genome and the Mutation column contains all possible variants at each position.



1. First run the cell below to import the necessary libraries.

In [5]:
#@title Load packages


#%%capture
#makes cell silent

from IPython.display import clear_output
#!sudo apt install clustalo
#!curl -sL haplogrep.now.sh | bash
#clear_output()
#!pip install gradio==4.44.1 # make sure match with version used on server
#!pip install Biopython
#!pip install tqdm
#clear_output()
!conda env list # check conda is working
!clustalo --version # check this is working
from Bio import SeqIO, AlignIO
from Bio.Align.Applications import ClustalwCommandline,ClustalOmegaCommandline  # , MuscleCommandline
import os
import pandas as pd
import tqdm
from tqdm import tqdm
import numpy as np
from Bio import pairwise2
from Bio.pairwise2 import format_alignment
import gradio as gr
clear_output()
import pandas as pd
from Bio import SeqIO
from Bio.Align.Applications import ClustalOmegaCommandline
import gradio as gr
import re
#!wget https://github.com/Iqbalwasim01/mtDNA/archive/refs/heads/main.zip
#!unzip -o main.zip
#clear_output()


2. Now run the cell below to load the user-friendly interface. The following file uploads will be required:


*   Patient data: Either a FASTA or .txt file containing unaligned patient mtDNA genomes OR a .txt file containing patient variants.

  *   If a FASTA file is provided for patient mtDNA genomes, upload a FASTA, .txt, or .gb file containing a mtDNA reference genome.

*   A .csv file for pathogenic score lookup. Before uploading a lookup table ensure the mtDNA variant lookup table has the following four column headings: Position, Reference, Mutation, Pathogenicity_Score










In [2]:
#@title VLS calculator

def Variant_load_calculation_haploexclude_all_characters(table, list_seq, list_var, reference_seq, Haplomarker_percentage, APOGEE_threshold, Haplomarker_total_count):
    table = pd.read_csv(table.name)
    reference_seq = list(SeqIO.parse(reference_seq.name, "fasta"))
    list_seq = list(SeqIO.parse(list_seq.name, "fasta"))
    haplovariants_table=pd.read_csv("mtDNA-main/Haplo_context.csv")
    haplovariants_table=haplovariants_table[(haplovariants_table['Percentage']>=Haplomarker_percentage) & (haplovariants_table['total']>=Haplomarker_total_count)] # subset halpovariants_table based variant frequency

    table['Position'] = table['Position'].values - 1  # Adjust Position to match Python indexing
    haplovariants_table['Position'] = haplovariants_table['Position'].values - 1

    results_df = pd.DataFrame(columns=["Patient", "Variant Load Score"])
    for patient in tqdm(range(0, len(list_seq)), desc="Sifting through mutations per patient"):
        SeqIO.write(list_seq[patient], "./unaligned_seq_patient.fasta", "fasta") # write the aligned patient sequence to use later for halpocontext
        Sequences = reference_seq + list_seq[patient:patient+1]
        with open("./unaligned_seq.fasta", "w") as unaligned_file:
            SeqIO.write(Sequences, unaligned_file, "fasta")

        clustalw_cline = ClustalOmegaCommandline("clustalo", infile="./unaligned_seq.fasta", outfile="aligned_seq.aln", force=True)
        clustalw_cline()

        sequences = list(SeqIO.parse("aligned_seq.aln", "fasta"))
        list_seq_aligned = str(sequences[1].seq)
        ref_aligned = str(sequences[0].seq)

        new_positions = [aligned_position for aligned_position in range(0, len(ref_aligned)) if ref_aligned[aligned_position] != '-']

        unaligned_ref_positions = list(range(0, 16569))
        ref_to_aligned_positions = dict(zip(unaligned_ref_positions, new_positions))

        haplovariants_table['Aligned_Positions'] = haplovariants_table['Position'].map(ref_to_aligned_positions) # need to also map the aligned positions to the haplocontext/variant table
        haplovariants_table.set_index('Aligned_Positions', inplace=True)
        table['Aligned_Positions'] = table['Position'].map(ref_to_aligned_positions)
        table.set_index('Aligned_Positions', inplace=True)
        Position = table.index.unique()


        !./haplogrep classify --in "unaligned_seq_patient.fasta" --output "results.txt" --format fasta  --lineage 1 #--extend-report #--hetLevel 0.1
        !grep " " " results.dot > results.txt
        df_hap=pd.read_csv("results.txt",sep="\t").loc[:,"Haplogroup"]
        Parent_haplo=[i for i in df_hap.values][0]
        #print(f'{Parent_haplo}')

        Variant_positions = []
        patient_scores = []
        for position in Position:
            current_position_mutations = table.loc[position,:]
            # this block of code subsets haplogroup marker rows with the best match to the patient haplogroup
            prefixes = [Parent_haplo[:i] for i in range(1, len(Parent_haplo) + 1)] # create all possible haplonames left to right from current haplogroup
            matching_rows = haplovariants_table[haplovariants_table['Haplogroup_marker'].apply(lambda x: any(x.startswith(prefix) for prefix in prefixes))].copy()
            matching_rows.loc[:, 'char_length'] = matching_rows['Haplogroup_marker'].apply(len) # get length all haplomatches
            Haplo_max=matching_rows['char_length'].max() # find the best match i.e. most character match and by one of the prefixes
            matching_rows = matching_rows[(matching_rows['char_length']==Haplo_max) &  (matching_rows['Haplogroup_marker'].isin(prefixes))] # subset the haplogroup with the greatest character length and must match one of the original prefixes
            current_pos_haplo_mutations = matching_rows[matching_rows.index == position] # match position to get final haplomarker
            if isinstance(current_position_mutations, pd.Series):
                current_position_mutations = current_position_mutations.to_frame().T
            base = list_seq_aligned[position]
            Score = current_position_mutations[current_position_mutations['Mutation'] == base]['Pathogenicity_Score']
            unaligned_position = current_position_mutations[current_position_mutations['Mutation'] == base]['Position'] + 1
            Reference_base = current_position_mutations[current_position_mutations['Mutation'] == base]['Reference']
            if (base in current_pos_haplo_mutations['Mutation'].values):
                patient_scores.append(np.nan)
                Variant_positions.append(f"Excluded from score as native to haplogroup: \n{Reference_base.iloc[0]}{unaligned_position.iloc[0]}{base}")
            elif(Score.empty or Score.iloc[0] < APOGEE_threshold):
                patient_scores.append(np.nan)
            else:
                patient_scores.append(Score.iloc[0])
                Variant_positions.append(f"{Reference_base.iloc[0]}{unaligned_position.iloc[0]}{base}")

        #print(f'{matching_rows.head(n=3)}')

        # NEED TO FIX THIS TO EXTRACT HVARIANTS AS NOW CHANGED FORMAT AFTER USING UNALIGNED FASTA!!!!
        Variants_array = np.asarray(Variant_positions + ["NA"] * (100 - len(Variant_positions)))
        Variants_df = pd.DataFrame([Variants_array], columns=[f'Pathogenic_Variant{i}' for i in range(0, 100)])
        new_row = pd.DataFrame([[f"Patient {str(sequences[1].id)}", np.nansum(patient_scores)]], columns=["Patient", "Variant Load Score"])
        new_row = pd.concat([new_row,df_hap,Variants_df], axis=1)
        results_df = pd.concat([results_df, new_row], ignore_index=True)

        yield results_df



def Variant_load_calculation(table, list_seq, list_var, reference_seq, Haplomarker_percentage, APOGEE_threshold, Haplomarker_total_count):
    table = pd.read_csv(table.name)
    reference_seq = list(SeqIO.parse(reference_seq.name, "fasta"))
    list_seq = list(SeqIO.parse(list_seq.name, "fasta"))

    table['Position'] = table['Position'].values - 1  # Adjust Position to match Python indexing

    results_df = pd.DataFrame(columns=["Patient", "Variant Load Score"])
    for patient in tqdm(range(0, len(list_seq)), desc="Sifting through mutations per patient"):
        SeqIO.write(list_seq[patient], "./unaligned_seq_patient.fasta", "fasta") # write the aligned patient sequence to use later for halpocontext
        Sequences = reference_seq + list_seq[patient:patient + 1]
        with open("./unaligned_seq.fasta", "w") as unaligned_file:
            SeqIO.write(Sequences, unaligned_file, "fasta")

        clustalw_cline = ClustalOmegaCommandline("clustalo", infile="./unaligned_seq.fasta", outfile="aligned_seq.aln", force=True)
        clustalw_cline()

        sequences = list(SeqIO.parse("aligned_seq.aln", "fasta"))
        list_seq_aligned = str(sequences[1].seq)
        ref_aligned = str(sequences[0].seq)

        new_positions = [aligned_position for aligned_position in range(0, len(ref_aligned)) if ref_aligned[aligned_position] != '-']

        unaligned_ref_positions = list(range(0, 16569))
        ref_to_aligned_positions = dict(zip(unaligned_ref_positions, new_positions))

        table['Aligned_Positions'] = table['Position'].map(ref_to_aligned_positions)
        table.set_index('Aligned_Positions', inplace=True)
        Position = table.index.unique()

        Variant_positions = []
        patient_scores = []
        for position in Position:
            current_position_mutations = table.loc[position,:]
            if isinstance(current_position_mutations, pd.Series):
                current_position_mutations = current_position_mutations.to_frame().T
            base = list_seq_aligned[position]
            Score = current_position_mutations[current_position_mutations['Mutation'] == base]['Pathogenicity_Score']
            unaligned_position = current_position_mutations[current_position_mutations['Mutation'] == base]['Position'] + 1
            Reference_base = current_position_mutations[current_position_mutations['Mutation'] == base]['Reference']
            if Score.empty or Score.iloc[0] < APOGEE_threshold:
                patient_scores.append(np.nan)
            else:
                patient_scores.append(Score.iloc[0])
                Variant_positions.append(f"{Reference_base.iloc[0]}{unaligned_position.iloc[0]}{base}")

        # NEED TO FIX THIS TO EXTRACT HVARIANTS AS NOW CHANGED FORMAT AFTER USING UNALIGNED FASTA!!!!
        !./haplogrep classify --in "unaligned_seq_patient.fasta" --output "results.txt" --format fasta  --lineage 1 #--extend-report #--hetLevel 0.1
        !grep " " " results.dot > results.txt
        df_hap=pd.read_csv("results.txt",sep="\t").loc[:,"Haplogroup"]
        Variants_array = np.asarray(Variant_positions + ["NA"] * (100 - len(Variant_positions)))
        Variants_df = pd.DataFrame([Variants_array], columns=[f'Pathogenic_Variant{i}' for i in range(0, 100)])
        new_row = pd.DataFrame([[f"Patient {str(sequences[1].id)}", np.nansum(patient_scores)]], columns=["Patient", "Variant Load Score"])
        new_row = pd.concat([new_row,df_hap,Variants_df], axis=1)
        results_df = pd.concat([results_df, new_row], ignore_index=True)

        yield results_df



def Variant_load_calculation_variant(table, list_seq, list_var, reference_seq, Haplomarker_percentage, APOGEE_threshold, Haplomarker_total_count):

    #Read in input
    table = pd.read_csv(table)
    list_var = open(list_var, "r")
    patient_list = list_var.read() #CHANGE TO READLINE
    patient_list = patient_list.split('\n')

    table['Position'] = table['Position'].values - 1  # Adjust Position to match Python indexing

    results_df = pd.DataFrame(columns=["Patient", "Variant Load Score"])


    for patient in range(0, len(patient_list) -1):  #for each patient

        patient = str(patient_list[patient])
        pat_variants = patient.split(";") #splits patient variants by ;
        pat_variants = [i.replace(' ','') for i in pat_variants] #removes white spaces
        id = pat_variants[0] #get patient ID
        pat_variants = pat_variants[1:len(pat_variants)] #takes variants by excluding first element, SHOULD CHANGE TO AVOID ERRORS


        Variant_positions = []
        patient_scores = []

        #Get haplotype

        haplo_input = [id, "1-16569", "?"] + [i for i in pat_variants if '-' not in i] #makes input for haplogrep, removes variants with mutation as '-'
        with open("hsd.txt", "w") as f:
           f.write("ID\tRange\tHaplogroup\tPolymorphisms\n")
           f.write("\t".join(haplo_input))
        f.close()

        !./haplogrep classify --in "hsd.txt" --output "results.txt" --format hsd --lineage 1 > /dev/null 2>&1 #--extend-report #--hetLevel 0.1
        df_hap=pd.read_csv("results.txt",sep="\t").loc[:,"Haplogroup"]


        for variant in pat_variants: #for each variant
            variant = [x for x in re.split(r'([0-9]+)', variant) if x] #splits by numbers and letters, removing empty elements

            #get position and base

            if 'm.' in variant[0]: #if new format is used, MAY HAVE TO ADD EXTRA LINES IF BASE SEQ IS NOT ALWAYS THERE
                position = variant[1]
                base = variant[2][0]
            else: #if old format used
                if bool(re.search("^[A-Z]" , variant[0])): #check if format contains base sequence
                    position = variant[1]
                    base = variant[0]
                else:
                    position = variant[0]
                    base = None

            #last letter = mutation (within indent x2)

            mutation = variant[-1][-1]

            #get info from lookup table (Pathogenicity score, and base if not included in input) using position and mutation

            try:
                score = table[(table['Position'] == (int(position) -1)) & (table['Mutation'] == mutation) ]['Pathogenicity_Score'].values[0] #map position to position of ref sequence
            except IndexError: #if mutation is not within lookup table, skip to next variant
                continue


            if base is None:  #add base info if missing
                base = table[(table['Position'] == (int(position) -1)) & (table['Mutation'] == mutation) ]['Reference'].values[0]
            else:
                pass


            # filtering

            if score >= APOGEE_threshold: #skip variant if below cutoff threshold
                patient_scores.append(score)
                Variant_positions.append(f"{base}{position}{mutation}")
            else:
                continue


            #calculate VLS like Wasim (per patient)

        Variants_array = np.asarray(Variant_positions + ["NA"] * (100 - len(Variant_positions)))
        Variants_df = pd.DataFrame([Variants_array], columns=[f'Pathogenic_Variant{i}' for i in range(0, 100)])
        new_row = pd.DataFrame([[f"{id}", np.nansum(patient_scores)]], columns=["Patient", "Variant Load Score"])
        new_row = pd.concat([new_row,df_hap,Variants_df], axis=1)
        results_df = pd.concat([results_df, new_row], ignore_index=True)
        #!rm results.dot results.txt hsd.txt #remove haplogrep files as it seems to cause issues if left there

        yield results_df


def Variant_load_calculation_variant_haploexclude(table, list_seq, list_var, reference_seq, Haplomarker_percentage, APOGEE_threshold, Haplomarker_total_count):

    #Read in input
    table = pd.read_csv(table)
    list_var = open(list_var, "r")
    patient_list = list_var.read() #CHANGE TO READLINE
    patient_list = patient_list.split('\n')
    haplovariants_table=pd.read_csv("mtDNA-main/Haplo_context.csv")
    haplovariants_table=haplovariants_table[(haplovariants_table['Percentage']>=Haplomarker_percentage) & (haplovariants_table['total']>=Haplomarker_total_count)] # subset halpovariants_table based variant frequency


    table['Position'] = table['Position'].values - 1  # Adjust Position to match Python indexing

    results_df = pd.DataFrame(columns=["Patient", "Variant Load Score"])


    for patient in range(0, len(patient_list)-1):  #for each patient

        patient = str(patient_list[patient])
        pat_variants = patient.split(";") #splits patient variants by ;
        pat_variants = [i.replace(' ','') for i in pat_variants] #removes white spaces
        id = pat_variants[0] #get patient ID
        pat_variants = pat_variants[1:len(pat_variants)] #takes variants by excluding first element, SHOULD CHANGE TO AVOID ERRORS


        Variant_positions = []
        patient_scores = []

        #Get haplotype

        haplo_input = [id, "1-16569", "?"] + [i for i in pat_variants if '-' not in i] #makes input for haplogrep, removes variants with mutation as '-'
        with open("hsd.txt", "w") as f:
           f.write("ID\tRange\tHaplogroup\tPolymorphisms\n")
           f.write("\t".join(haplo_input))
        f.close()

        !./haplogrep classify --in "hsd.txt" --output "results.txt" --format hsd --lineage 1 > /dev/null 2>&1 #--extend-report #--hetLevel 0.1
        df_hap=pd.read_csv("results.txt",sep="\t").loc[:,"Haplogroup"]

        for variant in pat_variants: #for each variant
            variant = [x for x in re.split(r'([0-9]+)', variant) if x] #splits by numbers and letters, removing empty elements

            #get position and base

            if 'm.' in variant[0]: #if new format is used, MAY HAVE TO ADD EXTRA LINES IF BASE SEQ IS NOT ALWAYS THERE
                position = variant[1]
                base = variant[2][0]
            else: #if old format used
                if bool(re.search("^[A-Z]" , variant[0])): #check if format contains base sequence
                    position = variant[1]
                    base = variant[0]
                else:
                    position = variant[0]
                    base = None

            #last letter = mutation (within indent x2)

            mutation = variant[-1][-1]

            #get info from lookup table (Pathogenicity score, and base if not included in input) using position and mutation

            try:
                score = table[(table['Position'] == (int(position) -1)) & (table['Mutation'] == mutation) ]['Pathogenicity_Score'].values[0] #map position to position of ref sequence
            except IndexError: #if mutation is not within lookup table, skip to next variant
                continue

            if base is None: #add base info if missing
                base = table[(table['Position'] == (int(position) -1)) & (table['Mutation'] == mutation) ]['Reference'].values[0]
            else:
                pass


            #get haplotype mutations for filtering

            Parent_haplo=[i for i in df_hap.values][0]

            prefixes = [Parent_haplo[:i] for i in range(1, len(Parent_haplo) + 1)] # create all possible haplonames left to right from current haplogroup
            matching_rows = haplovariants_table[haplovariants_table['Haplogroup_marker'].apply(lambda x: any(x.startswith(prefix) for prefix in prefixes))].copy()
            matching_rows.loc[:, 'char_length'] = matching_rows['Haplogroup_marker'].apply(len) # get length all haplomatches
            Haplo_max=matching_rows['char_length'].max() # find the best match i.e. most character match and by one of the prefixes
            matching_rows = matching_rows[(matching_rows['char_length']==Haplo_max) &  (matching_rows['Haplogroup_marker'].isin(prefixes))] # subset the haplogroup with the greatest character length and must match one of the original prefixes
            current_pos_haplo_mutations = matching_rows[matching_rows['Position'] == int(position)] # match position to get final haplomarker


            #filtering


            if score >= APOGEE_threshold: #skip variant if below cutoff threshold
                pass
            else:
                continue

            if (mutation in current_pos_haplo_mutations['Mutation'].values):
                Variant_positions.append(f"Excluded from score as native to haplogroup: \n{base}{position}{mutation}")
            else:
                patient_scores.append(score)
                Variant_positions.append(f"{base}{position}{mutation}")



            #calculate VLS like Wasim (per patient)

        Variants_array = np.asarray(Variant_positions + ["NA"] * (100 - len(Variant_positions)))
        Variants_df = pd.DataFrame([Variants_array], columns=[f'Pathogenic_Variant{i}' for i in range(0, 100)])
        new_row = pd.DataFrame([[f"{id}", np.nansum(patient_scores)]], columns=["Patient", "Variant Load Score"])
        new_row = pd.concat([new_row,df_hap,Variants_df], axis=1)
        results_df = pd.concat([results_df, new_row], ignore_index=True)
        #!rm results.dot results.txt hsd.txt #remove haplogrep files as it seems to cause issues if left there

        yield results_df


def save_results(results):
    results.to_csv('./results.csv', index=False)
    return './results.csv'




In [3]:
# Create the Gradio interface

#---------------------------
# STEP 2 : We create the gradio user friendly interface below.
#---------------------------

with gr.Blocks() as interface: # Whenever you create a gradio interface you start with gr.blocks() which essentially means start a fresh blank page
    gr.Markdown("## Mitochondrial Variant Load Calculator")
    gr.Markdown("# Please upload all three required files")

    with gr.Row(): # gr.row allows you to start placing objects on a fresh row or on the same row.
        table = gr.File(label="Upload Look-up Table (CSV)", file_types=["csv"])
        reference_seq = gr.File(label="Upload Reference Sequence (GenBank)", file_types=["fasta", "gb", "txt"])
        list_seq = gr.File(label="Upload Variant Sequences (FASTA)", file_types=["fasta", "txt"])
        list_var = gr.File(label="Upload Variant List", file_types=["txt"])

    with gr.Row():
        APOGEE_threshold = gr.Slider(label="Exclude scores by specifying minimum pathogenicity score threshold",step=0.01,minimum=0.00,maximum=1.00)
        haploexclude_full = gr.Checkbox(label="Exclude haplogroup markers from score based on haplogroup (e.g. L3f)")
        Haplomarker_total_count = gr.Slider(label="Filter halpogroup exclusion table by minimum clade size (n)",step=1,minimum=0,maximum=200)
        Haplomarker_percentage = gr.Slider(label="Filter halpogroup exclusion table by percentage clade with variant (%)",step=1)

    calculate_button = gr.Button("Calculate Variant Load") # To add a button with a name you simply use gr.Button#()

    output = gr.DataFrame(headers=["Patient", "Variant Load Score", "Haplogroup", # If you want to have a dataframe on your interface you need to specify gr.DataFrame() function
             'Pathogenic_Variant0', 'Pathogenic_Variant1', 'Pathogenic_Variant2', # You need to specify the same heading names and column numbers as the dataframe output from your function i.e. Variant_load_calculation or Variant_load_calculation_haploexclude_all_characters
             'Pathogenic_Variant3', 'Pathogenic_Variant4', 'Pathogenic_Variant5',
             'Pathogenic_Variant6', 'Pathogenic_Variant7', 'Pathogenic_Variant8',
             'Pathogenic_Variant9', 'Pathogenic_Variant10', 'Pathogenic_Variant11',
             'Pathogenic_Variant12', 'Pathogenic_Variant13', 'Pathogenic_Variant14'], label="Results")

    #download_button = gr.Button("Download CSV")
    download_file = gr.File(label="Downloaded Results") # This allows you to add a button to download the results
                                                        # If you want to use the gr.File() to download rather than upload files you simply asign a label name without a file_types=[]

    # This function below takes our raw functions and inputs and selects which version of the function we should use a) Variant_load_calculation_haploexclude_all_characters or b) the default function Variant_load_calculation
    # When the function version has been selected the chosen function then is saved in another object called calc_function which is then used to take the same inputs and calculate variant load scores per patient
    def calculate_variant_load(table, list_seq, list_var, reference_seq, Haplomarker_percentage, haploexclude_full, APOGEE_threshold, Haplomarker_total_count):
        if list_seq:
            if haploexclude_full:
                calc_function = Variant_load_calculation_haploexclude_all_characters # this your original function version 2 selected if checkbox is ticked (i.e. haploexclude_full)
            else:
                calc_function = Variant_load_calculation # this your original function or default function if checkbox is unticked
        if list_var:
            if haploexclude_full:
                calc_function = Variant_load_calculation_variant_haploexclude
            else:
                calc_function = Variant_load_calculation_variant
        results_df = pd.DataFrame(columns=["Patient", "Variant Load Score", "Haplogroup"] + [f'Pathogenic_Variant{i}' for i in range(20)])

        # Whichever function is selected from the if else statement above will be used in the for loop below to retrieve results per patient
        for result in calc_function(table, list_seq, list_var, reference_seq, Haplomarker_percentage, APOGEE_threshold, Haplomarker_total_count):
            results_df = pd.concat([results_df, result], ignore_index=True)
            results_df = results_df.drop_duplicates(keep="first")
            # Save results incrementally
            save_results(results_df)

            results_path = os.path.join("./", 'results.csv')
            yield results_df.drop_duplicates(keep="first"), results_path

    #  The above function within functions is then used in the button below and takes the original inputs and then provides the outputs as a downloadable file.
    calculate_button.click(
        fn=calculate_variant_load, # this is the final wrapper function to compute everything after a button click
        inputs=[table, list_seq, list_var, reference_seq, Haplomarker_percentage, haploexclude_full, APOGEE_threshold, Haplomarker_total_count], # These are the inputs e.g. file uploads or slider values or checkbox selection
        outputs=[output, download_file] # this is the final output produced by the wrapper function
    )


# Launch the app
interface.launch(share=True,height=1100,debug=True)



NameError: name 'gr' is not defined