In [None]:
#my codes are jupyter lab format!!!!

In [5]:
!pip install biopython



In [None]:
#scoring matrix

In [7]:
%%bash
cat > scoring.txt << 'EOF'
A C G T -
A  5 -4 -4 -4 -8
C -4  5 -4 -4 -8
G -4 -4  5 -4 -8
T -4 -4 -4  5 -8
- -8 -8 -8 -8  0
EOF

In [9]:
from Bio import Entrez, SeqIO
Entrez.email = "kkm001@ucr.edu"  

In [11]:
from typing import Tuple, Dict

NEG_INF = float('-inf')
# Reading fasta and and concatinating the DNA sequences
def read_fasta(path: str) -> str:
    seq = []
    with open(path) as f:
        for line in f:
            if line.startswith('>'): continue
            seq.append(line.strip())
    return ''.join(seq)

def read_scoring(path: str) -> Dict[Tuple[str,str],int]:
    mat = {}
    with open(path) as f:
        keys = f.readline().split()
        for line in f:
            parts = line.split()
            a = parts[0]
            for b,v in zip(keys, parts[1:]):
                mat[(a,b)] = int(v)
    return mat
# scoring of 2 elements
def sp_pair(a, b, scoring):
    return scoring[(a,b)]
# scoring of 3 elements
def sp_triple(x, y, z, scoring):
    return sp_pair(x,y,scoring) + sp_pair(x,z,scoring) + sp_pair(y,z,scoring)

def forward_scores(x, y, z, scoring):
    m,n,p = len(x), len(y), len(z)
    prev = [[NEG_INF]*(p+1) for _ in range(n+1)]
    prev[0][0] = 0
    #Computing scores of string y and string z.
    for j in range(n+1):
        for k in range(p+1):
            if j and k:#substitution or matching case
                prev[j][k] = max(prev[j][k], prev[j-1][k-1] + sp_triple('-',y[j-1],z[k-1],scoring))
            if j:#gap in z
                prev[j][k] = max(prev[j][k], prev[j-1][k]   + sp_triple('-',y[j-1],'-',scoring))
            if k:#gap in y
                prev[j][k] = max(prev[j][k], prev[j][k-1]   + sp_triple('-','-',z[k-1],scoring))
       
   #Computing scores of string x,y and string z. 
    for i in range(1,m+1):
        curr = [[NEG_INF]*(p+1) for _ in range(n+1)]#creating a 3D list
        curr[0][0] = prev[0][0] + sp_triple(x[i-1],'-','-',scoring)
        for j in range(n+1):
            for k in range(p+1):
                best = prev[j][k] + sp_triple(x[i-1],'-','-',scoring)#allign x[i-1] with gaps in y and z
                # foloowing checks if we can find a better alignment
                if j and k:#substitution or matching case
                    best = max(best, prev[j-1][k-1] + sp_triple(x[i-1],y[j-1],z[k-1],scoring))
                if j:#gap in z
                    best = max(best, prev[j-1][k]   + sp_triple(x[i-1],y[j-1],'-',scoring))
                if k:#gap in y
                    best = max(best, prev[j][k-1]   + sp_triple(x[i-1],'-',z[k-1],scoring))
                if j and k: #gap in x
                    best = max(best, curr[j-1][k-1] + sp_triple('-',y[j-1],z[k-1],scoring))
                if j:#gap in x and z
                    best = max(best, curr[j-1][k]   + sp_triple('-',y[j-1],'-',scoring))
                if k:# gap in x and y
                    best = max(best, curr[j][k-1]   + sp_triple('-','-',z[k-1],scoring))
                curr[j][k] = best
        prev = curr
    return prev
#I have a gut feeling that these recursive steps can be written in a fewer lines of code But this is the best i could do:(
def backward_scores(x, y, z, scoring):#will be used for hirschberg
    plane = forward_scores(x[::-1], y[::-1], z[::-1], scoring)
    n,p = len(y), len(z)
    out = [[0]*(p+1) for _ in range(n+1)]
    for j in range(n+1):
        for k in range(p+1):
            out[j][k] = plane[n-j][p-k]
    return out

