In [1]:
import time
import tracemalloc
import csv

In [2]:
UP = (-1,0)
LEFT = (0, -1)
TOPLEFT = (-1, -1)
ORIGIN = (0, 0)

def traceback_global(v, w, pointers):
  i,j = len(v), len(w)
  new_v = []
  new_w = []
  while True:
      di, dj = pointers[i][j]
      if (di,dj) == LEFT:
          new_v.append('-')
          new_w.append(w[j-1])
      elif (di,dj) == UP:
          new_v.append(v[i-1])
          new_w.append('-')
      elif (di,dj) == TOPLEFT:
          new_v.append(v[i-1])
          new_w.append(w[j-1])
      i, j = i + di, j + dj
      if (i <= 0 and j <= 0):
          break
  return ''.join(new_v[::-1])+'\n'+''.join(new_w[::-1])



def global_align(v, w, delta):
  """
  Returns the score of the maximum scoring alignment of the strings v and w, as well as the actual alignment as
  computed by traceback_global.

  :param: v
  :param: w
  :param: delta
  """
  start_time = time.time()
  tracemalloc.start()
  
  M = [[0 for j in range(len(w)+1)] for i in range(len(v)+1)]
  pointers = [[ORIGIN for j in range(len(w)+1)] for i in range(len(v)+1)]
  score, alignment = None, None

  # YOUR CODE HERE
  M[0][0] = 0;
  for i in range(1, len(v) + 1):
    cur_v = v[i - 1]
    M[i][0] = M[i - 1][0] + delta[cur_v]["-"]
    pointers[i][0] = UP

  for i in range(1, len(w) + 1):
    cur_w = w[i - 1]
    M[0][i] = M[0][i - 1] + delta["-"][cur_w]
    pointers[0][i] = LEFT

  for i in range(1, len(v) + 1):
    for j in range(1, len(w) + 1):
      cur_v = v[i - 1]
      cur_w = w[j - 1]

      left_score = M[i][j - 1] + delta["-"][cur_w]
      top_score = M[i - 1][j] + delta[cur_v]["-"]

      M[i][j] = max(left_score, M[i - 1][j - 1] + delta[cur_v][cur_w], top_score)

      if M[i][j] == left_score:
        pointers[i][j] = LEFT
      elif M[i][j] == top_score:
        pointers[i][j] = UP
      else:
        pointers[i][j] = TOPLEFT

  score = M[len(v)][len(w)]

  alignment = traceback_global(v,w, pointers)

  # Stop memory tracking and calculate time elapsed
  end_time = time.time()
  current, peak = tracemalloc.get_traced_memory()
  tracemalloc.stop()
  elapsed_time = end_time - start_time

  print(f"Time taken: {elapsed_time:.4f} seconds")
  print(f"Current memory usage: {current / 10**6:.2f} MB")
  print(f"Peak memory usage: {peak / 10**6:.2f} MB")

  return alignment, score, elapsed_time, current, peak

In [None]:
keys = ['A', 'C', 'T', 'G', '-']
delta = {}
for i in range(len(keys)):
    delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j]  else -1 for j in range(len(keys))])}

global_align("TAGATA", "GTAGGCTTAAGGTTA", delta)
print("keys")

Time taken: 0.0002 seconds
Current memory usage: 0.00 MB
Peak memory usage: 0.00 MB
keys
GTAGTGTTA
-TAGA--TA


