In [1]:
import random
import toyplot
from collections import defaultdict
from typing import List, Set, Tuple, Dict
import numpy as np

##### Get Kmers and their counts from a given sequence

In [2]:
def get_kmer_count_from_sequence(sequence, k=3, cyclic=True):
    """
    Returns dictionary with keys representing all possible kmers in a sequence
    and values counting their occurrence in the sequence.
    """
    # dict to store kmers
    kmers = {}
    
    # count how many times each occurred in this sequence (treated as cyclic)
    for i in range(0, len(sequence)):
        kmer = sequence[i:i + k]
        
        # for cyclic sequence get kmers that wrap from end to beginning
        length = len(kmer)
        if cyclic:
            if len(kmer) != k:
                kmer += sequence[:(k - length)]
        
        # if not cyclic then skip kmers at end of sequence
        else:
            if len(kmer) != k:
                continue
        
        # count occurrence of this kmer in sequence
        if kmer in kmers:
            kmers[kmer] += 1
        else:
            kmers[kmer] = 1
    
    return kmers

##### Get the De Bruijn graph from the Kmers

In [4]:
# def get_debruijn_edges_from_kmers(kmers):
#     """
#     Every possible (k-1)mer (n-1 suffix and prefix of kmers) is assigned
#     to a node, and we connect one node to another if the (k-1)mer overlaps 
#     another. Nodes are (k-1)mers, edges are kmers.
#     """
#     # store edges as tuples in a set
#     edges = set()
    
#     # compare each (k-1)mer
#     for k1 in kmers:
#         for k2 in kmers:
#             if k1 != k2:            
#                 # if they overlap then add to edges
#                 if k1[1:] == k2[:-1]:
#                     edges.add((k1[:-1], k2[:-1]))
#                 if k1[:-1] == k2[1:]:
#                     edges.add((k2[:-1], k1[:-1]))

#     return edges
def get_debruijn_edges_from_kmers(kmers):
    """
    Modified to return both edges and the original kmers that created them.
    Every kmer creates one edge connecting its (k-1) prefix to its (k-1) suffix.
    """
    # store edges as tuples with the full kmer that created them
    edges = []
    
    # create edge for each kmer
    for kmer in kmers:
        # connect prefix to suffix, store original kmer
        edges.append((kmer[:-1], kmer[1:], kmer))
        
    return edges

##### Plot the De Bruijn graph

In [5]:
def plot_debruijn_graph(edges, width=500, height=500):
    "returns a toyplot graph from an input of edges"
    graph = toyplot.graph(
        [i[0] for i in edges],
        [i[1] for i in edges],
        width=width,
        height=height,
        tmarker=">", 
        vsize=25,
        vstyle={"stroke": "black", "stroke-width": 2, "fill": "white"},
        vlstyle={"font-size": "11px"},
        estyle={"stroke": "black", "stroke-width": 2},
        layout=toyplot.layout.FruchtermanReingold(edges=toyplot.layout.CurvedEdges()))
    return graph

##### Generate a random sequence of DNA

In [6]:
def random_sequence(seqlen):
    "Generate a random DNA sequence of a given length "
    return "".join([random.choice("ACGT") for i in range(seqlen)])

##### Find all possible reconstructions of a sequence from its Kmers

In [7]:
def find_all_reconstructions(edges: List[Tuple[str, str, str]], k: int) -> List[str]:
    """
    Finds all possible sequence reconstructions from a de Bruijn graph using all possible Euler paths.
    
    Args:
        edges: List of tuples representing edges in the de Bruijn graph
            Each edge is a tuple of (prefix, suffix, original_kmer)
        k: Length of original kmers
    
    Returns:
        List[str]: List of all possible reconstructed sequences
    """
    def build_graph(edges: List[Tuple[str, str, str]]) -> Dict[str, List[Tuple[str, str]]]:
        """Creates adjacency list representation of the graph, preserving original kmers."""
        graph = defaultdict(list)
        for prefix, suffix, kmer in edges:
            graph[prefix].append((suffix, kmer))
        return graph
    
    def calculate_degrees(edges: List[Tuple[str, str, str]]) -> Tuple[Dict[str, int], Dict[str, int]]:
        """Calculates in-degree and out-degree for all nodes."""
        in_degree = defaultdict(int)
        out_degree = defaultdict(int)
        for prefix, suffix, _ in edges:
            out_degree[prefix] += 1
            in_degree[suffix] += 1
        return in_degree, out_degree
    
    def find_start_nodes(graph: Dict[str, List[Tuple[str, str]]], 
                        edges: List[Tuple[str, str, str]]) -> List[str]:
        """Finds all possible valid start nodes for Euler paths."""
        # For cyclic sequences, any node can be a start
        return list(set(prefix for prefix, _, _ in edges))
    
    def find_euler_paths(graph: Dict[str, List[Tuple[str, str]]], start: str, 
                        target_length: int, path: List[str] = None,
                        kmers_used: List[str] = None) -> List[List[str]]:
        """
        Recursively finds all possible Euler paths starting from given node.
        Tracks original kmers used to ensure correct sequence length.
        """
        if path is None:
            path = [start]
            kmers_used = []
            
        # Check if we've reached target length
        if len(kmers_used) == target_length:
            return [kmers_used]
            
        current = path[-1]
        if not graph[current]:
            return []
            
        # Try all possible next nodes
        all_paths = []
        neighbors = graph[current].copy()  # Copy to avoid modifying during iteration
        
        for next_node, kmer in neighbors:
            # Remove this edge
            graph[current].remove((next_node, kmer))
            
            # Recursively find all paths from next node
            new_paths = find_euler_paths(graph, next_node, target_length,
                                       path + [next_node],
                                       kmers_used + [kmer])
            all_paths.extend(new_paths)
            
            # Restore the edge for backtracking
            graph[current].append((next_node, kmer))
            
        return all_paths
    
    def reconstruct_from_kmers(kmers: List[str]) -> str:
        """Reconstructs sequence from ordered list of kmers."""
        if not kmers:
            return ""
        sequence = kmers[0]
        for i in range(1, len(kmers)):
            sequence += kmers[i][-1]
        return sequence

    # Main reconstruction logic
    graph = build_graph(edges)
    start_nodes = find_start_nodes(graph, edges)
    
    # Calculate target length based on number of kmers needed
    # For cyclic sequence with k-length kmers, need len(sequence) kmers
    target_length = len(set(kmer for _, _, kmer in edges))
    
    # Find all possible reconstructions from all possible start nodes
    all_reconstructions = set()
    for start_node in start_nodes:
        # Create a copy of the graph for each start node
        graph_copy = {k: v.copy() for k, v in graph.items()}
        kmer_paths = find_euler_paths(graph_copy, start_node, target_length)
        
        # Convert kmer paths to sequences
        for kmer_path in kmer_paths:
            sequence = reconstruct_from_kmers(kmer_path)
            if len(sequence) == target_length + k - 1:  # Ensure correct length
                all_reconstructions.add(sequence)
    
    return sorted(list(all_reconstructions))

