# Importing Plugins

In [1]:
from Bio import SeqIO
from Bio import Align
from Bio.Align import substitution_matrices
from collections import defaultdict
from datetime import datetime
import pandas as pd
import multiprocessing
# import Bio.Entrez as ez
# ez.email = "ratlos@habmalnefrage.de"

# Setting Analysis Parameters
## Adjust parameters to your requirements

In [2]:
path="/home/john/Excercises/Sequence_Analysis/Protein_Blast/" # Adjust your working directory
dataset_index=1 # Choose a set of data to analyse, the higher the number, the longer calculation will take
dataset=[["query_seq.fasta","synthetic_seqs.fasta"],["NP_477249.fasta","seqdump.txt"],["Q72GW4.fasta","EF-Tu_BLAST.fasta"],["Q72GW4.fasta","uniprotkb_proteome_UP000000625_AND_revi_2024_01_23.fasta"]]
query=dataset[dataset_index][0]
database_file=dataset[dataset_index][1]
kmer = 3
tscore = 13 # default 13
aligner = Align.PairwiseAligner
gap_open = 10 # default 10
gap_extend = 2 #default 2
joining_threshold = 36 # default 36

# Choose and Load Scoring Matrix
## Adjust parameters to your requirements

In [3]:
avaliable_scoring_matrices=["BLOSUM90","BLOSUM45","PAM120"]
index_of_scoring_matrix=0 # number refers to the list index of the matrix of choice from "avaliable_scoring_matrices"
scoringmatrix_df = pd.read_csv(f"{path}{avaliable_scoring_matrices[index_of_scoring_matrix]}.csv", index_col=0) # Number format must be compatible with int8 or the script will not run
scoringmatrix = scoringmatrix_df.to_dict(orient='dict', index=True) # put the name of any available scoringmatrix_dict here
matrix_for_local_alignments = "BLOSUM45"
matrix_for_global_alignments = "BLOSUM45"
local_score_threshold = 15
global_score_threshold = 50
print(f"{avaliable_scoring_matrices[index_of_scoring_matrix]}_df")

BLOSUM90_df


# Initialize Variables

In [4]:
# Variables
len_query=0
name_query=""
seq_query=""
job_ID=""
len_query_minus_kmer=0
cores=multiprocessing.cpu_count()

# Lists
database_items=[]
db_entries=[]
hit_IDs=[]
selected_hit_IDs=[]

# Dictionaries
database_dict={}
scan_dict={}
kmer_dict={}
diagonals_dict={}
top10_dict={}
nolh_dict={}
chains_dict={}
locals_dict={}
globals_dict={}


# Parsing Query Sequence

In [5]:
def parse_query_sequence(path, query):
    print("Parsing query sequence")
    with open(f"{path}query") as handle:
        for seq_record in SeqIO.parse(handle, "fasta"):
            print("Query:")
            print(seq_record.id)
            print("Query Sequence:", repr(seq_record.seq))
            print("Query sequence length:", len(seq_record))
            print()
            len_query = len(seq_record)
            name_query = seq_record.id
            seq_query = seq_record.seq
            return name_query, len_query, seq_query

# Creating Job-ID

In [6]:
def generate_job_ID(name_query):
    job_id=str(name_query+datetime.now().strftime('%Y%m-%d%H-%M%S-'))
    return(job_ID)

# Parsing Database

In [7]:
def parse_patabase(path, database_file):
    print("Parsing database")
    db_entries=[]
    sequence_count=0
    with open(f"{path}database_file") as handle: # this is a fasta file containing 127 sequences coming from an NCBI blast of the query sequence
        for seq_record in SeqIO.parse(handle, "fasta"):
            sequence_count+=1
            database_items.append(seq_record.id)
            database_dict[f"ID_{seq_record.id}"] = seq_record.id
            database_dict[f"Length_{seq_record.id}"] = len(seq_record)
            database_dict[f"Sequence_{seq_record.id}"] = seq_record.seq
            scan_dict[f"Scan_hits_{seq_record.id}"] = pd.DataFrame(index=range(len(seq_record) - kmer + 1), columns=range(len_query - kmer + 1)).fillna(False)
            scan_dict[f"Scan_diagonals_{seq_record.id}"] = pd.DataFrame(index=range(len(seq_record) - kmer + 1), columns=range(len_query - kmer + 1)).fillna(0)
            scan_dict[f"Scan_diagonals_{seq_record.id}"] = scan_dict[f"Scan_diagonals_{seq_record.id}"].astype('int16')
            db_entries.append(seq_record.id)
    print(sequence_count,"sequences parsed")
    return(db_entries, database_dict, scan_dict)