In [72]:
def test_global_align(csv_file, delta):
    # Load the sequences from the CSV file
    sequences = []
    with open(csv_file, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            sequences.append((row['sequence1'], row['sequence2']))
    
    # Test on the first 10 rows
    row_list = []
    for idx, (seq1, seq2) in enumerate(sequences[:10]):
        print(f"Test {idx + 1}:")
        aligned_seq, score, elapsed_time, current_mem, peak_mem = global_align(seq1, seq2, delta)
        current_memory = current_mem / 10**6  # Convert bytes to MB
        peak_memory = peak_mem / 10**6  # Convert bytes to MB

        print(f"Aligned Seq: {aligned_seq}")
        print(f"Score: {score}")
        print()

        row_list.append([aligned_seq, score, elapsed_time, current_memory, peak_memory])
    
    with open('alignment_results.csv', 'w', newline='') as file: 
        writer = csv.writer(file)
        # Write header row
        writer.writerow(['Sequences', 'Score', 'Elapsed Time (s)', 'Current Memory (MB)', 'Peak Memory (MB)'])
        # Write to CSV
        writer.writerows(row_list)


In [73]:
if __name__ == "__main__":
    # Define the scoring matrix (delta)
    keys = ['A', 'C', 'T', 'G', '-']
    delta = {}
    for i in range(len(keys)):
        delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j] else -1 for j in range(len(keys))])}
    
    # Replace with the path to your CSV file
    csv_file = "dataset/rv11_sequence_pairs.csv"
    test_global_align(csv_file, delta)

Test 1:
Time taken: 0.1704 seconds
Current memory usage: 0.19 MB
Peak memory usage: 0.19 MB
Aligned Seq: AAGAAAG-TTAGAT-ATCAG-ATGTGCTG-TT-AA-AGTGTTTGGTACGGGAAAATGTATAGGCGAGCTAGCCT--GCATAA
AA-AAAGGTTAGATGAGAAGCA-GTAATCCTTGAACA-TGATAGGAAA---AAAA-GAA-ATGA-A-CT-GACTCAGAAAAA
Score: 14

Test 2:


Time taken: 0.1621 seconds
Current memory usage: 0.23 MB
Peak memory usage: 0.24 MB
Aligned Seq: AA-GAAAGTTAG--ATATCAGATGTGCTG---TTA-AAGTG-TT-T-GGTACGGGAA----AATGTATAGGCGA-GCTAGCCT-GCAT-AA
CGTGAAA-TTCGGGATA-CAGA-GT-CAGGCATTCGAAG-GATTATTGGTAGGGGGGCGCTAA-G-ATAAGCGATGAT-G--TAGTTTTAA
Score: 13

Test 3:
Time taken: 0.0983 seconds
Current memory usage: 0.22 MB
Peak memory usage: 0.22 MB
Aligned Seq: AAGAAAGTTA-GATA-TCA-G-ATGTG--C-TGTTAA--AG-TGTTTG-GTACGGGA-AAATG--TA--TA---G-GCGAGCTAGCCTGCATAA
--GAAA-TTATG-TGGTAAAGTAAGCGAACCTGTCACGGAGATGAT-GAGAA-GG-ATAAA-GAATAAATAAAAGTG-G-G--AGC--G-AGA-
Score: 4

Test 4:
Time taken: 0.1290 seconds
Current memory usage: 0.27 MB
Peak memory usage: 0.27 MB
Aligned Seq: --AA-----GAAA-G-T--T-A-GAT--ATCA-GATGTGC---TGTT-A-AA-G-TG-TT-TGG-T-A-CGGGA-AAAT-GTATAGGCGAGCTA-GCCTGCATAA
GTAACATCAGAAAAGATGGTGACGAAGGAAAAAGA-G-GCGAATGGTGACAACGATGGTTATTTATGAGCGTGACAAAAAGAAAAAG--AGTTAAGG-T--A-AA
Score: 1

Test 5:
Time taken: 0.1011 seconds
Current memory usage: 0.29 MB
Peak memory

In [5]:
#Hirschberg Implementation 
class TreeNode: #customized nodes to use in the hirschberg recursion tree 
    def __init__(self, indices, i_star, j_split, left_child=None, right_child=None):
        self.indices = indices
        self.i_star = i_star
        self.j_split = j_split
        self.left_child = left_child 
        self.right_child = right_child 

    def add_child(self, child_node):
        self.children.append(child_node)

    def __repr__(self, level=0): #use this as chance to just print the tree here?...
        tree = "   "*level + f"{self.indices} ({self.i_star}, {self.j_split})\n"
        if self.left_child != None: 
            tree += self.left_child.__repr__(level + 1)
        if self.right_child != None: 
            tree += self.right_child.__repr__(level + 1)
        return tree


