# Genome Assembly: 3. De Novo Assembly Algorithms

## Overview
This notebook implements the core assembly algorithms from scratch:

1. **Greedy Overlap-Based Assembly**: Simple but fast; can produce suboptimal results
2. **De Bruijn Graph Assembly**: Optimized approach using k-mers (what modern assemblers use)

We'll implement both to show the algorithmic trade-offs and why the De Bruijn approach is superior.

**Data**: Uses reads from previous notebook (or generates synthetic ones)

## 1. Greedy Overlap Assembly

In [None]:
import numpy as np
from collections import defaultdict, deque
from typing import List, Tuple, Set, Dict, Optional
import time

def calculate_overlap(seq1: str, seq2: str, min_overlap: int = 20) -> Tuple[int, int]:
    """
    Calculate the overlap length between the end of seq1 and the beginning of seq2.
    
    Returns: (overlap_length, num_mismatches)
    """
    max_overlap = min(len(seq1), len(seq2))
    best_overlap = 0
    best_mismatches = float('inf')
    
    for overlap_len in range(max_overlap, min_overlap - 1, -1):
        # Check how well seq1's suffix matches seq2's prefix
        mismatches = 0
        for i in range(overlap_len):
            if seq1[-(overlap_len - i)] != seq2[i]:
                mismatches += 1
        
        # Allow up to 5% mismatches
        if mismatches / overlap_len <= 0.05:
            best_overlap = overlap_len
            best_mismatches = mismatches
            break
    
    return best_overlap, best_mismatches

def merge_reads(seq1: str, seq2: str, overlap: int) -> str:
    """
    Merge two reads that overlap.
    Concatenates seq1 + non-overlapping part of seq2.
    """
    if overlap == 0:
        return seq1 + seq2
    return seq1 + seq2[overlap:]

class GreedyAssembler:
    """
    Simple greedy overlap-based assembler.
    Algorithm:
    1. Find best overlap between all read pairs
    2. Greedily merge reads with highest overlap
    3. Repeat until no more merges possible
    
    Pros: Simple, fast for small genomes
    Cons: Can produce incorrect assemblies due to repeats, doesn't use all information
    """
    
    def __init__(self, min_overlap: int = 20):
        self.min_overlap = min_overlap
        self.contigs = []
    
    def assemble(self, reads: List[str], max_iterations: int = 100) -> List[str]:
        """
        Assemble reads using greedy overlap approach.
        """
        # Start with all reads as single-read contigs
        contigs = reads.copy()
        iteration = 0
        
        while iteration < max_iterations:
            # Find best pair of contigs to merge
            best_pair = None
            best_overlap = 0
            
            # Check all pairs
            for i in range(len(contigs)):
                for j in range(len(contigs)):
                    if i == j:
                        continue
                    
                    overlap, mismatches = calculate_overlap(contigs[i], contigs[j], self.min_overlap)
                    
                    if overlap > best_overlap:
                        best_overlap = overlap
                        best_pair = (i, j, overlap)
            
            # If no good merge found, we're done
            if best_pair is None:
                break
            
            # Merge the best pair
            i, j, overlap = best_pair
            merged = merge_reads(contigs[i], contigs[j], overlap)
            
            # Remove old contigs and add merged one
            new_contigs = [contigs[k] for k in range(len(contigs)) if k not in (i, j)]
            new_contigs.append(merged)
            contigs = new_contigs
            
            iteration += 1
        
        self.contigs = contigs
        return contigs

# Test greedy assembly
test_sequence = "ATGCGATCGATCGATCG" * 5
test_reads = [
    test_sequence[0:30],
    test_sequence[15:45],
    test_sequence[30:60],
    test_sequence[45:75],
    test_sequence[60:90],
    test_sequence[75:105],
]

print("Test reads (with overlaps):")
for i, read in enumerate(test_reads):
    print(f"  {i}: {read}")

print(f"\nOriginal test sequence ({len(test_sequence)} bp):")
print(f"  {test_sequence}")

greedy = GreedyAssembler(min_overlap=15)
greedy_contigs = greedy.assemble(test_reads)