# Creating K-mers and Neighbors Matrix

In [8]:
def create_kmers(kmer, len_query, seq_query):
    aa=["A","C","D","E","F","G","H","I","K","L","M","N","P","Q","R","S","T","V","W","X","Y"]

    # Creating Kmers dictionary: Making a dictionary containing all possible single-point mutations of a given kmer
    kmer_dict = {}
    print("Defining K-mers and all possible single-point mutants")
    for i in range(len_query - kmer):
        kmer_dict[f"kmer_{i}"] = [str(seq_query[i:i+kmer]), 0]
        for j in range(kmer):
            for k in range(len(aa)):
                peptide = kmer_dict[f"kmer_{i}"][0]
                mutant = ""
                for l in range(kmer):
                    if l != j:
                        mutant += peptide[l]
                    else:
                        mutant += aa[k]
                kmer_dict[f"{i}_{j}_{k}"] = [mutant, 0]
    print("Creation of K-Mers Completed")
#     df = pd.DataFrame.from_dict(kmer_dict, orient='index', columns=['Sequence', 'Score'])
#     print(df)
    # Scoring K-mers: 
    print("Scoring K-mers")
    for i in range(len_query-kmer):
        for j in range(kmer):
            score = 0
            score += scoringmatrix[kmer_dict[f"kmer_{i}"][0][j]][kmer_dict[f"kmer_{i}"][0][j]]
            kmer_dict[f"kmer_{i}"][1] = score
            for k in range(len(aa)):
                score += scoringmatrix[kmer_dict[f"kmer_{i}"][0][j]][kmer_dict[f"{i}_{j}_{k}"][0][j]]
                kmer_dict[f"{i}_{j}_{k}"][1] = score
#    df = pd.DataFrame.from_dict(kmer_dict, orient='index', columns=['Sequence', 'Score'])
#    print(df)
    return kmer_dict

# Creating Diagonals

In [9]:
def create_diagonals(database_dict, kmer_dict, tscore, db_entries):
    diagonals_dict=defaultdict(dict)
    print("Creating diagonals")
    direct_hits=0
    mismatched_hits=0
    for i in range(len(db_entries)):
        diagonals_dict[db_entries[i]] = defaultdict(list)
        hitstart_db=0
        hitstart_query=0
        score=0
        db_seq=str(database_dict[f"Sequence_{db_entries[i]}"])
        for j in range(len_query-kmer):
            for k in range(len(db_seq)-kmer):
                if kmer_dict[f"kmer_{j}"][0]==db_seq[k:k+kmer]: # Search for a direct seed hit
                    direct_hits +=1
                    score = kmer_dict[f"kmer_{j}"][1]
                    hitstart_query = j # index of the first amino acid in the query sequence matching the database sequence
                    hitstart_db = k # index of the first amino acid in the database sequence matching the first amino acid of the tested kmer
                    hitend_query = j+kmer # index of the last amino acid in the query sequence matched to the database sequence
                    hitend_db = k+kmer # index of the last amino acid in the database sequence matched to the tested kmer
                    # Scan forward
                    query_pos = j
                    db_pos = k
                    hit_seq = kmer_dict[f"kmer_{j}"][0]
                    while query_pos < (len_query-kmer-1) and db_pos < len(db_seq)-kmer-1:
                        query_pos += 1
                        db_pos += 1
                        if kmer_dict[f"kmer_{query_pos}"][0]==db_seq[db_pos:db_pos+kmer]: # try expanding the seed hit with another direct hit
                            hitend_query += 1 # update coordinates
                            hitend_db += 1 # update coordinates
                            direct_hits += 1
                            hit_seq = hit_seq + kmer_dict[f"kmer_{query_pos}"][0][-1] # add the last letter of the matching k-mer to the hit sequence
                            score += kmer_dict[f"kmer_{query_pos}"][1] # update the score of the expanded seed
                        else: # try expanding the seed hit with a mismatched, positive scored kmer
                            for l in range(kmer):
                                for m in range(len(aa)):
                                    if kmer_dict[f"{query_pos}_{l}_{m}"][1]>0:
                                        if kmer_dict[f"{query_pos}_{l}_{m}"][0]==db_seq[db_pos:db_pos+kmer]:
                                            hitend_query += 1
                                            hitend_db += 1
                                            mismatched_hits += 1
                                            hit_seq = hit_seq + kmer_dict[f"{query_pos}_{l}_{m}"][0][-1] # add the last letter of the matching k-mer to the hit sequence
                                            score += kmer_dict[f"{query_pos}_{l}_{m}"][1] # update the score of the expanded seed
                                        else:
                                            break
                    # Scan backward
                    query_pos = j
                    db_pos = k
                    while query_pos > 1 and db_pos > 1:
                        query_pos -= 1
                        db_pos -= 1
                        if kmer_dict[f"kmer_{query_pos}"][0]==db_seq[db_pos:db_pos+kmer]:
                            hitstart_query -= 1
                            hitstart_db -= 1
                            direct_hits += 1
                            hit_seq = kmer_dict[f"kmer_{query_pos}"][0][0] + hit_seq 
                            score += kmer_dict[f"kmer_{query_pos}"][1]
                        else:
                            for l in range(kmer):
                                for m in range(len(aa)):
                                    if kmer_dict[f"{query_pos}_{l}_{m}"][1]>0:
                                        if kmer_dict[f"{query_pos}_{l}_{m}"][0]==db_seq[db_pos:db_pos+kmer]:
                                            hitstart_query -= 1
                                            hitstart_db -= 1
                                            mismatched_hits += 1
                                            hit_seq = hit_seq + kmer_dict[f"{query_pos}_{l}_{m}"][0][-1]
                                            score += kmer_dict[f"{query_pos}_{l}_{m}"][1]
                                        else:
                                            break
                    # Store hits
                    if score >= tscore:
                        diagonals_dict[db_entries[i]][f"{hitstart_db}_{hitend_db}"] = [score, hitstart_query, hitend_query, hitstart_db, hitend_db, hit_seq]
    print("Diagonals created")
