# ATBI6ILV - Algorithms and Tools in Bioinformatics 
### Pt 2: Sequence Alignment Algorithms | FH Krems, SS 2024

## Name: Scripnic Dinu
## Effort: 20 hours

In [38]:
# Dependencies
! pip install numpy pandas biopython nbconvert blosum



## 1. Position Specific Scoring
Implement a framework for position-specific scoring in a programming language of your choice:
* Implement the import of aligned sequences (given in a text file)  
* A position-specific scoring matrix is to be calculated on the basis of these read-in sequences 
* Finally, it shall be possible to read in any number of sequences and output the score values of these sequences (in relation to the calculated scoring matrix) 

In [39]:
import numpy as np
import pandas as pd

class AlignedSequences:

    @classmethod
    def import_sequences(cls, filepath: str):
        with open(filepath, "r") as f:
            sequences = f.read().splitlines()
        return cls(sequences)
    
    def validate_input(self, sequences: list[str]):
        seq_len = len(sequences[0])
        if not all(len(seq) == seq_len for seq in sequences):
            raise ValueError("All sequences must have the same length.")
        return sequences

    def __init__(self, sequences: list[str]):
        sequences = self.validate_input(sequences)
        self.sequences = sequences
        self.sequence_length = len(sequences[0])
        self.bases = 'ACGT'

        self._count_matrix = None
        self._frequency_matrix = None
        self._weight_matrix = None
        self._score = None
        self._consensus_sequence = None
    
    def print_count_matrix(self):
        print("Count Matrix:")
        print(pd.DataFrame(self.count_matrix, index=list(self.bases), columns=list(self.consensus_sequence)))

    def print_frequency_matrix(self):
        print("Frequency Matrix:")
        print(pd.DataFrame(self.frequency_matrix, index=list(self.bases), columns=list(self.consensus_sequence)))

    def print_weight_matrix(self):
        print("Weight Matrix:")
        print(pd.DataFrame(self.weight_matrix, index=list(self.bases), columns=list(self.consensus_sequence)))

            
    @property
    def count_matrix(self) -> np.ndarray:
        if self._count_matrix is None:
            self._count_matrix = self._calculate_count_matrix()
        return self._count_matrix

    @property
    def frequency_matrix(self) -> np.ndarray:
        if self._frequency_matrix is None:
            self._frequency_matrix = self._calculate_frequency_matrix()
        return self._frequency_matrix

    @property
    def weight_matrix(self) -> np.ndarray:
        if self._weight_matrix is None:
            self._weight_matrix = self._calculate_weight_matrix()
        return self._weight_matrix

    def score(self, evaluation_sequences) -> list[float]:
        if self._score is None:
            self._score = list()
            for sequence in evaluation_sequences:
                self._score.append(self._calculate_score(sequence))
        return self._score
    
    @property
    def consensus_sequence(self) -> str:
        if self._consensus_sequence is None:
            self._consensus_sequence = self._calculate_consensus_sequence()
        return self._consensus_sequence

    def _calculate_score(self, sequence) -> list[float]:
        base_to_index = {base: index for index, base in enumerate(self.bases)}
        score = 0
        for position, base in enumerate(sequence):
            if base in base_to_index:
                score += self.weight_matrix[base_to_index[base], position]  # Fixed indexing
            else:
                score += self.weight_matrix.min()  # This will still assign the minimum score
        return score
          
    def _calculate_count_matrix(self) -> np.ndarray:
        base_to_index = {base: index for index, base in enumerate(self.bases)}
        matrix_height = len(self.bases)  
        matrix_width = self.sequence_length  
        count_matrix = np.zeros((matrix_height, matrix_width), dtype=int)
        
        for seq in self.sequences:
            for position, base in enumerate(seq):
                if base in base_to_index:
                    count_matrix[base_to_index[base], position] += 1
        return count_matrix

    def _calculate_frequency_matrix(self) -> np.ndarray:
        pseudocount = 1
        adjusted_matrix = self.count_matrix + pseudocount
        seq_count = len(self.sequences)
        frequency_matrix = adjusted_matrix / (seq_count + 4 * pseudocount)
        return frequency_matrix

    def _calculate_weight_matrix(self) -> np.ndarray:
        background_frequency = 0.25
        with np.errstate(divide='ignore', invalid='ignore'):
            log_odds_matrix = np.log(self.frequency_matrix / background_frequency)
            log_odds_matrix = np.nan_to_num(log_odds_matrix)
        return log_odds_matrix
    
    def _calculate_consensus_sequence(self) -> str:
        return ''.join(self.bases[np.argmax(row)] for row in self.count_matrix.transpose())