##### Verify and score the reconstructions (WIP)

In [8]:
def verify_reconstructions(original: str, reconstructions: List[str], k: int) -> List[bool]:
    """
    Verifies if each reconstructed sequence has the same k-mer composition
    and length as the original sequence.
    """
    original_kmers = get_kmer_count_from_sequence(original, k=k, cyclic=True)
    return [
        len(recon) == len(original) and
        get_kmer_count_from_sequence(recon, k=k, cyclic=True) == original_kmers 
        for recon in reconstructions
    ]

# def score_sequence(original: str, reconstructed: str, 
#                   match_score: float = 1.0, 
#                   mismatch_score: float = -0.5,
#                   length_penalty: float = -1.0) -> Tuple[float, Dict]:
#     """
#     Scores a reconstructed sequence against the original, handling unequal lengths.
    
#     Args:
#         original: Original sequence
#         reconstructed: Reconstructed sequence to score
#         match_score: Score for matching bases (default: 1.0)
#         mismatch_score: Score for mismatching bases (default: -0.5)
#         length_penalty: Points deducted per base of length difference (default: -1.0)
    
#     Returns:
#         Tuple[float, Dict]: (score, details)
#             score: Final score including length penalty
#             details: Dictionary with scoring details
#     """
#     # Get length of sequence to compare
#     min_length = min(len(original), len(reconstructed))
#     length_diff = abs(len(original) - len(reconstructed))
    
#     # Score the overlapping portion
#     matches = sum(1 for i in range(min_length) if original[i] == reconstructed[i])
#     mismatches = min_length - matches
    
#     # Calculate base score
#     base_score = (matches * match_score) + (mismatches * mismatch_score)
    
#     # Apply length penalty
#     length_penalty_score = length_diff * length_penalty
#     total_score = base_score + length_penalty_score
    
#     details = {
#         'matches': matches,
#         'mismatches': mismatches,
#         'length_diff': length_diff,
#         'base_score': base_score,
#         'length_penalty': length_penalty_score,
#         'compared_length': min_length,
#         'percent_identity': (matches / min_length * 100) if min_length > 0 else 0
#     }
    
#     return total_score, details
def score_sequence(original: str, reconstructed: str, 
                  circular: bool = True,
                  match_score: float = 1.0, 
                  mismatch_score: float = -0.5,
                  length_penalty: float = -1.0) -> Tuple[float, Dict]:
    """
    Scores a reconstructed sequence against the original, handling both circular and linear sequences.
    
    Args:
        original: Original sequence
        reconstructed: Reconstructed sequence to score
        circular: Whether to treat sequences as circular (default: True)
        match_score: Score for matching bases (default: 1.0)
        mismatch_score: Score for mismatching bases (default: -0.5)
        length_penalty: Points deducted per base of length difference (default: -1.0)
    
    Returns:
        Tuple[float, Dict]: (score, details)
            score: Final score including length penalty
            details: Dictionary with scoring details
    """
    if not circular:
        # For linear sequences, just compare directly
        min_length = min(len(original), len(reconstructed))
        length_diff = abs(len(original) - len(reconstructed))
        
        matches = sum(1 for i in range(min_length) if original[i] == reconstructed[i])
        mismatches = min_length - matches
        
        base_score = (matches * match_score) + (mismatches * mismatch_score)
        length_penalty_score = length_diff * length_penalty
        total_score = base_score + length_penalty_score
        
        details = {
            'matches': matches,
            'mismatches': mismatches,
            'length_diff': length_diff,
            'base_score': base_score,
            'length_penalty': length_penalty_score,
            'compared_length': min_length,
            'percent_identity': (matches / min_length * 100) if min_length > 0 else 0,
            'mode': 'linear'
        }
        
        return total_score, details
    
    # For circular sequences, try all possible rotations
    n = len(reconstructed)
    best_score = float('-inf')
    best_details = {}
    
    for i in range(n):
        # Rotate sequence
        rotated = reconstructed[i:] + reconstructed[:i]
        # Score this rotation as if linear
        score, curr_details = score_sequence(original, rotated, 
                                          circular=False,
                                          match_score=match_score, 
                                          mismatch_score=mismatch_score, 
                                          length_penalty=length_penalty)
        
        if score > best_score:
            best_score = score
            best_details = curr_details.copy()
            best_details['rotation'] = i
            best_details['mode'] = 'circular'
            best_details['aligned_sequence'] = rotated
    
    return best_score, best_details

def score_sequence_with_rotations(original: str, reconstructed: str,
                                match_score: float = 1.0,
                                mismatch_score: float = -0.5,
                                length_penalty: float = -1.0) -> Tuple[float, int, str, Dict]:
    """
    Scores a reconstructed sequence against all possible rotations of the original.
    
    Args:
        original: Original sequence
        reconstructed: Reconstructed sequence to score
        match_score: Score for matching bases (default: 1.0)
        mismatch_score: Score for mismatching bases (default: -0.5)
        length_penalty: Points deducted per base of length difference (default: -1.0)
    
    Returns:
        Tuple[float, int, str, Dict]: (best_score, best_rotation, best_alignment, best_details)
    """
    # Try all possible rotations
    n = len(reconstructed)
    best_score = float('-inf')
    best_rotation = 0
    best_alignment = reconstructed
    best_details = {}
    
    for i in range(n):
        # Rotate sequence
        rotated = reconstructed[i:] + reconstructed[:i]
        # Score this rotation
        score, details = score_sequence(original, rotated, 
                                      match_score, mismatch_score, length_penalty)
        
        if score > best_score:
            best_score = score
            best_rotation = i
            best_alignment = rotated
            best_details = details
            
    return best_score, best_rotation, best_alignment, best_details