def dp_fill(x, y, z, scoring):#DP table
    m,n,p = len(x), len(y), len(z)
    dp = [[[NEG_INF]*(p+1) for _ in range(n+1)] for _ in range(m+1)]#best score for aligning prefixes
    bt = [[[None]*(p+1)   for _ in range(n+1)] for _ in range(m+1)]# backtracking info
    dp[0][0][0] = 0
    for i in range(m+1):#recursions for filling table entries
        for j in range(n+1):
            for k in range(p+1):
                if i:
                    v = dp[i-1][j][k] + sp_triple(x[i-1],'-','-',scoring)
                    if v>dp[i][j][k]: dp[i][j][k],bt[i][j][k] = v,('X',i-1,j,k)
                if j:
                    v = dp[i][j-1][k] + sp_triple('-',y[j-1],'-',scoring)
                    if v>dp[i][j][k]: dp[i][j][k],bt[i][j][k] = v,('Y',i,j-1,k)
                if k:
                    v = dp[i][j][k-1] + sp_triple('-','-',z[k-1],scoring)
                    if v>dp[i][j][k]: dp[i][j][k],bt[i][j][k] = v,('Z',i,j,k-1)
                if i and j:
                    v = dp[i-1][j-1][k] + sp_triple(x[i-1],y[j-1],'-',scoring)
                    if v>dp[i][j][k]: dp[i][j][k],bt[i][j][k] = v,('XY',i-1,j-1,k)
                if i and k:
                    v = dp[i-1][j][k-1] + sp_triple(x[i-1],'-',z[k-1],scoring)
                    if v>dp[i][j][k]: dp[i][j][k],bt[i][j][k] = v,('XZ',i-1,j,k-1)
                if j and k:
                    v = dp[i][j-1][k-1] + sp_triple('-',y[j-1],z[k-1],scoring)
                    if v>dp[i][j][k]: dp[i][j][k],bt[i][j][k] = v,('YZ',i,j-1,k-1)
                if i and j and k:
                    v = dp[i-1][j-1][k-1] + sp_triple(x[i-1],y[j-1],z[k-1],scoring)
                    if v>dp[i][j][k]: dp[i][j][k],bt[i][j][k] = v,('XYZ',i-1,j-1,k-1)
    #backtracking--->
    A,B,C = [],[],[]
    i,j,k = m,n,p 
    while i or j or k:
        op,ii,jj,kk = bt[i][j][k]
        if op=='XYZ': A.append(x[ii]);B.append(y[jj]);C.append(z[kk])
        elif op=='XY':A.append(x[ii]);B.append(y[jj]);C.append('-')
        elif op=='XZ':A.append(x[ii]);B.append('-');C.append(z[kk])
        elif op=='YZ':A.append('-');B.append(y[jj]);C.append(z[kk])
        elif op=='X': A.append(x[ii]);B.append('-');C.append('-')
        elif op=='Y': A.append('-');B.append(y[jj]);C.append('-')
        elif op=='Z': A.append('-');B.append('-');C.append(z[kk])
        i,j,k = ii,jj,kk
    return ''.join(reversed(A)), ''.join(reversed(B)), ''.join(reversed(C)), dp[m][n][p]
#hirschberg for reducing space foro the 3D DP table.
def hirschberg3(x, y, z, scoring):
    m,n,p = len(x), len(y), len(z)
    if m*n*p <= 10000 or m<=1 or n<=1 or p<=1:
        return dp_fill(x,y,z,scoring)#for small inputs
    i_mid = m//2#divide sequence x by 2(to apply the divie and conquer in our DP)
    F = forward_scores(x[:i_mid], y, z, scoring)#this combined with the next few steps find the indes of y and z in the best alignment
    G = backward_scores(x[i_mid:], y, z, scoring)
    best,j_mid,k_mid = NEG_INF,0,0
    for j in range(n+1):
        for k in range(p+1):
            val=F[j][k]+G[j][k]
            if val>best: best,j_mid,k_mid=val,j,k
    A1,B1,C1,_ = hirschberg3(x[:i_mid], y[:j_mid], z[:k_mid], scoring)#divide and conquer recursions
    A2,B2,C2,_ = hirschberg3(x[i_mid:], y[j_mid:], z[k_mid:], scoring)
    return A1+A2, B1+B2, C1+C2, best


In [31]:
x = "GATT"
y = "GACT"
z = "GAT"

scoring = read_scoring("scoring.txt")

%time A, B, C, score = hirschberg3(x, y, z, scoring)

# output
print("Score:", score)
print(">seq1_aligned\n", A)
print(">seq2_aligned\n", B)
print(">seq3_aligned\n", C)

CPU times: user 163 μs, sys: 58 μs, total: 221 μs
Wall time: 162 μs
Score: 25
>seq1_aligned
 GATT
>seq2_aligned
 GACT
>seq3_aligned
 GA-T


In [13]:
#test case 1
def fetch_and_save(acc):
    handle = Entrez.efetch(db="nucleotide", id=acc, rettype="fasta", retmode="text")
    rec = SeqIO.read(handle, "fasta")
    handle.close()
    fn = f"{acc}.fasta"
    with open(fn, "w") as f:
        SeqIO.write(rec, f, "fasta")
    return str(rec.seq)