evaluation_sequences = ['ATATGG', 'GGATGA']

aligned_sequences = AlignedSequences.import_sequences("sequences.txt")


print("Position Specific Scoring Matrix:")
aligned_sequences.print_count_matrix()
aligned_sequences.print_frequency_matrix()
aligned_sequences.print_weight_matrix()
for seq, score in zip(evaluation_sequences, aligned_sequences.score(evaluation_sequences)):
    print(f"Score for {seq}: {score:.4f}")

Position Specific Scoring Matrix:
Count Matrix:
    T   G   G   A   G   A
A  29  19  25  33  11  29
C  15  23  21  21  32  20
G  20  31  30  27  33  28
T  36  27  24  19  24  23
Frequency Matrix:
          T         G         G         A         G         A
A  0.288462  0.192308  0.250000  0.326923  0.115385  0.288462
C  0.153846  0.230769  0.211538  0.211538  0.317308  0.201923
G  0.201923  0.307692  0.298077  0.269231  0.326923  0.278846
T  0.355769  0.269231  0.240385  0.192308  0.240385  0.230769
Weight Matrix:
          T         G         G         A         G         A
A  0.143101 -0.262364  0.000000  0.268264 -0.773190  0.143101
C -0.485508 -0.080043 -0.167054 -0.167054  0.238411 -0.213574
G -0.213574  0.207639  0.175891  0.074108  0.268264  0.109199
T  0.352821  0.074108 -0.039221 -0.262364 -0.039221 -0.080043
Score for ATATGG: 0.3323
Score for GGATGA: 0.1431


## 2. Needleman-Wunsch Algorithm and Smith-Waterman Algorithm
Implement a framework for applying the Needleman-Wunsch and Smith-Waterman alignment algorithms in a programming language of your choice: 
* It shall be possible to treat two sequences of any length 
* Substitution matrix and gap costs must be passed to the alignment function 
* The result is the alignment plus specification of its quality (score) 
* Users shall be optionally enabled to treat overlaps specifically

In [40]:
# Utils for printing matrices
SEQUENCE_1 = "HEAGAWGHEE"
SEQUENCE_2 = "PAWHEAE"
import numpy as np
import pandas as pd
import blosum as bl


def print_scoring_matrix(scoring_matrix, sequence_1, sequence_2):
    # Initialize the scoring matrix
    longer_sequence = None
    shorter_sequence = None
    if len(sequence_1) > len(sequence_2):
        longer_sequence = sequence_1
        shorter_sequence = sequence_2
    elif len(sequence_1) < len(sequence_2):
        longer_sequence = sequence_2
        shorter_sequence = sequence_1
    else:
        # If the sequences are of equal length, then it doesn't matter which is on X or Y axis
        longer_sequence = sequence_1
        shorter_sequence = sequence_2
    row_names = ["-"] + list(shorter_sequence)
    col_names = ["-"] + list(longer_sequence)
    df = pd.DataFrame(scoring_matrix, index=row_names, columns=col_names)
    print(df)