#    print("Direct hits found:",direct_hits)
#    print("Mismatched hits found:",mismatched_hits)
    kmer_dict={} # Purge kmer_dict
    return diagonals_dict, kmer_dict

# Filtering the 10 Best Diagonals

In [10]:
def filter_top10(diagonals_dict):
    print("Filtering best diagonals")
    print("Top 10 hits above threshold for each database sequence:")
    top10_dict = defaultdict(dict)
    for db_entry, inner_dict in diagonals_dict.items():
        diagonals_for_entry = list(inner_dict.values())
        if diagonals_for_entry:
            diagonals_for_entry.sort(key=lambda x: x[0], reverse=True)
            top10_dict[db_entry] = defaultdict(list)
            top10_dict[db_entry] = diagonals_for_entry[:min(len(diagonals_for_entry), 10)]
    diagonals_dict={} # Purge diagonals_dict
    return top10_dict, diagonals_dict

# Filtering Non-Overlapping Hits (NOLH)

In [11]:
def filter_nolh(top10_dict):
    print("Filtering best non-overlapping hits")
    nolh_dict = defaultdict(dict)
    for db_entry, inner_list in top10_dict.items():
        nolh = []
        for values in inner_list:
            if not any(
                (
                    values[3] in range(hit[3], hit[4] + 1)
                    or values[4] in range(hit[3], hit[4] + 1)
                    or values[1] in range(hit[1], hit[2] + 1)
                    or values[2] in range(hit[1], hit[2] + 1)
                )
                for hit in nolh
            ):
                nolh.append(values)
        nolh.sort(key=lambda x: x[0], reverse=True)
        nolh_dict[db_entry] = nolh
    return nolh_dict


# Chaining Non-Overlapping Hits