# def evaluate_reconstructions(original: str, reconstructions: List[str],
#                            match_score: float = 1.0,
#                            mismatch_score: float = -0.5,
#                            length_penalty: float = -1.0) -> List[Dict]:
#     """
#     Evaluates all reconstructions against the original sequence.
    
#     Args:
#         original: Original sequence
#         reconstructions: List of reconstructed sequences
#         match_score: Score for matching bases (default: 1.0)
#         mismatch_score: Score for mismatching bases (default: -0.5)
#         length_penalty: Points deducted per base of length difference (default: -1.0)
    
#     Returns:
#         List[Dict]: List of evaluation results for each reconstruction
#     """
#     evaluations = []
    
#     for recon in reconstructions:
#         # Get best score and alignment
#         score, rotation, aligned, details = score_sequence_with_rotations(
#             original, recon, match_score, mismatch_score, length_penalty)
        
#         evaluation = {
#             'sequence': recon,
#             'score': score,
#             'rotation': rotation,
#             'aligned': aligned,
#             'length': len(recon),
#             'length_diff': details['length_diff'],
#             'matches': details['matches'],
#             'mismatches': details['mismatches'],
#             'base_score': details['base_score'],
#             'length_penalty': details['length_penalty'],
#             'percent_identity': details['percent_identity']
#         }
        
#         evaluations.append(evaluation)
    
#     # Sort evaluations by score, descending
#     evaluations.sort(key=lambda x: x['score'], reverse=True)
#     return evaluations
def evaluate_reconstructions(original: str, 
                           reconstructions: List[str],
                           circular: bool = True,
                           match_score: float = 1.0,
                           mismatch_score: float = -0.5,
                           length_penalty: float = -1.0) -> List[Dict]:
    """
    Evaluates all reconstructions against the original sequence.
    
    Args:
        original: Original sequence
        reconstructions: List of reconstructed sequences
        circular: Whether to treat sequences as circular (default: True)
        match_score: Score for matching bases (default: 1.0)
        mismatch_score: Score for mismatching bases (default: -0.5)
        length_penalty: Points deducted per base of length difference (default: -1.0)
    
    Returns:
        List[Dict]: List of evaluation results for each reconstruction
    """
    evaluations = []
    
    for recon in reconstructions:
        # Get best score and details
        score, details = score_sequence(
            original, recon, 
            circular=circular,
            match_score=match_score, 
            mismatch_score=mismatch_score, 
            length_penalty=length_penalty)
        
        evaluation = {
            'sequence': recon,
            'score': score,
            'length': len(recon),
            'length_diff': details['length_diff'],
            'matches': details['matches'],
            'mismatches': details['mismatches'],
            'base_score': details['base_score'],
            'length_penalty': details['length_penalty'],
            'percent_identity': details['percent_identity'],
            'mode': details['mode']
        }
        
        # Add rotation information for circular sequences
        if circular:
            evaluation['rotation'] = details['rotation']
            evaluation['aligned'] = details['aligned_sequence']
        
        evaluations.append(evaluation)
    
    # Sort evaluations by score, descending
    evaluations.sort(key=lambda x: x['score'], reverse=True)
    return evaluations

##### Usage and testing

In [12]:
# # Set random seed for reproducibility
# random.seed(42)

# # Generate random genome
# genome = random_sequence(100)
# print("Original sequence:", genome)
# print("Length:", len(genome))

# # Get k-mers and build de Bruijn graph
# k = 10
# kmers = get_kmer_count_from_sequence(genome, k=k, cyclic=True)
# edges = get_debruijn_edges_from_kmers(kmers)

# # Find all possible reconstructions
# reconstructions = find_all_reconstructions(edges, k)

# # Add some reconstructions with different lengths for demonstration
# reconstructions.extend([
#     reconstructions[0][:-2],  # Shorter
#     reconstructions[0] + "AG"  # Longer
# ])

# print(f"\nFound {len(reconstructions)} possible reconstructions")

# # Evaluate reconstructions with default scoring
# print("\nEvaluating reconstructions (match=1.0, mismatch=-0.5, length_penalty=-1.0):")
# evaluations = evaluate_reconstructions(genome, reconstructions)

# print("\nResults sorted by score:")
# print("Score | %ID  | Len∆ | Sequence")
# print("-" * 60)
# for eval in evaluations:
#     print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | {eval['length_diff']:4d} | {eval['aligned']}")
    
# # Example with different scoring
# print("\nAlternative scoring (match=2.0, mismatch=-1.0, length_penalty=-2.0):")
# alt_evaluations = evaluate_reconstructions(genome, reconstructions, 
#                                         match_score=2.0, 
#                                         mismatch_score=-1.0,
#                                         length_penalty=-2.0)

# print("\nResults with alternative scoring:")
# print("Score | %ID  | Len∆ | Sequence")
# print("-" * 60)
# for eval in alt_evaluations:
#     print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | {eval['length_diff']:4d} | {eval['aligned']}")

# # Print detailed statistics for best reconstruction
# best = evaluations[0]
# print(f"\nBest reconstruction details:")
# print(f"Score: {best['score']:.1f}")
# print(f"Base score: {best['base_score']:.1f}")
# print(f"Length penalty: {best['length_penalty']:.1f}")
# print(f"Matches: {best['matches']}")
# print(f"Mismatches: {best['mismatches']}")
# print(f"Length difference: {best['length_diff']}")
# print(f"Percent Identity: {best['percent_identity']:.1f}%")
# print(f"Rotation needed: {best['rotation']} positions")

# Set random seed for reproducibility
random.seed(42)
    
# Generate random genome
genome = random_sequence(50)
print("Original sequence:", genome)
print("Length:", len(genome))

# Get k-mers and build de Bruijn graph
k = 4
kmers = get_kmer_count_from_sequence(genome, k=k, cyclic=True)
edges = get_debruijn_edges_from_kmers(kmers)