In [41]:
def needleman_wunsh_algorithm(
    sequence_1: str,
    sequence_2: str,
    substitution_matrix: dict[str, dict[str, float]],
    gap_cost: int,
) -> tuple[str, str, int]:
    """
    Needleman-Wunsh algorithm for global sequence alignment.

    Returns:
        tuple[str, int]: X-axis alignment, Y-axis alignment, quality score
    """
    # Decide X and Y axes
    longer_sequence = None
    shorter_sequence = None
    if len(sequence_1) > len(sequence_2):
        longer_sequence = sequence_1
        shorter_sequence = sequence_2
    elif len(sequence_1) < len(sequence_2):
        longer_sequence = sequence_2
        shorter_sequence = sequence_1
    else:
        # If the sequences are of equal length, then it doesn't matter which is on X or Y axis
        longer_sequence = sequence_1
        shorter_sequence = sequence_2

    # Initialize the scoring matrix
    scoring_matrix = np.zeros((len(shorter_sequence) + 1, len(longer_sequence) + 1))
    for x in range(1, len(longer_sequence) + 1):
        scoring_matrix[0, x] = scoring_matrix[0, x - 1] + gap_cost
    for y in range(1, len(shorter_sequence) + 1):
        scoring_matrix[y, 0] = scoring_matrix[y - 1, 0] + gap_cost
    print_scoring_matrix(scoring_matrix, sequence_1, sequence_2)

    # Fill in the scoring matrix
    for x in range(1, len(longer_sequence) + 1):
        for y in range(1, len(shorter_sequence) + 1):
            match = (
                scoring_matrix[y - 1, x - 1]
                + substitution_matrix[shorter_sequence[y - 1]][longer_sequence[x - 1]]
            )
            delete = scoring_matrix[y - 1, x] + gap_cost
            insert = scoring_matrix[y, x - 1] + gap_cost
            scoring_matrix[y, x] = max(match, delete, insert)

    print_scoring_matrix(scoring_matrix, sequence_1, sequence_2)

    # Traceback
    alignment_x = ""
    alignment_y = ""
    x, y = len(longer_sequence), len(shorter_sequence)
    while x > 0 or y > 0:
        if (
            x > 0
            and y > 0
            and scoring_matrix[y, x]
            == scoring_matrix[y - 1, x - 1]
            + substitution_matrix[shorter_sequence[y - 1]][longer_sequence[x - 1]]
        ):
            alignment_x = longer_sequence[x - 1] + alignment_x
            alignment_y = shorter_sequence[y - 1] + alignment_y
            x -= 1
            y -= 1
        elif y > 0 and scoring_matrix[y, x] == scoring_matrix[y - 1, x] + gap_cost:
            alignment_x = "-" + alignment_x
            alignment_y = shorter_sequence[y - 1] + alignment_y
            y -= 1
        else:
            alignment_x = longer_sequence[x - 1] + alignment_x
            alignment_y = "-" + alignment_y
            x -= 1
    
    print()
    print(alignment_x)
    print(alignment_y)
    
    return alignment_x, alignment_y, 0


_ = needleman_wunsh_algorithm(SEQUENCE_1, SEQUENCE_2, bl.BLOSUM(50), -8)

      -    H     E     A     G     A     W     G     H     E     E
-   0.0 -8.0 -16.0 -24.0 -32.0 -40.0 -48.0 -56.0 -64.0 -72.0 -80.0
P  -8.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
A -16.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
W -24.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
H -32.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
E -40.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
A -48.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
E -56.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
      -     H     E     A     G     A     W     G     H     E     E
-   0.0  -8.0 -16.0 -24.0 -32.0 -40.0 -48.0 -56.0 -64.0 -72.0 -80.0
P  -8.0  -2.0  -9.0 -17.0 -25.0 -33.0 -41.0 -49.0 -57.0 -65.0 -73.0
A -16.0 -10.0  -3.0  -4.0 -12.0 -20.0 -28.0 -36.0 -44.0 -52.0 -60.0
W -24.0 -18.0 -11.0  -6.0  -7.0 -15.0  -5.0 -13.0 -21.0 -29.0 -37.0
H -32.0 -14.0 -18.0 -13.0  -8.0  -9.0 -13.0  -7.0  -3.0 -

