## Chapter 3: Genome Assembly 
this chapter assumes no errors in sequencing and perfect coverage

`kmer_composition` is giving out kmers as an iterable from text

In [3]:
from typing import List, Dict, Iterable
def kmer_composition(text: str, k: int) -> Iterable[str]:
    return [text[i:i+k] for i in range(len(text)-k+1)]

In [4]:
kmer_composition('CAATCCAAC',5)

['CAATC', 'AATCC', 'ATCCA', 'TCCAA', 'CCAAC']

`genome_path` constructs a string out of pre-arranged kmers

In [9]:
def genome_path(path: List[str]) -> str:
    return path[0] + ''.join(kmer[-1] for kmer in path[1:])

In [10]:
genome_path(['ACCGA','CCGAA','CGAAG','GAAGC','AAGCT'])

'ACCGAAGCT'

`overlap_graph` gives out a dict rep of adjacency matrix of overlapping kmers (kmers overlap if the first one's suffix is equal to the second's suffix). The same node can have self-loop

In [30]:
def overlap_graph(patterns: List[str]) -> Dict[str, List[str]]:
    overlap_dict = {}
    for kmer_pre in patterns:
        for kmer_post in patterns:
            if is_overlap(kmer_pre,kmer_post): #if 2 kmers in kmer^2 overlap, added to overlap_dict
                if kmer_pre not in overlap_dict:
                    overlap_dict[kmer_pre] = [kmer_post]
                elif (kmer_post not in overlap_dict[kmer_pre]):
                    overlap_dict[kmer_pre].append(kmer_post)
    return overlap_dict

In [18]:
def is_overlap(kmer_pre:str, kmer_post:str)-> bool: #checks whether 2 kmers overlap
    return kmer_pre[1:]==kmer_post[:-1]

In [31]:
overlap_graph(['ACT','CTT','TTT'])

{'ACT': ['CTT'], 'CTT': ['TTT'], 'TTT': ['TTT']}

In [32]:
overlap_graph(['AAG','AGA','ATT','CTA','CTC','GAT','TAC','TCT','TCT','TTC'])

{'AAG': ['AGA'],
 'AGA': ['GAT'],
 'ATT': ['TTC'],
 'CTA': ['TAC'],
 'CTC': ['TCT'],
 'GAT': ['ATT'],
 'TCT': ['CTA', 'CTC'],
 'TTC': ['TCT']}

Hamiltonian path traversing each node exactly once

`de_bruijn_string` forms an adj matrix using dict for de bruijn graph (nodes are overlap and edges are kmers)

In [40]:
def de_bruijn_string(text: str, k: int) -> Dict[str, List[str]]:
    bruijn_dict = {}
    for i in range(len(text)-k+1):
        overlap_pre = text[i:i+k-1] #k-1 mer
        overlap_post = text[i+1:i+k]
        if overlap_pre not in bruijn_dict:
            bruijn_dict[overlap_pre] = [overlap_post]
            continue
        bruijn_dict[overlap_pre].append(overlap_post)
    return bruijn_dict
    

In [41]:
de_bruijn_string('ACGTGTATA',3)

{'AC': ['CG'],
 'CG': ['GT'],
 'GT': ['TG', 'TA'],
 'TG': ['GT'],
 'TA': ['AT'],
 'AT': ['TA']}

Using input kmers, `de_bruijn_kmers` uses 

In [46]:
def de_bruijn_kmers(k_mers: List[str]) -> Dict[str, List[str]]:
    de_bruijn = {}
    for kmer in k_mers:
        if kmer[:-1] not in de_bruijn:
            de_bruijn[kmer[:-1]]=[kmer[1:]]
            continue
        de_bruijn[kmer[:-1]].append(kmer[1:])
    return de_bruijn

In [47]:
de_bruijn_kmers(["GAGG","CAGG","GGGG","GGGA","CAGG","AGGG","GGAG"])

