# Needleman–Wunsch (Global Alignment) for Hemoglobin α Chains

This notebook implements the Needleman–Wunsch algorithm with configurable scoring matrices (BLOSUM62, BLOSUM80, PAM250) and affine gap penalties (default gap open 11, gap extend 1). It loads sequences from `hemoglobin_209_species_final.fasta` and metadata from `hemoglobin_209_species_final.csv`, supports pilot and full pairwise runs, and exports results for clustering.


In [2]:
# Imports and configuration
from Bio import SeqIO
from Bio.Align import substitution_matrices
import pandas as pd
from dataclasses import dataclass
from typing import Dict, Tuple, List
import itertools
import math
import time

# Paths
FASTA_PATH = "hemoglobin_209_species_final_fasta.fasta"
CSV_PATH = "hemoglobin_209_species_final.csv"

# Default parameters
DEFAULT_MATRIX_NAME = "BLOSUM62"
DEFAULT_GAP_OPEN = 11
DEFAULT_GAP_EXTEND = 1

# Allowed matrix names (used with substitution_matrices.load)
VALID_MATRIX_NAMES = ["BLOSUM62", "BLOSUM80", "PAM250"]

@dataclass
class SequenceRecord:
    protein_id: str
    species: str
    order: str
    superorder: str
    sequence: str



In [3]:
# Load sequences and metadata

def load_sequences(fasta_path: str, csv_path: str) -> List[SequenceRecord]:
    meta = pd.read_csv(csv_path)
    # Ensure index type matches FASTA ids (strings)
    meta['protein_id'] = meta['protein_id'].astype(str)
    meta = meta.set_index('protein_id')

    records: List[SequenceRecord] = []
    for rec in SeqIO.parse(fasta_path, 'fasta'):
        pid = str(rec.id)
        if pid not in meta.index:
            continue
        row = meta.loc[pid]
        records.append(
            SequenceRecord(
                protein_id=pid,
                species=str(row['species']),
                order=str(row['order']),
                superorder=str(row['superorder']),
                sequence=str(rec.seq).upper().replace('U', 'X')  # standardize
            )
        )
    return records

seq_records = load_sequences(FASTA_PATH, CSV_PATH)
print(f"Loaded sequences: {len(seq_records)}")


Loaded sequences: 209


In [4]:
# Scoring matrix helpers (symmetric lookup)

def load_matrix(name: str) -> Dict[Tuple[str, str], int]:
    name = name.upper()
    if name not in VALID_MATRIX_NAMES:
        raise ValueError(f"Unsupported matrix '{name}'. Choose from {VALID_MATRIX_NAMES}")
    mat = substitution_matrices.load(name)
    alphabet = mat.alphabet
    result: Dict[Tuple[str, str], int] = {}
    for a in alphabet:
        for b in alphabet:
            score = int(mat[a, b])
            result[(a, b)] = score
            result[(b, a)] = score
    return result

# Example: mat = load_matrix(DEFAULT_MATRIX_NAME)



In [5]:
# Needleman–Wunsch (global alignment) — affine gap (Gotoh)
NEG_INF = -10**9

