# Genome Assembly Using Overlap Graphs

### Part 1: Importing the genome and simulate reads

importing genome file

In [2]:
def read_genome(genome_path="phiX174.fasta"):
    with open(genome_path, 'r') as f:
        lines = [line.strip() for line in f.readlines() if not line.startswith('>')]
        genome = "".join(lines)
    return genome


simulating reads

In [3]:
import random

def add_errors(reads, error_rate):
    # error rate is between 0.001 and 0.1
    if error_rate < 0.001:
        return reads
    noisy_reads = []
    for read in reads:
        noisy_read = ""
        for base in read:
            if random.random() < error_rate:
                noisy_read += random.choice('ACGT'.replace(base, ''))
            else:
                noisy_read += base
        noisy_reads.append(noisy_read)
    return noisy_reads

def simulate_reads(genome, num_reads, read_length, error_rate, circular = True):
    reads = []
    if circular:
        genome = genome + genome[:read_length]
    for _ in range(num_reads):
        start = random.randint(0, len(genome))
        read = genome[start:start + read_length]
        reads.append(read)
    reads = add_errors(reads, error_rate)
    return reads

# Test the functions
reads = simulate_reads(read_genome(), 1000, 150, 0.0)
print(reads[:10])
noisy_reads = add_errors(reads, 0.005)
print(noisy_reads[:10])
error_count = sum(a != b for r, nr in zip(reads, noisy_reads) for a, b in zip(r, nr))
total_len = sum(len(r) for r in reads)
error_pct = error_count / total_len
print(f"Error rate: {error_pct:.4f}")