{'GAG': ['AGG'],
 'CAG': ['AGG', 'AGG'],
 'GGG': ['GGG', 'GGA'],
 'AGG': ['GGG'],
 'GGA': ['GAG']}

- Eulerian path:graph traversing each edge exactly once
- Eulerian cycle: each edge once and then return to same node
- Eulerian graph needs to be (1) balance (in(v)=out(v) for all nodes) (2) strongly connected
^ analogus to de_bruijn
Hamiltonian is similar to overlap_graph

In [97]:
import random
def eulerian_cycle(g: Dict[int, List[int]]) -> Iterable[int]:
    if not g:
        return []  # Return an empty cycle for an empty graph

    # Start with a random walk to form the initial cycle
    cycle = make_cycle(random.choice(list(g.keys())), g)

    # Continue until all edges are used
    while any(g.values()):
        for i, node in enumerate(cycle):
            if g[node]:  # If there are unused edges from this node
                new_cycle = make_cycle(node, g)
                cycle = cycle[:i] + new_cycle + cycle[i+1:]
                break

    return cycle


In [96]:
def make_cycle(start: int, g: Dict[int, List[int]]) -> List[int]:
    cycle = [start]
    while True:
        if not g[start]:  # No more edges to follow from the current node
            break
        next_node = g[start].pop()  # Remove and return the last item
        cycle.append(next_node)
        start = next_node

    return cycle

In [98]:
eulerian_cycle({0:[3],1:[0],2:[1,6],3:[2],4:[2],5:[4],6:[5,8],7:[9],8:[7],9:[6]})

[9, 6, 5, 4, 2, 1, 0, 3, 2, 6, 8, 7, 9]

Using the Eulerian Cycle function to find Eulerian Path. In a eulerian path 2 nodes at max have to be unbalanced. We can create an artificial edge between them to find a cycle, and remove it to have found the path

In [None]:
import random
def eulerian_path(g: Dict[int, List[int]]) -> Iterable[int]:
    Unbalanced_nodes = unabalanced_nodes(g)
    #print("unabalanced_nodes",Unbalanced_nodes)
    if Unbalanced_nodes[0] not in g:
        g[Unbalanced_nodes[0]] = [Unbalanced_nodes[1]]
    else:
        g[Unbalanced_nodes[0]].append(Unbalanced_nodes[1])
    path = eulerian_cycle(g)
    #print(path)
    return rearrange_path(Unbalanced_nodes, path)
def unabalanced_nodes(g: Dict[int,List[int]]) -> List[int]: #returns a pair of nodes that are unbalanced where ind 0 has extra outgoing edge and ind 1 has extra incoming edge
    unabalanced_nodes = []
    in_degree = {node: 0 for node in g}
    out_degree = {node:len(edges) for node,edges in g.items()}

    for edges in g.values():
        for node in edges:
            if node in g:
                in_degree[node]+=1
            else:
                in_degree[node]=1
                out_degree[node]=0
    #print(out_degree,in_degree)
    for node in in_degree:
        #print(node)
        if in_degree[node]<out_degree[node]:
            unabalanced_nodes.insert(1,node)
        elif in_degree[node]>out_degree[node]:
            unabalanced_nodes.insert(0,node)
    
    return unabalanced_nodes
def rearrange_path(edge: List[int],arr: List[int]) -> List[int]:
# Find indices of the edge elements
    edge_indices = []
    for i in range(len(arr) - 1):
        if arr[i] == edge[0] and arr[i + 1] == edge[1]:
            edge_indices.append(i)
    #print(edge_indices)
    return arr[edge_indices[0]+1:] + arr[:edge_indices[0]+1]

def eulerian_path(g: Dict[int, List[int]]) -> Iterable[int]:
    Unbalanced_nodes = unabalanced_nodes(g)
    print("unabalanced_nodes",Unbalanced_nodes)
    if Unbalanced_nodes[0] not in g:
        g[Unbalanced_nodes[0]] = [Unbalanced_nodes[1]]
    else:
        g[Unbalanced_nodes[0]].append(Unbalanced_nodes[1])
    path = eulerian_cycle(g)
    print(path)
    return rearrange_path(Unbalanced_nodes, path)