# Find all possible reconstructions
reconstructions = find_all_reconstructions(edges, k)

# Add some reconstructions with different lengths
reconstructions.extend([
    reconstructions[0][:-2],  # Shorter
    reconstructions[0] + "AG"  # Longer
])

print(f"\nFound {len(reconstructions)} possible reconstructions")

# Evaluate with default scoring for circular DNA
print("\nEvaluating as circular DNA (match=1.0, mismatch=-0.5, length_penalty=-1.0):")
circular_evals = evaluate_reconstructions(genome, reconstructions, circular=True)

print("\nAll circular DNA results sorted by score:")
print("Score | %ID  | Len∆ | Mode     | Rot | Sequence")
print("-" * 80)
for eval in circular_evals:
    print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | "
          f"{eval['length_diff']:4d} | {eval['mode']:<8} | "
          f"{eval['rotation']:3d} | {eval['aligned']}")

# Evaluate with default scoring for linear DNA
print("\nEvaluating as linear DNA (match=1.0, mismatch=-0.5, length_penalty=-1.0):")
linear_evals = evaluate_reconstructions(genome, reconstructions, circular=False)

print("\nAll linear DNA results sorted by score:")
print("Score | %ID  | Len∆ | Mode     | Sequence")
print("-" * 80)
for eval in linear_evals:
    print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | "
          f"{eval['length_diff']:4d} | {eval['mode']:<8} | {eval['sequence']}")

# Alternative scoring example
print("\nAlternative scoring (match=2.0, mismatch=-1.0, length_penalty=-2.0):")

# Circular DNA with alternative scoring
print("\nAll circular DNA results with alternative scoring:")
alt_circular_evals = evaluate_reconstructions(genome, reconstructions, 
                                            circular=True,
                                            match_score=2.0, 
                                            mismatch_score=-1.0,
                                            length_penalty=-2.0)

print("Score | %ID  | Len∆ | Mode     | Rot | Sequence")
print("-" * 80)
for eval in alt_circular_evals:
    print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | "
          f"{eval['length_diff']:4d} | {eval['mode']:<8} | "
          f"{eval['rotation']:3d} | {eval['aligned']}")

# Linear DNA with alternative scoring
print("\nAll linear DNA results with alternative scoring:")
alt_linear_evals = evaluate_reconstructions(genome, reconstructions,
                                          circular=False,
                                          match_score=2.0,
                                          mismatch_score=-1.0,
                                          length_penalty=-2.0)

print("Score | %ID  | Len∆ | Mode     | Sequence")
print("-" * 80)
for eval in alt_linear_evals:
    print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | "
          f"{eval['length_diff']:4d} | {eval['mode']:<8} | {eval['sequence']}")

# Print detailed statistics for best reconstructions
print("\nDetailed Statistics:")
print("\nBest circular reconstruction:")
best_circular = circular_evals[0]
print(f"Score: {best_circular['score']:.1f}")
print(f"Base score: {best_circular['base_score']:.1f}")
print(f"Length penalty: {best_circular['length_penalty']:.1f}")
print(f"Matches: {best_circular['matches']}")
print(f"Mismatches: {best_circular['mismatches']}")
print(f"Length difference: {best_circular['length_diff']}")
print(f"Percent Identity: {best_circular['percent_identity']:.1f}%")
print(f"Rotation needed: {best_circular['rotation']} positions")

print("\nBest linear reconstruction:")
best_linear = linear_evals[0]
print(f"Score: {best_linear['score']:.1f}")
print(f"Base score: {best_linear['base_score']:.1f}")
print(f"Length penalty: {best_linear['length_penalty']:.1f}")
print(f"Matches: {best_linear['matches']}")
print(f"Mismatches: {best_linear['mismatches']}")
print(f"Length difference: {best_linear['length_diff']}")
print(f"Percent Identity: {best_linear['percent_identity']:.1f}%")

Original sequence: AAGCCCAATAAACCACTCTGACTGGCCGAATAGGGATATAGGCAACGACA
Length: 50

Found 6338 possible reconstructions

Evaluating as circular DNA (match=1.0, mismatch=-0.5, length_penalty=-1.0):

All circular DNA results sorted by score:
Score | %ID  | Len∆ | Mode     | Rot | Sequence
--------------------------------------------------------------------------------
 50.0 | 100.0% |    0 | circular |  19 | AAGCCCAATAAACCACTCTGACTGGCCGAATAGGGATATAGGCAACGACA
 47.0 | 96.0% |    0 | circular |  19 | AAGCCGAATAAACCACTCTGACTGGCCCAATAGGGATATAGGCAACGACA
 44.0 | 92.0% |    0 | circular |  19 | AAGCCCAATAAACCACTGACTCTGGCCGAATAGGGATATAGGCAACGACA
 41.0 | 88.0% |    0 | circular |  19 | AACGACAATAAACCACTCTGACTGGCCGAATAGGGATATAGGCAAAGCCC
 41.0 | 88.0% |    0 | circular |  19 | AAGCCGAATAAACCACTGACTCTGGCCCAATAGGGATATAGGCAACGACA
 39.5 | 86.0% |    0 | circular |  19 | AAAGCCGACAAACCACTCTGACTGGCCCAATAGGGATATAGGCAACGAAT
 39.5 | 86.0% |    0 | circular |  19 | AACCAATAAAGCCCACTCTGACTGGCCGAATAGGGATATAGGCAAC

#### Old stuff

In [None]:
import random
import toyplot

def get_kmer_count_from_sequence(sequence, k=3, cyclic=True):
    """
    Returns dictionary with keys representing all possible kmers in a sequence
    and values counting their occurrence in the sequence.
    """
    # dict to store kmers
    kmers = {}
    
    # count how many times each occurred in this sequence (treated as cyclic)
    for i in range(0, len(sequence)):
        kmer = sequence[i:i + k]
        
        # for cyclic sequence get kmers that wrap from end to beginning
        length = len(kmer)
        if cyclic:
            if len(kmer) != k:
                kmer += sequence[:(k - length)]
        
        # if not cyclic then skip kmers at end of sequence
        else:
            if len(kmer) != k:
                continue
        
        # count occurrence of this kmer in sequence
        if kmer in kmers:
            kmers[kmer] += 1
        else:
            kmers[kmer] = 1
    
    return kmers