def forward(v, w, delta):
    len_w = len(w); len_v = len(v)
    M = [[None for j in range(len_w)] for i in range(len_v)]
    M[0][0] = 0
    for i in range(1, len_v):
        M[i][0] = M[i-1][0] + delta[v[i]]['-']
    for j in range(1, len_w):
        for i in range(0, len_v):
            if (i == 0):
                M[i][j] = M[i][j-1] + delta[w[j]]['-']
            else: 
                left_score = M[i][j - 1] + delta['-'][w[j]]
                top_score = M[i-1][j] + delta[v[i]]['-']
                M[i][j] = max(left_score, M[i-1][j-1] + delta[v[i]][w[j]], top_score)
            #clear value from colum no longer in use 
            if (j >= 2):
                M[i][j-2] = None
    columns = list(zip(*M))
    return columns[len_w - 1]

def backward(v, w, delta):
    len_w = len(w); len_v = len(v)
    M = [[None for j in range(len_w)] for i in range(len_v)]
    M[-1][-1] = 0
    for i in range(len_v - 2, -1, -1):
        M[i][len_w - 1] = M[i + 1][len_w-1] + delta[v[i]]['-']
    for j in range(len_w - 2, -1, -1):
        for i in range(len_v - 1, -1, -1):
            if (i == len(v) - 1):
                M[i][j] = M[i][j+1] + delta[v[i]]['-']
            else:
                left_score = M[i][j+1] + delta['-'][w[j]]
                top_score = M[i+1][j] + delta[v[i]]['-']
                M[i][j] = max(left_score, M[i+1][j+1] + delta[v[i+1]][w[j+1]], top_score)
            #clear value from column no longer in use
            if (j + 2 < len(w)):
                M[i][j + 2] = None
    columns = list(zip(*M))
    return columns[0]

def hirschberg_recurse(v, w, delta, i, j, i_prime, j_prime):
    #print("currently doing", i, j, i_prime, j_prime)
    v_adjusted = "-"+v; w_adjusted = '-'+w
    if (j_prime - j > 1):
        col_split = j_prime // 2 #round down to nearest int
        if (col_split == j): col_split = j + 1 #so suffix is never empty
        #calculating i_star
        prefixes = forward(v_adjusted[i: i_prime + 1], w_adjusted[j : col_split + 1], delta)
        suffixes = backward(v_adjusted[i: i_prime + 1], w_adjusted[col_split: j_prime + 1], delta)
        weights = [p + s for p, s in zip(prefixes, suffixes)]
        i_star = weights.index(max(weights)) 

        j_split = (int)(j+((j_prime - j)/2))
        #print(i, j, i_prime, j_prime, "<-- would report")#report (but also is this where I wanna do it?)
        #print(i_star + i, j_split, "SHOULD REPORT")

        left = hirschberg_recurse(v, w, delta, i, j, i_star + i, j_split)
        right = hirschberg_recurse(v, w, delta, i_star + i, j_split, i_prime, j_prime)
        #print("finished", i, j, i_prime, j_prime)
        #just make them nodes here before returning them? 
        tree = TreeNode((i, j, i_prime, j_prime), i_star, j_split, left, right)
        return tree #[((i, j, i_prime, j_prime), i_star + i, j_split), left, right]
    else: #base case
        #print("finished", i, j, i_prime, j_prime)
        #print(i, j, i_prime, j_prime, "<-- would report")
        leaf = TreeNode((i, j, i_prime, j_prime), '-', '-')
        return leaf #[(i, j, i_prime, j_prime), '-', '-']