def unabalanced_nodes(g: Dict[int,List[int]]) -> List[int]: #returns a pair of nodes that are unbalanced where ind 0 has extra outgoing edge and ind 1 has extra incoming edge
    unabalanced_nodes = []
    in_degree = {node: 0 for node in g}
    out_degree = {node:len(edges) for node,edges in g.items()}

    for edges in g.values():
        for node in edges:
            if node in g:
                in_degree[node]+=1
            else:
                in_degree[node]=1
                out_degree[node]=0
    #print(out_degree,in_degree)
    for node in in_degree:
        #print(node)
        if in_degree[node]<out_degree[node]:
            unabalanced_nodes.insert(1,node)
        elif in_degree[node]>out_degree[node]:
            unabalanced_nodes.insert(0,node)
    
    return unabalanced_nodes
def rearrange_path(edge: List[int],arr: List[int]) -> List[int]:
# Find indices of the edge elements
    edge_indices = []
    for i in range(len(arr) - 1):
        if arr[i] == edge[0] and arr[i + 1] == edge[1]:
            edge_indices.append(i)
    #print(edge_indices)
    return arr[edge_indices[0]+1:-1] + arr[:edge_indices[0]+1]
import random
def eulerian_cycle(g: Dict[int, List[int]]) -> Iterable[int]:
    if not g:
        return []  # Return an empty cycle for an empty graph

    # Start with a random walk to form the initial cycle
    start = random.choice(list(g.keys()))
    cycle = [start]
    while True:
        if not g[start]:  # No more edges to follow from the current node
            break
        next_node = g[start].pop()  # Remove and return the last item
        cycle.append(next_node)
        start = next_node

    # Continue until all edges are used
    while any(g.values()):
        for i, node in enumerate(cycle):
            if g[node]:  # If there are unused edges from this node
                #new_cycle, g = make_cycle(node, g)
                start = node
                new_cycle = [start]
                while True:
                    if not g[start]:  # No more edges to follow from the current node
                        break
                    next_node = g[start].pop()  # Remove and return the last item
                    new_cycle.append(next_node)
                    start = next_node
                print("new_cycle",new_cycle)
                cycle = cycle[:i] + new_cycle + cycle[i+1:]
                #cycle = new_cycle
                print("cycle",cycle)
                break

    return cycle


In [232]:
def eulerian_path(g: Dict[int, List[int]]) -> Iterable[int]:
    Unbalanced_nodes = unabalanced_nodes(g)
    #print("unabalanced_nodes",Unbalanced_nodes)
    if Unbalanced_nodes[0] not in g:
        g[Unbalanced_nodes[0]] = [Unbalanced_nodes[1]]
    else:
        g[Unbalanced_nodes[0]].append(Unbalanced_nodes[1])
    path = eulerian_cycle(g)
    #print(path)
    return rearrange_path(Unbalanced_nodes, path)


In [201]:
def unabalanced_nodes(g: Dict[int,List[int]]) -> List[int]: #returns a pair of nodes that are unbalanced where ind 0 has extra outgoing edge and ind 1 has extra incoming edge
    unabalanced_nodes = []
    in_degree = {node: 0 for node in g}
    out_degree = {node:len(edges) for node,edges in g.items()}

    for edges in g.values():
        for node in edges:
            if node in g:
                in_degree[node]+=1
            else:
                in_degree[node]=1
                out_degree[node]=0
    #print(out_degree,in_degree)
    for node in in_degree:
        #print(node)
        if in_degree[node]<out_degree[node]:
            unabalanced_nodes.insert(1,node)
        elif in_degree[node]>out_degree[node]:
            unabalanced_nodes.insert(0,node)
    
    return unabalanced_nodes