def short_read_sequencing(sequence, nreads, readlen):
    "generate short reads from a circular genome"
    
    # do not allow reads to be longer than genome
    assert len(sequence) > readlen, "readlen must be shorter than sequence"
    
    # get random start positions of short reads
    starts = [random.randint(0, len(sequence)) for i in range(nreads)]
    
    # return reads as a list, generate reads by slicing from sequence
    reads = []
    for position in starts:
        end = position + readlen
        
        # if read extends past end then loop to beginning of sequence
        if end > len(sequence):
            read = sequence[position:len(sequence)] + sequence[0:end-len(sequence)]
        else:
            read = sequence[position:position + readlen]
        
        # append to reads list
        reads.append(read)
    return reads

def get_kmer_count_from_reads(reads, k=3):
    "Combines results of 'get_kmer_count_from_sequence()' across many reads"
   
    # a dictionary to store kmer counts in
    kmers = {}
    
    # iterate over reads
    for read in reads:
        
        # get kmer count for this read
        ikmers = get_kmer_count_from_sequence(read, k, cyclic=False)
        
        # add this kmer count to the global kmer counter across all reads
        for key, value in ikmers.items():
            if key in kmers:
                kmers[key] += value
            else:
                kmers[key] = value
                
    # return kmer counts
    return kmers

# def get_debruijn_edges_from_kmers(kmers):
#     """
#     Every possible (k-1)mer (n-1 suffix and prefix of kmers) is assigned
#     to a node, and we connect one node to another if the (k-1)mer overlaps 
#     another. Nodes are (k-1)mers, edges are kmers.
#     """
#     # store edges as tuples in a set
#     edges = set()
    
#     # compare each (k-1)mer
#     for k1 in kmers:
#         for k2 in kmers:
#             if k1 != k2:            
#                 # if they overlap then add to edges
#                 if k1[1:] == k2[:-1]:
#                     edges.add((k1[:-1], k2[:-1]))
#                 if k1[:-1] == k2[1:]:
#                     edges.add((k2[:-1], k1[:-1]))

#     return edges
def get_debruijn_edges_from_kmers(kmers):
    """
    Modified to return both edges and the original kmers that created them.
    Every kmer creates one edge connecting its (k-1) prefix to its (k-1) suffix.
    """
    # store edges as tuples with the full kmer that created them
    edges = []
    
    # create edge for each kmer
    for kmer in kmers:
        # connect prefix to suffix, store original kmer
        edges.append((kmer[:-1], kmer[1:], kmer))
        
    return edges

def plot_debruijn_graph(edges, width=500, height=500):
    "returns a toyplot graph from an input of edges"
    graph = toyplot.graph(
        [i[0] for i in edges],
        [i[1] for i in edges],
        width=width,
        height=height,
        tmarker=">", 
        vsize=25,
        vstyle={"stroke": "black", "stroke-width": 2, "fill": "white"},
        vlstyle={"font-size": "11px"},
        estyle={"stroke": "black", "stroke-width": 2},
        layout=toyplot.layout.FruchtermanReingold(edges=toyplot.layout.CurvedEdges()))
    return graph

def random_sequence(seqlen):
    "Generate a random DNA sequence of a given length "
    return "".join([random.choice("ACGT") for i in range(seqlen)])

# set a random seed 
random.seed(123)

# get a random genome sequence
genome1 = random_sequence(25)
genome1

# get kmers
kmers = get_kmer_count_from_sequence(genome1, k=3, cyclic=True)

# get db graph
edges = get_debruijn_edges_from_kmers(kmers)

# plot db graph
plot_debruijn_graph(edges, width=600, height=400);

# print the true sequence
print("the true sequence: {}".format(genome1))

In [None]:
import random
from collections import defaultdict
from typing import List, Set, Tuple, Dict
import numpy as np

def find_all_reconstructions(edges: List[Tuple[str, str, str]], k: int) -> List[str]:
    """
    Finds all possible sequence reconstructions from a de Bruijn graph using all possible Euler paths.
    
    Args:
        edges: List of tuples representing edges in the de Bruijn graph
            Each edge is a tuple of (prefix, suffix, original_kmer)
        k: Length of original kmers
    
    Returns:
        List[str]: List of all possible reconstructed sequences
    """
    def build_graph(edges: List[Tuple[str, str, str]]) -> Dict[str, List[Tuple[str, str]]]:
        """Creates adjacency list representation of the graph, preserving original kmers."""
        graph = defaultdict(list)
        for prefix, suffix, kmer in edges:
            graph[prefix].append((suffix, kmer))
        return graph
    
    def calculate_degrees(edges: List[Tuple[str, str, str]]) -> Tuple[Dict[str, int], Dict[str, int]]:
        """Calculates in-degree and out-degree for all nodes."""
        in_degree = defaultdict(int)
        out_degree = defaultdict(int)
        for prefix, suffix, _ in edges:
            out_degree[prefix] += 1
            in_degree[suffix] += 1
        return in_degree, out_degree
    
    def find_start_nodes(graph: Dict[str, List[Tuple[str, str]]], 
                        edges: List[Tuple[str, str, str]]) -> List[str]:
        """Finds all possible valid start nodes for Euler paths."""
        # For cyclic sequences, any node can be a start
        return list(set(prefix for prefix, _, _ in edges))
    
    def find_euler_paths(graph: Dict[str, List[Tuple[str, str]]], start: str, 
                        target_length: int, path: List[str] = None,
                        kmers_used: List[str] = None) -> List[List[str]]:
        """
        Recursively finds all possible Euler paths starting from given node.
        Tracks original kmers used to ensure correct sequence length.
        """
        if path is None:
            path = [start]
            kmers_used = []
            
        # Check if we've reached target length
        if len(kmers_used) == target_length:
            return [kmers_used]
            
        current = path[-1]
        if not graph[current]:
            return []
            
        # Try all possible next nodes
        all_paths = []
        neighbors = graph[current].copy()  # Copy to avoid modifying during iteration
        
        for next_node, kmer in neighbors:
            # Remove this edge
            graph[current].remove((next_node, kmer))
            
            # Recursively find all paths from next node
            new_paths = find_euler_paths(graph, next_node, target_length,
                                       path + [next_node],
                                       kmers_used + [kmer])
            all_paths.extend(new_paths)
            
            # Restore the edge for backtracking
            graph[current].append((next_node, kmer))
            
        return all_paths
    
    def reconstruct_from_kmers(kmers: List[str]) -> str:
        """Reconstructs sequence from ordered list of kmers."""
        if not kmers:
            return ""
        sequence = kmers[0]
        for i in range(1, len(kmers)):
            sequence += kmers[i][-1]
        return sequence

    # Main reconstruction logic
    graph = build_graph(edges)
    start_nodes = find_start_nodes(graph, edges)
    
    # Calculate target length based on number of kmers needed
    # For cyclic sequence with k-length kmers, need len(sequence) kmers
    target_length = len(set(kmer for _, _, kmer in edges))
    
    # Find all possible reconstructions from all possible start nodes
    all_reconstructions = set()
    for start_node in start_nodes:
        # Create a copy of the graph for each start node
        graph_copy = {k: v.copy() for k, v in graph.items()}
        kmer_paths = find_euler_paths(graph_copy, start_node, target_length)
        
        # Convert kmer paths to sequences
        for kmer_path in kmer_paths:
            sequence = reconstruct_from_kmers(kmer_path)
            if len(sequence) == target_length + k - 1:  # Ensure correct length
                all_reconstructions.add(sequence)
    
    return sorted(list(all_reconstructions))