x = fetch_and_save("NM_013096")
y = fetch_and_save("NM_008218")
z = fetch_and_save("NM_000558")
print(f"Fetched lengths: {len(x)}, {len(y)}, {len(z)}")


Fetched lengths: 557, 569, 577


In [19]:
# scoring = read_scoring("scoring.txt")#using the scoring matrix provided to compute the final score
# %time A, B, C, score = hirschberg3(x, y, z, scoring)

# # output
# print("Score:", score)
# # print(">seq1_aligned\n", A)
# # print(">seq2_aligned\n", B)
# # print(">seq3_aligned\n", C)


In [61]:
#test case 2
def fetch_and_save(acc):
    handle = Entrez.efetch(db="nucleotide", id=acc, rettype="fasta", retmode="text")
    rec = SeqIO.read(handle, "fasta")
    handle.close()
    fn = f"{acc}.fasta"
    with open(fn, "w") as f:
        SeqIO.write(rec, f, "fasta")
    return str(rec.seq)

x = fetch_and_save("NM_001030004")
y = fetch_and_save("NM_178850")
z = fetch_and_save("NM_001287184")
print(f"Fetched lengths: {len(x)}, {len(y)}, {len(z)}")


Fetched lengths: 1558, 1652, 1780


In [21]:
# scoring = read_scoring("scoring.txt")

# %time A, B, C, score = hirschberg3(x, y, z, scoring)

# # output
# print("Score:", score)
# print(">seq1_aligned\n", A)
# print(">seq2_aligned\n", B)
# print(">seq3_aligned\n", C)



In [11]:
#test case 3
def fetch_and_save(acc):
    handle = Entrez.efetch(db="nucleotide", id=acc, rettype="fasta", retmode="text")
    rec = SeqIO.read(handle, "fasta")
    handle.close()
    fn = f"{acc}.fasta"
    with open(fn, "w") as f:
        SeqIO.write(rec, f, "fasta")
    return str(rec.seq)


x = fetch_and_save("NM_010019")
y = fetch_and_save("NM_001243563")
z = fetch_and_save("NM_014326")
print(f"Fetched lengths: {len(x)}, {len(y)}, {len(z)}")

Fetched lengths: 1734, 1825, 2791


In [23]:
# scoring = read_scoring("scoring.txt")

# A, B, C, score = hirschberg3(x, y, z, scoring)

# print("Score:", score)
# # print(">seq1_aligned\n", A)
# # print(">seq2_aligned\n", B)
# # print(">seq3_aligned\n", C)

In [37]:
# Test case 4
def fetch_and_save(acc):
    handle = Entrez.efetch(db="nucleotide", id=acc, rettype="fasta", retmode="text")
    rec = SeqIO.read(handle, "fasta")
    handle.close()
    fn = f"{acc}.fasta"
    with open(fn, "w") as f:
        SeqIO.write(rec, f, "fasta")
    return str(rec.seq)

x = fetch_and_save("NM_008261")
y = fetch_and_save("NM_000457")
z = fetch_and_save("XM_016937951")
print(f"Fetched lengths: {len(x)}, {len(y)}, {len(z)}")

Fetched lengths: 4391, 4739, 4746


In [25]:
# scoring = read_scoring("scoring.txt")

# A, B, C, score = hirschberg3(x, y, z, scoring)

# # output
# print("Score:", score)
# # print(">seq1_aligned\n", A)
# # print(">seq2_aligned\n", B)
# # print(">seq3_aligned\n", C)


In [91]:
# Test case 5

def fetch_and_save(acc):
    handle = Entrez.efetch(db="nucleotide", id=acc, rettype="fasta", retmode="text")
    rec = SeqIO.read(handle, "fasta")
    handle.close()
    fn = f"{acc}.fasta"
    with open(fn, "w") as f:
        SeqIO.write(rec, f, "fasta")
    return str(rec.seq)

x = fetch_and_save("NM_000492")
y = fetch_and_save("NM_021050")
z = fetch_and_save("NM_031506")

print(f"Fetched lengths: {len(x)}, {len(y)}, {len(z)}")

Fetched lengths: 6070, 6305, 6287


In [None]:
scoring = read_scoring("scoring.txt")

A, B, C, score = hirschberg3(x, y, z, scoring)

# output
print("Score:", score)
# print(">seq1_aligned\n", A)
# print(">seq2_aligned\n", B)
# print(">seq3_aligned\n", C)