In [1]:
# Constants
MSA_FILEPATH = "./data/alignment/intersection.sto"
CODES_PATH = "./data/codes/intersection.txt"
EVALUES_SAVE_PATH = "./data/evalues/intersection.tsv"

MSA_NAME = b"KUNITZ"

# Controls
NEGATIVE_CONTROL_PATH = "./data/test/negative.fasta"
POSITIVE_CONTROL_PATH = "./data/test/positive.fasta"

In [2]:
import pyhmmer

# Build the HMM
alphabet = pyhmmer.easel.Alphabet.amino()
builder = pyhmmer.plan7.Builder(alphabet,)
background = pyhmmer.plan7.Background(alphabet)

# read the MSA
with pyhmmer.easel.MSAFile(MSA_FILEPATH, digital=True, alphabet=alphabet) as msa_file:
    msa = msa_file.read()
    msa.name = MSA_NAME

hmm, _, _ =  builder.build_msa(msa, background)

# Load the HMM from a file called test.hmm  
# with pyhmmer.plan7.HMMFile("./test.hmm") as hmm_file:
#     hmm = hmm_file.read()

# save the HMM to a file called models/union.hmm
with open("./models/foldseek.hmm", "wb") as hmm_file:
    hmm.write(hmm_file)

In [4]:
import requests
import urllib.parse
import tqdm 
import os

# Parse the codes to exclude from the search in the positive set
# Split at , and at _ to get the codes
with open(CODES_PATH, "r") as codes_file:
    codes = codes_file.read().split(",")
    
    # Remove empty lines
    codes = [code for code in codes if code != ""]
    
    # Remove the _ and everything after it
    codes = [code.split("_")[0] for code in codes]

# Downloads the given uniprot query as a fasta file
# and returns the path to the file
def download_uniprot_fasta(query: str, output_path: str, eclude_codes :list[str] = None, overwrite  : bool = False) -> str:
    """Download the given uniprot query as a fasta file"""
    # Set the URL for the Data API
    # first we format the query so that it can be used in the url
    
    # add the codes to exclude (if any)
    # the code works like this: NOT (xref:pdb-3tgi) NOT (xref:pdb-3tgi) ...
    if eclude_codes != None:
        query += " AND "
        query += " ".join([f"NOT (xref:pdb-{code})" for code in eclude_codes])        
    query = urllib.parse.quote(query)
    url = f'https://rest.uniprot.org/uniprotkb/stream?format=fasta&query={query}'
    
    # print(url)
    
    # download and show progress
    response = requests.get(url, stream=True)
    total_size_in_bytes= int(response.headers.get('content-length', 0))
    
    # Check if the request was successful
    if response.status_code != 200:
        print(f"Failed to download {query}")
        return None
    
    # If the file already exists and overwrite is false, return the path
    if not overwrite and os.path.exists(output_path):
        print(f"File {output_path} already exists")
        return output_path
    
    # download the file displaying the progress in megabytes
    block_size = 1024 #1 Kibibyte
    with open(output_path, 'wb') as file, tqdm.tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True) as progress_bar:
        for data in response.iter_content(block_size):
            progress_bar.update(len(data))
            file.write(data)        

    return output_path

NEGATIVE_CONTROL_QUERY = "NOT (xref:pfam-PF00014) AND (length:[58 TO *]) AND (reviewed:true)"
POSITIVE_CONTROL_QUERY = "xref:pfam-PF00014 AND (length:[58 TO *]) AND (reviewed:true)"

# Download the negative control
download_uniprot_fasta(NEGATIVE_CONTROL_QUERY, NEGATIVE_CONTROL_PATH)

# # Download the positive control
download_uniprot_fasta(POSITIVE_CONTROL_QUERY, POSITIVE_CONTROL_PATH, overwrite  = True) # Uncomment this to download all the positives
# download_uniprot_fasta(POSITIVE_CONTROL_QUERY, POSITIVE_CONTROL_PATH, codes, overwrite  = True) # Uncomment this to download all the positives minus the ones in the codes file

File ./data/test/negative.fasta already exists


112kiB [00:00, 243kiB/s]  


'./data/test/positive.fasta'

In [5]:
import math

# Computes the MCC from the TP, FP, TN, FN
def compute_mcc(tp, fp, tn, fn):
    
    numerator = tp * tn - fp * fn
    denominator = math.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
    
    # if the denominator is 0 return 0 (invalid value)
    if denominator == 0:
        return 0
    
    
    return numerator / denominator

