In [1]:
import numpy as np
import os

In [2]:
class Node:
    def __init__(self, left=None, right=None, sequences=None, id=None, name=None):
        self.left = left
        self.right = right
        self.sequences = sequences
        self.id = id
        self.name = name

    def is_leaf(self):
        return self.left is None and self.right is None

In [3]:
class MSASolver:
    def __init__(self, fasta_folder, match, mismatch, gap):
        self.match = match
        self.mismatch = mismatch
        self.gap = gap
        self.fasta_folder = fasta_folder

        self.fasta_sequences = {}
        self.fasta_file_map = {} 
        self.fasta_file_names = []
        self.load_fasta()

        self.sequences = list(self.fasta_sequences.values())
        self.sequence_ids = list(self.fasta_sequences.keys())

        self.num_seqs = len(self.sequences)
        self.aligned_sequences = []
        self.distance_matrix = []
        self.upgma_root = None

    def load_fasta(self):
        fasta_dict = {}
        fasta_map = {}
        fasta_file_names = []
        index = 0

        for file in sorted(os.listdir(self.fasta_folder)):
            if not file.lower().endswith(".fasta"):
                continue

            path = os.path.join(self.fasta_folder, file)

            with open(path, "r") as f:
                lines = f.read().strip().splitlines()

                header = lines[0].replace(">", "").strip()
                sequence = "".join(lines[1:]).replace(" ", "").upper()

                fasta_dict[header] = sequence
                fasta_map[index] = (file, header)
                fasta_file_names.append(file.replace(".fasta", ""))

                index += 1

        self.fasta_sequences = fasta_dict
        self.fasta_file_map = fasta_map
        self.fasta_file_names = fasta_file_names

    # ============================================================
    # FASE 1 — NW PAR-A-PAR E MATRIZ DE DISTÂNCIAS
    # ============================================================

    def needleman_wunsch(self, seq1, seq2):
        size_s1 = len(seq1)
        size_s2 = len(seq2)
        matrix = np.zeros((size_s1 + 1, size_s2 + 1))
        
        matrix[:,0] = np.linspace(0, size_s1 * self.gap, size_s1 + 1) 
        matrix[0,:] = np.linspace(0, size_s2 * self.gap, size_s2 + 1)

        for line in range(1, size_s1+1):
            for column in range(1, size_s2+1):
                top = matrix[line, column-1] + self.gap
                left = matrix[line-1, column] + self.gap
                diagonal = matrix[line-1, column-1] + (self.match if seq1[line-1] == seq2[column-1] else self.mismatch)
                matrix[line, column] = max([top, left, diagonal])

        line_position = len(seq1)
        column_position = len(seq2)
        reversed_s1 = []
        reversed_s2 = []

        matches = 0
        mismatches = 0
        gaps = 0
        while line_position > 0 or column_position > 0:
            actual = matrix[line_position, column_position]

            if column_position == 0:
                reversed_s1.append(seq1[line_position-1])
                reversed_s2.append("-")
                line_position -= 1
                gaps += 1

                continue
            
            if line_position == 0:
                reversed_s1.append("-")
                reversed_s2.append(seq2[column_position-1])
                column_position -= 1
                gaps += 1

                continue

            if (actual == matrix[line_position-1][column_position-1] + (self.match if seq1[line_position-1] == seq2[column_position-1] else self.mismatch)
            ):        
                reversed_s1.append(seq1[line_position-1])
                reversed_s2.append(seq2[column_position-1])
                line_position -= 1
                column_position -= 1
                if seq1[line_position-1] == seq2[column_position-1]:
                    matches += 1
                else:
                    mismatches += 1

            elif (actual == matrix[line_position-1][column_position] + self.gap
            ):
                reversed_s1.append(seq1[line_position-1])
                reversed_s2.append("-")
                line_position -= 1
                gaps += 1

            else:
                reversed_s1.append("-")
                reversed_s2.append(seq2[column_position-1])
                column_position -= 1
                gaps += 1                        

        aligned_s1 = "".join(reversed_s1)[::-1]
        aligned_s2 = "".join(reversed_s2)[::-1]

        return aligned_s1, aligned_s2, matches, mismatches, gaps
    
    def pairwise_nw_distance(self, seq1, seq2):
        aligned_s1, aligned_s2, matches, mismatches, gaps = self.needleman_wunsch(seq1, seq2)

        return mismatches / (matches + mismatches)

    def compute_distance_matrix(self):
        print("--- Fase 1: Calculando Matriz de Distâncias ---")
        dist_matrix = np.zeros((self.num_seqs, self.num_seqs))

        for i in range(self.num_seqs):
            for j in range(i + 1, self.num_seqs):
                d = self.pairwise_nw_distance(self.sequences[i], self.sequences[j])
                dist_matrix[i][j] = dist_matrix[j][i] = d
                print(f"Dist({i}, {j}) = {d:.3f}")

        return dist_matrix

    # ============================================================
    # FASE 2 — ÁRVORE GUIA UPGMA
    # ============================================================

    def build_guide_tree(self):
        print("\n--- Fase 2: Construindo Árvore Guia (UPGMA) ---")

        clusters = [
            Node(sequences=[(i, self.sequences[i])], id=i, name=f"{self.sequence_ids[i].split(" ")[0]} {self.fasta_file_names[i]}")
            for i in range(self.num_seqs)
        ]

        d_mat = self.distance_matrix.copy()

        while len(clusters) > 1:
            x, y = -1, -1
            min_dist = float("inf")

            for i in range(len(d_mat)):
                for j in range(i + 1, len(d_mat)):
                    if d_mat[i][j] < min_dist:
                        min_dist = d_mat[i][j]
                        x, y = i, j

            node_a = clusters[x]
            node_b = clusters[y]
            print(f"Fundindo clusters {node_a.id} e {node_b.id}  (Dist: {min_dist:.3f})")

            new_node = Node(left=node_a, right=node_b,
                            id=f"({node_a.id}+{node_b.id})")

            new_dists = []
            for k in range(len(d_mat)):
                if k != x and k != y:
                    new_dists.append((d_mat[x][k] + d_mat[y][k]) / 2)

            for idx in sorted([x, y], reverse=True):
                d_mat = np.delete(d_mat, idx, axis=0)
                d_mat = np.delete(d_mat, idx, axis=1)
                clusters.pop(idx)

            new_row = np.array(new_dists + [0])
            d_mat = np.vstack((d_mat, new_row[:-1]))
            d_mat = np.column_stack((d_mat, new_row))

            clusters.append(new_node)

        return clusters[0]

    # ============================================================
    # FASE 3 — ALINHAMENTO PROGRESSIVO
    # ============================================================

    def get_column_score(self, col_a, col_b):
        score = 0
        for a in col_a:
            for b in col_b:
                if a == "-" and b == "-":
                    score += 0
                elif a == "-" or b == "-":
                    score += self.gap
                elif a == b:
                    score += self.match
                else:
                    score += self.mismatch
        return score

    def align_profiles(self, profile_a, profile_b):
        ids_a = [id_ for (id_, _) in profile_a]
        ids_b = [id_ for (id_, _) in profile_b]

        seqs_a = [seq for (_, seq) in profile_a]
        seqs_b = [seq for (_, seq) in profile_b]

        n = len(seqs_a[0])
        m = len(seqs_b[0])

        dp = np.zeros((n + 1, m + 1))

        len_a = len(seqs_a)
        len_b = len(seqs_b)

        for i in range(1, n + 1):
            dp[i][0] = dp[i-1][0] + self.gap * len_b
        for j in range(1, m + 1):
            dp[0][j] = dp[0][j-1] + self.gap * len_a

        for i in range(1, n + 1):
            for j in range(1, m + 1):
                col_a = [s[i-1] for s in seqs_a]
                col_b = [s[j-1] for s in seqs_b]

                score_match = dp[i-1][j-1] + self.get_column_score(col_a, col_b)
                score_del = dp[i-1][j] + self.gap * len_b
                score_ins = dp[i][j-1] + self.gap * len_a

                dp[i][j] = max(score_match, score_del, score_ins)

        aligned_a = [""] * len_a
        aligned_b = [""] * len_b

        i, j = n, m

        while i > 0 or j > 0:
            if i > 0 and j > 0:
                col_a = [s[i-1] for s in seqs_a]
                col_b = [s[j-1] for s in seqs_b]
                diag_score = dp[i-1][j-1] + self.get_column_score(col_a, col_b)

                if dp[i][j] == diag_score:
                    for k in range(len_a):
                        aligned_a[k] = seqs_a[k][i-1] + aligned_a[k]
                    for k in range(len_b):
                        aligned_b[k] = seqs_b[k][j-1] + aligned_b[k]
                    i -= 1; j -= 1
                    continue

            if i > 0 and dp[i][j] == dp[i-1][j] + self.gap * len_b:
                for k in range(len_a):
                    aligned_a[k] = seqs_a[k][i-1] + aligned_a[k]
                for k in range(len_b):
                    aligned_b[k] = "-" + aligned_b[k]
                i -= 1
            else:
                for k in range(len_a):
                    aligned_a[k] = "-" + aligned_a[k]
                for k in range(len_b):
                    aligned_b[k] = seqs_b[k][j-1] + aligned_b[k]
                j -= 1

        new_profile = []

        for id_, seq in zip(ids_a, aligned_a):
            new_profile.append((id_, seq))

        for id_, seq in zip(ids_b, aligned_b):
            new_profile.append((id_, seq))

        return new_profile
    
    def progressive_alignment(self):
        print("\n=== Fase 3: Alinhamento Progressivo ===")
        aligned_sequences = self.traverse_and_align(self.upgma_root)
        return aligned_sequences

    def traverse_and_align(self, node):
        if node.is_leaf():
            return node.sequences

        left_profile = self.traverse_and_align(node.left)
        right_profile = self.traverse_and_align(node.right)

        print(f"Alinhando perfil ({len(left_profile)}) com ({len(right_profile)})...")

        return self.align_profiles(left_profile, right_profile)

    def print_distance_matrix(self):
        print("\n--- MATRIZ DE DISTÂNCIAS ---")

        labels = [l.split(" ")[0] for l in self.sequence_ids]
        n = len(labels)

        header = " " * 16
        header += "  ".join([f"{i:>6}" for i in range(n)])
        print(header)

        for i in range(n):
            row = f"{i}. {labels[i]:<12} "

            if i == 0:
                row += "0.000".rjust(8)
            for j in range(i):
                row += f"{self.distance_matrix[i][j]:>8.3f}"
            print(row)

        return

    def print_sequence_map(self):
        print("\n--- MAPA DE SEQUÊNCIAS ---")
        for idx, (filename, header) in self.fasta_file_map.items():
            print(f"Sequência {idx}: >{header} - {filename}")

    def walk_recursive_guide_tree(self, node, prefix="", is_left=True):
        if node.is_leaf():
            print(prefix + ("└── " if is_left else "┌── ") + node.name)
            return

        branch = "└── " if is_left else "┌── "
        print(prefix + branch + f"Nó {node.id}")

        new_prefix = prefix + ("    " if is_left else "│   ")
        self.walk_recursive_guide_tree(node.right, new_prefix, False)
        
        new_prefix = prefix + ("    " if is_left else "│   ")
        self.walk_recursive_guide_tree(node.left, new_prefix, True)
        
        return
    
    def print_guide_tree(self):
        print("\n--- ÁRVORE UPGMA ---")
        self.walk_recursive_guide_tree(self.upgma_root)

        return

    def print_aligned_sequences(self):
        print("\n--- ALINHAMENTO MÚLTIPLO DE SEQUÊNCIAS ---")
        for id_orig, seq in self.aligned_sequences:
            print(f"Sequência {id_orig}: {seq}")

        return

    def run(self, print_matrix=False, print_upgma=False, print_sequences=True):
        self.distance_matrix = self.compute_distance_matrix()
        if print_matrix:
            self.print_distance_matrix()

        self.upgma_root = self.build_guide_tree()
        if print_upgma:
            self.print_guide_tree()
        
        self.aligned_sequences = self.progressive_alignment()
        if print_sequences:
            self.print_aligned_sequences()

        return