In [42]:
def smith_waterman_algorithm(
    sequence_1: str,
    sequence_2: str,
    substitution_matrix: dict[str, dict[str, float]],
    gap_cost: int,
) -> tuple[str, str, int]:
    """
    Smith-Waterman algorithm for local sequence alignment.

    Returns:
        tuple[str, int]: alignment and quality score
    """
    # Decide X and Y axes
    longer_sequence = None
    shorter_sequence = None
    if len(sequence_1) > len(sequence_2):
        longer_sequence = sequence_1
        shorter_sequence = sequence_2
    elif len(sequence_1) < len(sequence_2):
        longer_sequence = sequence_2
        shorter_sequence = sequence_1
    else:
        # If the sequences are of equal length, then it doesn't matter which is on X or Y axis
        longer_sequence = sequence_1
        shorter_sequence = sequence_2

    # Initialize the scoring matrix
    scoring_matrix = np.zeros((len(shorter_sequence) + 1, len(longer_sequence) + 1))
    print_scoring_matrix(scoring_matrix, sequence_1, sequence_2)

    # Fill in the scoring matrix
    for x in range(1, len(longer_sequence) + 1):
        for y in range(1, len(shorter_sequence) + 1):
            match = (
                scoring_matrix[y - 1, x - 1]
                + substitution_matrix[shorter_sequence[y - 1]][longer_sequence[x - 1]]
            )
            delete = scoring_matrix[y - 1, x] + gap_cost
            insert = scoring_matrix[y, x - 1] + gap_cost
            scoring_matrix[y, x] = max(match, delete, insert, 0)

    print_scoring_matrix(scoring_matrix, sequence_1, sequence_2)

    # Find the maximum score
    max_score = 0
    max_x = 0
    max_y = 0
    for x in range(1, len(longer_sequence) + 1):
        for y in range(1, len(shorter_sequence) + 1):
            if scoring_matrix[y, x] > max_score:
                max_score = scoring_matrix[y, x]
                max_x = x
                max_y = y

    # Traceback
    alignment_x = ""
    alignment_y = ""
    x, y = max_x, max_y

    while x > 0 or y > 0:
        if (
            x > 0
            and y > 0
            and scoring_matrix[y, x]
            == scoring_matrix[y - 1, x - 1]
            + substitution_matrix[shorter_sequence[y - 1]][longer_sequence[x - 1]]
        ):
            alignment_x = longer_sequence[x - 1] + alignment_x
            alignment_y = shorter_sequence[y - 1] + alignment_y
            x -= 1
            y -= 1
        elif y > 0 and scoring_matrix[y, x] == scoring_matrix[y - 1, x] + gap_cost:
            alignment_x = "-" + alignment_x
            alignment_y = shorter_sequence[y - 1] + alignment_y
            y -= 1
        elif x > 0 and scoring_matrix[y, x] == scoring_matrix[y, x - 1] + gap_cost:
            alignment_x = longer_sequence[x - 1] + alignment_x
            alignment_y = "-" + alignment_y
            x -= 1
        else:
            break
    print()
    print(alignment_x)
    print(alignment_y)
    
    return alignment_x, alignment_y, max_score


_ = smith_waterman_algorithm(SEQUENCE_1, SEQUENCE_2, bl.BLOSUM(50), -8)

     -    H    E    A    G    A    W    G    H    E    E
-  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
P  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
A  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
W  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
H  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
E  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
A  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
E  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
     -     H     E     A     G     A     W     G     H     E     E
-  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
P  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
A  0.0   0.0   0.0   5.0   0.0   5.0   0.0   0.0   0.0   0.0   0.0
W  0.0   0.0   0.0   0.0   2.0   0.0  20.0  12.0   4.0   0.0   0.0
H  0.0  10.0   2.0   0.0   0.0   0.0  12.0  18.0  22.0  14.0   6.0
E  0.0   2.0  16.0   8.0   0.0   0.0   4.0  10.0  18.0  28.0  20.0
A  0.0   0.0   8.0

## 3. BLAST Algorithm
Implement the core routines of the BLAST algorithm: 
* The algorithm is given a sequence as well as a „database“ (i.e., set of sequences) * You do not have to implement a FSM 
* Use a substitution matrix and thresholds for the similarity of words (these are also inputs to the function) 
* The result is the information, which DB sequence the given sequence is similar to plus the score and the position of the alignment

In [43]:
from Bio.Align import substitution_matrices
from Bio import SeqIO

AMINO_ACIDS = "ACDEFGHIKLMNPQRSTVWY"

# calculate the similarity between two sequences
def sequence_similarity(seq1: str, seq2:str, matrix: np.ndarray) -> float:
    if len(seq1) != len(seq2):
        raise ValueError("Sequences must be of the same length.")
    return sum(matrix[seq1[i], seq2[i]] for i in range(len(seq1)))