print(f"\nGreedy assembly result ({len(greedy_contigs)} contigs):")
for i, contig in enumerate(greedy_contigs):
    print(f"  Contig {i} ({len(contig)} bp): {contig}")
    match = "✓ MATCHES" if contig in test_sequence else "✗ doesn't match"
    print(f"             {match}")

## 2. De Bruijn Graph Assembly (Optimized)

**Why De Bruijn?**
- More efficient: O(n) instead of O(n²) pairwise comparisons
- Handles repeats better: Explicit branching points
- Produces unambiguous regions as contigs

In [None]:
class DeBruijnGraph:
    """
    De Bruijn graph for assembly.
    - Nodes: (k-1)-mers
    - Edges: k-mers connecting (k-1)-mers
    """
    
    def __init__(self, k: int):
        self.k = k
        self.edges = defaultdict(lambda: defaultdict(int))  # edges[prefix][base] = count
        self.in_degree = defaultdict(int)   # in-degree per node
        self.out_degree = defaultdict(int)  # out-degree per node
    
    def add_sequence(self, sequence: str):
        """
        Add all k-mers from a sequence to the graph.
        """
        for i in range(len(sequence) - self.k + 1):
            kmer = sequence[i:i + self.k]
            prefix = kmer[:-1]
            suffix = kmer[1:]
            base = kmer[-1]
            
            self.edges[prefix][base] += 1
            self.out_degree[prefix] += 1
            self.in_degree[suffix] += 1
    
    def get_nodes(self) -> Set[str]:
        """Get all (k-1)-mers that are nodes in the graph."""
        nodes = set(self.edges.keys())
        for suffix in self.in_degree.keys():
            nodes.add(suffix)
        return nodes
    
    def get_coverage(self, node: str) -> int:
        """Get the coverage (sum of edge weights) for a node."""
        return sum(self.edges[node].values())
    
    def simplify(self, min_coverage: int = 1):
        """
        Remove low-coverage nodes (likely errors).
        This is error correction.
        """
        nodes_to_remove = []
        for node in self.get_nodes():
            coverage = self.get_coverage(node)
            if coverage < min_coverage:
                nodes_to_remove.append(node)
        
        for node in nodes_to_remove:
            # Remove outgoing edges
            if node in self.edges:
                del self.edges[node]
            
            # Remove incoming edges
            for prefix in self.edges:
                if node in self.edges[prefix]:
                    del self.edges[prefix][node]

class DeBruijnAssembler:
    """
    Modern De Bruijn graph-based assembler.
    Implements contig extraction from the graph.
    """
    
    def __init__(self, k: int = 21):
        self.k = k
        self.graph = DeBruijnGraph(k)
    
    def assemble(self, reads: List[str], min_coverage: int = 2) -> List[str]:
        """
        Assemble reads using De Bruijn graph approach.
        Returns contigs (unambiguous paths).
        """
        # Build graph from reads
        for read in reads:
            self.graph.add_sequence(read)
        
        # Error correction: remove low-coverage edges
        self.graph.simplify(min_coverage)
        
        # Extract contigs
        contigs = self.extract_contigs()
        
        return contigs
    
    def extract_contigs(self) -> List[str]:
        """
        Extract linear contigs from the De Bruijn graph.
        A contig is a path with in-degree = out-degree = 1 at internal nodes.
        """
        contigs = []
        visited_edges = set()
        
        # Find all starting points (in-degree = 0 or in-degree ≠ out-degree)
        nodes = self.graph.get_nodes()
        
        for start_node in nodes:
            out_deg = self.graph.out_degree[start_node]
            in_deg = self.graph.in_degree[start_node]
            
            # Try to extend from this node
            if out_deg > 0:  # Can go somewhere
                for next_base in self.graph.edges[start_node]:
                    if (start_node, next_base) in visited_edges:
                        continue
                    
                    # Trace a path from here
                    path = self._trace_path(start_node, next_base, visited_edges)
                    if path:
                        contig = self._path_to_sequence(start_node, path)
                        contigs.append(contig)
        
        return contigs
    
    def _trace_path(self, start: str, first_base: str, visited: Set) -> List[str]:
        """
        Trace a linear path through the graph.
        Returns list of bases (edges).
        """
        path = [first_base]
        current = start[1:] + first_base  # New node: suffix of start + new base
        visited.add((start, first_base))
        
        # Continue while path is unambiguous
        while len(self.graph.edges[current]) == 1:
            # Get the only outgoing edge
            next_base = list(self.graph.edges[current].keys())[0]
            
            # Check if we've created a cycle (infinite loop)
            if (current, next_base) in visited:
                break
            
            path.append(next_base)
            visited.add((current, next_base))
            current = current[1:] + next_base
        
        return path
    
    def _path_to_sequence(self, start: str, path: List[str]) -> str:
        """
        Convert a path (list of bases) to a sequence.
        """
        return start + ''.join(path)