def nw_global_affine(seq1: str, seq2: str, matrix: Dict[Tuple[str, str], int], gap_open: int, gap_extend: int):
    n = len(seq1)
    m = len(seq2)

    M = [[NEG_INF] * (m + 1) for _ in range(n + 1)]  # match/mismatch state
    Ix = [[NEG_INF] * (m + 1) for _ in range(n + 1)]  # gap in seq2 (deletion)
    Iy = [[NEG_INF] * (m + 1) for _ in range(n + 1)]  # gap in seq1 (insertion)

    trace_M = [[None] * (m + 1) for _ in range(n + 1)]
    trace_Ix = [[None] * (m + 1) for _ in range(n + 1)]
    trace_Iy = [[None] * (m + 1) for _ in range(n + 1)]

    M[0][0] = 0

    for i in range(1, n + 1):
        Ix[i][0] = -gap_open - (i - 1) * gap_extend
        trace_Ix[i][0] = ('Ix', i - 1, 0) if i > 1 else ('M', 0, 0)
    for j in range(1, m + 1):
        Iy[0][j] = -gap_open - (j - 1) * gap_extend
        trace_Iy[0][j] = ('Iy', 0, j - 1) if j > 1 else ('M', 0, 0)

    for i in range(1, n + 1):
        a = seq1[i - 1]
        for j in range(1, m + 1):
            b = seq2[j - 1]
            score_sub = matrix.get((a, b), matrix.get((b, a), -gap_open))

            # M state
            prev_candidates = [
                (M[i - 1][j - 1], 'M'),
                (Ix[i - 1][j - 1], 'Ix'),
                (Iy[i - 1][j - 1], 'Iy'),
            ]
            best_val, best_state = max(prev_candidates, key=lambda x: x[0])
            M[i][j] = best_val + score_sub
            trace_M[i][j] = (best_state, i - 1, j - 1)

            # Ix state (gap in seq2)
            cand_Ix = [
                (M[i - 1][j] - gap_open, 'M'),
                (Ix[i - 1][j] - gap_extend, 'Ix'),
            ]
            best_val, best_state = max(cand_Ix, key=lambda x: x[0])
            Ix[i][j] = best_val
            trace_Ix[i][j] = (best_state, i - 1, j)

            # Iy state (gap in seq1)
            cand_Iy = [
                (M[i][j - 1] - gap_open, 'M'),
                (Iy[i][j - 1] - gap_extend, 'Iy'),
            ]
            best_val, best_state = max(cand_Iy, key=lambda x: x[0])
            Iy[i][j] = best_val
            trace_Iy[i][j] = (best_state, i, j - 1)

    # Traceback
    end_candidates = [(M[n][m], 'M'), (Ix[n][m], 'Ix'), (Iy[n][m], 'Iy')]
    score_final, state = max(end_candidates, key=lambda x: x[0])

    i, j = n, m
    aln1, aln2 = [], []
    matches = 0
    positives = 0
    aligned_non_gap = 0

    while i > 0 or j > 0:
        if state == 'M':
            prev_state, pi, pj = trace_M[i][j]
            a = seq1[i - 1]
            b = seq2[j - 1]
            aln1.append(a)
            aln2.append(b)
            aligned_non_gap += 1
            if a == b:
                matches += 1
            if matrix.get((a, b), matrix.get((b, a), -gap_open)) > 0:
                positives += 1
            i, j = pi, pj
            state = prev_state
        elif state == 'Ix':
            prev_state, pi, pj = trace_Ix[i][j]
            a = seq1[i - 1]
            aln1.append(a)
            aln2.append('-')
            i, j = pi, pj
            state = prev_state
        else:  # 'Iy'
            prev_state, pi, pj = trace_Iy[i][j]
            b = seq2[j - 1]
            aln1.append('-')
            aln2.append(b)
            i, j = pi, pj
            state = prev_state

    aln1 = ''.join(reversed(aln1))
    aln2 = ''.join(reversed(aln2))
    aln_len = len(aln1)
    aligned_positions = matches + (aligned_non_gap - matches)

    identity = matches / aligned_positions if aligned_positions else 0.0
    distance = 1.0 - identity


    return {
        'score': score_final,
        'aln1': aln1,
        'aln2': aln2,
        'identity': identity,
        'distance': distance,
        'alignment_length': aln_len,
        'aligned_non_gap_positions': aligned_non_gap,
        'matches': matches,
    }



In [6]:
# Pilot run on a small subset (Just for 20 samples)
import random

random.seed(42)

matrix = load_matrix(DEFAULT_MATRIX_NAME)

def run_pair(a: SequenceRecord, b: SequenceRecord):
    res = nw_global_affine(a.sequence, b.sequence, matrix=matrix, gap_open=DEFAULT_GAP_OPEN, gap_extend=DEFAULT_GAP_EXTEND)
    return {
        'protein_id_1': a.protein_id,
        'protein_id_2': b.protein_id,
        'species_1': a.species,
        'species_2': b.species,
        'order_1': a.order,
        'order_2': b.order,
        'superorder_1': a.superorder,
        'superorder_2': b.superorder,
        'matrix': DEFAULT_MATRIX_NAME,
        'gap_open': DEFAULT_GAP_OPEN,
        'gap_extend': DEFAULT_GAP_EXTEND,
        **res,
    }