# find all possible mutants of a word with a similarity above a threshold
def find_mutants(target_seq: str, words: list[str], amino_acids: str, matrix: np.ndarray, threshold: float) -> dict[str, int]:
    mutants = {}
    for word in words:
        start_index = target_seq.index(word) 
        for i, char in enumerate(word):
            for amino_acid in amino_acids:
                if amino_acid != char:
                    mutant = word[:i] + amino_acid + word[i+1:] # create a mutant by replacing the character at index i
                    if sequence_similarity(word, mutant, matrix) >= threshold: # check if the similarity score is above the threshold
                        mutants[mutant] = start_index
    return mutants

def get_alignments(db_seq:str, query_seq:str, query_dict:dict, window:int, threshold: float, matrix: np.ndarray) -> dict[str, tuple[int, float]]:
    alignments = {}
    query_len = len(query_seq)

    # loop over the database sequence to find potential matches
    for i in range(len(db_seq) - window + 1):
        subseq = db_seq[i:i + window]
        if subseq in query_dict:
            start_idx = max(0, i - query_dict[subseq]) 
            end_idx = min(len(db_seq), start_idx + query_len) 
            extended_db_subseq = db_seq[start_idx:end_idx] # get the full segment of the database sequence
            if len(extended_db_subseq) == query_len:
                score = sequence_similarity(extended_db_subseq, query_seq, matrix) # calculate the similarity score
                if score > threshold:
                    alignments[extended_db_subseq] = (start_idx, score)

    return alignments

def blast_algo(database_seq:str, target_seq: str, window: int, threshold:float, matrix: np.ndarray) -> list[dict[str, str | int | float]]:
    global AMINO_ACIDS

    query_words = {target_seq[i:i + window]: i for i in range(len(target_seq) - window + 1)}
    extended_words = query_words.copy() 
    extended_words.update(find_mutants(target_seq, list(query_words.keys()), AMINO_ACIDS, matrix, threshold))
    alignments = get_alignments(database_seq, target_seq, extended_words, window, threshold, matrix)

    results = [
        {"sequence": seq, "start_position": start, "score": score}
        for seq, (start, score) in alignments.items()
    ]

    return results

def read_fasta(file_path: str) -> dict[str, str]:
    sequences = {}
    for record in SeqIO.parse(file_path, "fasta"):
        sequences[record.id] = str(record.seq)
    return sequences

fasta = read_fasta("ATBI_BLAST_exemplary input db.fasta")
database_seq = list(fasta.values())[0]
target_seq = "SIRFVRLI"
window = 3
threshold = 5

blast_algo(database_seq, target_seq, window, threshold, substitution_matrices.load("BLOSUM62"))


[{'sequence': 'RIRSVYPV', 'start_position': 298, 'score': 8.0},
 {'sequence': 'TIAFGGCV', 'start_position': 402, 'score': 7.0},
 {'sequence': 'TSAFVETV', 'start_position': 483, 'score': 10.0},
 {'sequence': 'AARVVRSI', 'start_position': 539, 'score': 15.0},
 {'sequence': 'AQNSVRVL', 'start_position': 554, 'score': 8.0},
 {'sequence': 'SLRLIDAM', 'start_position': 576, 'score': 12.0},
 {'sequence': 'SQYSLRLI', 'start_position': 573, 'score': 11.0},
 {'sequence': 'GVEFLRDG', 'start_position': 637, 'score': 7.0},
 {'sequence': 'DITFLKKD', 'start_position': 1274, 'score': 7.0},
 {'sequence': 'GIEFLKRG', 'start_position': 1523, 'score': 7.0},
 {'sequence': 'SLREVRTI', 'start_position': 1560, 'score': 20.0},
 {'sequence': 'AANFCALI', 'start_position': 1706, 'score': 12.0},
 {'sequence': 'NIKFADDL', 'start_position': 1933, 'score': 9.0},
 {'sequence': 'SPNFSKLI', 'start_position': 2219, 'score': 15.0},
 {'sequence': 'IIWFLLLS', 'start_position': 2229, 'score': 6.0},
 {'sequence': 'SVCLGSLI', 