# Test De Bruijn assembly
print("De Bruijn Graph Assembly Test")
print("="*50)

db_assembler = DeBruijnAssembler(k=15)
db_contigs = db_assembler.assemble(test_reads, min_coverage=1)

print(f"De Bruijn assembly result ({len(db_contigs)} contigs):")
for i, contig in enumerate(db_contigs):
    print(f"  Contig {i} ({len(contig)} bp): {contig}")
    match = "✓ MATCHES" if contig in test_sequence else "✗ doesn't match"
    print(f"             {match}")

## 3. Comparison on Realistic Data

In [None]:
def generate_reads_from_sequence(sequence: str, read_length: int = 100, coverage: int = 10, 
                                 error_rate: float = 0.01, seed: int = 42) -> List[str]:
    """
    Generate reads from a sequence with controlled coverage and error rate.
    """
    import random
    random.seed(seed)
    
    num_reads = (len(sequence) * coverage) // read_length
    reads = []
    bases = ['A', 'T', 'G', 'C']
    
    for _ in range(num_reads):
        # Random position
        start = random.randint(0, max(0, len(sequence) - read_length))
        read = sequence[start:start + read_length]
        
        # Introduce errors
        read_list = list(read)
        for i in range(len(read_list)):
            if random.random() < error_rate:
                read_list[i] = random.choice([b for b in bases if b != read_list[i]])
        
        reads.append(''.join(read_list))
    
    return reads

# Generate a test genome with some repeats
base = "ATGCGATCGATCGATCGATCG"
test_genome = (base * 20) + (base * 5)  # One region is duplicated
print(f"Test genome: {len(test_genome)} bp (with one repeat region)\n")

# Generate reads
test_reads_large = generate_reads_from_sequence(test_genome, read_length=100, coverage=10, error_rate=0.005)
print(f"Generated {len(test_reads_large)} reads from test genome")

# Benchmark both assemblers
print("\n" + "="*70)
print("BENCHMARK: Greedy vs De Bruijn Assembly")
print("="*70)

# Greedy
print("\n1. Greedy Overlap-Based Assembly")
start_time = time.time()
greedy_large = GreedyAssembler(min_overlap=15)
greedy_contigs_large = greedy_large.assemble(test_reads_large[:500], max_iterations=50)  # Limit for speed
greedy_time = time.time() - start_time

greedy_total_bp = sum(len(c) for c in greedy_contigs_large)
print(f"   Time: {greedy_time:.3f}s")
print(f"   Contigs: {len(greedy_contigs_large)}")
print(f"   Total assembled: {greedy_total_bp} bp")

# De Bruijn
print("\n2. De Bruijn Graph Assembly")
start_time = time.time()
db_large = DeBruijnAssembler(k=21)
db_contigs_large = db_large.assemble(test_reads_large, min_coverage=2)
db_time = time.time() - start_time

db_total_bp = sum(len(c) for c in db_contigs_large)
print(f"   Time: {db_time:.3f}s")
print(f"   Contigs: {len(db_contigs_large)}")
print(f"   Total assembled: {db_total_bp} bp")

print(f"\n3. Comparison")
print(f"   De Bruijn is {greedy_time/db_time:.1f}x faster")
print(f"   De Bruijn assembled {db_total_bp} bp vs Greedy {greedy_total_bp} bp")
print(f"   Reference length: {len(test_genome)} bp")

## 4. Error Correction in De Bruijn Graphs

In [None]:
def analyze_error_correction_effect(reads: List[str], k: int = 21):
    """
    Show how different coverage thresholds affect error correction.
    """
    results = []
    
    for min_cov in [1, 2, 3, 5, 10]:
        assembler = DeBruijnAssembler(k=k)
        contigs = assembler.assemble(reads, min_coverage=min_cov)
        
        total_bp = sum(len(c) for c in contigs)
        results.append({
            'min_coverage': min_cov,
            'num_contigs': len(contigs),
            'total_bp': total_bp,
            'avg_contig_size': total_bp / len(contigs) if contigs else 0
        })
    
    return results