In [12]:
def chaining(nolh_dict, joining_threshold):
    def calculate_joining_penalty(db_end,db_start, joining_threshold):
        gapsize = db_start - db_end
        penalty = 0
        if gapsize > 1:
            penalty = gap_open + gapsize * gap_extend
            if penalty < joining_threshold:
                return True, gapsize, penalty # do this if there is a gap, but the penalty within agreeable limits
            else:
                return False, gapsize, penalty # do this if the gap is too large
        else:
            return True, gapsize, penalty # do this if there is no gap

    print()
    print("Chaining non-overlapping hits")
    chains_dict=defaultdict(dict)
    joining_events=0
    for db_entry, inner_list in nolh_dict.items():
        hits=inner_list
        hits.sort(key=lambda x: x[3], reverse=False) # Sort by hit start position in the database sequence
        segment=""
        chain=[]
        shackle = 0
        segment_score = 0
        gapsize = 0
        while shackle < len(hits):
            if len(hits) - shackle >1: # If there is another element in the list after the current one, check if the two elements can be joined
                join, gapsize, penalty = calculate_joining_penalty(hits[shackle][4],hits[shackle+1][3]) # Arguments handed over: end of current hit and start of next hit
                segment += hits[shackle][5] # add the sequence string
                if join:
                    segment_score += hits[shackle][0] - penalty # add match score and apply penalty for gaps to joined segments
                    for i in range(gapsize): # filling gaps with '-', the sequence of the next non-overlapping hit will be added in the next loop iteration
                       segment += "-"
                    joining_events += 1
                    shackle += 1 # Update loop variable
                    continue
                else:
                    segment_score += hits[shackle][0] # add match score
                    chain.append([segment_score, segment])
                    shackle += 1 # Update loop variable
            else: # If this is the last element in the list, append the last element and end the loop
               segment += hits[shackle][5]
               segment_score += hits[shackle][0]
               chain.append([segment_score, segment])
               break
        chain.sort(key=lambda x: x[0], reverse=True) # sort chains by score
        chains_dict[f"{db_entry}"]=chain[0]
        print("Target ID:",db_entry,"Best scoring chain:",chain[0])

    # df = pd.DataFrame.from_dict(chains_dict, orient='index', columns=['Score','Sequence'])
    # print(df)
    print("Number of segments joined:",joining_events)
    return chains_dict

# Writing Hit Sequences to Fasta File and Parsing of Hit-File

In [13]:
def create_and_parse_hitfile(path, job_ID, chains_dict):
    print("Writing chains to hitfile.fasta")
    hitfile = open(f"{path}/output/{job_ID}_hitfile.fasta", "w")
    for db_entry, value in chains_dict.items(): # db_entry refers to the database sequence that produced a hit
        if value[1]!="":
    #         print(db_entry, value[0])
    #         print(db_entry, value[1])
            hitfile.write(">" + db_entry + " " + "\n" + value[1]+ "\n"+"\n")
    hitfile.close()

    # Parsing hitfile for subsequent alignment
    aligner=Align.PairwiseAligner()
    hitfile = f"{path}/output/{job_ID}_hitfile.fasta"

    hit_IDs=[]
    hitparse_dict={}
    print("Parsing hitfile")
    with open(hitfile) as handle:
        for seq_record in SeqIO.parse(handle, "fasta"):
            hit_IDs.append(seq_record.id)
            hitparse_dict[f"ID_{seq_record.id}"] = seq_record.id
            hitparse_dict[f"Length_{seq_record.id}"] = len(seq_record)
            hitparse_dict[f"Sequence_{seq_record.id}"] = seq_record.seq
    return hit_IDs

# Creating Local Alignments

In [14]:
def local_alignment(path, job_ID, hit_IDs, chain_lightning, db_hit, local_matrix):
    print("Creating local alignments for filtering")
    aligner.mode = "local"
    aligner.open_gap_score = -10
    aligner.extend_gap_score = -0.5
    local_matrix = substitution_matrices.load(matrix_for_local_alignments)
    local_alignments = aligner.align(chain_lightning, db_hit)
    local_scores = aligner.score(chain_lightning, db_hit)
    filename = f"{path}/output/{job_ID}_Local_Alignments.txt"
    with open(filename, "w") as file:
        for i in range(len(local_alignments)):
            file.write(str(f">{hit_ID} Score: {local_scores}\n"))
            file.write(f"{(str(local_alignments[i]))},'\n','\n'")
    return local_scores, local_alignments

def call_local_alignment(path, job_ID, hit_IDs, hitparse_dict, database_dict, matrix_for_global_alignments):
    locals_dict={}
    selected_hit_IDs=[]
    local_matrix=substitution_matrices.load(matrix_for_global_alignments)
    for record in hit_IDs:
        query=hitparse_dict[f"Sequence_{record}"]
        target=database_dict[f"Sequence_{record}"]
        local_scores, local_alignments = local_alignment(record, query, target, matrix_for_local_alignments)
        locals_dict[f"{record}"]=[local_scores, local_alignments]
        if local_scores > local_score_threshold:
            selected_hit_IDs.append(record)
            print(local_scores)
            print(local_alignments[0])
    return selected_hit_IDs

