In [1]:
import pandas as pd
import numpy
import random

# generate a random DNA sequence of length 10000bp
def generate_random_sequence(length):
    dna = ""
    for i in range(length):
        dna += random.choice("ACGT")
    return dna

# generate a random DNA sequence of length 10000bp
dna = generate_random_sequence(10000)

In [19]:
import sys
from typing import List, Dict, Iterable
from collections import defaultdict
import random

# ------------------------ Generate Reads ------------------------ #
def generate_reads(dna: str, read_len: int, coverage: float) -> List[str]:
    """Generates reads from a DNA string with specified read length and coverage."""
    genome_len = len(dna)
    num_reads = int(genome_len * coverage / read_len)
    reads = []
    for _ in range(num_reads):
        start_pos = random.randint(0, genome_len - read_len)
        reads.append(dna[start_pos:start_pos + read_len])
    return reads

# ------------------------ Break Reads into k-mers ------------------------ #
def break_reads_into_kmers(reads: List[str], k: int, g: int) -> List[str]:
    """Breaks a list of reads into k-mers with a gap of g."""
    kmers = []
    for read in reads:
        for i in range(0, len(read) - k + 1, g + 1):  # Step by g+1
            kmers.append(read[i:i+k])
    return kmers

# ------------------------ De Bruijn Graph from k-mers ------------------------ #
def de_bruijn_kmers(k_mers: List[str]) -> Dict[str, List[str]]:
    """Forms the de Bruijn graph of a collection of k-mers."""
    adj_list = defaultdict(list)
    for k_mer in k_mers:
        prefix = k_mer[:-1]
        suffix = k_mer[1:]
        adj_list[prefix].append(suffix)
    return dict(adj_list)

# ------------------------ Eulerian Path ------------------------ #
def extend_cycle(cycle: List[str], marked_graph: Dict[str, List[str]]) -> List[str]:
    """Extends the Eulerian cycle from a given node in the marked graph."""
    if cycle:
        cycle.pop()  # remove the repeated node at the end
        new_start_index = next(i for i, node in enumerate(cycle) if node in marked_graph)
        cycle = cycle[new_start_index:] + cycle[:new_start_index]
        cycle.append(cycle[0])  # re-add the repeated node
        current_node = cycle[-1]
    else:
        current_node = next(iter(marked_graph))  # get an arbitrary node from the graph
        cycle = [current_node]
    
    while current_node in marked_graph:
        old_node = current_node
        current_node = marked_graph[old_node].pop()
        if not marked_graph[old_node]:
            del marked_graph[old_node]  # remove the node if no more edges
        cycle.append(current_node)
    
    return cycle

def eulerian_cycle_str(g: Dict[str, List[str]]) -> List[str]:
    """Constructs an Eulerian cycle in a graph. Assumes the graph is Eulerian and connected."""
    cycle = []
    while g:
        cycle = extend_cycle(cycle, g)
    return cycle

def fix_unbalanced(g: Dict[str, List[str]]) -> tuple[str, str]:
    """Finds and fixes unbalanced nodes in the graph."""
    total_degree = defaultdict(int)
    
    for node1, adj_nodes in g.items():
        for node2 in adj_nodes:
            total_degree[node1] += 1  # Out-degree
            total_degree[node2] -= 1  # In-degree

    s, t = None, None
    for node, tot_degree in total_degree.items():
        if tot_degree == 1:
            t = node
        elif tot_degree== -1:
            s = node

    if s and t:
        g.setdefault(s, []).append(t)
    
    return s, t

def eulerian_path(g: Dict[str, List[str]]) -> List[str]:
    """Constructs an Eulerian path in a graph, assuming the graph is nearly Eulerian."""
    s, t = fix_unbalanced(g)
    cycle = eulerian_cycle_str(g)
    
    if s:
        cycle.pop()  # Remove the duplicate last node
        t_index = next(i for i, (u, v) in enumerate(zip(cycle, cycle[1:])) if u == s and v == t)
        cycle = cycle[t_index + 1:] + cycle[:t_index + 1]
    
    return cycle

# ------------------------ String Spelled by a Genome Path ------------------------ #
def genome_path(path: List[str]) -> str:
    """Forms the genome path formed by a collection of patterns."""
    if not path:
        return ""
    result = path[0]
    for i in range(1, len(path)):
        result += path[i][-1]
    return result

# ------------------------ Assemble Sequence ------------------------ #
def assemble_sequence(dna: str, read_len: int, coverage: float, k: int, g: int) -> str:
    """Assembles a DNA sequence from reads using a De Bruijn graph."""
    reads = generate_reads(dna, read_len, coverage)
    kmers = break_reads_into_kmers(reads, k, g)  # Pass g here
    graph = de_bruijn_kmers(kmers)
    path = eulerian_path(graph)
    assembled_sequence = genome_path(path)
    return assembled_sequence

# # ------------------------ Main Execution ------------------------ #
# if __name__ == "__main__":
#     dna_string = input("Enter DNA string: ")
#     read_length = int(input("Enter read length: "))
#     coverage_percentage = float(input("Enter coverage percentage (e.g., 0.5 for 50%): "))
#     kmer_length = int(input("Enter k-mer length: "))