In [4]:
match = 1
mismatch = -1
gap = -1

coi_folder = "coi_sequences"

In [5]:
msa = MSASolver(coi_folder, match, mismatch, gap)
msa.run(print_sequences=False)

--- Fase 1: Calculando Matriz de Distâncias ---
Dist(0, 1) = 0.246
Dist(0, 2) = 0.353
Dist(0, 3) = 0.697
Dist(0, 4) = 0.232
Dist(0, 5) = 0.510
Dist(1, 2) = 0.367
Dist(1, 3) = 0.697
Dist(1, 4) = 0.157
Dist(1, 5) = 0.523
Dist(2, 3) = 0.752
Dist(2, 4) = 0.346
Dist(2, 5) = 0.527
Dist(3, 4) = 0.710
Dist(3, 5) = 0.731
Dist(4, 5) = 0.512

--- Fase 2: Construindo Árvore Guia (UPGMA) ---
Fundindo clusters 1 e 4  (Dist: 0.157)
Fundindo clusters 0 e (1+4)  (Dist: 0.239)
Fundindo clusters 2 e (0+(1+4))  (Dist: 0.355)
Fundindo clusters 5 e (2+(0+(1+4)))  (Dist: 0.520)
Fundindo clusters 3 e (5+(2+(0+(1+4))))  (Dist: 0.728)