def short_align(v_sub, w_sub):
    if len(v_sub) == 0:
        return "-"*len(w_sub), w_sub
    if len(w_sub) == 0:
        return v_sub, "-"*len(v_sub)
    if len(v_sub) == 1 and len(w_sub)==0:
        return v_sub, w_sub #match or miss match 
    v_align = []
    w_align = []
    #there has gotta be a more efficent way to do this... plus i feel like it might overwrite...
    for i in range(len(v_sub)):
        for j in range(len(w_sub)):
            if v_sub[i] == w_sub[j]:
                v_align.append(v_sub[i])
                w_align.append(w_sub[j])
                continue
            else: 
                v_align.append("-")
                w_align.append(w_sub[i])
    return "".join(v_align), "".join(w_align)
        

def traceback_hirschberg(v, w, node): 
    if node == None: 
        return ""
    i, j, i_prime, j_prime = node.indices
    #base cases: a sequence is empty
    if i_prime - i == 0:
        return "-"*(j_prime-j), w[j:j_prime]
    if j_prime - j == 0:
        return "-"*(i_prime-i), w[i:i_prime]
    
    #special case: a sequence has length of 1
    if i_prime - i == 1 or j_prime - j == 1:
        return short_align(v[i:i_prime], w[j:j_prime])
    
    #backtrace subtrees
    v_left, w_left = traceback_hirschberg(v, w, node.left_child)
    v_right, w_right = traceback_hirschberg(v, w, node.right_child)
    v_align = v_left + v_right
    w_align = w_left + w_right
    return v_align, w_align

def hirschberg(v, w, delta, i, j, i_prime, j_prime):
    nodes = hirschberg_recurse(v, w, delta, i, j, i_prime, j_prime)
    v_align, w_align = traceback_hirschberg(v, w, nodes)
    alignmet = f"{w_align}\n{v_align}"
    return alignmet

In [120]:
keys = ['A', 'C', 'T', 'G', '-']
delta = {}
for i in range(len(keys)):
    delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j]  else -1 for j in range(len(keys))])}
    
print(hirschberg("TG", "ATCG", delta, 0, 0, 2, 4))

ATCG
-T-G


In [None]:
"""
I was bored and ran your code through some other test cases and it kinda got stuck
so i ask Chatgpt to give me some idea, and these are the suggestion
Please take it as a grain of salt, but also feel free to take a look if you are stuck
"""

class TreeNode:  # Customized nodes for the Hirschberg recursion tree
    def __init__(self, indices, i_star, j_split, left_child=None, right_child=None):
        self.indices = indices
        self.i_star = i_star
        self.j_split = j_split
        self.left_child = left_child
        self.right_child = right_child

    def __repr__(self, level=0):  # Recursive tree visualization
        tree = "   " * level + f"{self.indices} ({self.i_star}, {self.j_split})\n"
        if self.left_child:
            tree += self.left_child.__repr__(level + 1)
        if self.right_child:
            tree += self.right_child.__repr__(level + 1)
        return tree


def forward(v, w, delta):
    len_w, len_v = len(w), len(v)
    # ISSUE: Originally, a full 2D matrix `M` was being allocated, which uses unnecessary memory.
    # FIX: Use a 2-column matrix to keep only the necessary columns at each step.
    M = [[0] * 2 for _ in range(len_v)]
    M[0][0] = 0

    for i in range(1, len_v):
        M[i][0] = M[i - 1][0] + delta[v[i]]['-']  # First column initialization
    for j in range(1, len_w):
        for i in range(len_v):
            if i == 0:
                # ISSUE: Referenced `M[i][j - 1]` instead of using the correct column index.
                # FIX: Use modular indexing with `% 2` to alternate between columns.
                # it is because we only saved two columns at a time, therefore we alternate between 0/1
                M[i][j % 2] = M[i][(j - 1) % 2] + delta['-'][w[j]]
            else:
                left_score = M[i][(j - 1) % 2] + delta['-'][w[j]]
                top_score = M[i - 1][j % 2] + delta[v[i]]['-']
                diag_score = M[i - 1][(j - 1) % 2] + delta[v[i]][w[j]]
                M[i][j % 2] = max(left_score, top_score, diag_score)

    # ISSUE: Returned the last column directly but with unnecessary transpose operations.
    # FIX: Return the relevant column directly by iterating over `M`.
    return [M[i][(len_w - 1) % 2] for i in range(len_v)]