# Creating Global Alignments

In [15]:
def global_alignment(path, job_ID, hit_ID, seq_query, db_hit, global_matrix):
    aligner.mode = "global"
    aligner.open_gap_score = -10
    aligner.extend_gap_score = -0.5

    global_alignments = aligner.align(seq_query, db_hit)
    global_scores = aligner.score(seq_query, db_hit)
    return global_scores, global_alignments

def call_global_alignment(path, job_ID, database_dict, selected_hit_IDs, global_score_threshold, seq_query, matrix_for_global_alignments):
    print("Hit IDs:",hit_IDs)
    global_matrix = substitution_matrices.load(matrix_for_global_alignments)
    globals_dict={}
    for record in selected_hit_IDs:
        target=database_dict[f"Sequence_{record}"]
        global_scores, global_alignments = global_alignment(path, job_ID, record,seq_query,target,global_matrix)
        if global_scores > global_score_threshold:
            globals_dict[f"{record}"]=[global_scores, global_alignments]
    return globals_dict
            
def save_global_alignments(job_ID, globals_dict):
    filename = f"{job_ID}_global.txt"
    with open(filename, "w") as file:
        for i in range(len(global_alignments)):
            file.write(str(f">{record} Score: {global_scores}\n"))
            file.write(f"{(str(global_alignments[i]))},'\n','\n'")

# Function to Call Functions

In [16]:
#def get_cpu_count():
#    return multiprocessing.cpu_count()

def process_function(step):
    function, use_multiprocessing, boundary, args, returns = step
    print(function, use_multiprocessing, boundary, args, returns)
    if use_multiprocessing:
        chunk_size = max(1, boundary // multiprocessing.cpu_count())
        chunks = [(i * chunk_size, (i + 1) * chunk_size) for i in range(get_cpu_count())]
        chunks[-1] = (chunks[-1][0], boundary)

        pool = multiprocessing.Pool(processes=get_cpu_count())
        results = pool.starmap(function, [(args, chunk[0], chunk[1]) for chunk in chunks])
        pool.close()
        pool.join()

        # Combine the results from different chunks
        combined_result = []
        for result in results:
            combined_result.extend(result)

        return combined_result

    else:
        return function(args)

# Defining the process

In [17]:
if __name__ == "__main__":
# Syntax of "steps": 1. Name of function to be called | 2. Use multiprocessing for executing a function | 
# 3. Upper boundary of chunk size | 4. Arguments handed to the function | 5. Variables returned by the function
    steps=[
        [parse_query_sequence,False,1,(path, query),"name_query, len_query, seq_query"],
        [generate_job_ID,False,1,(name_query),"job_ID"],
        [parse_patabase,True,len(db_entries),(path, database_file),"db_entries, database_dict, scan_dict"],
        [create_kmers,False,(len_query-kmer), (kmer, len_query, seq_query, scoringmatrix),"kmer_dict"],
        [create_diagonals,True,len(db_entries),(database_dict, kmer_dict, tscore, db_entries),"diagonals_dict, kmer_dict"],
        [filter_top10,True,len(diagonals_dict),(diagonals_dict),"top10_dict, diagonals_dict"],
        [filter_nolh,True,len(top10_dict),(top10_dict),"nolh_dict"],
        [chaining,True,len(nolh_dict),(nolh_dict, joining_threshold),"chains_dict"],
        [create_and_parse_hitfile,False,1,(path, job_ID, chains_dict),""],
        [call_local_alignment,False,1,(path, job_ID, hit_IDs),""],
        [call_global_alignment,False,1,(path, job_ID, selected_hit_IDs),(path, job_ID, database_dict, selected_hit_IDs, global_score_threshold, seq_query, matrix_for_global_alignments),"globals_dict"],
        [save_global_alignments,False, 1,(job_ID, globals_dict),""]
        ]
    for i, step in enumerate(steps):
        result_variable_names = step[4].split(", ")
        results = process_function(step)

        # Assign results to variables based on the provided variable names
        for j, result_var_name in enumerate(result_variable_names):
            globals()[result_var_name] = results[j]

<function parse_query_sequence at 0x7f46f80d4400> False 1 ('/home/john/Excercises/Sequence_Analysis/Protein_Blast/', 'NP_477249.fasta') name_query, len_query, seq_query


TypeError: parse_query_sequence() missing 1 required positional argument: 'query'