In [197]:
def rearrange_path(edge: List[int],arr: List[int]) -> List[int]:
# Find indices of the edge elements
    edge_indices = []
    for i in range(len(arr) - 1):
        if arr[i] == edge[0] and arr[i + 1] == edge[1]:
            edge_indices.append(i)
    #print(edge_indices)
    return arr[edge_indices[0]+1:] + arr[:edge_indices[0]+1]
    

In [196]:
def remove_consecutive_duplicates(arr):
    if not arr:
        return []

    # Initialize the result list with the first element of the input array
    result = [arr[0]]

    # Iterate through the array starting from the second element
    for i in range(1, len(arr)):
        if arr[i] != arr[i - 1]:
            result.append(arr[i])

    return result

In [299]:
def eulerian_path(g: Dict[int, List[int]]) -> Iterable[int]:
    Unbalanced_nodes = unabalanced_nodes(g)
    if len(Unbalanced_nodes)==0:
        return eulerian_cycle(g)
    print("unabalanced_nodes",Unbalanced_nodes)
    if Unbalanced_nodes[0] not in g:
        g[Unbalanced_nodes[0]] = [Unbalanced_nodes[1]]
    else:
        g[Unbalanced_nodes[0]].append(Unbalanced_nodes[1])
    path = eulerian_cycle(g)
    print(path)
    return rearrange_path(Unbalanced_nodes, path)


In [194]:
def unabalanced_nodes(g: Dict[int,List[int]]) -> List[int]: #returns a pair of nodes that are unbalanced where ind 0 has extra outgoing edge and ind 1 has extra incoming edge
    unabalanced_nodes = []
    in_degree = {node: 0 for node in g}
    out_degree = {node:len(edges) for node,edges in g.items()}

    for edges in g.values():
        for node in edges:
            if node in g:
                in_degree[node]+=1
            else:
                in_degree[node]=1
                out_degree[node]=0
    #print(out_degree,in_degree)
    for node in in_degree:
        #print(node)
        if in_degree[node]<out_degree[node]:
            unabalanced_nodes.insert(1,node)
        elif in_degree[node]>out_degree[node]:
            unabalanced_nodes.insert(0,node)
    
    return unabalanced_nodes

In [274]:
def rearrange_path(edge: List[int],arr: List[int]) -> List[int]:
# Find indices of the edge elements
    edge_indices = []
    for i in range(len(arr) - 1):
        if arr[i] == edge[0] and arr[i + 1] == edge[1]:
            edge_indices.append(i)
    #print(edge_indices)
    return arr[edge_indices[0]+1:-1] + arr[:edge_indices[0]+1]
import random

In [266]:
def eulerian_cycle(g: Dict[int, List[int]]) -> Iterable[int]:
    if not g:
        return []  # Return an empty cycle for an empty graph

    # Start with a random walk to form the initial cycle
    start = random.choice(list(g.keys()))
    cycle = [start]
    while True:
        if not g[start]:  # No more edges to follow from the current node
            break
        next_node = g[start].pop()  # Remove and return the last item
        cycle.append(next_node)
        start = next_node

    # Continue until all edges are used
    while any(g.values()):
        for i, node in enumerate(cycle):
            if g[node]:  # If there are unused edges from this node
                #new_cycle, g = make_cycle(node, g)
                start = node
                new_cycle = [start]
                while True:
                    if not g[start]:  # No more edges to follow from the current node
                        break
                    next_node = g[start].pop()  # Remove and return the last item
                    new_cycle.append(next_node)
                    start = next_node
                print("new_cycle",new_cycle)
                cycle = cycle[:i] + new_cycle + cycle[i+1:]
                #cycle = new_cycle
                print("cycle",cycle)
                break

    return cycle

In [247]:
def make_cycle(start: int, g: Dict[int, List[int]]):
    cycle = [start]
    while True:
        if not g[start]:  # No more edges to follow from the current node
            break
        next_node = g[start].pop()  # Remove and return the last item
        cycle.append(next_node)
        start = next_node

    return cycle, g