def verify_reconstructions(original: str, reconstructions: List[str], k: int) -> List[bool]:
    """
    Verifies if each reconstructed sequence has the same k-mer composition
    and length as the original sequence.
    """
    original_kmers = get_kmer_count_from_sequence(original, k=k, cyclic=True)
    return [
        len(recon) == len(original) and
        get_kmer_count_from_sequence(recon, k=k, cyclic=True) == original_kmers 
        for recon in reconstructions
    ]

def score_sequence(original: str, reconstructed: str, 
                  match_score: float = 1.0, 
                  mismatch_score: float = -0.5,
                  length_penalty: float = -1.0) -> Tuple[float, Dict]:
    """
    Scores a reconstructed sequence against the original, handling unequal lengths.
    
    Args:
        original: Original sequence
        reconstructed: Reconstructed sequence to score
        match_score: Score for matching bases (default: 1.0)
        mismatch_score: Score for mismatching bases (default: -0.5)
        length_penalty: Points deducted per base of length difference (default: -1.0)
    
    Returns:
        Tuple[float, Dict]: (score, details)
            score: Final score including length penalty
            details: Dictionary with scoring details
    """
    # Get length of sequence to compare
    min_length = min(len(original), len(reconstructed))
    length_diff = abs(len(original) - len(reconstructed))
    
    # Score the overlapping portion
    matches = sum(1 for i in range(min_length) if original[i] == reconstructed[i])
    mismatches = min_length - matches
    
    # Calculate base score
    base_score = (matches * match_score) + (mismatches * mismatch_score)
    
    # Apply length penalty
    length_penalty_score = length_diff * length_penalty
    total_score = base_score + length_penalty_score
    
    details = {
        'matches': matches,
        'mismatches': mismatches,
        'length_diff': length_diff,
        'base_score': base_score,
        'length_penalty': length_penalty_score,
        'compared_length': min_length,
        'percent_identity': (matches / min_length * 100) if min_length > 0 else 0
    }
    
    return total_score, details

def score_sequence_with_rotations(original: str, reconstructed: str,
                                match_score: float = 1.0,
                                mismatch_score: float = -0.5,
                                length_penalty: float = -1.0) -> Tuple[float, int, str, Dict]:
    """
    Scores a reconstructed sequence against all possible rotations of the original.
    
    Args:
        original: Original sequence
        reconstructed: Reconstructed sequence to score
        match_score: Score for matching bases (default: 1.0)
        mismatch_score: Score for mismatching bases (default: -0.5)
        length_penalty: Points deducted per base of length difference (default: -1.0)
    
    Returns:
        Tuple[float, int, str, Dict]: (best_score, best_rotation, best_alignment, best_details)
    """
    # Try all possible rotations
    n = len(reconstructed)
    best_score = float('-inf')
    best_rotation = 0
    best_alignment = reconstructed
    best_details = {}
    
    for i in range(n):
        # Rotate sequence
        rotated = reconstructed[i:] + reconstructed[:i]
        # Score this rotation
        score, details = score_sequence(original, rotated, 
                                      match_score, mismatch_score, length_penalty)
        
        if score > best_score:
            best_score = score
            best_rotation = i
            best_alignment = rotated
            best_details = details
            
    return best_score, best_rotation, best_alignment, best_details

def evaluate_reconstructions(original: str, reconstructions: List[str],
                           match_score: float = 1.0,
                           mismatch_score: float = -0.5,
                           length_penalty: float = -1.0) -> List[Dict]:
    """
    Evaluates all reconstructions against the original sequence.
    
    Args:
        original: Original sequence
        reconstructions: List of reconstructed sequences
        match_score: Score for matching bases (default: 1.0)
        mismatch_score: Score for mismatching bases (default: -0.5)
        length_penalty: Points deducted per base of length difference (default: -1.0)
    
    Returns:
        List[Dict]: List of evaluation results for each reconstruction
    """
    evaluations = []
    
    for recon in reconstructions:
        # Get best score and alignment
        score, rotation, aligned, details = score_sequence_with_rotations(
            original, recon, match_score, mismatch_score, length_penalty)
        
        evaluation = {
            'sequence': recon,
            'score': score,
            'rotation': rotation,
            'aligned': aligned,
            'length': len(recon),
            'length_diff': details['length_diff'],
            'matches': details['matches'],
            'mismatches': details['mismatches'],
            'base_score': details['base_score'],
            'length_penalty': details['length_penalty'],
            'percent_identity': details['percent_identity']
        }
        
        evaluations.append(evaluation)
    
    # Sort evaluations by score, descending
    evaluations.sort(key=lambda x: x['score'], reverse=True)
    return evaluations