['AGGAGTGATGTAATGTCTAAAGGTAAAAAACGTTCTGGCGCTCGCCCTGGTCGTCCGCAGCCGTTGCGAGGTACTAAAGGCAAGCGTAAAGGCGCTCGTCTTTGGTATGTAGGTGGTCAACAATTTTAATTGCAGGGGCTTCGGCCCCTT', 'TGCCGACCCTAAATTTTTTGCCTGTTTGGTTCGCTTTGAGTCTTCTTCGGTTCCGACTACCCTCCCGACTGCCTATGATGTTTATCCTTTGAATGGTCGCCATGATGGTGGTTATTATACCGTCAAGGACTGTGTGACTATTGACGTCCT', 'ATGCTTTGCGTGACTATTTTCGTGATATTGGTCGTATGGTTCTTGCTGCCGAGGGTCGCAAGGCTAATGATTCACACGCCGACTGCTATCAGTATTTTTGTGTGCCTGAGTATGGTACAGCTAATGGCCGTCTTCATTTCCATGCGGTGC', 'CTATTATGGAAAACACCAATCTTTCCAAGCAACAGCAGGTTTCCGAGATTATGCGCCAAATGCTTACTCAAGCTCAAACGGCTGGTCAGTATTTTACCAATGACCAAATCAAAGAAATGACTCGCAAGGTTAGTGCTGAGGTTGACTTAG', 'ACAAAGTTTGGATTGCTACTGACCGCTCTCGTGCTCGTCGCTGCGTTGAGGCTTGCGTTTATGGTACGCTGGACTTTGTGGGATACCCTCGCTTTCCTGCTCCTGTTGAGTTTATTGCTGCCGTCATTGCTTATTATGTTCATCCCGTCA', 'TTTCGTATGCAGGGCGTTGAGTTCGATAATGGTGATATGTATGTTGACGGCCATAAGGCTGCTTCTGACGTTCGTGATGAGTTTGTATCTGTTACTGAGAAGTTAATGGATGAATTGGCACAATGCTACAATGTGCTCCCCCAACTTGAT', 'TGGTTGTGGCCTGTTGATGCTAAAGGTGAGCCGCTTAAAGCTACCAGTTATATGGCTGTTGGTTTCTATGTGGC

### Part 2: building an overlap graph

find max overlap

In [4]:
def overlapKMP(read1, read2):
    """
    Returns the length of the maximum overlap between read1 and read2
    using the Knuth-Morris-Pratt algorithm
    O(l = read_length) time complexity
    """
    combined = read2 + "#" + read1
    pi = [0] * len(combined)
    for i in range(1, len(combined)):
        j = pi[i - 1]
        while j > 0 and combined[i] != combined[j]:
            j = pi[j - 1]
        if combined[i] == combined[j]:
            j += 1
        pi[i] = j
    return pi[-1]

str1 = "ACGTACGT"
str2 = "GTACGTAC"
print(overlapKMP(str1, str2))

6


Overlap graph class

In [None]:
class OverlapGraph:
    def __init__(self, reads, min_overlap = 1):
        self.nodes = set()
        self.edges = {}
        # self.node_data = {}
        self.min_overlap = min_overlap
        self.build_graph(reads)
    
    def connect_nodes(self, node1, node2):
        """
        Connects node1 to node2 if they overlap by at least min_overlap
        O(m) time complexity
        """
        overlap1 = overlapKMP(node1, node2)
        if overlap1 >= self.min_overlap:
            self.edges[(node1, node2)] = overlap1
        overlap2 = overlapKMP(node2, node1)
        if overlap2 >= self.min_overlap:
            self.edges[(node2, node1)] = overlap2
    
    def add_edges(self, node):
        """
        Connects the given node to all other nodes in the graph
        O(N^2 * l) time complexity
        """
        for n in self.nodes:
            if n == node:
                continue
            self.connect_nodes(n, node)
    
    def add_node(self, node, connect = True):
        if node not in self.nodes:
            self.nodes.add(node)
            if connect:
                self.add_edges(node)
        
    def add_nodes(self, nodes):
        for node in nodes:
            self.add_node(node)
    
    def build_graph(self, reads):
        self.add_nodes(reads)
        for node in self.nodes:
            self.add_edges(node)

    def find_heaviest_edge(self):
        """ finding heaviest edge in O(N) time """
        return max(self.edges, key=self.edges.get) if self.edges else None
    
    def remove_edge(self, edge):
        del self.edges[edge]

    def remove_edges(self, edges):
        for edge in edges:
            self.remove_edge(edge)

    def remove_node(self, node):
        """
        Removes the given node from the graph
        """
        if node not in self.nodes:
            return
        edges_to_del = [e for e in self.edges if node in e]
        self.remove_edges(edges_to_del)
        self.nodes.remove(node)
    
    def merge_nodes(self, edge):
        """
        Merges the two nodes in the given edge
        """
        if edge not in self.edges:
            return
        n1, n2 = edge
        overlap = self.edges[edge]
        merged_node = n1 + n2[overlap:]
        connected_edges = [e for e in self.edges if n1 in e or n2 in e]
        neighbors = set(n for e in connected_edges for n in e if n != n1 and n != n2)
        self.add_node(merged_node, connect=False)
        # no need to add add new edges, just update the ends of the existing ones
        for n in neighbors:
            weight = self.edges.pop((n, n1), 0)
            if weight:
                self.edges[(n, merged_node)] = weight
            weight = self.edges.pop((n2, n), 0)
            if weight:
                self.edges[(merged_node, n)] = weight
            # self.connect_nodes(merged_node, n)
        self.remove_node(n1)
        self.remove_node(n2)
        # return merged_node

    def __str__(self):
        result = "OverlapGraph:\n"
        result += f"  Nodes ({len(self.nodes)}): {', '.join(sorted(self.nodes))}\n"
        result += f"  Edges ({len(self.edges)}):\n"
        
        for (n1, n2), weight in sorted(self.edges.items(), key=lambda x: x[1], reverse=True):
            result += f"    {n1} -> {n2}: {weight}\n"
        
        return result
    

# Test the OverlapGraph class
reads = simulate_reads(read_genome(), 5, 5, 0.0)
graph = OverlapGraph(reads)
print(graph)
while graph.edges:
    max_edge = graph.find_heaviest_edge()
    graph.merge_nodes(max_edge)
    print(graph)

OverlapGraph:
  Nodes (5): ACGAC, CCTGG, CTGGC, GTTCA, TGTCA
  Edges (7):
    CCTGG -> CTGGC: 4
    GTTCA -> ACGAC: 1
    CCTGG -> GTTCA: 1
    ACGAC -> CCTGG: 1
    TGTCA -> ACGAC: 1
    CTGGC -> CCTGG: 1
    ACGAC -> CTGGC: 1

OverlapGraph:
  Nodes (4): ACGAC, CCTGGC, GTTCA, TGTCA
  Edges (3):
    GTTCA -> ACGAC: 1
    TGTCA -> ACGAC: 1
    ACGAC -> CCTGGC: 1

OverlapGraph:
  Nodes (3): CCTGGC, GTTCACGAC, TGTCA
  Edges (1):
    GTTCACGAC -> CCTGGC: 1

OverlapGraph:
  Nodes (2): GTTCACGACCTGGC, TGTCA
  Edges (0):



### Part 3: greedy assembly algorithm

In [20]:
def greedy_scs(reads, min_overlap = 1):
    graph = OverlapGraph(reads, min_overlap)
    while graph.edges:
        max_edge = graph.find_heaviest_edge()
        graph.merge_nodes(max_edge)
    return list(graph.nodes)

reads = simulate_reads(read_genome(), 100, 100, 0.0)
scs = greedy_scs(reads)
print(scs)
print(len(scs))

['ATATGGCTGTTGGTTTCTATGTGGCTAAATACGTTAACAAAAAGTCAGATATGGACCTTGCTGCTAAAGGTCTAGGAGCTAAAGAATGGAACAACTCACTGACGCGTTGGATGAGGAGAAGTGGCTTAATATGCTTGGCACGTTCGTCAAGGACTGGTTTAGATATGAGTCACATTTTGTTCATGGTAGAGATTCTCTTGTTGACATTTTAAAAGAGCGTGGATTACTATCTGAGTCCGATGCTGTTCAACCACTAATAGGTAAGAAATCATGAGTCAAGTTACTGAACAATCCGTACGTTTCCAGACCGCTTTGGCCTCTATTAAGCTCATTCAGGCTTCTGCCGTTTTGGATTTAACCGAAGATTGAGTGTGAGGTTATAACGCCGAAGCGGTAAAAATTTTAATTTTTGCCGCTGAGGGGTTGACCAAGCGAAGCGCGGTAGGTTTTCTGCTTAGGAGTTTAATCATGTTTCAGACTTTTATTTCTCGCCATAATTCAAACTTTTTTTCTGATAAGCTGGTTCTCACTTCTGTTACTCCAGCTTCTTCGGCACCTGTTTTACAGACACCTAAAGCTACATCGTCAACGTTATATTTTGATAGTTTGACGGTTAATGCTGGTAATGGTGGTTTTCTTCATTGCATTCAGATGGATACATCTGTCATCCAACCTGCAGAGTTTTATCGCTTCCATGACGCAGAAGTTAACACTTTCGGATATTTCTGATGAGTCGAAAAATTATCTTGATAAAGCAGGAATTACTACTGCTTGTTTACGAATTAAATCGAAGTGGACTGCTGGCGGAAAATGAGAAAATTCGACCTATCCTTGCGCAGCTCGAGAAGCTCTTACTTTGCGACCTTTCGCCATCAACTAACGATTCTGTCAAAAACGCCGCTAATCAGGTTGTTTCTGTTGGTGCTGATATTGCTTTTGATGCCGACCCTAAATTTTTTGCCTGTTTGGTTCGCTTTGAGTCTTCTTCGGTTCCGACT

### Part 4: Evaluation

#### Evaluation functions

In [21]:
def evaluate_assembly(contigs, original_genome, circular=True):
    if circular:
        max_contig_len = max(len(c) for c in contigs) if contigs else 0
        working_genome = original_genome + original_genome[:max_contig_len]
    else:
        working_genome = original_genome
    
    metrics = {
        'genome_length': len(original_genome),
        'assembly_length': sum(len(c) for c in contigs),
        'contigs_count': len(contigs),
        'aligned_bases': 0,
        'mismatches': 0,
        'unaligned_contigs': 0,
        'positions_covered': set(),
    }
    
    for contig in contigs:
        best_pos = find_best_alignment(contig, working_genome)
        
        if best_pos['position'] == -1:
            metrics['unaligned_contigs'] += 1
            continue
        
        metrics['aligned_bases'] += best_pos['aligned_length']
        metrics['mismatches'] += best_pos['mismatches']
        
        # track positions covered by the contig
        start = best_pos['position']
        end = start + len(contig)
        
        for pos in range(start, end):
            # map back to original genome coordinates
            original_pos = pos % len(original_genome)
            metrics['positions_covered'].add(original_pos)
    
    metrics['genome_coverage'] = len(metrics['positions_covered']) / len(original_genome)
    if metrics['aligned_bases'] > 0:
        metrics['sequence_identity'] = 1 - (metrics['mismatches'] / metrics['aligned_bases'])
    else:
        metrics['sequence_identity'] = 0
    
    # N50
    sorted_contigs = sorted(contigs, key=len, reverse=True)
    cumulative_length = 0
    n50 = 0
    l50 = 0
    
    for i, contig in enumerate(sorted_contigs):
        cumulative_length += len(contig)
        if cumulative_length >= metrics['assembly_length'] / 2:
            n50 = len(contig)
            l50 = i + 1
            break
    
    metrics['N50'] = n50
    metrics['L50'] = l50
    
    # convert positions_covered to a count
    metrics['positions_covered'] = len(metrics['positions_covered'])
    
    return metrics

def find_best_alignment(contig, genome, k=10, max_positions=100):
    if len(contig) <= k:
        return find_best_alignment_exhaustive(contig, genome)
    
    # building k-mer index for the genome
    kmer_positions = {}
    for i in range(len(genome) - k + 1):
        kmer = genome[i:i+k]
        if kmer not in kmer_positions:
            kmer_positions[kmer] = []
        kmer_positions[kmer].append(i)
    
    # searching for the best alignment
    position_counts = {}
    for i in range(len(contig) - k + 1):
        kmer = contig[i:i+k]
        if kmer in kmer_positions:
            for pos in kmer_positions[kmer]:
                # checking all possible positions
                genome_start = pos - i
                if genome_start >= 0 and genome_start + len(contig) <= len(genome):
                    position_counts[genome_start] = position_counts.get(genome_start, 0) + 1
    
    # sorting candidate positions by count
    candidate_positions = sorted(position_counts.items(), key=lambda x: x[1], reverse=True)
    
    # finding the best alignment among top candidates
    best_position = -1
    best_score = -float('inf')
    best_mismatches = len(contig)
    
    for position, _ in candidate_positions[:max_positions]:
        if position + len(contig) > len(genome):
            continue
            
        mismatches = 0
        for j in range(len(contig)):
            if contig[j] != genome[position + j]:
                mismatches += 1
        
        score = len(contig) - mismatches
        
        if score > best_score:
            best_score = score
            best_position = position
            best_mismatches = mismatches
    
    return {
        'position': best_position,
        'aligned_length': len(contig),
        'mismatches': best_mismatches,
        'score': best_score
    }

def find_best_alignment_exhaustive(contig, genome):
    """
    Finds the best alignment of a contig in the genome using an exhaustive search.
    O(n * m) time complexity.
    """
    best_position = -1
    best_score = -float('inf')
    best_mismatches = len(contig)
    
    for i in range(len(genome) - len(contig) + 1):
        mismatches = 0
        for j in range(len(contig)):
            if contig[j] != genome[i + j]:
                mismatches += 1
        
        score = len(contig) - mismatches
        
        if score > best_score:
            best_score = score
            best_position = i
            best_mismatches = mismatches
    
    return {
        'position': best_position,
        'aligned_length': len(contig),
        'mismatches': best_mismatches,
        'score': best_score
    }

def detect_chimeric_contigs(contigs, original_genome, circular=True):
    """
    Detects chimeric contigs by checking for unexpected gaps between the two halves of the contig.
    returns: list of indices of chimeric contigs
    """
    if circular:
        max_contig_len = max(len(c) for c in contigs) if contigs else 0
        working_genome = original_genome + original_genome[:max_contig_len]
    else:
        working_genome = original_genome
    
    chimeric_contigs = []
    
    for i, contig in enumerate(contigs):
        if len(contig) < 20:
            continue
            
        first_half = contig[:len(contig)//2]
        second_half = contig[len(contig)//2:]
        
        align1 = find_best_alignment(first_half, working_genome)
        align2 = find_best_alignment(second_half, working_genome)
        
        if align1['position'] != -1 and align2['position'] != -1:
            expected_pos2 = align1['position'] + len(first_half)
            actual_pos2 = align2['position']
            
            # calculate distance between the two halves
            if circular:
                distance = min(
                    abs(actual_pos2 - expected_pos2),
                    abs(actual_pos2 - expected_pos2 + len(original_genome)),
                    abs(actual_pos2 - expected_pos2 - len(original_genome))
                )
            else:
                distance = abs(actual_pos2 - expected_pos2)
            
            if distance > 10:
                chimeric_contigs.append(i)
    
    return chimeric_contigs

# def visualize_genome_coverage(contigs, original_genome, circular=True):
#     """
#     Visualizes the coverage of the genome by the assembled contigs.
#     Returns a string with a text-based visualization.
#     """
#     coverage = [0] * len(original_genome)
    
#     if circular:
#         max_contig_len = max(len(c) for c in contigs) if contigs else 0
#         working_genome = original_genome + original_genome[:max_contig_len]
#     else:
#         working_genome = original_genome
    
#     for contig in contigs:
#         alignment = find_best_alignment(contig, working_genome)
#         if alignment['position'] == -1:
#             continue
        
#         start_pos = alignment['position']
#         end_pos = start_pos + len(contig)
        
#         for pos in range(start_pos, end_pos):
#             # map back to original genome coordinates
#             original_pos = pos % len(original_genome)
#             coverage[original_pos] += 1
    
#     max_coverage = max(coverage) if coverage else 0
#     visualization = ["Genome coverage:"]
#     # visualization.append("Position | Coverage")
#     visualization.append("position" + " " * 5 + "coverage")
#     visualization.append("-" * 30)
    
#     chunk_size = max(1, len(original_genome) // 50)
    
#     for i in range(0, len(original_genome), chunk_size):
#         end = min(i + chunk_size, len(original_genome))
#         avg_coverage = sum(coverage[i:end]) / (end - i)
#         bar = "#" * int(avg_coverage * 20 / max_coverage) if max_coverage > 0 else ""
#         visualization.append(f"{i:8d} - {end-1:8d} | {avg_coverage:5.1f} {bar}")
    
#     return "\n".join(visualization)

def test_assembly(reads, original_genome, min_overlap=20, circular=True):
    """
    returns tuple: (metrics, assembled_contigs)
    """
    print("Assembling genome...")
    assembled_contigs = greedy_scs(reads, min_overlap)
    
    print("Evaluating assembly...")
    metrics = evaluate_assembly(assembled_contigs, original_genome, circular)
    # Detect chimeric contigs
    chimeric_indices = detect_chimeric_contigs(assembled_contigs, original_genome, circular)
    metrics['chimeric_contigs'] = len(chimeric_indices)
    # Visualize genome coverage
    # coverage_viz = visualize_genome_coverage(assembled_contigs, original_genome, circular)
    
    print(f"=== Assembly Evaluation ===")
    print(f"Original genome length: {metrics['genome_length']} bp")
    print(f"Assembly length: {metrics['assembly_length']} bp")
    print(f"Number of contigs: {metrics['contigs_count']}")
    print(f"N50: {metrics['N50']} bp")
    print(f"L50: {metrics['L50']}")
    print(f"Genome coverage: {metrics['genome_coverage']*100:.2f}%")
    print(f"Sequence identity: {metrics['sequence_identity']*100:.2f}%")
    print(f"Unaligned contigs: {metrics['unaligned_contigs']}")
    print(f"Chimeric contigs: {metrics['chimeric_contigs']}")
    # print("\n" + coverage_viz)
    
    return metrics, assembled_contigs

In [25]:
import pandas as pd
from datetime import datetime

def run_parameter_sweep(genome, num_reads_list, read_length_list, error_rate_list, min_overlap_list, circular=True, output_format="csv"):
    all_results = []
    
    # Counter to track progress
    total_runs = len(num_reads_list) * len(read_length_list) * len(error_rate_list) * len(min_overlap_list)
    current_run = 0
    
    for n in num_reads_list:
        for l in read_length_list:
            for er in error_rate_list:
                for o in min_overlap_list:
                    current_run += 1
                    print(f"\nRun {current_run}/{total_runs}: num_reads={n}, read_length={l}, error_rate={er}, min_overlap={o}")
                    
                    reads = simulate_reads(genome, n, l, er, circular=circular)
                    
                    metrics, contigs = test_assembly(reads, genome, o, circular=circular)
                    
                    run_metrics = {
                        'num_reads': n,
                        'read_length': l,
                        'error_rate': er,
                        'min_overlap': o,
                        'total_reads_length': n * l,
                        'coverage': (n * l) / len(genome),
                    }
                    
                    # Add all metrics
                    for key, value in metrics.items():
                        if not isinstance(value, (set, list, dict)):
                            run_metrics[key] = value
                    
                    all_results.append(run_metrics)
    
    results_df = pd.DataFrame(all_results)
    
    # Save results to a file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    main_file = f"parameter_sweep_results_{timestamp}.csv"
    results_df.to_csv(main_file, index=False)
    
    print(f"\nAll results have been successfully saved to file: {main_file}")
    return main_file
    
# Example usage:
num_reads = [300]
read_length = [150]
error_rate = [0.01]
min_overlap = [10]

genome = read_genome()

# Use automatic format - will choose CSV if openpyxl is not installed
results_path = run_parameter_sweep(genome, num_reads, read_length, error_rate, min_overlap, circular=True)


Run 1/1: num_reads=300, read_length=150, error_rate=0.01, min_overlap=10
Assembling genome...
Evaluating assembly...
=== Assembly Evaluation ===
Original genome length: 5386 bp
Assembly length: 32275 bp
Number of contigs: 129
N50: 276 bp
L50: 38
Genome coverage: 100.00%
Sequence identity: 98.71%
Unaligned contigs: 0
Chimeric contigs: 0

All results have been successfully saved to file: parameter_sweep_results_20250310_004144.csv


In [None]:
num_reads = [300, 500, 700, 900, 1100]
read_length = [100, 150, 200, 250]
error_rate = [0.0, 0.001, 0.005, 0.01, 0.05, 0.1]
min_overlap = [1, 10, 20, 30]

genome = read_genome()
excel_file = run_parameter_sweep(genome, num_reads, read_length, error_rate, min_overlap, circular=True)


Run 1/1: num_reads=700, read_length=150, error_rate=0.0, min_overlap=10
Assembling genome...
Evaluating assembly...
=== Assembly Evaluation ===
Original genome length: 5386 bp
Assembly length: 5487 bp
Number of contigs: 1
N50: 5487 bp
L50: 1
Genome coverage: 100.00%
Sequence identity: 100.00%
Unaligned contigs: 0
Chimeric contigs: 0

All results have been successfully saved to file: parameter_sweep_results_20250310_003627.csv


#### Visualization

Figure 1: N50 and Contiguity vs. N and l

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

data = pd.read_csv("experiments_results.csv")

fig, axs = plt.subplots(2, 2, figsize=(14, 12))
fig.suptitle('N50 and Contiguity vs. Number of Reads and Read Length', fontsize=16)

# Filter for error rate 0 to focus on the effect of reads and length
filtered_data = data[data["Error Rate"] == 0]

# Plot 1: N50 vs Number of Reads
read_counts = sorted(filtered_data["Number of Reads"].unique())
n50_by_reads = [filtered_data[filtered_data["Number of Reads"] == n]["N50"].mean() for n in read_counts]

axs[0, 0].plot(read_counts, n50_by_reads, 'o-', color='blue', linewidth=2, markersize=8)
axs[0, 0].set_title('N50 vs Number of Reads', fontsize=14)
axs[0, 0].set_xlabel('Number of Reads', fontsize=12)
axs[0, 0].set_ylabel('Average N50', fontsize=12)
axs[0, 0].grid(True, linestyle='--', alpha=0.7)

# Plot 2: Number of Contigs vs Number of Reads
contigs_by_reads = [filtered_data[filtered_data["Number of Reads"] == n]["Number of contigs"].mean() for n in read_counts]

axs[0, 1].plot(read_counts, contigs_by_reads, 'o-', color='red', linewidth=2, markersize=8)
axs[0, 1].set_title('Number of Contigs vs Number of Reads', fontsize=14)
axs[0, 1].set_xlabel('Number of Reads', fontsize=12)
axs[0, 1].set_ylabel('Average Number of Contigs', fontsize=12)
axs[0, 1].grid(True, linestyle='--', alpha=0.7)

# Plot 3: N50 vs Read Length
read_lengths = sorted(filtered_data["Read Length"].unique())
n50_by_length = [filtered_data[filtered_data["Read Length"] == l]["N50"].mean() for l in read_lengths]

axs[1, 0].plot(read_lengths, n50_by_length, 'o-', color='green', linewidth=2, markersize=8)
axs[1, 0].set_title('N50 vs Read Length', fontsize=14)
axs[1, 0].set_xlabel('Read Length', fontsize=12)
axs[1, 0].set_ylabel('Average N50', fontsize=12)
axs[1, 0].grid(True, linestyle='--', alpha=0.7)

# Plot 4: Number of Contigs vs Read Length
contigs_by_length = [filtered_data[filtered_data["Read Length"] == l]["Number of contigs"].mean() for l in read_lengths]

axs[1, 1].plot(read_lengths, contigs_by_length, 'o-', color='purple', linewidth=2, markersize=8)
axs[1, 1].set_title('Number of Contigs vs Read Length', fontsize=14)
axs[1, 1].set_xlabel('Read Length', fontsize=12)
axs[1, 1].set_ylabel('Average Number of Contigs', fontsize=12)
axs[1, 1].grid(True, linestyle='--', alpha=0.7)

# value labels to the points
for i, n in enumerate(read_counts):
    axs[0, 0].annotate(f"{n50_by_reads[i]:.1f}", (n, n50_by_reads[i]), 
                      textcoords="offset points", xytext=(0,10), ha='center')
    axs[0, 1].annotate(f"{contigs_by_reads[i]:.1f}", (n, contigs_by_reads[i]), 
                      textcoords="offset points", xytext=(0,10), ha='center')

for i, l in enumerate(read_lengths):
    axs[1, 0].annotate(f"{n50_by_length[i]:.1f}", (l, n50_by_length[i]), 
                      textcoords="offset points", xytext=(0,10), ha='center')
    axs[1, 1].annotate(f"{contigs_by_length[i]:.1f}", (l, contigs_by_length[i]), 
                      textcoords="offset points", xytext=(0,10), ha='center')

plt.tight_layout(rect=[0, 0, 1, 0.96])  # Adjust for the suptitle
plt.savefig("n50_contigs_vs_reads_length.png", dpi=300)
plt.show()

Figure 2: Number of Contigs vs. Error Rate

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

data = pd.read_csv("experiments_results.csv")

# group by error rate and calculate the average number of contigs
grouped_data = data.groupby("Error Rate")["Number of contigs"].mean().reset_index()

# simple line graph
plt.figure(figsize=(10, 6))
plt.plot(grouped_data["Error Rate"], grouped_data["Number of contigs"], 
         'o-', color='blue', linewidth=2, markersize=8)

# labels and title
plt.title("Average Number of Contigs vs Error Rate", fontsize=14)
plt.xlabel("Error Rate", fontsize=12)
plt.ylabel("Average Number of Contigs", fontsize=12)

# value labels on each point
for x, y in zip(grouped_data["Error Rate"], grouped_data["Number of contigs"]):
    plt.text(x, y + 15, f"{y:.1f}", ha='center', fontsize=10)

# grid lines
plt.grid(True, linestyle='--', alpha=0.7)

plt.tight_layout()
plt.savefig("simple_contigs_vs_error_rate.png", dpi=300)
plt.show()

Figure 3: Sequence Identity by Error Rate and Minimum Overlap

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

data = pd.read_csv("experiments_results.csv")

heatmap_data = data.groupby(["Error Rate", "Min Overlap"])["Sequence identity"].mean().reset_index()
pivot_data = heatmap_data.pivot("Error Rate", "Min Overlap", "Sequence identity")

plt.figure(figsize=(10, 7))

# values below 95% in red tones and above 95% in blue tones
center = 95
vmin = pivot_data.min().min()
vmax = 100  # Maximum possible sequence identity

ax = sns.heatmap(pivot_data, annot=True, fmt=".1f", 
                cmap="coolwarm_r", center=center,
                vmin=vmin, vmax=vmax,
                linewidth=0.5, 
                cbar_kws={'label': 'Sequence Identity (%)'})

plt.title("Sequence Identity by Error Rate and Minimum Overlap", fontsize=16)
plt.xlabel("Minimum Overlap", fontsize=14)
plt.ylabel("Error Rate", fontsize=14)

for _, spine in ax.spines.items():
    spine.set_visible(True)

plt.tight_layout()
plt.savefig("sequence_identity_heatmap_centered.png", dpi=300)
plt.show()