error_corr_results = analyze_error_correction_effect(test_reads_large[:500], k=21)

print("Error Correction: Effect of Minimum K-mer Coverage Threshold")
print("="*70)
print(f"{'Min Coverage':<15} {'Contigs':<12} {'Total BP':<15} {'Avg Contig Size'}")
print("-"*70)

for r in error_corr_results:
    print(f"{r['min_coverage']:<15} {r['num_contigs']:<12} {r['total_bp']:<15} {r['avg_contig_size']:.0f}")

print("\nInterpretation:")
print("- Higher thresholds remove more spurious k-mers (from errors)")
print("- But may also remove real low-coverage regions")
print("- Typical choice: min_coverage = 2-3 for diploid, 3-5 for haploid")

## 5. The Effect of K-mer Size

In [None]:
def test_kmer_sizes(reads: List[str], k_values: List[int] = [15, 21, 31, 51]):
    """
    Show how different k-mer sizes affect assembly quality.
    """
    results = []
    
    for k in k_values:
        assembler = DeBruijnAssembler(k=k)
        contigs = assembler.assemble(reads, min_coverage=2)
        
        total_bp = sum(len(c) for c in contigs)
        results.append({
            'k': k,
            'num_contigs': len(contigs),
            'total_bp': total_bp,
            'avg_contig_size': total_bp / len(contigs) if contigs else 0,
            'longest_contig': max(len(c) for c in contigs) if contigs else 0
        })
    
    return results

kmer_results = test_kmer_sizes(test_reads_large, k_values=[15, 21, 31, 51])

print("\nEffect of K-mer Size on Assembly Quality")
print("="*90)
print(f"{'K':<5} {'Contigs':<12} {'Total BP':<15} {'Avg Size':<12} {'Longest Contig'}")
print("-"*90)

for r in kmer_results:
    print(f"{r['k']:<5} {r['num_contigs']:<12} {r['total_bp']:<15} {r['avg_contig_size']:.0f}bp {r['longest_contig']:<14}bp")

print("\nInterpretation:")
print("- Smaller k: More k-mers → more connections → longer contigs")
print("  BUT: More false connections at repeats")
print("\n- Larger k: Fewer spurious connections → cleaner assembly")
print("  BUT: Sensitivity to errors (errors break reads into more k-mers)")
print("\n- Best practice: Use k around L/√C where L=read length, C=coverage")
print(f"\n  For your reads (L≈{len(test_reads_large[0])}, C≈10): k ≈ {int(len(test_reads_large[0])/np.sqrt(10))}")

## 6. Handling Repeats: A Key Challenge

In [None]:
print("The Repeat Problem in Genome Assembly")
print("="*70)
print()

# Create a genome with repeats
repeat_unit = "ATGCGATCGATCG"
unique1 = repeat_unit + "TTTT"
repeat_region = repeat_unit * 3  # 3 copies of repeat
unique2 = repeat_unit + "GGGG"
repeat_genome = unique1 + repeat_region + unique2

print(f"Test genome with repeats:")
print(f"  [Unique region 1]--[3x repeated unit]--[Unique region 2]")
print(f"  Total length: {len(repeat_genome)} bp\n")

# Generate reads
repeat_reads = generate_reads_from_sequence(repeat_genome, read_length=100, coverage=15)

print(f"Generated {len(repeat_reads)} reads (15x coverage)")
print()

# Assemble with different k-mer sizes
print("Assembly results with different k-mer sizes:")
print("-"*70)

for k in [15, 21, 31]:
    assembler = DeBruijnAssembler(k=k)
    contigs = assembler.assemble(repeat_reads, min_coverage=3)
    
    print(f"\nk={k}:")
    print(f"  Contigs: {len(contigs)}")
    for i, contig in enumerate(sorted(contigs, key=len, reverse=True)[:3]):
        print(f"    Contig {i+1}: {len(contig)} bp")
        # Check if it's from unique or repeat region
        if contig in repeat_genome:
            print(f"             (matches unique region)")
        elif repeat_unit in contig:
            count = contig.count(repeat_unit)
            print(f"             (contains {count}x repeat unit - ambiguous!)")