#     assembled_dna = assemble_sequence(dna_string, read_length, coverage_percentage, kmer_length)
#     print("Assembled DNA sequence:", assembled_dna)

In [20]:
dna_string = generate_random_sequence(1000)
read_length = 20
coverage_percentage = 1.0
kmer_length = 10
gap = 1

assembled_dna = assemble_sequence(dna_string, read_length, coverage_percentage, kmer_length, gap)

StopIteration: 

In [6]:
import numpy as np

def construct_profile_hmm(alignment, alphabet, theta):
    # Preprocess the alignment
    alignment = [list(seq) for seq in alignment]
    n_seqs = len(alignment)
    seq_length = len(alignment[0])
    
    # Count the number of non-gap characters in each column
    col_counts = [sum(1 for seq in alignment if seq[i] != '-') for i in range(seq_length)]
    
    # Determine which columns are matches (M) and which are inserts (I)
    match_columns = [i for i, count in enumerate(col_counts) if count / n_seqs > theta]
    
    # Initialize transition and emission matrices
    n_match_states = len(match_columns)
    n_states = 2 * n_match_states + 3  # S, I0, M1, D1, I1, ..., Mn, Dn, In, E
    transitions = np.zeros((n_states, n_states))
    emissions = np.zeros((n_states, len(alphabet)))
    
    # Helper function to get state index
    def state_index(state_type, state_num):
        if state_type == 'S':
            return 0
        elif state_type == 'I':
            return 2 * state_num + 1
        elif state_type == 'M':
            return 2 * state_num
        elif state_type == 'D':
            return 2 * state_num + 1
        elif state_type == 'E':
            return n_states - 1
    
    # Count transitions and emissions
    for seq in alignment:
        prev_state = 'S'
        prev_state_num = 0
        
        for i, char in enumerate(seq):
            if i in match_columns:
                state_num = match_columns.index(i) + 1
                if char == '-':
                    state_type = 'D'
                else:
                    state_type = 'M'
                    emissions[state_index('M', state_num)][alphabet.index(char)] += 1
            else:
                state_num = sum(1 for col in match_columns if col < i)
                if char != '-':
                    state_type = 'I'
                    emissions[state_index('I', state_num)][alphabet.index(char)] += 1
                else:
                    continue  # Skip gaps in insert columns
            
            transitions[state_index(prev_state, prev_state_num)][state_index(state_type, state_num)] += 1
            prev_state, prev_state_num = state_type, state_num
        
        # Transition to end state
        transitions[state_index(prev_state, prev_state_num)][state_index('E', 0)] += 1
    
    # Normalize transitions and emissions
    for i in range(n_states):
        row_sum = np.sum(transitions[i])
        if row_sum > 0:
            transitions[i] /= row_sum
    
    for i in range(n_states):
        row_sum = np.sum(emissions[i])
        if row_sum > 0:
            emissions[i] /= row_sum
    
    return transitions, emissions

def format_matrix(matrix, row_labels, col_labels):
    output = "\t" + "\t".join(col_labels) + "\n"
    for i, row in enumerate(matrix):
        output += f"{row_labels[i]}\t" + "\t".join(f"{x:.3f}" if x > 0 else "0" for x in row) + "\n"
    return output

# Main function to process input and generate output
def main():
    # Hardcoded input
    input_data = """0.289
--------
A B C D E
--------
EBA
E-D
EB-
EED
EBD
EBE
E-D
"""

    # Process input
    lines = input_data.strip().split('\n')
    theta = float(lines[0])
    alphabet = lines[2].split()
    alignment = [line.strip() for line in lines[4:]]

    # Construct the profile HMM
    transitions, emissions = construct_profile_hmm(alignment, alphabet, theta)

    # Prepare labels for output
    n_match_states = (transitions.shape[0] - 3) // 2
    state_labels = ['S', 'I0'] + sum([[f'M{i}', f'D{i}', f'I{i}'] for i in range(1, n_match_states + 1)], []) + ['E']

    # Format and print the output
    print(format_matrix(transitions, state_labels, state_labels))
    print("--------")
    print(format_matrix(emissions, state_labels, alphabet))

if __name__ == "__main__":
    main()

	S	I0	M1	D1	I1	M2	D2	I2	M3	D3	I3	E
S	0	0	1.000	0	0	0	0	0	0
I0	0	0	0	0	0	0	0	0	0
M1	0	0	0	0	0.714	0.286	0	0	0
D1	0	0	0	0	0	0	0	0	0
I1	0	0	0	0	0	0	0.800	0.200	0
M2	0	0	0	0	0	0	1.000	0	0
D2	0	0	0	0	0	0	0	0	1.000
I2	0	0	0	0	0	0	0	0	1.000
M3	0	0	0	0	0	0	0	0	0

--------
	A	B	C	D	E
S	0	0	0	0	0
I0	0	0	0	0	0
M1	0	0	0	0	1.000
D1	0	0	0	0	0
I1	0	0.800	0	0	0.200
M2	0	0	0	0	0
D2	0.167	0	0	0.667	0.167
I2	0	0	0	0	0
M3	0	0	0	0	0