# Example usage:
if __name__ == "__main__":
    # Set random seed for reproducibility
    random.seed(42)
    
    # Generate random genome
    genome = random_sequence(50)
    print("Original sequence:", genome)
    print("Length:", len(genome))
    
    # Get k-mers and build de Bruijn graph
    k = 50
    kmers = get_kmer_count_from_sequence(genome, k=k, cyclic=True)
    edges = get_debruijn_edges_from_kmers(kmers)
    
    # Find all possible reconstructions
    reconstructions = find_all_reconstructions(edges, k)
    
    # Add some reconstructions with different lengths for demonstration
    reconstructions.extend([
        reconstructions[0][:-2],  # Shorter
        reconstructions[0] + "AG"  # Longer
    ])
    
    print(f"\nFound {len(reconstructions)} possible reconstructions")
    
    # Evaluate reconstructions with default scoring
    print("\nEvaluating reconstructions (match=1.0, mismatch=-0.5, length_penalty=-1.0):")
    evaluations = evaluate_reconstructions(genome, reconstructions)
    
    print("\nResults sorted by score:")
    print("Score | %ID  | Len∆ | Sequence")
    print("-" * 60)
    for eval in evaluations:
        print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | {eval['length_diff']:4d} | {eval['aligned']}")
        
    # Example with different scoring
    print("\nAlternative scoring (match=2.0, mismatch=-1.0, length_penalty=-2.0):")
    alt_evaluations = evaluate_reconstructions(genome, reconstructions, 
                                            match_score=2.0, 
                                            mismatch_score=-1.0,
                                            length_penalty=-2.0)
    
    print("\nResults with alternative scoring:")
    print("Score | %ID  | Len∆ | Sequence")
    print("-" * 60)
    for eval in alt_evaluations:
        print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | {eval['length_diff']:4d} | {eval['aligned']}")
    
    # Print detailed statistics for best reconstruction
    best = evaluations[0]
    print(f"\nBest reconstruction details:")
    print(f"Score: {best['score']:.1f}")
    print(f"Base score: {best['base_score']:.1f}")
    print(f"Length penalty: {best['length_penalty']:.1f}")
    print(f"Matches: {best['matches']}")
    print(f"Mismatches: {best['mismatches']}")
    print(f"Length difference: {best['length_diff']}")
    print(f"Percent Identity: {best['percent_identity']:.1f}%")
    print(f"Rotation needed: {best['rotation']} positions")

#### New Version (test)

In [None]:
import random
from collections import defaultdict
from typing import List, Set, Tuple, Dict
import numpy as np

def get_kmer_count_from_sequence(sequence, k=3, cyclic=True):
    """
    Returns dictionary with keys representing all possible kmers in a sequence
    and values counting their occurrence in the sequence.
    """
    # dict to store kmers
    kmers = {}
    
    # count how many times each occurred in this sequence (treated as cyclic)
    for i in range(0, len(sequence)):
        kmer = sequence[i:i + k]
        
        # for cyclic sequence get kmers that wrap from end to beginning
        length = len(kmer)
        if cyclic:
            if len(kmer) != k:
                kmer += sequence[:(k - length)]
        
        # if not cyclic then skip kmers at end of sequence
        else:
            if len(kmer) != k:
                continue
        
        # count occurrence of this kmer in sequence
        if kmer in kmers:
            kmers[kmer] += 1
        else:
            kmers[kmer] = 1
    
    return kmers

def get_debruijn_edges_from_kmers(kmers):
    """
    Modified to return both edges and the original kmers that created them.
    Every kmer creates one edge connecting its (k-1) prefix to its (k-1) suffix.
    """
    edges = []
    for kmer in kmers:
        edges.append((kmer[:-1], kmer[1:], kmer))
    return edges

def random_sequence(seqlen):
    "Generate a random DNA sequence of a given length"
    return "".join([random.choice("ACGT") for i in range(seqlen)])

def needleman_wunsch(seq1: str, seq2: str, match_score: float = 1.0, 
                    mismatch_score: float = -0.5, gap_score: float = -1.0) -> Tuple[str, str, float]:
    """
    Implements the Needleman-Wunsch global alignment algorithm.
    
    Args:
        seq1, seq2: Sequences to align
        match_score: Score for matching bases
        mismatch_score: Score for mismatching bases
        gap_score: Score for gaps
        
    Returns:
        Tuple[str, str, float]: (aligned_seq1, aligned_seq2, alignment_score)
    """
    # Initialize scoring matrix
    n, m = len(seq1), len(seq2)
    score_matrix = np.zeros((n + 1, m + 1))
    
    # Initialize traceback matrix
    traceback = np.zeros((n + 1, m + 1), dtype=str)
    
    # Initialize first row and column
    for i in range(n + 1):
        score_matrix[i, 0] = i * gap_score
        traceback[i, 0] = 'up'
    for j in range(m + 1):
        score_matrix[0, j] = j * gap_score
        traceback[0, j] = 'left'
    
    # Fill matrices
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            # Calculate scores for all possible moves
            match = score_matrix[i-1, j-1] + (match_score if seq1[i-1] == seq2[j-1] else mismatch_score)
            delete = score_matrix[i-1, j] + gap_score
            insert = score_matrix[i, j-1] + gap_score
            
            # Find best score and move
            score_matrix[i, j] = max(match, delete, insert)
            
            if score_matrix[i, j] == match:
                traceback[i, j] = 'diag'
            elif score_matrix[i, j] == delete:
                traceback[i, j] = 'up'
            else:
                traceback[i, j] = 'left'
    
    # Traceback to find alignment
    aligned1, aligned2 = [], []
    i, j = n, m
    
    while i > 0 or j > 0:
        if traceback[i, j] == 'diag':
            aligned1.append(seq1[i-1])
            aligned2.append(seq2[j-1])
            i -= 1
            j -= 1
        elif traceback[i, j] == 'up':
            aligned1.append(seq1[i-1])
            aligned2.append('-')
            i -= 1
        else:
            aligned1.append('-')
            aligned2.append(seq2[j-1])
            j -= 1
            
    # Reverse alignments and join into strings
    aligned1 = ''.join(reversed(aligned1))
    aligned2 = ''.join(reversed(aligned2))
    
    return aligned1, aligned2, score_matrix[n, m]