=== Fase 3: Alinhamento Progressivo ===
Alinhando perfil (1) com (1)...
Alinhando perfil (1) com (2)...
Alinhando perfil (1) com (3)...
Alinhando perfil (1) com (4)...
Alinhando perfil (1) com (5)...


In [6]:
msa.print_sequence_map()
msa.print_distance_matrix()
msa.print_guide_tree()
msa.print_aligned_sequences()


--- MAPA DE SEQUÊNCIAS ---
Sequência 0: >KX012779.1 UNVERIFIED: Alligator mississippiensis voucher NZG:BWP36367 cytochrome oxidase subunit 1-like (COI) gene, partial sequence; mitochondrial - Alligator_mississippiensis.fasta
Sequência 1: >KY033225.1 Canis lupus cytochrome c oxidase subunit 1 (COI) gene, partial cds; mitochondrial - Canis_lupus.fasta
Sequência 2: >MK932901.1 Falco peregrinus voucher R79 cytochrome oxidase subunit 1 (COI) gene, partial cds; mitochondrial - Falco_peregrinus.fasta
Sequência 3: >MH290787.1 Panthera tigris isolate FMNH_UN_2137 cytochrome oxidase subunit I (COI) gene, partial cds; mitochondrial - Panthera_tigris.fasta
Sequência 4: >LC619129.1 Vulpes vulpes japonica FA_50 mitochondrial COI gene for cytochrome c oxidase subunit 1, partial cds - Vulpes_vulpes.fasta
Sequência 5: >KY177093.1 Xenopus laevis voucher MTSN 7716 cytochrome oxidase subunit I (COI) gene, partial cds; mitochondrial - Xenopus_laevis.fasta

--- MATRIZ DE DISTÂNCIAS ---
                    