In [190]:
def remove_consecutive_duplicates(arr):
    if not arr:
        return []

    # Initialize the result list with the first element of the input array
    result = [arr[0]]

    # Iterate through the array starting from the second element
    for i in range(1, len(arr)):
        if arr[i] != arr[i - 1]:
            result.append(arr[i])

    return result

In [161]:
rearrange_path([9,8],[2, 6, 8, 9, 8, 7, 9, 6, 5, 4, 2, 1, 0, 3, 2])

[8, 7, 9, 6, 5, 4, 2, 1, 0, 3, 2, 2, 6, 8, 9]

In [156]:
eulerian_path({0:[3],1:[0],2:[1,6],3:[2],4:[2],5:[4],6:[5,8],7:[9],8:[7,9],9:[6]})

[8, 7, 9, 6, 5, 4, 2, 1, 1, 0, 3, 2, 6, 8, 9]

In [187]:
g = {0:[2],1:[3],2:[1],3:[0,4],6:[3,7],7:[8],8:[9],9:[6]}
eulerian_path(g)

[6, 7, 8, 9, 6, 3, 0, 2, 1, 3, 3, 4]

Using Eulerian Path Algo to do String Reconstruction
`StringReconstruction(Patterns)
    dB ← DeBruijn(Patterns)
    path ← EulerianPath(dB)
    Text﻿ ← PathToGenome(path)
    return Text`

In [240]:
def string_reconstruction(patterns: List[str], k: int) -> str:
    dB = de_bruijn_kmers(patterns)
    print(dB)
    path = eulerian_path(dB)
    text = genome_path(path)
    return text

In [296]:
patterns = ['GG','AC','GA','CT']
dB = de_bruijn_kmers(patterns)
eulerian_path(dB)
string_reconstruction(patterns,3)

unabalanced_nodes ['T', 'G']
new_cycle ['G', 'G']
cycle ['C', 'T', 'G', 'G', 'A', 'C']
['C', 'T', 'G', 'G', 'A', 'C']
{'G': ['G', 'A'], 'A': ['C'], 'C': ['T']}
unabalanced_nodes ['T', 'G']
new_cycle ['G', 'G']
cycle ['T', 'G', 'G', 'A', 'C', 'T']
['T', 'G', 'G', 'A', 'C', 'T']


'GGACT'

In [289]:
string_reconstruction(['ACG','CGT','GTG','TGT','GTA','TAT','ATA'],3)

{'AC': ['CG'], 'CG': ['GT'], 'GT': ['TG', 'TA'], 'TG': ['GT'], 'TA': ['AT'], 'AT': ['TA']}
unabalanced_nodes ['TA', 'AC']
new_cycle ['TA', 'AT', 'TA']
cycle ['TG', 'GT', 'TA', 'AT', 'TA', 'AC', 'CG', 'GT', 'TG']
['TG', 'GT', 'TA', 'AT', 'TA', 'AC', 'CG', 'GT', 'TG']


'ACGTGTATA'

In [300]:
string_reconstruction(["ACG","CGT","GTA","TAC"],3)

{'AC': ['CG'], 'CG': ['GT'], 'GT': ['TA'], 'TA': ['AC']}


'TACGTA'

`k_uniersal_string` solves the string reconstruction problem but for k-universal binary strings instead.

In [334]:
def k_universal_string(k: int) -> str:
    k_binary = k_binary_str(k)
    dB= de_bruijn_kmers(k_binary)
    path = eulerian_path(dB)
    binary_str = "".join([b[-1] for b in path[:-1]])
    return binary_str

In [313]:
#gives list of binary strings
def k_binary_str(k: int)-> List[str]:
    if k==1:
        return ['0','1']
    previous_binary = k_binary_str(k-1)
    k_binary = [binary+'0' for binary in previous_binary] + [binary+'1' for binary in previous_binary]
    return k_binary

In [338]:
k_universal_string(3)

'01110100'