pilot_n = 20
subset = random.sample(seq_records, min(pilot_n, len(seq_records)))
pairs = list(itertools.combinations(subset, 2))

pilot_results = [run_pair(a, b) for a, b in pairs]

pilot_df = pd.DataFrame(pilot_results)
print(f"Pilot pairs: {len(pairs)}; Pilot rows: {len(pilot_df)}")
print("Columns:", list(pilot_df.columns))

if not pilot_df.empty and all(c in pilot_df.columns for c in ['score','identity','similarity']):
    pilot_df[['score','identity','similarity']].describe()
else:
    print("Pilot DataFrame is empty or missing expected columns ('score','identity','similarity').")


Pilot pairs: 190; Pilot rows: 190
Columns: ['protein_id_1', 'protein_id_2', 'species_1', 'species_2', 'order_1', 'order_2', 'superorder_1', 'superorder_2', 'matrix', 'gap_open', 'gap_extend', 'score', 'aln1', 'aln2', 'identity', 'distance', 'alignment_length', 'aligned_non_gap_positions', 'matches']
Pilot DataFrame is empty or missing expected columns ('score','identity','similarity').


In [8]:
# Pilot summary stats
if not pilot_df.empty:
    display(pilot_df[['score','identity','distance']].describe())
else:
    print("No pilot results to summarize.")


Unnamed: 0,score,identity,distance
count,190.0,190.0,190.0
mean,361.784211,0.595556,0.404444
std,297.99835,0.277944,0.277944
min,-70.0,0.147059,0.007092
25%,-42.75,0.234844,0.197183
50%,552.5,0.760563,0.239437
75%,592.0,0.802817,0.765156
max,730.0,0.992908,0.852941


In [9]:
# Full pairwise run (may take time) (For all 209 samples)
from itertools import combinations

full_pairs = list(combinations(seq_records, 2))
print(f"Total pairs: {len(full_pairs)}")

mat = load_matrix(DEFAULT_MATRIX_NAME)

def run_pair_row(a: SequenceRecord, b: SequenceRecord):
    res = nw_global_affine(a.sequence, b.sequence, matrix=mat, gap_open=DEFAULT_GAP_OPEN, gap_extend=DEFAULT_GAP_EXTEND)
    return {
        'protein_id_1': a.protein_id,
        'protein_id_2': b.protein_id,
        'species_1': a.species,
        'species_2': b.species,
        'order_1': a.order,
        'order_2': b.order,
        'superorder_1': a.superorder,
        'superorder_2': b.superorder,
        'matrix': DEFAULT_MATRIX_NAME,
        'gap_open': DEFAULT_GAP_OPEN,
        'gap_extend': DEFAULT_GAP_EXTEND,
        **res,
    }

start = time.perf_counter()
full_results = [run_pair_row(a, b) for a, b in full_pairs]
elapsed = time.perf_counter() - start
print(f"Time to compute all alignments: {elapsed:.2f} seconds")

full_df = pd.DataFrame(full_results)
print(f"Full rows: {len(full_df)}")

# Save full results
full_df.to_csv('nw_full_results.csv', index=False)
print("Saved nw_full_results.csv")

# Build identity distance matrix: 1 - identity
ids = sorted({r.protein_id for r in seq_records})
idx = pd.Index(ids, name='protein_id')
dist_df = pd.DataFrame(1.0, index=idx, columns=ids)

for _, row in full_df[['protein_id_1','protein_id_2','identity']].iterrows():
    i = row['protein_id_1']
    j = row['protein_id_2']
    d = 1.0 - float(row['identity'])
    dist_df.at[i, j] = d
    dist_df.at[j, i] = d

# Zero diagonal
for i in ids:
    dist_df.at[i, i] = 0.0

# Save distance matrix
dist_df.to_csv('nw_distance_identity.csv')
print("Saved nw_distance_identity.csv")


Total pairs: 21736
Time to compute all alignments: 666.15 seconds
Full rows: 21736
Saved nw_full_results.csv
Saved nw_distance_identity.csv