print("\n" + "="*70)
print("Key insight: Repeats create branching in the graph!")
print("- Can't distinguish which path is correct without additional info")
print("- Solution: Use paired-end reads (next notebook!)")
print("="*70)

## 7. Summary and Algorithm Comparison

In [None]:
comparison_table = """
ALGORITHM COMPARISON
═════════════════════════════════════════════════════════════════════════

GREEDY OVERLAP-BASED ASSEMBLY
─────────────────────────────────────────────────────────────────────────
Algorithm:
  1. Compare all pairs of reads/contigs
  2. Find pair with best overlap
  3. Merge them
  4. Repeat until no more merges possible

Time Complexity: O(n² × L²) where n=number of reads, L=read length
Space Complexity: O(n)

Pros:
  ✓ Conceptually simple
  ✓ Good for small genomes
  ✓ No parameter tuning needed (besides min overlap)

Cons:
  ✗ Very slow for large genomes (n² comparisons)
  ✗ Greedy decisions can produce suboptimal results
  ✗ Can't resolve repeat regions
  ✗ Sensitive to sequencing errors

─────────────────────────────────────────────────────────────────────────

DE BRUIJN GRAPH ASSEMBLY
─────────────────────────────────────────────────────────────────────────
Algorithm:
  1. Extract all k-mers from reads
  2. Build graph: nodes=(k-1)-mers, edges=k-mers
  3. Remove low-coverage k-mers (error correction)
  4. Find Eulerian paths (contigs)

Time Complexity: O(n × L) for graph building, O(n + m) for path finding
  where n=number of reads, L=read length, m=number of k-mers
Space Complexity: O(m) where m=number of unique k-mers

Pros:
  ✓ Much faster: linear in number of reads
  ✓ Handles repeats explicitly (branching in graph)
  ✓ Built-in error correction (k-mer frequency)
  ✓ Scalable to large genomes
  ✓ Used by modern assemblers (SPAdes, Velvet, etc.)

Cons:
  ✗ More complex to understand
  ✗ Requires parameter tuning (k, min_coverage)
  ✗ Still can't fully resolve ambiguous repeats
  ✗ Higher memory usage for very large graphs

─────────────────────────────────────────────────────────────────────────

MODERN ASSEMBLERS TYPICALLY:
  • Use De Bruijn graphs with multiple k-mer sizes
  • Apply graph simplification to remove errors and spurious nodes
  • Use paired-end reads for disambiguation (see next notebook)
  • Include polynomial time/space optimizations

═════════════════════════════════════════════════════════════════════════
"""

print(comparison_table)

## Real-World Tools Built on These Concepts

**De Bruijn Graph Assemblers:**
- **SPAdes** (most popular): Uses multiple k-mer sizes, graph simplification, paired-end reads
- **Velvet**: Original De Bruijn graph assembler
- **ABySS**: Parallel De Bruijn graph assembly

**Other Approaches:**
- **Overlap-Layout-Consensus (OLC)**: Used by long-read assemblers (PacBio, Oxford Nanopore)
- **String Graph Assembly**: Improved OLC variant
- **Hybrid Assemblers**: Combine short reads (De Bruijn) + long reads (OLC)

**Installation & Use:**
```bash
# SPAdes (recommended for Illumina reads)
conda install -c bioconda spades
spades.py -1 reads_R1.fastq -2 reads_R2.fastq -o output_dir -k 21,33,55,77

# Velvet
conda install -c bioconda velvet
velveth output_dir 21 -fastq reads.fastq
velvetg output_dir
```

## Next Steps

**This notebook covered:**
- ✓ Greedy overlap-based assembly (simple, slow)
- ✓ De Bruijn graph assembly (modern, efficient)
- ✓ Error correction via k-mer frequency
- ✓ The effect of k-mer size and coverage thresholds
- ✓ Why repeats are fundamentally challenging

**Next notebook (4) will cover:**
- Building contigs from graph paths
- Using paired-end reads to resolve ambiguities
- Scaffolding: linking contigs into larger structures
- Gap filling
- Output formats (FASTA)