def backward(v, w, delta):
    len_w, len_v = len(w), len(v)
    # ISSUE: Same as in `forward`, used a full 2D matrix unnecessarily.
    # FIX: Use a 2-column matrix to save memory.
    M = [[0] * 2 for _ in range(len_v)]
    M[-1][0] = 0

    for i in range(len_v - 2, -1, -1):
        M[i][0] = M[i + 1][0] + delta[v[i]]['-']
    for j in range(len_w - 2, -1, -1):
        for i in range(len_v - 1, -1, -1):
            if i == len_v - 1:
                M[i][j % 2] = M[i][(j + 1) % 2] + delta['-'][w[j]]
            else:
                left_score = M[i][(j + 1) % 2] + delta['-'][w[j]]
                top_score = M[i + 1][j % 2] + delta[v[i]]['-']
                diag_score = M[i + 1][(j + 1) % 2] + delta[v[i + 1]][w[j + 1]]
                M[i][j % 2] = max(left_score, top_score, diag_score)

    # FIX: Return the first column of `M` to match the `forward` function's output.
    return [M[i][0] for i in range(len_v)]


def hirschberg_recurse(v, w, delta, i, j, i_prime, j_prime):
    v_adjusted = "-" + v
    w_adjusted = "-" + w
    if j_prime - j > 1:
        col_split = j + (j_prime - j) // 2
        prefixes = forward(v_adjusted[i:i_prime + 1], w_adjusted[j:col_split + 1], delta)
        suffixes = backward(v_adjusted[i:i_prime + 1], w_adjusted[col_split:j_prime + 1], delta)
        # ISSUE: Improper calculation of `i_star` leading to index errors.
        # FIX: Use `weights.index(max(weights))` to determine the correct split point.
        weights = [p + s for p, s in zip(prefixes, suffixes)]
        i_star = weights.index(max(weights))
        left = hirschberg_recurse(v, w, delta, i, j, i_star + i, col_split)
        right = hirschberg_recurse(v, w, delta, i_star + i, col_split, i_prime, j_prime)
        return TreeNode((i, j, i_prime, j_prime), i_star, col_split, left, right)
    else:
        return TreeNode((i, j, i_prime, j_prime), '-', '-')


def short_align(v_sub, w_sub):
    if len(v_sub) == 0:
        return "-" * len(w_sub), w_sub
    if len(w_sub) == 0:
        return v_sub, "-" * len(v_sub)
    # ISSUE: Nested loops with overwriting caused alignment issues.
    # FIX: Use a single loop with `zip` to compare characters directly.
    v_align, w_align = [], []
    for a, b in zip(v_sub, w_sub):
        if a == b:
            v_align.append(a)
            w_align.append(b)
        else:
            v_align.append(a)
            w_align.append(b)
    return "".join(v_align), "".join(w_align)


def traceback_hirschberg(v, w, node):
    if node is None:
        return "", ""
    i, j, i_prime, j_prime = node.indices
    if i_prime - i == 0:
        return "-" * (j_prime - j), w[j:j_prime]
    if j_prime - j == 0:
        return v[i:i_prime], "-" * (i_prime - i)
    # ISSUE: Special cases for short sequences were not handled correctly.
    # FIX: Add conditions for lengths of 1 for both sequences.
    if i_prime - i == 1 or j_prime - j == 1:
        return short_align(v[i:i_prime], w[j:j_prime])
    v_left, w_left = traceback_hirschberg(v, w, node.left_child)
    v_right, w_right = traceback_hirschberg(v, w, node.right_child)
    return v_left + v_right, w_left + w_right


def hirschberg(v, w, delta):
    root = hirschberg_recurse(v, w, delta, 0, 0, len(v), len(w))
    # ISSUE: Incorrectly concatenated alignment results.
    # FIX: Properly return the formatted alignment strings.
    v_align, w_align = traceback_hirschberg(v, w, root)
    return f"{w_align}\n{v_align}"