In [10]:
"""
# Run full NW for multiple matrices (BLOSUM62, BLOSUM80, PAM250)
# from itertools import combinations
# import os
#
# MATRICES = ["BLOSUM62", "BLOSUM80", "PAM250"]
#
# # Precompute all pairs once
# all_pairs = list(combinations(seq_records, 2))
# print(f"Pairs to evaluate: {len(all_pairs)}")
#
# def run_full_for_matrix(matrix_name: str):
#     print(f"\n=== Running NW with {matrix_name} ===")
#     mat = load_matrix(matrix_name)
#
#     def run_pair_row(a: SequenceRecord, b: SequenceRecord):
#         res = nw_global_affine(a.sequence, b.sequence, matrix=mat, gap_open=DEFAULT_GAP_OPEN, gap_extend=DEFAULT_GAP_EXTEND)
#         return {
#             'protein_id_1': a.protein_id,
#             'protein_id_2': b.protein_id,
#             'species_1': a.species,
#             'species_2': b.species,
#             'order_1': a.order,
#             'order_2': b.order,
#             'superorder_1': a.superorder,
#             'superorder_2': b.superorder,
#             'matrix': matrix_name,
#             'gap_open': DEFAULT_GAP_OPEN,
#             'gap_extend': DEFAULT_GAP_EXTEND,
#             **res,
#         }
#
#     results = [run_pair_row(a, b) for a, b in all_pairs]
#     df = pd.DataFrame(results)
#     print(f"Rows: {len(df)}")
#
#     # Save per-matrix results and distance matrix
#     res_path = f"nw_full_results_{matrix_name}.csv"
#     dist_path = f"nw_distance_identity_{matrix_name}.csv"
#     df.to_csv(res_path, index=False)
#     print(f"Saved {res_path}")
#
#     ids = sorted({r.protein_id for r in seq_records})
#     idx = pd.Index(ids, name='protein_id')
#     dist_df = pd.DataFrame(1.0, index=idx, columns=ids)
#
#     for _, row in df[['protein_id_1','protein_id_2','identity']].iterrows():
#         i = row['protein_id_1']
#         j = row['protein_id_2']
#         d = 1.0 - float(row['identity'])
#         dist_df.at[i, j] = d
#         dist_df.at[j, i] = d
#
#     for i in ids:
#         dist_df.at[i, i] = 0.0
#
#     dist_df.to_csv(dist_path)
#     print(f"Saved {dist_path}")
#
# # Run for BLOSUM80 and PAM250 (BLOSUM62 already done, but safe to re-run if desired)
# for name in ["BLOSUM80", "PAM250"]:
#     run_full_for_matrix(name)
"""


'\n# Run full NW for multiple matrices (BLOSUM62, BLOSUM80, PAM250)\n# from itertools import combinations\n# import os\n#\n# MATRICES = ["BLOSUM62", "BLOSUM80", "PAM250"]\n#\n# # Precompute all pairs once\n# all_pairs = list(combinations(seq_records, 2))\n# print(f"Pairs to evaluate: {len(all_pairs)}")\n#\n# def run_full_for_matrix(matrix_name: str):\n#     print(f"\n=== Running NW with {matrix_name} ===")\n#     mat = load_matrix(matrix_name)\n#\n#     def run_pair_row(a: SequenceRecord, b: SequenceRecord):\n#         res = nw_global_affine(a.sequence, b.sequence, matrix=mat, gap_open=DEFAULT_GAP_OPEN, gap_extend=DEFAULT_GAP_EXTEND)\n#         return {\n#             \'protein_id_1\': a.protein_id,\n#             \'protein_id_2\': b.protein_id,\n#             \'species_1\': a.species,\n#             \'species_2\': b.species,\n#             \'order_1\': a.order,\n#             \'order_2\': b.order,\n#             \'superorder_1\': a.superorder,\n#             \'superorder_2\': b.supero