# Computes the accuracy from the TP, FP, TN, FN
def compute_accuracy(tp, fp, tn, fn):
    return (tp + tn) / (tp + fp + tn + fn)

# Computes the precision and recall from the TP, FP, TN, FN
# also returns the f1 score
def compute_precision_and_recall(tp, fp, tn, fn):
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    
    return precision, recall, 2 * precision * recall / (precision + recall)

# Appling the HMM to a sequence database and CVing the E-Value

In [10]:
# The evalues tested during the cross validation
# from 1e10-3 to 1e-12
TESTED_EVALUES = [10 ** -i for i in range(2, 20)] # Uncomment this to test all the evalues
# TESTED_EVALUES = [10 ** -5] # Uncomment this to test only one evalue

# the results of the cross validation
# saves the evalue, accuracy, precision, recall, f1, tp, fp, tn, fn
cv_results = []

# open both files (negative and positive) and count the amount of sequences
# this is done to compute the true negatives and true positives
positive_file = pyhmmer.easel.SequenceFile(POSITIVE_CONTROL_PATH, digital=True, alphabet=alphabet)
negative_file = pyhmmer.easel.SequenceFile(NEGATIVE_CONTROL_PATH, digital=True, alphabet=alphabet)

# count the amount of sequences in the files (they are the same, independent of the evalue)
positive_ = len(positive_file.read_block())
negative_ = len(negative_file.read_block())

# print the fact that the files were read
print("Read the files")

# iterate over the evalues
for evalue in TESTED_EVALUES:

    # rewind the files to the beginning
    positive_file.rewind()
    negative_file.rewind()

    # print the evalue being tested
    print(f"Testing evalue: {evalue}")

    # Create the pipeline
    pipeline = pyhmmer.plan7.Pipeline(alphabet, background=background, E=evalue)

    # Search the negative control
    negative_hits = pipeline.search_hmm(hmm, negative_file, )
        
    # Search the positive control
    positive_hits = pipeline.search_hmm(hmm, positive_file, )
        
    # Compute a confusion matrix    
    true_positives = len(positive_hits)
    false_positives = len(negative_hits)
    true_negatives = negative_ - false_positives
    false_negatives = positive_ - true_positives
    
    # print all the metrics
    print(f"TP: {true_positives}, FP: {false_positives}, TN: {true_negatives}, FN: {false_negatives}")

    # Compute the metrics and save them
    accuracy = compute_accuracy(true_positives, false_positives, true_negatives, false_negatives)
    precision, recall, f1 = compute_precision_and_recall(true_positives, false_positives, true_negatives, false_negatives)
    mcc = compute_mcc(true_positives, false_positives, true_negatives, false_negatives)
    cv_results.append((evalue, f1, accuracy, precision, recall, mcc, true_positives, false_positives, true_negatives, false_negatives))
    
# close the files
positive_file.close()
negative_file.close()

Read the files
Testing evalue: 1e-05
TP: 345, FP: 0, TN: 553177, FN: 2


# Identifying the Wrongly classified Sequences

In [7]:
# Create the pipeline
pipeline = pyhmmer.plan7.Pipeline(alphabet, background=background, E=10**-5)

# Search the positive control
positive_file = pyhmmer.easel.SequenceFile(POSITIVE_CONTROL_PATH, digital=True, alphabet=alphabet)

# Get all the positive names
all_positive_names = [str(seq.name) for seq in positive_file.read_block()]

# Search the positive control
positive_file.rewind()
positive_hits = pipeline.search_hmm(hmm, positive_file, )
positive_file.close()

# Get the names of the positive hits
positive_hits_names = [str(hit.name) for hit in positive_hits]

# Get the names of the wrongly classified sequences
wrongly_classified = [name for name in all_positive_names if name not in positive_hits_names]
wrongly_classified

["b'sp|O62247|BLI5_CAEEL'", "b'sp|D3GGZ8|BLI5_HAECO'"]

# Saving E-Values

In [None]:

# Save a TSV with the results
with open(EVALUES_SAVE_PATH, "w") as file:
    file.write("evalue\tf1\taccuracy\tprecision\trecall\tmcc\tTP\tFP\tTN\tFN\n")
    for result in cv_results:
        file.write("\t".join(map(str, result)) + "\n")