# Re-Implementation of profileHMM to benchmark against Needleman-Wunsch for Sequence Alignment

In [1]:
%pip install Biopython

[31mERROR: Could not find an activated virtualenv (required).[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
#!/usr/bin/env python3
import argparse
import json
from Bio import SeqIO
import numpy as np
from collections import Counter

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Process the Data

In [None]:

# Path to the FASTA file
fasta_file = 'unitpro_data.fasta'

# Parse FASTA file
sequences = []
for record in SeqIO.parse(fasta_file, "fasta"):
    sequences.append({
        "id": record.id,
        "description": record.description,
        "sequence": str(record.seq)
    })


#print descriptions
for seq in sequences:
    print(seq["description"])


# Count species occurrences
species = [seq["description"].split("OS=")[1].split(" OX=")[0] for seq in sequences]
print(Counter(species))

# Verify loaded data
print(f"Number of sequences: {len(sequences)}")
print(f"Example sequence: {sequences[0]}")

with open("c2_domain_dataset.fasta", "w") as file:
    for seq in sequences:
        file.write(f">{seq['id']} {seq['description']}\n{seq['sequence']}\n")


FileNotFoundError: [Errno 2] No such file or directory: 'unitpro_data.fasta'

Run MSA from terminal:
mafft --auto c2_domain_dataset.fasta > aligned_c2_domain.fasta


Trying out profile HMM

In [None]:
from Bio import SeqIO
from collections import Counter

class ProfileHMM:
    def __init__(self):
        self.transition_probabilities = {}
        self.emission_probabilities = {}
        self.alphabet = None

    def train(self, aligned_sequences):
        # Extract the alphabet (excluding gaps)
        self.alphabet = list(set("".join(aligned_sequences).replace("-", "")))

        match_counts = Counter()
        insertion_counts = Counter()
        transition_counts = Counter()

        # Process each column in the alignment
        for col in zip(*aligned_sequences):
            non_gaps = [c for c in col if c != "-"]
            gaps = [c for c in col if c == "-"]

            if len(non_gaps) > len(gaps):  # Majority non-gap → Match state
                match_counts.update(non_gaps)
                transition_counts["Match"] += 1
            else:  # Majority gap → Deletion state
                transition_counts["Deletion"] += 1

            # Count transitions between states (basic example)
            prev_state = None
            for c in col:
                if c != "-":
                    curr_state = "Match"
                else:
                    curr_state = "Deletion"
                if prev_state is not None:
                    transition_counts[f"{prev_state}->{curr_state}"] += 1
                prev_state = curr_state

        # Calculate probabilities
        total_matches = sum(match_counts.values())
        self.emission_probabilities = {
            char: match_counts[char] / total_matches for char in self.alphabet
        }

        total_transitions = sum(transition_counts.values())
        self.transition_probabilities = {
            state: count / total_transitions for state, count in transition_counts.items()
        }

    def print_model(self):
        print("Transition Probabilities:", self.transition_probabilities)
        print("Emission Probabilities:", self.emission_probabilities)


# Load the aligned sequences from your FASTA file
file_path = "aligned_c2_domain.fasta"
aligned_sequences = [str(record.seq) for record in SeqIO.parse(file_path, "fasta")]

# Train and display the Profile HMM
hmm = ProfileHMM()
hmm.train(aligned_sequences)
hmm.print_model()


Transition Probabilities: {'Deletion': 0.023544498589381894, 'Deletion->Deletion': 0.6137813114473797, 'Deletion->Match': 0.1109771736342652, 'Match->Deletion': 0.1055997264255792, 'Match': 0.005026929982046679, 'Match->Match': 0.14107035992134737}
Emission Probabilities: {'D': 0.06050531914893617, 'I': 0.043617021276595745, 'T': 0.05518617021276596, 'R': 0.0581781914893617, 'K': 0.07293882978723404, 'A': 0.052327127659574466, 'G': 0.0495345744680851, 'N': 0.03776595744680851, 'S': 0.08131648936170213, 'P': 0.059375, 'W': 0.01077127659574468, 'Q': 0.052194148936170214, 'F': 0.039029255319148935, 'M': 0.02267287234042553, 'V': 0.06230053191489362, 'C': 0.014827127659574469, 'H': 0.026063829787234042, 'E': 0.07386968085106382, 'L': 0.10186170212765958, 'Y': 0.025664893617021275}


## Needleman-Wunsch

In [None]:
'''Script for computing sequence alignments using Needleman-Wunsch with
   affine gap penalties.
Arguments:
    f - FASTA file with sequences in FASTA format.
    s - JSON with the score matrix for alignment.
    d - The gap opening penalty for the alignment.
    e - The gap extension penalty for the alignment.

Outputs:
    Prints alignment to console.

Example Usage:
    python 2c.py -f sequences.fasta -s score_matrix.json -d 430 -e 30
'''

D = 0
U = 1
L = 2

M = 0
IX = 1
IY = 2


'''Computes the actual string alignments given the traceback matrix.
Arguments:
    x: the first string we're aligning
    y: the second string we're aligning
    t: the traceback matrix, which stores values that point to which
       prior matrix was used to reach a given location in each of the
       3 matrices.
Returns:
    a_x: the string for the alignment of x's sequence
    a_y: the string for the alignment of y's sequence
'''
def traceback(x, y, t, state):
    ''' Complete this function. '''
    curr_x = len(x)
    curr_y = len(y)

    a_x = ""
    a_y = ""

    k = state

    while(curr_x > 0 or curr_y > 0):
        try:
            dir, next_k = t[k][curr_x][curr_y]
        except:
            print(state, curr_x, curr_y)

        if next_k == M:
            a_x = x[curr_x - 1] + a_x
            a_y = y[curr_y - 1] + a_y
            curr_x -= 1
            curr_y -= 1
        elif next_k == IX:
            a_x = x[curr_x - 1] + a_x
            a_y = "-" + a_y
            curr_x -= 1
        elif next_k == IY:
            a_x = "-" + a_x
            a_y = y[curr_y - 1] + a_y
            curr_y -= 1

        k = dir
    return a_x, a_y


'''Computes the score and alignment of two strings using an affine gap penalty.
Arguments:
    x: the first string we're aligning
    y: the second string we're aligning
    s: the score matrix
    d: the gap opening penalty
    e: the gap extension penalty
Returns:
    score: the score of the optimal sequence alignment
    a_x: the aligned first string
    a_y: the aligned second string
The latter two are computed using the above traceback method.
'''
def affine_sequence_alignment(x, y, s, d, e):
    ''' Recurrence matrix, redefine/use as necessary. '''
    m = None
    ''' Recurrence matrix, redefine/use as necessary. '''
    i_x = None
    ''' Recurrence matrix, redefine/use as necessary. '''
    i_y = None
    ''' Traceback matrix, redefine/use as necessary. '''
    t = None

    # a_x, a_y = traceback(x, y, t)
    ''' Complete this function. '''

    m = [[float("-inf") for _ in range(len(y) + 1)] for _ in range(len(x) + 1)]
    i_x = [[float("-inf") for _ in range(len(y) + 1)] for _ in range(len(x) + 1)]
    i_y = [[float("-inf") for _ in range(len(y) + 1)] for _ in range(len(x) + 1)]
    # t = [[None for _ in range(len(y) + 1)] for _ in range(len(x) + 1)]

    t = [[[None for _ in range(len(y) + 1)] for _ in range(len(x) + 1)] for _ in range(3)]

    m[0][0] = i_x[0][0] = i_y[0][0] = 0

    for i in range(1, len(x) + 1):
        t[IX][i][0] = (U, IX)
        i_x[i][0] = i_x[i-1][0] - e

    for j in range(1, len(y) + 1):
        i_y[0][j] = i_y[0][j-1] - e
        t[IY][0][j] = (L, IY)

    for i in range(1, len(x) + 1):
        for j in range(1, len(y) + 1):

            score = s[x[i - 1]][y[j - 1]]

            x_open = m[i - 1][j] - d
            x_extend = i_x[i - 1][j] - e

            x_opt = [x_open, x_extend]

            y_open = m[i][j - 1] - d
            y_extend = i_y[i][j - 1] - e

            y_opt = [y_open, float("-inf") ,y_extend]

            i_x[i][j] = max(x_opt)
            i_y[i][j] = max(y_opt)

            prev_i_x = i_x[i - 1][j - 1] + score
            prev_i_y = i_y[i - 1][j - 1] + score

            dag = m[i - 1][j - 1] + score

            m_opt = [dag, prev_i_x, prev_i_y]

            m[i][j] = max(dag, prev_i_x, prev_i_y)
            t[M][i][j] = (m_opt.index(m[i][j]), M)

            t[IX][i][j] = (x_opt.index(i_x[i][j]), IX)

            t[IY][i][j] = (y_opt.index(i_y[i][j]), IY)

    max_v = max(m[-1][-1], i_x[-1][-1], i_y[-1][-1])
    if max_v == m[-1][-1]:
        state = M
    elif max_v == i_x[-1][-1]:
        state = IX
    else:
        state = IY

    return max_v, traceback(x, y, t, state)


'''Prints two aligned sequences formatted for convenient inspection.
Arguments:
    a_x: the first sequence aligned
    a_y: the second sequence aligned
Outputs:
    Prints aligned sequences (80 characters per line) to console
'''
def print_alignment(a_x, a_y):
    assert len(a_x) == len(a_y), "Sequence alignment lengths must be the same."
    for i in range(1 + (len(a_x) // 80)):
        start = i * 80
        end = (i + 1) * 80
        print(a_x[start:end])
        print(a_y[start:end])
        print()


def alignment_script(fasta_file, score_matrix_file, d, e):
    # parser = argparse.ArgumentParser(
    #     description='Calculate sequence alignments for two sequences with an affine gap penalty.')
    # parser.add_argument('-f', action="store", dest="f", type=str, required=True)
    # parser.add_argument('-s', action="store", dest="s", type=str, required=True)
    # parser.add_argument('-d', action="store", dest="d", type=float, required=True)
    # parser.add_argument('-e', action="store", dest="e", type=float, required=True)

    # fasta_file = f
    # score_matrix_file = s
    # d = d
    # e = e

    with open(fasta_file) as f:
        _, x, _, y = [line.strip() for line in f.readlines()]
    with open(score_matrix_file) as f:
        s = json.loads(f.readlines()[0])

    score, (a_x, a_y) = affine_sequence_alignment(x, y, s, d, e)
    print("Alignment:")
    print_alignment(a_x, a_y)
    print("Score: " + str(score))


## MSA Needleman-Wunsch

In [None]:
# Constants for traceback directions
D = 0  # Diagonal
U = 1  # Up
L = 2  # Left

# States in the alignment matrices
M = 0   # Match/mismatch
IX = 1  # Gap in y (insertion in x)
IY = 2  # Gap in x (deletion in x)

class MSA:
    def __init__(self, sequences=None, s=None, d=5, e=1):
        '''Initializes the MSA object.

        Arguments:
            sequences: Optional list of sequences to initialize the MSA.
            s: Scoring matrix (dictionary of dictionaries).
            d: Gap opening penalty.
            e: Gap extension penalty.
        '''
        self.s = s  # Scoring matrix
        self.d = d  # Gap opening penalty
        self.e = e  # Gap extension penalty
        self.msa_sequences_info = []  # List to hold tuples of (sequence, score)
        self.pairwise_scores = {}  # Dictionary to hold pairwise scores
        self.pairwise_alignments = {}  # Dictionary to hold pairwise alignments

        if sequences:
            # Build MSA from initial sequences
            self.build_msa(sequences)

    def build_msa(self, sequences):
        '''Builds the initial MSA using the progressive alignment approach.'''
        n = len(sequences)

        # Compute all pairwise alignments and scores
        for i in range(n):
            for j in range(i + 1, n):
                score, (aligned_seq1, aligned_seq2) = affine_sequence_alignment(
                    sequences[i], sequences[j], self.s, self.d, self.e)
                self.pairwise_scores[(i, j)] = score
                self.pairwise_alignments[(i, j)] = (aligned_seq1, aligned_seq2)
                self.pairwise_scores[(j, i)] = score  # Symmetric
                self.pairwise_alignments[(j, i)] = (aligned_seq2, aligned_seq1)

        # Find the pair with the highest alignment score
        max_pair = max(self.pairwise_scores, key=self.pairwise_scores.get)
        max_score = self.pairwise_scores[max_pair]
        print(f"Initial pair chosen: Sequences {max_pair} with score {max_score}")

        # Initialize MSA with the best scoring pair
        seq1_idx, seq2_idx = max_pair
        aligned_seq1, aligned_seq2 = self.pairwise_alignments[max_pair]
        self.msa_sequences_info = [
            (aligned_seq1, max_score),
            (aligned_seq2, max_score)
        ]

        included_indices = {seq1_idx, seq2_idx}
        remaining_indices = set(range(n)) - included_indices

        while remaining_indices:
            best_score = float("-inf")
            best_seq_idx = None
            best_target_idx = None

            # Find the sequence that aligns best to any sequence already in the MSA
            for seq_idx in remaining_indices:
                for target_idx in included_indices:
                    score = self.pairwise_scores.get((seq_idx, target_idx))
                    if score is not None and score > best_score:
                        best_score = score
                        best_seq_idx = seq_idx
                        best_target_idx = target_idx

            print(f"Adding sequence {best_seq_idx} aligned to sequence {best_target_idx} with score {best_score}")

            # Align the new sequence to the target sequence
            aligned_new_seq, aligned_target_seq = self.pairwise_alignments[(best_seq_idx, best_target_idx)]

            # Merge the new sequence into the MSA
            self.add_sequence(aligned_new_seq, aligned_target_seq, best_target_idx, best_score)

            included_indices.add(best_seq_idx)
            remaining_indices.remove(best_seq_idx)

    def add_sequence(self, aligned_seq, aligned_target_seq, target_idx, score):
        '''Adds a new aligned sequence to the MSA based on alignment with a target sequence in the MSA.

        Arguments:
            aligned_seq: The aligned new sequence to add to the MSA.
            aligned_target_seq: The aligned target sequence (from the pairwise alignment).
            target_idx: The index of the target sequence in the MSA.
            score: The alignment score for the new sequence.
        '''
        # Extract sequences and scores
        msa_sequences = [seq_info[0] for seq_info in self.msa_sequences_info]

        # Update the target sequence in the MSA to match aligned_target_seq
        msa_seq = msa_sequences[target_idx]
        updated_msa_seq = self._merge_sequences(msa_seq, aligned_target_seq)

        # Insert gaps into other MSA sequences to maintain alignment
        for i, seq in enumerate(msa_sequences):
            if i == target_idx:
                msa_sequences[i] = updated_msa_seq
                continue
            msa_sequences[i] = self._insert_gaps(seq, updated_msa_seq)

        # Update the sequences in msa_sequences_info
        for i, (seq, seq_score) in enumerate(self.msa_sequences_info):
            self.msa_sequences_info[i] = (msa_sequences[i], seq_score)

        # Add the new aligned sequence and its score to msa_sequences_info
        self.msa_sequences_info.append((aligned_seq, score))

    def align_sequence(self, sequence):
        '''Aligns a new sequence to the existing MSA without adding it to the MSA.

        Arguments:
            sequence: The sequence to align to the MSA.

        Returns:
            score: The best alignment score.
            aligned_sequence: The aligned sequence corresponding to the input sequence.
            adjusted_msa_sequences: The adjusted MSA sequences (not modifying the internal MSA).
        '''
        best_score = float("-inf")
        best_aligned_seq = None
        best_aligned_msa = None

        # Extract sequences from msa_sequences_info
        msa_sequences = [seq_info[0] for seq_info in self.msa_sequences_info]

        # Align the new sequence to each sequence in the MSA
        for target_idx, msa_seq in enumerate(msa_sequences):
            # Remove gaps from the target MSA sequence
            target_seq_no_gaps = msa_seq.replace('-', '')

            # Perform pairwise alignment
            score, (aligned_seq, aligned_target_seq) = affine_sequence_alignment(
                sequence, target_seq_no_gaps, self.s, self.d, self.e)

            # Update the target MSA sequence to include gaps from the alignment
            adjusted_target_seq = self._merge_sequences(msa_seq, aligned_target_seq)

            # Adjust other MSA sequences to include gaps
            adjusted_msa = []
            for i, seq in enumerate(msa_sequences):
                if i == target_idx:
                    adjusted_msa.append(adjusted_target_seq)
                else:
                    adjusted_msa.append(self._insert_gaps(seq, adjusted_target_seq))

            if score > best_score:
                best_score = score
                best_aligned_seq = aligned_seq
                best_aligned_msa = adjusted_msa

        return best_score, best_aligned_seq, best_aligned_msa

    def _merge_sequences(self, msa_seq, aligned_target_seq):
        '''Updates the msa_seq to include gaps as per aligned_target_seq.

        Arguments:
            msa_seq: The sequence from the MSA (with existing gaps).
            aligned_target_seq: The aligned target sequence from pairwise alignment.

        Returns:
            updated_msa_seq: The updated MSA sequence with gaps adjusted.
        '''
        updated_msa_seq = ""
        msa_idx = 0
        for char in aligned_target_seq:
            if char == '-':
                updated_msa_seq += '-'
            else:
                if msa_idx < len(msa_seq):
                    updated_msa_seq += msa_seq[msa_idx]
                    msa_idx += 1
                else:
                    updated_msa_seq += '-'
        return updated_msa_seq

    def _insert_gaps(self, seq, reference_seq):
        '''Inserts gaps into seq to match the gaps in reference_seq.

        Arguments:
            seq: The sequence to insert gaps into.
            reference_seq: The reference sequence with desired gaps.

        Returns:
            new_seq: The sequence with gaps inserted to align with reference_seq.
        '''
        new_seq = ""
        seq_idx = 0
        for ref_char in reference_seq:
            if ref_char == '-':
                new_seq += '-'
            else:
                if seq_idx < len(seq):
                    new_seq += seq[seq_idx]
                    seq_idx += 1
                else:
                    new_seq += '-'
        return new_seq

    def get_alignment(self):
        '''Returns the current MSA sequences.'''
        return [seq_info[0] for seq_info in self.msa_sequences_info]

    def print_alignment(self, sequences=None):
        '''Prints the aligned sequences.

        Arguments:
            sequences: Optional list of sequences to print. If None, prints the internal MSA sequences.
        '''
        if sequences is None:
            sequences = self.get_alignment()
        for i in range(0, len(sequences[0]), 80):
            for seq in sequences:
                print(seq[i:i+80])
            print()

    def print_alignment_with_scores(self):
        '''Prints all sequences in the MSA along with their alignment scores.'''
        max_seq_length = max(len(seq_info[0]) for seq_info in self.msa_sequences_info)
        for i in range(0, max_seq_length, 80):
            for seq, score in self.msa_sequences_info:
                print(f"{seq[i:i+80]}  (Score: {score})")
            print()

# def affine_sequence_alignment(x, y, s, d, e):
#     '''Aligns sequence x and y using affine gap penalties.

#     Arguments:
#         x: The first sequence (string).
#         y: The second sequence (string).
#         s: Scoring matrix (dictionary of dictionaries).
#         d: Gap opening penalty.
#         e: Gap extension penalty.

#     Returns:
#         max_v: The alignment score.
#         (aligned_x, aligned_y): Tuple of aligned sequences.
#     '''
#     len_x = len(x)
#     len_y = len(y)

#     # Initialize the matrices
#     m = [[float("-inf")] * (len_y + 1) for _ in range(len_x + 1)]
#     i_x = [[float("-inf")] * (len_y + 1) for _ in range(len_x + 1)]
#     i_y = [[float("-inf")] * (len_y + 1) for _ in range(len_x + 1)]
#     t = [[[None] * (len_y + 1) for _ in range(len_x + 1)] for _ in range(3)]

#     m[0][0] = i_x[0][0] = i_y[0][0] = 0

#     # Initialize first column
#     for i in range(1, len_x + 1):
#         i_x[i][0] = -d - (i - 1) * e
#         t[IX][i][0] = (U, IX)

#     # Initialize first row
#     for j in range(1, len_y + 1):
#         i_y[0][j] = -d - (j - 1) * e
#         t[IY][0][j] = (L, IY)

#     # Fill in the matrices
#     for i in range(1, len_x + 1):
#         for j in range(1, len_y + 1):
#             # Score for match/mismatch
#             score = s[x[i - 1]][y[j - 1]]

#             # Update matrices for matches/mismatches
#             m_options = [
#                 m[i - 1][j - 1] + score,
#                 i_x[i - 1][j - 1] + score,
#                 i_y[i - 1][j - 1] + score
#             ]
#             m[i][j] = max(m_options)
#             t[M][i][j] = (m_options.index(m[i][j]), M)

#             # Update matrices for gaps in y (insertion in x)
#             x_open = m[i - 1][j] - d
#             x_extend = i_x[i - 1][j] - e
#             i_x[i][j] = max(x_open, x_extend)
#             t[IX][i][j] = (0 if x_open >= x_extend else 1, IX)

#             # Update matrices for gaps in x (deletion in x)
#             y_open = m[i][j - 1] - d
#             y_extend = i_y[i][j - 1] - e
#             i_y[i][j] = max(y_open, y_extend)
#             t[IY][i][j] = (0 if y_open >= y_extend else 1, IY)

#     # Determine the starting point for traceback
#     max_v = max(m[len_x][len_y], i_x[len_x][len_y], i_y[len_x][len_y])
#     if max_v == m[len_x][len_y]:
#         state = M
#     elif max_v == i_x[len_x][len_y]:
#         state = IX
#     else:
#         state = IY

#     # Perform traceback to get the aligned sequences
#     aligned_x, aligned_y = traceback(x, y, t, state)
#     return max_v, (aligned_x, aligned_y)

# def traceback(x, y, t, state):
#     '''Constructs the aligned sequences from the traceback matrix.'''
#     curr_x = len(x)
#     curr_y = len(y)

#     aligned_x = ""
#     aligned_y = ""

#     k = state

#     while curr_x > 0 or curr_y > 0:
#         dir, next_k = t[k][curr_x][curr_y]

#         if k == M:
#             # Diagonal move: align x[i - 1] with y[j - 1]
#             aligned_x = x[curr_x - 1] + aligned_x
#             aligned_y = y[curr_y - 1] + aligned_y
#             curr_x -= 1
#             curr_y -= 1
#         elif k == IX:
#             # Up move: gap in y (insertion in x)
#             aligned_x = x[curr_x - 1] + aligned_x
#             aligned_y = '-' + aligned_y
#             curr_x -= 1
#         elif k == IY:
#             # Left move: gap in x (deletion in x)
#             aligned_x = '-' + aligned_x
#             aligned_y = y[curr_y - 1] + aligned_y
#             curr_y -= 1

#         k = dir

#     return aligned_x, aligned_y


In [None]:
def read_fasta(fasta_file):
  with open(fasta_file) as f:
    lines = [line.strip() for line in f.readlines()]

  return lines[1::2]

def read_score_matrix(score_matrix_file):
  with open(score_matrix_file) as f:
    s = json.loads(f.readlines()[0])
  return s


## NW MSA-Testing

In [None]:
init_path = '/content/drive/MyDrive/Classes/Fall 2024/CS 4775/Final Project Files/HW1 Testing'
s_mat = read_score_matrix(f'{init_path}/score_matrix.json')
seqs_1 = read_fasta(f'{init_path}/test-2c-1.fasta')
seqs_2 = read_fasta(f'{init_path}/test-2c-2.fasta')
seqs_3 = read_fasta(f'{init_path}/test-2c-3.fasta')

tests = [
    [seqs_1, s_mat, 430, 30],
    [seqs_1, s_mat, 430, 60],
    [seqs_1, s_mat, 430, 430],
    [seqs_2, s_mat, 200, 50],
    [seqs_2, s_mat, 200, 200],
    [seqs_3, s_mat, 430, 30],
    [seqs_3, s_mat, 100, 100],
]

#T1_1
for test in tests:
  msa = MSA(*test)
  msa.print_alignment_with_scores()



Initial pair chosen: Sequences (0, 1) with score 153
TGCCAATCGTCGACTGCC  (Score: 153)
TACCA---GTCACATGTC  (Score: 153)

Initial pair chosen: Sequences (0, 1) with score 93
TGCCAATCGTCGACTGCC  (Score: 93)
TACCA---GTCACATGTC  (Score: 93)

Initial pair chosen: Sequences (0, 1) with score -359
TGCCAATCGTCGACTGCC  (Score: -359)
TACCAGTCA-C-A-TGTC  (Score: -359)

Initial pair chosen: Sequences (0, 1) with score 688
TGCCAAGCTGTC-GACCC  (Score: 688)
--ACAAGCTGTCGGGCCT  (Score: 688)

Initial pair chosen: Sequences (0, 1) with score 471
TGCCAAGCTGTC-GACCC  (Score: 471)
-A-CAAGCTGTCGGGCCT  (Score: 471)

Initial pair chosen: Sequences (0, 1) with score 3824
ATAGAGAAACATAGACAGACGTTTTACGTACGCTGGAAAGCAGACCTCATCATAGGGCGCCTTGTAACCTGGATA  (Score: 3824)
AGAGGGTAACATAGACAGACGTAATACGAACGCTGGAAAGCAG---TCA--GTTGCGCACCTTGTAACCTGGATA  (Score: 3824)

Initial pair chosen: Sequences (0, 1) with score 4319
ATAGAGAAACATAGACAGACGTTTTACGTACGCTGGAAAGCAGACCTCA-TCATAGGGCGCCTTGTAACCTGGATA  (Score: 4319)
AGAGGGTAACATAGACA

## HMM Implementation

## Benchmark Clustal, MAFFT, Muscle

### Installs

In [None]:
from Bio.Align.Applications import MafftCommandline, MuscleCommandline

In [None]:
# Install MAFFT
!apt-get install mafft

# Install MUSCLE
!apt-get install muscle

# Install ClustalW
!apt-get install clustalw

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-lato libauthen-sasl-perl libclone-perl libdata-dump-perl libencode-locale-perl
  libfile-listing-perl libfont-afm-perl libhtml-form-perl libhtml-format-perl libhtml-parser-perl
  libhtml-tagset-perl libhtml-tree-perl libhttp-cookies-perl libhttp-daemon-perl libhttp-date-perl
  libhttp-message-perl libhttp-negotiate-perl libio-html-perl libio-socket-ssl-perl
  liblwp-mediatypes-perl liblwp-protocol-https-perl libmailtools-perl libnet-http-perl
  libnet-smtp-ssl-perl libnet-ssleay-perl libruby3.0 libtry-tiny-perl liburi-perl libwww-perl
  libwww-robotrules-perl lynx lynx-common mailcap mime-support netbase perl-openssl-defaults rake
  ruby ruby-net-telnet ruby-rubygems ruby-webrick ruby-xmlrpc ruby3.0 rubygems-integration
Suggested packages:
  libdigest-hmac-perl libgssapi-perl libcrypt-ssleay-perl libsub-name-perl libbusiness-isbn-

### MAFFT

In [None]:
from Bio.Align.Applications import MafftCommandline
from Bio import AlignIO

# MAFFT Sequence Parsing

file_path = '/content/drive/MyDrive/Classes/Fall 2024/CS 4775/Final Project Files'

dataset_file_path = f'{file_path}/c2_domain_dataset.fasta'

# Define the input and output file names
input_file = dataset_file_path
mafft_output = f"{file_path}/mafft_aligned.fasta"

# Create a MAFFT command-line object
mafft_cline = MafftCommandline(input=input_file, inputorder=True)

# Execute MAFFT and capture the alignment
stdout, stderr = mafft_cline()

# Write the alignment to a file
with open(mafft_output, "w") as handle:
    handle.write(stdout)

# Read the alignment using AlignIO
alignment = AlignIO.read(mafft_output, "fasta")

# Print the alignment
print(alignment)



Alignment with 35 rows and 3399 columns
MATHLSHPQRRPLLRQAIKIRRRRVRDLQDPPP-QATQEVQV--...--- sp|B2RUP2|UN13D_MOUSE
MVDSVGFAEAWR---------------AQFPDSEPPRMELRS--...--- sp|F1LXF1|BCR_RAT
MM------------------------------------------...--- sp|G3XA57|RFIP2_MOUSE
--------------------------------------------...--- sp|O08625|SYT10_RAT
MA------------------------------------------...--- sp|O08835|SYT11_RAT
MASNPD--------------------------------------...--- sp|O08874|PKN2_RAT
MF------------------------------------------...--- sp|O15484|CAN5_HUMAN
MG------------------------------------------...--- sp|O35646|CAN6_MOUSE
--------------------------------------------...--- sp|O43581|SYT7_HUMAN
MA----------------------YNWQTEPNRAEPQEGGHD--...--- sp|O70173|P3C2G_RAT
MSLLNP--------------------VLLPP-------KVKA--...-RL sp|O89040|PLCB2_RAT
MAKSS---------------------------------------...--- sp|O95294|RASL1_HUMAN
MSD-----------------------------------------...--- sp|O95741|CPNE6_HUMAN
MQ---------------------------

### MUSCLE

In [None]:
from Bio.Align.Applications import MuscleCommandline

# Define the input and output file names
muscle_output = f"{file_path}/muscle_aligned.fasta"

# Create a MUSCLE command-line object
muscle_cline = MuscleCommandline(input=input_file, out=muscle_output)

# Execute MUSCLE
stdout, stderr = muscle_cline()

# Read the alignment using AlignIO
alignment = AlignIO.read(muscle_output, "fasta")

# Print the alignment
print(alignment)


Alignment with 35 rows and 2194 columns
------------------------------------MEIKEEGT...--- sp|Q9QWG5|INP4B_RAT
MAYNWQTEPNRAEPQEGGHDHQQCHHADQHLSSRQVRLGFDQLV...I-- sp|O70173|P3C2G_RAT
-----------------------------------------MSL...--- sp|O89040|PLCB2_RAT
---------------------------------------MAKPY...--- sp|Q9QW07|PLCB4_RAT
--------------------------------------------...--- sp|G3XA57|RFIP2_MOUSE
--------------------------------------------...LRL sp|Q4KWH8|PLCH1_HUMAN
------------------------------MAVEDEGLRVFQSV...--- sp|Q14644|RASA3_HUMAN
----MAAAAPAAAAASSEAPAASATAEPEAGDQDSREVRVLQSL...--- sp|Q15283|RASA2_HUMAN
-----------------------------------------MSF...--- sp|P47712|PA24A_HUMAN
-----------------------------------------MSF...--- sp|P47713|PA24A_MOUSE
----------------------------------------MATC...--- sp|P10820|PERF_MOUSE
--------------------------------------------...--- sp|P0C871|PA24B_MOUSE
--------------------------------------------...--- sp|Q9HAU4|SMUF2_HUMAN
--------------------

### ClustalW

In [None]:
from Bio.Align.Applications import ClustalwCommandline
clustalw_output = f"{file_path}/clustalw_aligned.fasta"

# Create a ClustalW command-line object
clustalw_cline = ClustalwCommandline("clustalw", infile=input_file, outfile=clustalw_output, output="FASTA")

# Execute ClustalW
stdout, stderr = clustalw_cline()

# Read the alignment using AlignIO
alignment = AlignIO.read(clustalw_output, "fasta")

# Print the alignment
print(alignment)

Alignment with 35 rows and 2155 columns
--------------------------------------------...--- sp|P47712|PA24A_HUMAN
--------------------------------------------...--- sp|P47713|PA24A_MOUSE
--------------------------------------------...L-- sp|O89040|PLCB2_RAT
--------------------------------------------...--- sp|Q9QW07|PLCB4_RAT
--------------------------------------------...--- sp|Q14644|RASA3_HUMAN
--------------------------------------------...--- sp|Q15283|RASA2_HUMAN
--------------------------------------------...--- sp|Q96FN4|CPNE2_HUMAN
--------------------------------------------...--- sp|Q99829|CPNE1_HUMAN
--------------------------------------------...--- sp|O95741|CPNE6_HUMAN
--------------------------------------------...--- sp|P17252|KPCA_HUMAN
--------------------------------------------...--- sp|P63319|KPCG_RAT
--------------------------------------------...--- sp|Q9HAU4|SMUF2_HUMAN
--------------------------------------------...--- sp|Q9HCE7|SMUF1_HUMAN
-------------------

### NW MSA Same Format

In [None]:
class MSA:
    def __init__(self, input_file=None, s=None, d=5, e=1):
        '''Initializes the MSA object.

        Arguments:
            input_file: Path to the input file containing sequences.
            s: Scoring matrix (dictionary of dictionaries).
            d: Gap opening penalty.
            e: Gap extension penalty.
        '''
        self.s = s  # Scoring matrix
        self.d = d  # Gap opening penalty
        self.e = e  # Gap extension penalty
        self.msa_sequences_info = []  # List to hold tuples of (header, sequence, score)
        self.pairwise_scores = {}  # Dictionary to hold pairwise scores
        self.pairwise_alignments = {}  # Dictionary to hold pairwise alignments

        if input_file:
            # Read sequences from the input file
            sequences = self.read_sequences(input_file)
            # Build MSA from sequences
            self.build_msa(sequences)

    def read_sequences(self, input_file):
        '''Reads sequences from an input file.

        Arguments:
            input_file: Path to the input file.

        Returns:
            sequences: List of tuples (header, sequence).
        '''
        sequences = []
        for record in SeqIO.parse(input_file, "fasta"):
            # Retain the full header
            header = record.description
            sequences.append((header, str(record.seq)))
        return sequences

    def build_msa(self, sequences):
        '''Builds the initial MSA using the progressive alignment approach.'''
        n = len(sequences)

        # Compute all pairwise alignments and scores
        for i in range(n):
            header_i, seq_i = sequences[i]
            for j in range(i + 1, n):
                header_j, seq_j = sequences[j]
                score, (aligned_seq1, aligned_seq2) = affine_sequence_alignment(
                    seq_i, seq_j, self.s, self.d, self.e)
                self.pairwise_scores[(i, j)] = score
                self.pairwise_alignments[(i, j)] = (aligned_seq1, aligned_seq2)
                self.pairwise_scores[(j, i)] = score  # Symmetric
                self.pairwise_alignments[(j, i)] = (aligned_seq2, aligned_seq1)

        # Find the pair with the highest alignment score
        max_pair = max(self.pairwise_scores, key=self.pairwise_scores.get)
        max_score = self.pairwise_scores[max_pair]
        print(f"Initial pair chosen: Sequences {max_pair} with score {max_score}")

        # Initialize MSA with the best scoring pair
        seq1_idx, seq2_idx = max_pair
        header1, _ = sequences[seq1_idx]
        header2, _ = sequences[seq2_idx]
        aligned_seq1, aligned_seq2 = self.pairwise_alignments[max_pair]
        self.msa_sequences_info = [
            (header1, aligned_seq1, max_score),
            (header2, aligned_seq2, max_score)
        ]

        included_indices = {seq1_idx, seq2_idx}
        remaining_indices = set(range(n)) - included_indices

        while remaining_indices:
            best_score = float("-inf")
            best_seq_idx = None
            best_target_idx = None

            # Find the sequence that aligns best to any sequence already in the MSA
            for seq_idx in remaining_indices:
                for target_idx in included_indices:
                    score = self.pairwise_scores.get((seq_idx, target_idx))
                    if score is not None and score > best_score:
                        best_score = score
                        best_seq_idx = seq_idx
                        best_target_idx = target_idx

            print(f"Adding sequence {best_seq_idx} aligned to sequence {best_target_idx} with score {best_score}")

            # Align the new sequence to the target sequence
            aligned_new_seq, aligned_target_seq = self.pairwise_alignments[(best_seq_idx, best_target_idx)]
            header_new, _ = sequences[best_seq_idx]

            # Merge the new sequence into the MSA
            self.add_sequence(header_new, aligned_new_seq, aligned_target_seq, best_target_idx, best_score)

            included_indices.add(best_seq_idx)
            remaining_indices.remove(best_seq_idx)

    def add_sequence(self, header, aligned_seq, aligned_target_seq, target_idx, score):
        '''Adds a new aligned sequence to the MSA based on alignment with a target sequence in the MSA.

        Arguments:
            header: The header of the new sequence.
            aligned_seq: The aligned new sequence to add to the MSA.
            aligned_target_seq: The aligned target sequence (from the pairwise alignment).
            target_idx: The index of the target sequence in the MSA.
            score: The alignment score for the new sequence.
        '''
        # Extract sequences and scores
        msa_sequences = [seq_info[1] for seq_info in self.msa_sequences_info]

        # Update the target sequence in the MSA to match aligned_target_seq
        msa_seq = msa_sequences[target_idx]
        updated_msa_seq = self._merge_sequences(msa_seq, aligned_target_seq)

        # Insert gaps into other MSA sequences to maintain alignment
        for i, seq in enumerate(msa_sequences):
            if i == target_idx:
                msa_sequences[i] = updated_msa_seq
                continue
            msa_sequences[i] = self._insert_gaps(seq, updated_msa_seq)

        # Update the sequences in msa_sequences_info
        for i, (header_i, seq, seq_score) in enumerate(self.msa_sequences_info):
            self.msa_sequences_info[i] = (header_i, msa_sequences[i], seq_score)

        # Adjust the new sequence to align with updated MSA
        aligned_new_seq = self._insert_gaps(aligned_seq, updated_msa_seq)

        # Add the new aligned sequence and its score to msa_sequences_info
        self.msa_sequences_info.append((header, aligned_new_seq, score))

    def _merge_sequences(self, msa_seq, aligned_target_seq):
        '''Updates the msa_seq to include gaps as per aligned_target_seq.

        Arguments:
            msa_seq: The sequence from the MSA (with existing gaps).
            aligned_target_seq: The aligned target sequence from pairwise alignment.

        Returns:
            updated_msa_seq: The updated MSA sequence with gaps adjusted.
        '''
        updated_msa_seq = ""
        msa_idx = 0
        for char in aligned_target_seq:
            if char == '-':
                updated_msa_seq += '-'
            else:
                if msa_idx < len(msa_seq):
                    updated_msa_seq += msa_seq[msa_idx]
                    msa_idx += 1
                else:
                    updated_msa_seq += '-'
        return updated_msa_seq

    def _insert_gaps(self, seq, reference_seq):
        '''Inserts gaps into seq to match the gaps in reference_seq.

        Arguments:
            seq: The sequence to insert gaps into.
            reference_seq: The reference sequence with desired gaps.

        Returns:
            new_seq: The sequence with gaps inserted to align with reference_seq.
        '''
        new_seq = ""
        seq_idx = 0
        for ref_char in reference_seq:
            if ref_char == '-':
                new_seq += '-'
            else:
                if seq_idx < len(seq):
                    new_seq += seq[seq_idx]
                    seq_idx += 1
                else:
                    new_seq += '-'
        return new_seq

    def get_alignment(self):
        '''Returns the current MSA sequences along with headers.'''
        return [(seq_info[0], seq_info[1]) for seq_info in self.msa_sequences_info]

    def print_alignment(self, line_length=60):
        '''Prints the aligned sequences in the specified format.

        Arguments:
            line_length: The number of characters per line for the sequences.
        '''
        sequences_info = self.get_alignment()
        for header, sequence in sequences_info:
            print(f">{header}")
            for i in range(0, len(sequence), line_length):
                print(sequence[i:i+line_length])
            print()

    def write_custom_alignment(self, filename, line_length=60):
        '''Writes the alignment to a file in the specified custom format.

        Arguments:
            filename: The name of the output file.
            line_length: The number of characters per line for the sequences.
        '''
        sequences_info = self.get_alignment()
        with open(filename, 'w') as f:
            for header, sequence in sequences_info:
                f.write(f">{header}\n")
                for i in range(0, len(sequence), line_length):
                    f.write(sequence[i:i+line_length] + '\n')
                f.write('\n')
        print(f"Alignment written to {filename} in custom format.")

# Define the BLOSUM62 scoring matrix
from Bio.Align import substitution_matrices

pam30 = substitution_matrices.load("PAM30")
pam30_dict = {row: {col: pam30[row, col] for col in pam30.alphabet} for row in pam30.alphabet}

In [None]:
msa = MSA(input_file=input_file, s=pam30_dict, d=9, e=1)

# Write the alignment to a file in the custom format
msa.write_custom_alignment(f'{file_path}/msa_alignment_aligned.fasta')

# Print the alignment
msa.print_alignment()

KeyboardInterrupt: 

### Final Processing

In [None]:
from Bio import AlignIO
from Bio.Align import MultipleSeqAlignment

def sort_alignment(input_alignment_file, output_alignment_file, file_format):
    # Read the alignment
    alignment = AlignIO.read(input_alignment_file, file_format)

    # Sort the sequences based on their IDs
    sorted_records = sorted(alignment, key=lambda record: record.id)

    # Create a new MultipleSeqAlignment object
    sorted_alignment = MultipleSeqAlignment(sorted_records)

    # Write the sorted alignment to a file
    AlignIO.write(sorted_alignment, output_alignment_file, file_format)

    return sorted_alignment


In [None]:
mafft_sorted_output = f"{file_path}/mafft_aligned_sorted.fasta"
sorted_mafft_alignment = sort_alignment(mafft_output, mafft_sorted_output, "fasta")
muscle_sorted_output = f"{file_path}/muscle_aligned_sorted.fasta"
sorted_muscle_alignment = sort_alignment(muscle_output, muscle_sorted_output, "fasta")
clustalw_sorted_output = f"{file_path}/clustalw_aligned_sorted.fasta"
sorted_clustalw_alignment = sort_alignment(clustalw_output, clustalw_sorted_output, "fasta")

# Get the sequence IDs from each sorted alignment
mafft_ids = [record.id for record in sorted_mafft_alignment]
muscle_ids = [record.id for record in sorted_muscle_alignment]
clustalw_ids = [record.id for record in sorted_clustalw_alignment]

# Verify that the IDs are the same across all alignments
if mafft_ids == muscle_ids == clustalw_ids:
    print("All alignments have sequences in the same order.")
else:
    print("Sequence order differs between alignments.")

    # Identify differences
    print("MAFFT IDs:", mafft_ids)
    print("MUSCLE IDs:", muscle_ids)
    print("ClustalW IDs:", clustalw_ids)


All alignments have sequences in the same order.


## Benchmark

#### bali_score setup

In [None]:
!apt-get update
!apt-get install -y libexpat1-dev
!unzip bali_score_src.zip

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building depe

In [None]:
!apt-get install automake autoconf libtool
!apt-get install xsltproc
!apt-get reinstall libexpat1 libexpat1-dev

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
autoconf is already the newest version (2.71-2).
automake is already the newest version (1:1.16.5-1.3).
libtool is already the newest version (2.4.6-15build2).
0 upgraded, 0 newly installed, 0 to remove and 51 not upgraded.
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
xsltproc is already the newest version (1.1.34-4ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 51 not upgraded.
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
0 upgraded, 0 newly installed, 2 reinstalled, 0 to remove and 51 not upgraded.
Need to get 239 kB of archives.
After this operation, 0 B of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 libexpat1 amd64 2.4.7-1ubuntu0.4 [91.2 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 libexpat

In [None]:
!ls /usr/lib/x86_64-linux-gnu

#### Scoring Script

In [48]:
from Bio import AlignIO
from Bio.Align import MultipleSeqAlignment
from itertools import combinations
import sys

def reorder_alignment(reference, test):
    """
    Reorder the test alignment to match the order of the reference alignment by sequence IDs.
    """
    ref_ids = [record.id for record in reference]
    test_dict = {record.id: record for record in test}

    # Ensure all IDs in the reference are present in the test
    if not all(ref_id in test_dict for ref_id in ref_ids):
        missing = [ref_id for ref_id in ref_ids if ref_id not in test_dict]
        print(f"Error: Missing sequences in test alignment: {', '.join(missing)}")
        sys.exit(1)

    # Reorder test alignment to match reference
    reordered_test = MultipleSeqAlignment([test_dict[ref_id] for ref_id in ref_ids])
    return reordered_test

def calculate_sp_score(reference, test):
    """
    Calculate Sum-of-Pairs (SP) score for a test alignment given a reference alignment.
    """
    sp_matches = 0
    total_pairs = 0

    # Use the minimum number of columns to avoid out-of-range errors
    num_columns = min(reference.get_alignment_length(), test.get_alignment_length())

    for col in range(num_columns):
        ref_col = [seq[col] for seq in reference]
        test_col = [seq[col] for seq in test]

        # Generate all pairwise combinations for this column
        ref_pairs = set(combinations(ref_col, 2))
        test_pairs = set(combinations(test_col, 2))

        # Count matching pairs
        sp_matches += len(ref_pairs & test_pairs)
        total_pairs += len(ref_pairs)

    sp_score = sp_matches / total_pairs if total_pairs > 0 else 0
    return sp_score

def calculate_tc_score(reference, test):
    """
    Calculate Total Column (TC) score for a test alignment given a reference alignment.
    """
    column_matches = 0

    # Use the minimum number of columns to avoid out-of-range errors
    num_columns = min(reference.get_alignment_length(), test.get_alignment_length())

    for col in range(num_columns):
        ref_col = [seq[col] for seq in reference]
        test_col = [seq[col] for seq in test]

        if ref_col == test_col:
            column_matches += 1

    tc_score = column_matches / num_columns if num_columns > 0 else 0
    return tc_score

def score_files(ref_file, test_file):
    # Load the alignments
    reference = AlignIO.read(ref_file, "msf")
    test = AlignIO.read(test_file, "msf")

    # Ensure the alignments are loaded as MultipleSeqAlignment objects
    if not isinstance(reference, MultipleSeqAlignment) or not isinstance(test, MultipleSeqAlignment):
        print("Error: Failed to parse the alignments as MultipleSeqAlignment objects.")
        sys.exit(1)

    # Reorder the test alignment to match the reference
    reordered_test = reorder_alignment(reference, test)

    # Calculate scores
    sp_score = calculate_sp_score(reference, reordered_test)
    tc_score = calculate_tc_score(reference, reordered_test)

    # Output results
    return {"SP": sp_score, "TC": tc_score}

#### Chat Response

In [51]:
import os
import subprocess
from Bio import AlignIO, SeqIO
from Bio.Align import MultipleSeqAlignment
import pandas as pd

# Define the base directory for your files
file_path = 'bb3_release/RV11'

# List all files with .tfa extension and remove the extension
test_cases = [os.path.splitext(file)[0] for file in os.listdir(file_path) if file.endswith('.tfa')]

# Alignment tools with direct command-line calls
alignment_tools = {
    # 'mafft': {
    #     'command': lambda input_file, output_file: f"mafft --auto --inputorder {input_file} > {output_file}",
    #     'output_ext': '_mafft_aligned.fasta'
    # },
    'muscle': {
        'command': lambda input_file, output_file: f"muscle -in {input_file} -out {output_file} -msf",
        'output_ext': '_muscle_aligned.msf'
    },
    'clustalw': {
        'command': lambda input_file, output_file: f"clustalw -INFILE={input_file} -OUTFILE={output_file} -OUTPUT=GCG/MSF",
        'output_ext': '_clustalw_aligned.msf'
    }
}

def get_original_ids(tfa_file):
    original_ids = []
    for record in SeqIO.parse(tfa_file, 'fasta'):
        original_ids.append(record.id)
    return original_ids

def sort_alignment(input_alignment_file, output_alignment_file, file_format, original_ids):
    # Read the alignment
    alignment = AlignIO.read(input_alignment_file, file_format)

    # Create a dictionary of records with IDs as keys
    record_dict = {record.id: record for record in alignment}

    # Reorder the records based on original IDs
    sorted_records = []
    for oid in original_ids:
        if oid in record_dict:
            record = record_dict[oid]
            # Ensure the record ID matches the original ID exactly
            record.id = oid
            sorted_records.append(record)
        else:
            print(f"Warning: ID {oid} not found in alignment.")
            return None  # Alignment IDs don't match

    # Create a new MultipleSeqAlignment object
    sorted_alignment = MultipleSeqAlignment(sorted_records)

    # Write the sorted alignment to a file
    AlignIO.write(sorted_alignment, output_alignment_file, file_format)

    return sorted_alignment

# Function to run bali_score
def run_bali_score(reference_msf, test_alignment, score_output):
    score = score_files(reference_msf, test_alignment)
    print(score)
    with open(score_output, 'w') as f_out:
        f_out.write(str(score))
    return process

# Main benchmarking loop
results = []

for test_case in test_cases:
    tfa_file = os.path.join(file_path, f'{test_case}.tfa')
    msf_file = os.path.join(file_path, f'{test_case}.msf')

    # Check if files exist
    if not os.path.exists(tfa_file) or not os.path.exists(msf_file):
        print(f'Files for {test_case} not found.')
        continue

    # Get original sequence IDs
    original_ids = get_original_ids(tfa_file)

    for tool_name, tool_info in alignment_tools.items():
        output_file = os.path.join(f'{file_path}/results', f'{test_case}{tool_info["output_ext"]}')
        # sorted_output_file = os.path.join(f'{file_path}/results', f'{test_case}_{tool_name}_sorted.msf')
        score_output = os.path.join(f'{file_path}/results', f'{test_case}_{tool_name}_score.txt')

        print(f'Running {tool_name} on {test_case}...')

        # Construct the command
        command = tool_info['command'](tfa_file, output_file)
        print(f'Executing command: {command}')

        # Run the alignment tool
        process = subprocess.run(command, shell=True, capture_output=True, text=True)

        # Check for errors
        if process.returncode != 0:
            print(f"Error running {tool_name} on {test_case}:")
            print(process.stderr)
            continue

        # Sort the alignment
        # sorted_alignment = sort_alignment(output_file, sorted_output_file, 'fasta', original_ids)

        # if sorted_alignment is None:
        #     print(f"Error sorting alignment for {test_case} with {tool_name}.")
        #     continue
        
        # # Convert the sorted FASTA alignment to MSF using seqret (part of EMBOSS)
        # msf_output_file = sorted_output_file.replace("_sorted.fasta", "_sorted.msf")
        # seqret_command = f"seqret -sequence {sorted_output_file} -outseq {msf_output_file} -osformat2 msf"
        
        # seqret_process = subprocess.run(seqret_command, shell=True, capture_output=True, text=True)
        # if seqret_process.returncode != 0:
        #     print("Error converting to MSF format:")
        #     print(seqret_process.stderr)
        #     continue

        # Now run bali_score using the MSF file if needed, or continue using the FASTA file
        # This depends on what format bali_score expects. If bali_score can handle FASTA directly, use sorted_output_file.
        # If it requires MSF, use msf_output_file instead:
        # process = run_bali_score(xml_file, sorted_output_file, score_output)
        # If you want to use MSF instead:
        process = run_bali_score(msf_file, output_file, score_output)

        # Check for errors
        if process.returncode != 0:
            print(f"Error running bali_score on {test_case} with {tool_name}:")
            print(process.stderr)
            continue
        
        # Read the score
        with open(score_output, 'r') as f:
            score = f.read().strip()

        # Append to results
        results.append({
            'test_case': test_case,
            'tool': tool_name,
            'score': score
        })

        print(f'Results for {tool_name} on {test_case}:')
        print(score)
        print('---')

# Display results in a DataFrame
df = pd.DataFrame(results)
print(df)


Running muscle on BB11031...
Executing command: muscle -in bb3_release/RV11/BB11031.tfa -out bb3_release/RV11/results/BB11031_muscle_aligned.msf -msf
{'SP': 0.08151774785801713, 'TC': 0.0028208744710860366}
Results for muscle on BB11031:
{'SP': 0.08151774785801713, 'TC': 0.0028208744710860366}
---
Running clustalw on BB11031...
Executing command: clustalw -INFILE=bb3_release/RV11/BB11031.tfa -OUTFILE=bb3_release/RV11/results/BB11031_clustalw_aligned.msf -OUTPUT=GCG/MSF
{'SP': 0.11699415029248537, 'TC': 0.07528641571194762}
Results for clustalw on BB11031:
{'SP': 0.11699415029248537, 'TC': 0.07528641571194762}
---
Running muscle on BB11025...
Executing command: muscle -in bb3_release/RV11/BB11025.tfa -out bb3_release/RV11/results/BB11025_muscle_aligned.msf -msf
{'SP': 0.05719557195571956, 'TC': 0.0}
Results for muscle on BB11025:
{'SP': 0.05719557195571956, 'TC': 0.0}
---
Running clustalw on BB11025...
Executing command: clustalw -INFILE=bb3_release/RV11/BB11025.tfa -OUTFILE=bb3_release

### HMM REIMPLEMENTATION WORKS YIELDS NON -inf and no infinite loop

In [None]:
from Bio import SeqIO
from collections import Counter
import numpy as np
import math
from typing import List, Tuple, Dict

class ProfileHMM:
    """
    A Profile Hidden Markov Model for Multiple Sequence Alignment.
    """

    def __init__(self, pseudocount: float = 0.1):
        """
        Initialize the Profile HMM.
        :param pseudocount: Small value to prevent zero probabilities.
        """
        self.transition_probabilities: Dict[str, float] = {}  # Transition probabilities between states
        self.match_emissions: Dict[int, Dict[str, float]] = {}  # Emission probabilities for Match states
        self.alphabet: List[str] = []  # Set of characters in the sequences
        self.pseudocount = pseudocount

    def train(self, aligned_sequences: List[str]):
        """
        Train the Profile HMM on a set of aligned sequences.
        :param aligned_sequences: List of pre-aligned sequences.
        """
        # Extract the alphabet (excluding gaps)
        self.alphabet = sorted(list(set("".join(aligned_sequences).replace("-", ""))))

        seq_length = len(aligned_sequences[0])  # Length of alignment
        num_sequences = len(aligned_sequences)

        # Initialize match emission counts with pseudocounts
        match_counts = {pos: {char: self.pseudocount for char in self.alphabet}
                       for pos in range(seq_length)}
        # Removed insert_counts since Insert (I) states are not used

        # Count occurrences in match states
        for seq in aligned_sequences:
            model_pos = 0  # Current position in the model (Match states)
            for pos in range(seq_length):
                if seq[pos] != '-':
                    # Match state emission
                    match_counts[model_pos][seq[pos]] += 1
                    model_pos += 1
                else:
                    # Deletion: Do not count insertions; handle deletions in transitions
                    pass  # No action needed for deletions in emissions

        # Calculate match state emission probabilities
        self.match_emissions = {
            pos: {char: count / sum(char_counts.values())
                  for char, count in char_counts.items()}
            for pos, char_counts in match_counts.items()
        }

        # Estimate transition probabilities
        self._estimate_transitions(aligned_sequences)

    def _estimate_transitions(self, aligned_sequences: List[str]):
        """
        Estimate transition probabilities using sequence data, including pseudocounts for unseen transitions.
        """
        transition_counts = Counter()

        # Define possible transitions
        possible_transitions = {
            'START': ['M'],
            'M': ['M', 'D', 'END'],
            'D': ['M', 'D', 'END']
        }

        # Count transitions based on sequences
        for seq in aligned_sequences:
            prev_state = 'START'
            for pos in range(len(seq)):
                if seq[pos] != '-':
                    curr_state = 'M'
                else:
                    curr_state = 'D'
                transition_key = f"{prev_state}->{curr_state}"
                transition_counts[transition_key] += 1
                prev_state = curr_state
            # After the end of the sequence, transition to END
            transition_key = f"{prev_state}->END"
            transition_counts[transition_key] += 1

        # Add pseudocounts for all possible transitions
        for from_state, to_states in possible_transitions.items():
            for to_state in to_states:
                transition = f"{from_state}->{to_state}"
                if transition not in transition_counts:
                    transition_counts[transition] = self.pseudocount
                else:
                    transition_counts[transition] += self.pseudocount  # Add pseudocount to existing counts

        # Normalize transition counts to probabilities
        self.transition_probabilities = {}
        from_states = set(transition.split('->')[0] for transition in transition_counts)
        for from_state in from_states:
            transitions_from_state = {trans: count for trans, count in transition_counts.items()
                                      if trans.startswith(f"{from_state}->")}
            total = sum(transitions_from_state.values())
            for trans, count in transitions_from_state.items():
                self.transition_probabilities[trans] = count / total

    def viterbi_alignment(self, sequence: str) -> Tuple[float, List[str]]:
        """
        Perform Viterbi alignment of a sequence to the profile HMM.
        :param sequence: Sequence to align.
        :return: Tuple of (log-likelihood, aligned sequence).
        """
        seq_length = len(sequence)
        model_length = len(self.match_emissions)

        # Define states
        states = ['M', 'D']  # Only Match and Deletion states
        state_indices = {state: idx for idx, state in enumerate(states)}
        num_states = len(states)

        # Initialize Viterbi matrices
        # Dimensions: (model_length +1) x num_states x (seq_length +1)
        # Using log probabilities to prevent underflow
        log_viterbi = np.full((model_length + 1, num_states, seq_length + 1), -np.inf)
        backtrack = np.full((model_length + 1, num_states, seq_length + 1), None, dtype=object)

        # Start state: Initialize transition from START to M
        if 'START->M' in self.transition_probabilities:
            log_viterbi[0, state_indices['M'], 0] = math.log(self.transition_probabilities['START->M'])
            backtrack[0, state_indices['M'], 0] = ('START', 0, 0)
        else:
            print("No transition from START to M state.")
            return -np.inf, []

        # Iterate over the model positions
        for model_pos in range(0, model_length + 1):
            for seq_pos in range(0, seq_length + 1):
                for current_state in states:
                    current_idx = state_indices[current_state]

                    # Determine emission and movement based on current state
                    if current_state == 'M':
                        if model_pos == model_length:
                            continue  # No Match state after the last position
                        if seq_pos == 0:
                            continue  # Cannot emit without a sequence character
                        emission_char = sequence[seq_pos - 1]
                        emission_prob = self.match_emissions[model_pos].get(emission_char, self.pseudocount)
                        emission_log = math.log(emission_prob)
                        prev_seq_pos = seq_pos - 1
                        prev_model_pos = model_pos - 1
                    elif current_state == 'D':
                        emission_log = 0  # No emission for Delete
                        prev_seq_pos = seq_pos
                        prev_model_pos = model_pos - 1

                    # Iterate over possible previous states
                    for prev_state in states + ['START']:
                        if model_pos == 0 and prev_state != 'START':
                            continue  # Only START can transition to the first state
                        if model_pos > 0 and prev_state == 'START':
                            continue  # START only transitions at model_pos == 0

                        # Determine transition
                        transition = f"{prev_state}->{current_state}"
                        if transition not in self.transition_probabilities:
                            continue  # Transition not possible

                        trans_prob = self.transition_probabilities[transition]
                        trans_log = math.log(trans_prob)

                        # Get previous Viterbi score
                        if prev_state == 'START':
                            prev_score = 0  # log(1) = 0
                        else:
                            prev_idx = state_indices.get(prev_state, None)
                            if prev_idx is None:
                                continue
                            prev_score = log_viterbi[prev_model_pos, prev_idx, prev_seq_pos]
                            if prev_score == -np.inf:
                                continue  # No valid path

                        # Calculate new score
                        score = prev_score + trans_log + emission_log

                        # Update Viterbi matrix if new score is better
                        if score > log_viterbi[model_pos, current_idx, seq_pos]:
                            log_viterbi[model_pos, current_idx, seq_pos] = score
                            backtrack[model_pos, current_idx, seq_pos] = (prev_model_pos, state_indices.get(prev_state, -1), prev_seq_pos)

        # Transition to END state
        best_score = -np.inf
        best_end_state = None

        for state in states:
            state_idx = state_indices[state]
            transition = f"{state}->END"
            if transition not in self.transition_probabilities:
                continue
            trans_prob = self.transition_probabilities[transition]
            trans_log = math.log(trans_prob)
            score = log_viterbi[model_length, state_idx, seq_length] + trans_log
            if score > best_score:
                best_score = score
                best_end_state = state_idx

        if best_end_state is None:
            return -np.inf, []

        # Trace back to find the best path
        best_path = []
        curr_pos = (model_length, best_end_state, seq_length)

        while curr_pos != (0, state_indices['M'], 0):
            model_pos, state_idx, seq_pos = curr_pos
            state = list(state_indices.keys())[list(state_indices.values()).index(state_idx)]
            best_path.append((model_pos, state, seq_pos))
            prev = backtrack[model_pos, state_idx, seq_pos]
            if prev is None:
                break
            prev_model_pos, prev_state_idx, prev_seq_pos = prev
            if prev_state_idx == -1:
                break
            curr_pos = (prev_model_pos, prev_state_idx, prev_seq_pos)

        best_path.reverse()

        # Construct the aligned sequence
        aligned_sequence = []
        for pos, state, s_pos in best_path:
            if state == 'M':
                aligned_sequence.append(sequence[s_pos - 1] if s_pos > 0 else '-')
            elif state == 'D':
                aligned_sequence.append('-')

        return best_score, aligned_sequence

    def score_alignment(self, alignment: List[str]) -> float:
        """
        Score a multiple sequence alignment.
        :param alignment: List of aligned sequences.
        :return: Alignment score.
        """
        total_score = 0.0

        for seq in alignment:
            seq_score = self._sequence_likelihood(seq)
            total_score += seq_score

        return total_score

    def _sequence_likelihood(self, sequence: str) -> float:
        """
        Calculate the likelihood of a sequence given the profile HMM.
        :param sequence: Sequence to score.
        :return: Log-likelihood of the sequence.
        """
        log_likelihood = 0.0

        model_length = len(self.match_emissions)
        model_pos = 0

        for pos in range(len(sequence)):
            char = sequence[pos]
            if model_pos < model_length and char != '-':
                if char in self.match_emissions[model_pos]:
                    log_likelihood += math.log(self.match_emissions[model_pos][char])
                else:
                    log_likelihood += math.log(self.pseudocount)
                model_pos += 1
            else:
                # Deletion state: no emission, probability is 1 (log(1) = 0)
                log_likelihood += 0

        return log_likelihood

    def generate_consensus(self) -> str:
        """
        Generate a consensus sequence based on emission probabilities.
        :return: Consensus sequence.
        """
        consensus = []
        for pos in range(len(self.match_emissions)):
            most_probable_char = max(
                self.match_emissions[pos], key=self.match_emissions[pos].get
            )
            consensus.append(most_probable_char)

        return ''.join(consensus)

    def print_model(self):
        """
        Print the trained HMM model's parameters.
        """
        print("Transition Probabilities:")
        for trans, prob in self.transition_probabilities.items():
            print(f"  {trans}: {prob:.4f}")

        print("\nEmission Probabilities (Match States):")
        for pos, emissions in self.match_emissions.items():
            print(f"  Position {pos + 1}:")
            for char, prob in emissions.items():
                print(f"    {char}: {prob:.4f}")
# Example usage
def main():
    # Define aligned sequences
    aligned_sequences = [
        'ACGT-ATCG',
        'ACGTT-TCG',
        'A-GTTA-CG',
        'ACGT--TCG'
    ]

    # Create and train Profile HMM
    hmm = ProfileHMM()
    hmm.train(aligned_sequences)

    # Print the trained model
    hmm.print_model()

    # Generate consensus sequence
    consensus = hmm.generate_consensus()
    print("\nConsensus Sequence:", consensus)

    # Score existing alignment
    alignment_score = hmm.score_alignment(aligned_sequences)
    print("Alignment Score:", alignment_score)

    # Align a new sequence
    new_sequence = 'ACGTTACG'
    alignment_log_likelihood, alignment = hmm.viterbi_alignment(new_sequence)
    print("New Sequence Alignment:", ''.join(alignment))
    print("Alignment Log-Likelihood:", alignment_log_likelihood)

if __name__ == '__main__':
    main()


Transition Probabilities:
  D->M: 0.8095
  D->D: 0.1746
  D->END: 0.0159
  START->M: 1.0000
  M->M: 0.6964
  M->D: 0.1683
  M->END: 0.1353

Emission Probabilities (Match States):
  Position 1:
    A: 0.9318
    C: 0.0227
    G: 0.0227
    T: 0.0227
  Position 2:
    A: 0.0227
    C: 0.7045
    G: 0.2500
    T: 0.0227
  Position 3:
    A: 0.0227
    C: 0.0227
    G: 0.7045
    T: 0.2500
  Position 4:
    A: 0.0227
    C: 0.0227
    G: 0.0227
    T: 0.9318
  Position 5:
    A: 0.4773
    C: 0.0227
    G: 0.0227
    T: 0.4773
  Position 6:
    A: 0.0227
    C: 0.4773
    G: 0.0227
    T: 0.4773
  Position 7:
    A: 0.0227
    C: 0.4773
    G: 0.4773
    T: 0.0227
  Position 8:
    A: 0.0417
    C: 0.0417
    G: 0.8750
    T: 0.0417
  Position 9:
    A: 0.2500
    C: 0.2500
    G: 0.2500
    T: 0.2500

Consensus Sequence: ACGTACCGA
Alignment Score: -14.5818129761372
New Sequence Alignment: ACGTTACG--
Alignment Log-Likelihood: -16.442099469229454


In [None]:
!pip install Bio

Collecting Bio
  Downloading bio-1.7.1-py3-none-any.whl.metadata (5.7 kB)
Collecting biopython>=1.80 (from Bio)
  Downloading biopython-1.84-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting gprofiler-official (from Bio)
  Downloading gprofiler_official-1.0.0-py3-none-any.whl.metadata (11 kB)
Collecting mygene (from Bio)
  Downloading mygene-3.2.2-py2.py3-none-any.whl.metadata (10 kB)
Collecting biothings-client>=0.2.6 (from mygene->Bio)
  Downloading biothings_client-0.3.1-py2.py3-none-any.whl.metadata (9.8 kB)
Downloading bio-1.7.1-py3-none-any.whl (280 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.0/281.0 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading biopython-1.84-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m49.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading gprofiler_official-1.0.0-py3-none-any.whl

In [None]:
#alternative implementation of profile HMM
from Bio import SeqIO
from collections import defaultdict
import math

class ProfileHMM:
    def __init__(self, alphabet=None, pseudocount=0.1):
        """
        Initialize Profile HMM

        Args:
            alphabet (list): List of characters in the sequence alphabet
            pseudocount (float): Small value added to prevent zero probabilities
        """
        self.alphabet = alphabet or []
        self.pseudocount = pseudocount

        # Transition probabilities between states
        self.transition_probs = {
            'start': defaultdict(float),
            'match': defaultdict(float),
            'insert': defaultdict(float),
            'delete': defaultdict(float),
            'end': defaultdict(float)
        }

        # Emission probabilities for match states
        self.emission_probs = []

        # Model parameters
        self.sequence_length = 0

    def train(self, aligned_sequences):
        """
        Train the Profile HMM using aligned sequences

        Args:
            aligned_sequences (list): List of aligned sequences
        """
        if not aligned_sequences:
            raise ValueError("No sequences provided for training")

        # Determine alphabet and sequence length
        self.alphabet = list(set("".join(aligned_sequences).replace("-", "")))
        self.sequence_length = len(aligned_sequences[0])

        # Initialize count matrices
        match_counts = [defaultdict(int) for _ in range(self.sequence_length)]
        insert_counts = [defaultdict(int) for _ in range(self.sequence_length)]
        transition_counts = {
            'start': defaultdict(int),
            'match': defaultdict(int),
            'insert': defaultdict(int),
            'delete': defaultdict(int),
            'end': defaultdict(int)
        }

        # Count occurrences in the alignment
        for seq in aligned_sequences:
            state = 'start'
            for pos, char in enumerate(seq):
                if char == '-':
                    # Deletion state
                    if state == 'match':
                        transition_counts['match']['delete'] += 1
                        state = 'delete'
                    elif state == 'delete':
                        transition_counts['delete']['delete'] += 1
                    elif state == 'start':
                        transition_counts['start']['delete'] += 1
                else:
                    # Match or Insert state
                    if state == 'start':
                        transition_counts['start']['match'] += 1
                        state = 'match'
                    elif state == 'delete':
                        transition_counts['delete']['match'] += 1
                        state = 'match'

                    # Update counts
                    if state == 'match':
                        match_counts[pos][char] += 1
                        transition_counts['match']['match'] += 1
                    elif state == 'insert':
                        insert_counts[pos][char] += 1
                        transition_counts['insert']['insert'] += 1

            # Handle end state transitions
            transition_counts[state]['end'] += 1

        # Calculate probabilities with pseudocounts
        def calculate_prob_dict(counts):
            total = sum(counts.values()) + len(self.alphabet) * self.pseudocount
            return {k: (counts.get(k, 0) + self.pseudocount) / total for k in self.alphabet}

        # Emission probabilities for match states
        self.emission_probs = [calculate_prob_dict(counts) for counts in match_counts]

        # Transition probabilities
        def normalize_transitions(counts):
            total = sum(counts.values()) + self.pseudocount * len(counts)
            return {k: (v + self.pseudocount) / total for k, v in counts.items()}

        self.transition_probs = {
            k: normalize_transitions(v) for k, v in transition_counts.items()
        }

    def score_sequence(self, sequence):
        """
        Calculate the log probability of a sequence given the HMM

        Args:
            sequence (str): Sequence to score

        Returns:
            float: Log probability of the sequence
        """
        if len(sequence) != self.sequence_length:
            raise ValueError(f"Sequence length must be {self.sequence_length}")

        log_prob = 0.0
        state = 'start'

        for pos, char in enumerate(sequence):
            # Transition probability
            if state == 'start':
                log_prob += math.log(self.transition_probs['start']['match'])
                state = 'match'

            # Emission probability for match state
            log_prob += math.log(self.emission_probs[pos].get(char, self.pseudocount))

            # Transition to next state (simplified)
            if char == '-':
                log_prob += math.log(self.transition_probs['match']['delete'])
                state = 'delete'
            else:
                log_prob += math.log(self.transition_probs['match']['match'])

        # End state transition
        log_prob += math.log(self.transition_probs[state]['end'])

        return log_prob

    def print_model(self):
        """
        Print the model parameters
        """
        print("Transition Probabilities:")
        for state, transitions in self.transition_probs.items():
            print(f"{state}: {transitions}")

        print("\nEmission Probabilities:")
        for pos, probs in enumerate(self.emission_probs):
            print(f"Position {pos}: {probs}")

def load_aligned_sequences(file_path):
    """
    Load aligned sequences from a FASTA file

    Args:
        file_path (str): Path to the FASTA file

    Returns:
        list: List of aligned sequences
    """
    try:
        aligned_sequences = [str(record.seq) for record in SeqIO.parse(file_path, "fasta")]

        # Validate that all sequences have the same length
        if len(set(len(seq) for seq in aligned_sequences)) > 1:
            raise ValueError("Sequences in the alignment must have equal length")

        return aligned_sequences

    except FileNotFoundError:
        print(f"Error: File {file_path} not found.")
        return []
    except Exception as e:
        print(f"An error occurred while loading sequences: {e}")
        return []

def main():
    # Load the aligned sequences from a FASTA file
    file_path = "aligned_c2_domain.fasta"
    aligned_sequences = load_aligned_sequences(file_path)

    if not aligned_sequences:
        print("No sequences to process.")
        return

    # Train and display the Profile HMM
    hmm = ProfileHMM()
    hmm.train(aligned_sequences)

    # Print model details
    print("\n--- Profile HMM Model Details ---")
    hmm.print_model()

    # Score individual sequences
    print("\n--- Sequence Scoring ---")
    for i, seq in enumerate(aligned_sequences[:5], 1):
        try:
            score = hmm.score_sequence(seq)
            print(f"Sequence {i} score: {score}")
        except Exception as e:
            print(f"Error scoring sequence {i}: {e}")

if __name__ == "__main__":
    main()


--- Profile HMM Model Details ---
Transition Probabilities:
start: {'delete': 0.956081081081081, 'match': 0.04391891891891892}
match: {'match': 0.9202716820260541, 'delete': 0.07957477427571027, 'end': 0.0001535436982354517}
insert: {}
delete: {'delete': 0.9691356655180507, 'match': 0.030512860333982178, 'end': 0.00035147414796711323}
end: {}

Emission Probabilities:
Position 0: {'P': 0.03333333333333333, 'S': 0.03333333333333333, 'C': 0.03333333333333333, 'L': 0.03333333333333333, 'D': 0.03333333333333333, 'A': 0.03333333333333333, 'Q': 0.03333333333333333, 'N': 0.03333333333333333, 'F': 0.03333333333333333, 'R': 0.03333333333333333, 'E': 0.03333333333333333, 'Y': 0.03333333333333333, 'T': 0.03333333333333333, 'K': 0.03333333333333333, 'I': 0.03333333333333333, 'W': 0.03333333333333333, 'V': 0.03333333333333333, 'M': 0.3666666666666667, 'H': 0.03333333333333333, 'G': 0.03333333333333333}
Position 1: {'P': 0.03333333333333333, 'S': 0.3666666666666667, 'C': 0.03333333333333333, 'L': 0.

# PyHMMR for Validation

In [None]:
# Install necessary Python libraries
!pip install biopython pyhmmer


Collecting biopython
  Downloading biopython-1.84-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting pyhmmer
  Downloading pyhmmer-0.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Downloading biopython-1.84-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyhmmer-0.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.4/13.4 MB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyhmmer, biopython
Successfully installed biopython-1.84 pyhmmer-0.10.15


In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# Compatibility: Ensure Python is installed natively for ARM (M2 MacBook)
!python3 --version

# Reinstall pyhmmer with forced compilation (if needed)
!pip install --no-binary pyhmmer pyhmmer

# Install psutil library (macOS-specific, skip if not applicable)
!brew install psutil


Python 3.10.12
/bin/bash: line 1: brew: command not found


In [None]:
from pyhmmer import easel, plan7
from google.colab import drive
import os

# Step 1: Mount Google Drive
drive.mount('/content/drive')

# Step 2: Define the amino acid alphabet
alphabet = easel.Alphabet.amino()

# Step 3: Load the multiple sequence alignment
msa_path = "/content/drive/MyDrive/Final Project Files/aligned_c2_domain.fasta"

# Ensure the alignment file exists
if not os.path.exists(msa_path):
    raise FileNotFoundError(f"Alignment file not found: {msa_path}")

# Load the MSA and assign a name
with easel.MSAFile(msa_path, digital=True, alphabet=alphabet) as msa_file:
    msa = msa_file.read()
msa.name = b"c2_domain"

# Step 4: Build the HMM
builder = plan7.Builder(alphabet)
background = plan7.Background(alphabet)
hmm, _, _ = builder.build_msa(msa, background)

# Save the HMM
output_hmm_path = "/content/drive/MyDrive/Final Project Files/c2_domain.hmm"
with open(output_hmm_path, "wb") as output_file:
    hmm.write(output_file)

# Step 5: Remove gaps from the sequence database
original_sequence_path = "/content/drive/MyDrive/Final Project Files/aligned_c2_domain.fasta"
clean_file_path = "/content/drive/MyDrive/Final Project Files/cleaned_c2_domain.fasta"

# Clean the sequence file
if not os.path.exists(original_sequence_path):
    raise FileNotFoundError(f"Sequence file not found: {original_sequence_path}")

with open(original_sequence_path, "r") as input_file, open(clean_file_path, "w") as output_file:
    for line in input_file:
        if line.startswith(">"):
            output_file.write(line)  # Write headers as is
        else:
            output_file.write(line.replace("-", "").strip() + "\n")  # Remove gaps

# Step 6: Run the pipeline with the cleaned sequence file
sequence_database_path = clean_file_path
pipeline = plan7.Pipeline(alphabet, background=background)

# Ensure the cleaned file exists
if not os.path.exists(sequence_database_path):
    raise FileNotFoundError(f"Cleaned sequence file not found: {sequence_database_path}")

with easel.SequenceFile(sequence_database_path, digital=True, alphabet=alphabet) as seq_file:
    hits = pipeline.search_hmm(hmm, seq_file)

# Step 7: Print hits and save results
results_path = "/content/drive/MyDrive/Final Project Files/c2_domain_results.domtbl"

with open(results_path, "w") as f:
    # Write header
    f.write("Sequence\tScore\tE-value\n")
    # Write results
    for hit in hits:
        name = hit.name.decode("ascii")
        score = hit.score
        evalue = hit.evalue
        f.write(f"{name}\t{score:.2f}\t{evalue:.2g}\n")
        print(f"Sequence: {name}, Score: {score:.2f}, E-value: {evalue:.2g}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Sequence: sp|Q14644|RASA3_HUMAN, Score: 477.29, E-value: 2.5e-145
Sequence: sp|Q15283|RASA2_HUMAN, Score: 457.65, E-value: 2.2e-139
Sequence: sp|O95294|RASL1_HUMAN, Score: 457.33, E-value: 2.7e-139
Sequence: sp|P17252|KPCA_HUMAN, Score: 384.97, E-value: 2.1e-117
Sequence: sp|P63319|KPCG_RAT, Score: 384.80, E-value: 2.3e-117
Sequence: sp|Q4KWH8|PLCH1_HUMAN, Score: 366.59, E-value: 7.5e-112
Sequence: sp|Q3UHC7|DAB2P_MOUSE, Score: 354.61, E-value: 3.2e-108
Sequence: sp|P47712|PA24A_HUMAN, Score: 323.70, E-value: 7.1e-99
Sequence: sp|P47713|PA24A_MOUSE, Score: 321.66, E-value: 2.9e-98
Sequence: sp|Q99829|CPNE1_HUMAN, Score: 320.95, E-value: 4.8e-98
Sequence: sp|Q96FN4|CPNE2_HUMAN, Score: 316.75, E-value: 9e-97
Sequence: sp|Q9JIR4|RIMS1_RAT, Score: 311.27, E-value: 4.1e-95
Sequence: sp|P0C871|PA24B_MOUSE, Score: 301.52, E-value: 3.6e-92
Sequence: sp|O95741|CPNE6_H

THIS WORKS AS OF WED 430

In [None]:
from Bio import SeqIO
from collections import Counter
import math
import numpy as np

# Load aligned sequences from FASTA file
aligned_file = "/content/drive/MyDrive/Final Project Files/aligned_c2_domain.fasta"
aligned_sequences = [str(record.seq) for record in SeqIO.parse(aligned_file, "fasta")]

# ProfileHMM implementation (as provided previously)
class ProfileHMM:
    def __init__(self, pseudocount: float = 0.1):
        self.transition_probabilities = {}
        self.emission_probabilities = {}
        self.alphabet = []
        self.pseudocount = pseudocount

    def train(self, aligned_sequences):
        self.alphabet = sorted(list(set("".join(aligned_sequences).replace("-", ""))))
        match_counts = Counter()
        transition_counts = Counter()

        for seq in aligned_sequences:
            prev_state = 'START'
            for c in seq:
                curr_state = 'M' if c != '-' else 'D'
                if curr_state == 'M':
                    match_counts[c] += 1
                transition_counts[f"{prev_state}->{curr_state}"] += 1
                prev_state = curr_state
            transition_counts[f"{prev_state}->END"] += 1

        total_matches = sum(match_counts.values())
        self.emission_probabilities = {
            char: (match_counts[char] + self.pseudocount) / (total_matches + len(self.alphabet) * self.pseudocount)
            for char in self.alphabet
        }

        from_states = set(t.split("->")[0] for t in transition_counts)
        for state in from_states:
            transitions = {t: c for t, c in transition_counts.items() if t.startswith(f"{state}->")}
            total = sum(transitions.values())
            for t, c in transitions.items():
                self.transition_probabilities[t] = c / total

    def viterbi_alignment(self, sequence: str):
        """
        Perform Viterbi alignment of a sequence to the profile HMM.
        :param sequence: Sequence to align.
        :return: Tuple of (log-likelihood, aligned sequence).
        """
        seq_len = len(sequence)
        states = ['M', 'D']  # Match and Deletion states
        state_indices = {state: i for i, state in enumerate(states)}
        num_states = len(states)

        # Initialize Viterbi and backtrack matrices
        # Dimensions: (seq_len + 1) x num_states
        log_viterbi = np.full((seq_len + 1, num_states), -np.inf)
        backtrack = np.full((seq_len + 1, num_states), None, dtype=object)

        # Initialize start state
        if 'START->M' in self.transition_probabilities:
            log_viterbi[0, state_indices['M']] = math.log(self.transition_probabilities['START->M'])
            backtrack[0, state_indices['M']] = 'START'
        else:
            print("No transition from START to M")
            return -np.inf, []

        # Fill Viterbi matrix
        for i in range(1, seq_len + 1):
            for j, state in enumerate(states):
                if state == 'M':
                    # Match state: emit sequence[i-1]
                    emission_char = sequence[i - 1]
                    emission_prob = self.emission_probabilities.get(emission_char, self.pseudocount)
                    emission_log = math.log(emission_prob)

                    # Consider transitions from all possible previous states
                    for prev_state in states:
                        trans_key = f"{prev_state}->{state}"
                        if trans_key in self.transition_probabilities:
                            trans_prob = self.transition_probabilities[trans_key]
                            trans_log = math.log(trans_prob)
                            score = log_viterbi[i - 1, state_indices[prev_state]] + trans_log + emission_log
                            if score > log_viterbi[i, j]:
                                log_viterbi[i, j] = score
                                backtrack[i, j] = prev_state
                elif state == 'D':
                    # Deletion state: insert gap, do not consume sequence
                    # Emission probability is 1 (log(1) = 0)
                    emission_log = 0

                    # Consider transitions from all possible previous states
                    for prev_state in states:
                        trans_key = f"{prev_state}->{state}"
                        if trans_key in self.transition_probabilities:
                            trans_prob = self.transition_probabilities[trans_key]
                            trans_log = math.log(trans_prob)
                            score = log_viterbi[i - 1, state_indices[prev_state]] + trans_log + emission_log
                            if score > log_viterbi[i, j]:
                                log_viterbi[i, j] = score
                                backtrack[i, j] = prev_state

        # Termination: transition to END
        best_score = -np.inf
        best_end_state = None
        for j, state in enumerate(states):
            trans_key = f"{state}->END"
            if trans_key in self.transition_probabilities:
                trans_prob = self.transition_probabilities[trans_key]
                trans_log = math.log(trans_prob)
                score = log_viterbi[seq_len, j] + trans_log
                if score > best_score:
                    best_score = score
                    best_end_state = j

        if best_end_state is None:
            return -np.inf, []

        # Backtrack to find alignment
        alignment = []
        current_state = best_end_state
        current_i = seq_len

        while current_i > 0:
            prev_state = backtrack[current_i, current_state]
            if prev_state == 'M':
                alignment.append(sequence[current_i - 1])
                current_state = state_indices['M']
                current_i -= 1
            elif prev_state == 'D':
                alignment.append('-')
                current_state = state_indices['D']
                # current_i remains the same
            elif prev_state == 'START':
                break
            else:
                break

        alignment.reverse()
        return best_score, alignment

    def print_model(self):
        print("Transition Probabilities:")
        for trans, prob in self.transition_probabilities.items():
            print(f"{trans}: {prob:.4f}")
        print("\nEmission Probabilities:")
        for char, prob in self.emission_probabilities.items():
            print(f"{char}: {prob:.4f}")

# Train the ProfileHMM on the aligned sequences
hmm = ProfileHMM()
hmm.train(aligned_sequences)
hmm.print_model()


Transition Probabilities:
M->M: 0.9134
M->D: 0.0865
M->END: 0.0002
D->D: 0.9690
D->M: 0.0306
D->END: 0.0003
START->D: 0.9714
START->M: 0.0286

Emission Probabilities:
A: 0.0583
C: 0.0188
D: 0.0553
E: 0.0747
F: 0.0411
G: 0.0568
H: 0.0263
I: 0.0434
K: 0.0684
L: 0.0967
M: 0.0213
N: 0.0371
P: 0.0616
Q: 0.0504
R: 0.0591
S: 0.0796
T: 0.0524
V: 0.0612
W: 0.0121
Y: 0.0255


In [None]:
# Load the MSA and assign a name
with easel.MSAFile(msa_path, digital=True, alphabet=alphabet) as msa_file:
    msa = msa_file.read()

# Assign a name to the MSA
msa.name = b"c2_domain"  # Name must be a byte string

# Build the HMM
builder = plan7.Builder(alphabet)
background = plan7.Background(alphabet)
pyhmmer_hmm, _, _ = builder.build_msa(msa, background)

# Save the HMM
pyhmmer_hmm_path = "/content/drive/MyDrive/Final Project Files/c2_domain.hmm"
with open(pyhmmer_hmm_path, "wb") as f:
    pyhmmer_hmm.write(f)



In [None]:
# Train your custom ProfileHMM
custom_hmm = ProfileHMM()
custom_hmm.train(aligned_sequences)

# Test sequences from the same dataset
test_file = "/content/drive/MyDrive/Final Project Files/unitpro_data.fasta"
test_sequences = [str(record.seq) for record in SeqIO.parse(test_file, "fasta")]

# Ensure `custom_hmm` refers to your ProfileHMM instance
profilehmm_scores = []
for sequence in test_sequences:
    # Use custom_hmm for your implementation
    score, _ = custom_hmm.viterbi_alignment(sequence)  # Custom ProfileHMM
    profilehmm_scores.append((sequence, score))

# Use pyhmmer pipeline for testing with the pyhmmer HMM
pipeline = plan7.Pipeline(alphabet, background=background)
pyhmmer_scores = []
with easel.SequenceFile(test_file, digital=True, alphabet=alphabet) as seq_file:
    hits = pipeline.search_hmm(pyhmmer_hmm, seq_file)

for hit in hits:
    pyhmmer_scores.append((hit.name.decode("ascii"), hit.score))





KeyboardInterrupt: 

In [None]:
# Compare top hits from both models
profilehmm_scores.sort(key=lambda x: x[1], reverse=True)
pyhmmer_scores.sort(key=lambda x: x[1], reverse=True)

# Save comparison to a file
results_path = "/content/drive/MyDrive/Final Project Files/comparison_results.txt"
with open(results_path, "w") as f:
    f.write("ProfileHMM Top Hits:\n")
    for seq, score in profilehmm_scores[:10]:
        f.write(f"Sequence: {seq[:30]}... Score: {score:.2f}\n")

    f.write("\npyhmmer Top Hits:\n")
    for name, score in pyhmmer_scores[:10]:
        f.write(f"Sequence: {name} Score: {score:.2f}\n")


NameError: name 'pyhmmer_scores' is not defined

In [None]:
import matplotlib.pyplot as plt

# Extract scores for comparison
profilehmm_top_scores = [score for _, score in profilehmm_scores[:10]]
pyhmmer_top_scores = [score for _, score in pyhmmer_scores[:10]]

# Plot the scores
plt.figure(figsize=(10, 6))
plt.plot(profilehmm_top_scores, label="ProfileHMM", marker="o")
plt.plot(pyhmmer_top_scores, label="pyhmmer", marker="x")
plt.xlabel("Rank")
plt.ylabel("Alignment Score")
plt.title("Comparison of Top Hits: ProfileHMM vs pyhmmer")
plt.legend()
plt.show()


NameError: name 'pyhmmer_scores' is not defined