def score_sequence(original: str, reconstructed: str, 
                  circular: bool = True,
                  match_score: float = 1.0, 
                  mismatch_score: float = -0.5,
                  gap_score: float = -1.0) -> Tuple[float, Dict]:
    """
    Scores a reconstructed sequence against the original using global alignment.
    
    Args:
        original: Original sequence
        reconstructed: Reconstructed sequence to score
        circular: Whether to treat sequences as circular
        match_score: Score for matching bases
        mismatch_score: Score for mismatching bases
        gap_score: Score for gaps
    
    Returns:
        Tuple[float, Dict]: (score, details)
    """
    if not circular:
        # For linear sequences, perform direct alignment
        aligned_orig, aligned_recon, score = needleman_wunsch(
            original, reconstructed, match_score, mismatch_score, gap_score
        )
        
        # Calculate alignment statistics
        matches = sum(1 for i in range(len(aligned_orig)) 
                     if aligned_orig[i] != '-' and aligned_orig[i] == aligned_recon[i])
        mismatches = sum(1 for i in range(len(aligned_orig))
                        if aligned_orig[i] != '-' and aligned_recon[i] != '-' 
                        and aligned_orig[i] != aligned_recon[i])
        gaps = sum(1 for c in aligned_orig + aligned_recon if c == '-')
        
        aligned_length = len(aligned_orig)
        ungapped_length = aligned_length - gaps
        
        details = {
            'matches': matches,
            'mismatches': mismatches,
            'gaps': gaps,
            'score': score,
            'aligned_original': aligned_orig,
            'aligned_reconstructed': aligned_recon,
            'aligned_length': aligned_length,
            'percent_identity': (matches / ungapped_length * 100) if ungapped_length > 0 else 0,
            'mode': 'linear'
        }
        
        return score, details
    
    # For circular sequences, try all possible rotations
    n = len(reconstructed)
    best_score = float('-inf')
    best_details = {}
    
    for i in range(n):
        # Rotate sequence
        rotated = reconstructed[i:] + reconstructed[:i]
        # Score this rotation
        score, curr_details = score_sequence(
            original, rotated, circular=False,
            match_score=match_score, 
            mismatch_score=mismatch_score,
            gap_score=gap_score
        )
        
        if score > best_score:
            best_score = score
            best_details = curr_details.copy()
            best_details['rotation'] = i
            best_details['mode'] = 'circular'
    
    return best_score, best_details

def evaluate_reconstructions(original: str, 
                           reconstructions: List[str],
                           circular: bool = True,
                           match_score: float = 1.0,
                           mismatch_score: float = -0.5,
                           gap_score: float = -1.0) -> List[Dict]:
    """
    Evaluates all reconstructions against the original sequence using alignment.
    
    Args:
        original: Original sequence
        reconstructions: List of reconstructed sequences
        circular: Whether to treat sequences as circular
        match_score: Score for matching bases
        mismatch_score: Score for mismatching bases
        gap_score: Score for gaps
    
    Returns:
        List[Dict]: List of evaluation results for each reconstruction
    """
    evaluations = []
    
    for recon in reconstructions:
        # Get best score and details
        score, details = score_sequence(
            original, recon, 
            circular=circular,
            match_score=match_score, 
            mismatch_score=mismatch_score, 
            gap_score=gap_score
        )
        
        evaluation = {
            'sequence': recon,
            'score': score,
            'matches': details['matches'],
            'mismatches': details['mismatches'],
            'gaps': details['gaps'],
            'aligned_length': details['aligned_length'],
            'percent_identity': details['percent_identity'],
            'mode': details['mode'],
            'aligned_original': details['aligned_original'],
            'aligned_reconstructed': details['aligned_reconstructed']
        }
        
        # Add rotation information for circular sequences
        if circular:
            evaluation['rotation'] = details['rotation']
        
        evaluations.append(evaluation)
    
    # Sort evaluations by score, descending
    evaluations.sort(key=lambda x: x['score'], reverse=True)
    return evaluations

# Example usage and testing code
if __name__ == "__main__":
    # Set random seed for reproducibility
    random.seed(42)
    
    # Generate random genome
    genome = random_sequence(50)
    print("Original sequence:", genome)
    print("Length:", len(genome))
    
    # Get k-mers and build de Bruijn graph
    k = 4
    kmers = get_kmer_count_from_sequence(genome, k=k, cyclic=True)
    edges = get_debruijn_edges_from_kmers(kmers)
    
    # Find all possible reconstructions
    reconstructions = find_all_reconstructions(edges, k)
    
    # Add some reconstructions with different lengths
    reconstructions.extend([
        reconstructions[0][:-2],  # Shorter
        reconstructions[0] + "AG"  # Longer
    ])
    
    print(f"\nFound {len(reconstructions)} possible reconstructions")
    
    # Evaluate with default scoring for circular DNA
    print("\nEvaluating as circular DNA (match=1.0, mismatch=-0.5, gap=-1.0):")
    circular_evals = evaluate_reconstructions(genome, reconstructions, circular=True)
    
    print("\nAll circular DNA results sorted by score:")
    print("Score | %ID  | Gaps | Mode     | Rot | Aligned Original")
    print("-" * 80)
    for eval in circular_evals:
        print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | "
              f"{eval['gaps']:4d} | {eval['mode']:<8} | "
              f"{eval.get('rotation', 'N/A'):3} | {eval['aligned_original']}")
        print(" " * 29 + "| Aligned Reconstructed")
        print(" " * 29 + f"| {eval['aligned_reconstructed']}")
        print("-" * 80)
    
    # Evaluate with default scoring for linear DNA
    print("\nEvaluating as linear DNA (match=1.0, mismatch=-0.5, gap=-1.0):")
    linear_evals = evaluate_reconstructions(genome, reconstructions, circular=False)
    
    print("\nAll linear DNA results sorted by score:")
    print("Score | %ID  | Gaps | Mode     | Aligned Original")
    print("-" * 80)
    for eval in linear_evals:
        print(f"{eval['score']:5.1f} | {eval['percent_identity']:4.1f}% | "
              f"{eval['gaps']:4d} | {eval['mode']:<8} | {eval['aligned_original']}")
        print(" " * 29 + "| Aligned Reconstructed")
        print(" " * 29 + f"| {eval['aligned_reconstructed']}")
        print("-" * 80)