In [11]:
"""
# Compare matrices: within vs between superorders and matrix correlations
# import numpy as np
#
# # Load per-matrix distance matrices
# mat_to_dist = {}
# for name in ["BLOSUM62", "BLOSUM80", "PAM250"]:
#     path = f"nw_distance_identity_{name}.csv" if name != "BLOSUM62" else "nw_distance_identity.csv"
#     mat_to_dist[name] = pd.read_csv(path, index_col=0)
#
# # Helper: compute within/between superorder means
# pid_to_super = {r.protein_id: r.superorder for r in seq_records}
#
# summary_rows = []
# for name, dist_df in mat_to_dist.items():
#     ids = list(dist_df.index)
#     same, diff = [], []
#     for i_idx in range(len(ids)):
#         for j_idx in range(i_idx + 1, len(ids)):
#             i = ids[i_idx]; j = ids[j_idx]
#             dij = float(dist_df.iloc[i_idx, j_idx])
#             if pid_to_super.get(i) == pid_to_super.get(j):
#                 same.append(dij)
#             else:
#                 diff.append(dij)
#     summary_rows.append({
#         'matrix': name,
#       'mean_within_superorder': np.mean(same) if same else np.nan,
#         'mean_between_superorder': np.mean(diff) if diff else np.nan,
#         'separation_gap': (np.mean(diff) - np.mean(same)) if same and diff else np.nan,
#     })
#
# summary_df = pd.DataFrame(summary_rows).sort_values('separation_gap', ascending=False)
# print("\nWithin/between superorder distance summary (higher gap is better):")
# display(summary_df)
#
# # Correlate distance matrices (upper-triangle Spearman)
# from scipy.stats import spearmanr
#
# def upper_triangle_values(df: pd.DataFrame) -> np.ndarray:
#     vals = []
#     n = df.shape[0]
#     for i in range(n):
#         for j in range(i + 1, n):
#             vals.append(float(df.iloc[i, j]))
#     return np.array(vals)
#
# names = list(mat_to_dist.keys())
# print("\nMatrix correlations (Spearman, upper triangle):")
# for i in range(len(names)):
#     for j in range(i + 1, len(names)):
#         a = upper_triangle_values(mat_to_dist[names[i]])
#         b = upper_triangle_values(mat_to_dist[names[j]])
#         rho, p = spearmanr(a, b)
#         print(f"{names[i]} vs {names[j]}: rho={rho:.3f}, p={p:.1e}")
"""


'\n# Compare matrices: within vs between superorders and matrix correlations\n# import numpy as np\n#\n# # Load per-matrix distance matrices\n# mat_to_dist = {}\n# for name in ["BLOSUM62", "BLOSUM80", "PAM250"]:\n#     path = f"nw_distance_identity_{name}.csv" if name != "BLOSUM62" else "nw_distance_identity.csv"\n#     mat_to_dist[name] = pd.read_csv(path, index_col=0)\n#\n# # Helper: compute within/between superorder means\n# pid_to_super = {r.protein_id: r.superorder for r in seq_records}\n#\n# summary_rows = []\n# for name, dist_df in mat_to_dist.items():\n#     ids = list(dist_df.index)\n#     same, diff = [], []\n#     for i_idx in range(len(ids)):\n#         for j_idx in range(i_idx + 1, len(ids)):\n#             i = ids[i_idx]; j = ids[j_idx]\n#             dij = float(dist_df.iloc[i_idx, j_idx])\n#             if pid_to_super.get(i) == pid_to_super.get(j):\n#                 same.append(dij)\n#             else:\n#                 diff.append(dij)\n#     summary_rows.append({\

In [12]:
# Validation: Compare NW implementation vs Biopython pairwise2
from Bio import pairwise2
from Bio.Align import substitution_matrices
import random

VALIDATION_MATRIX = DEFAULT_MATRIX_NAME  # e.g., 'BLOSUM62'
GAP_OPEN = DEFAULT_GAP_OPEN
GAP_EXTEND = DEFAULT_GAP_EXTEND

val_mat = substitution_matrices.load(VALIDATION_MATRIX)
our_val_matrix = load_matrix(VALIDATION_MATRIX)

# Sample pairs for validation
random.seed(7)
val_n = 15
val_subset = random.sample(seq_records, min(val_n, len(seq_records)))
val_pairs = list(itertools.combinations(val_subset, 2))

rows = []
for a, b in val_pairs:
    # Our NW (affine)
    our_res = nw_global_affine(a.sequence, b.sequence, matrix=our_val_matrix, gap_open=GAP_OPEN, gap_extend=GAP_EXTEND)

    # Biopython pairwise2 (use best alignment)
    alignments = pairwise2.align.globalds(a.sequence, b.sequence, val_mat, -GAP_OPEN, -GAP_EXTEND, one_alignment_only=True)
    if alignments:
        aln = alignments[0]
        bi_score = aln.score
        bi_aln1, bi_aln2 = aln.seqA, aln.seqB
        aligned_positions = sum(1 for x, y in zip(bi_aln1, bi_aln2) if x != '-' and y != '-')
        matches = sum(1 for x, y in zip(bi_aln1, bi_aln2) if x == y and x != '-' and y != '-')
        bi_identity = matches / aligned_positions if aligned_positions else 0.0
    else:
        bi_score = float('nan')
        bi_identity = float('nan')

    rows.append({
        'protein_id_1': a.protein_id,
        'protein_id_2': b.protein_id,
        'our_score': our_res['score'],
        'bi_score': bi_score,
        'our_identity': our_res['identity'],
        'bi_identity': bi_identity,
        'abs_diff_identity': abs(our_res['identity'] - bi_identity) if not pd.isna(bi_identity) else float('nan'),
        'abs_diff_score': abs(our_res['score'] - bi_score) if not pd.isna(bi_score) else float('nan'),
    })

val_df = pd.DataFrame(rows)
print(f"Validated pairs: {len(val_df)}")

print("\nIdentity comparison:")
display(val_df[['our_identity','bi_identity','abs_diff_identity']].describe())

print("\nScore comparison:")
display(val_df[['our_score','bi_score','abs_diff_score']].describe())

print("\nLargest identity differences:")
display(val_df.sort_values('abs_diff_identity', ascending=False).head(5))




Validated pairs: 105

Identity comparison:


Unnamed: 0,our_identity,bi_identity,abs_diff_identity
count,105.0,105.0,105.0
mean,0.676507,0.676507,0.0
std,0.196921,0.196921,0.0
min,0.166667,0.166667,0.0
25%,0.574468,0.574468,0.0
50%,0.758865,0.758865,0.0
75%,0.788732,0.788732,0.0
max,0.978873,0.978873,0.0



Score comparison:


Unnamed: 0,our_score,bi_score,abs_diff_score
count,105.0,105.0,105.0
mean,468.504762,468.504762,0.0
std,215.172119,215.172119,0.0
min,-63.0,-63.0,0.0
25%,417.0,417.0,0.0
50%,552.0,552.0,0.0
75%,586.0,586.0,0.0
max,724.0,724.0,0.0



Largest identity differences:


Unnamed: 0,protein_id_1,protein_id_2,our_score,bi_score,our_identity,bi_identity,abs_diff_identity,abs_diff_score
0,1430744588,15825904,562,562.0,0.765957,0.765957,0.0,0.0
53,1561931297,122437,536,536.0,0.765957,0.765957,0.0,0.0
77,2282135466,122437,542,542.0,0.780142,0.780142,0.0,0.0
76,145301549,526117607,603,603.0,0.788732,0.788732,0.0,0.0
75,145301549,2826373145,-55,-55.0,0.168317,0.168317,0.0,0.0


In [9]:
# Save pilot results
pilot_df.to_csv('nw_pilot_results.csv', index=False)
len(pilot_df), pilot_df.head(3)


(190,
   protein_id_1 protein_id_2           species_1              species_2  \
 0   1864417489   1934719391  Callithrix jacchus   Choloepus didactylus   
 1   1864417489     38502825  Callithrix jacchus  Monodelphis domestica   
 2   1864417489   2441695316  Callithrix jacchus     Nycticebus coucang   
 
     order_1          order_2      superorder_1      superorder_2    matrix  \
 0  Primates           Pilosa  Euarchontoglires         Xenarthra  BLOSUM62   
 1  Primates  Didelphimorphia  Euarchontoglires        Marsupials  BLOSUM62   
 2  Primates         Primates  Euarchontoglires  Euarchontoglires  BLOSUM62   
 
    gap_open  gap_extend  score  \
 0        11           1    -61   
 1        11           1    -51   
 2        11           1    390   
 
                                                 aln1  \
 0  MALLKANKDLISAGLKEFNVLLNQQV--FNDPLISE--EDMVIVVE...   
 1  MALLKANKDLISAGLKEF----NVLLNQQVFNDPLISEEDMVIVVE...   
 2  MALLKANKDLISAGLKEFNVLLNQQVFNDPLISEEDMVIVVEDWMN...   
 
  