In [24]:
import random


In [25]:

def read_genome(filepath):
    """
    Reads a genome from a FASTA file.
    Assumes that the file has a header line (starting with '>') and then sequence lines.
    Returns the concatenated genome sequence as a string.
    """
    genome = ""
    with open(filepath, "r") as file:
        for line in file:
            if line.startswith(">"):
                continue
            genome += line.strip()
    return genome


In [26]:

def generate_error_free_reads(genome, num_reads, read_length, circular=True):
    """
    Generates error-free reads from the provided genome sequence.

    Parameters:
      genome (str): The genome sequence.
      num_reads (int): The number of reads to generate.
      read_length (int): The length of each read.
      circular (bool): If True, treats the genome as circular (wraps around if needed).

    Returns:
      List[str]: A list of error-free reads.
    """
    reads = []
    genome_length = len(genome)
    for _ in range(num_reads):
        # Select a start position uniformly at random over the genome.
        start = random.randint(0, genome_length - 1)
        if circular:
            # If the read extends past the end, wrap-around to the beginning.
            if start + read_length <= genome_length:
                read = genome[start:start+read_length]
            else:
                part1 = genome[start:]
                part2 = genome[:(read_length - (genome_length - start))]
                read = part1 + part2
        else:
            # For non-circular genomes, ensure the read is fully contained.
            if start + read_length > genome_length:
                start = random.randint(0, genome_length - read_length)
            read = genome[start:start+read_length]
        reads.append(read)
    return reads


In [29]:
if __name__ == "__main__":
    # Read the PhiX genome from file
    genome = read_genome("sequence.fasta")
    
    # Define parameters (you can adjust these values as needed)
    num_reads = 100     # N: number of reads (e.g., 100 to 1,000,000)
    read_length = 50    # L: length of each read (e.g., 50 to 150)
    
    # Generate error-free reads
    reads = generate_error_free_reads(genome, num_reads, read_length, circular=True)
    
    # Calculate average coverage: (num_reads * read_length) / genome_length
    coverage = (num_reads * read_length) / len(genome)
    print("Generated {} error-free reads of length {}.".format(num_reads, read_length))
    print("Average coverage of the genome: {:.2f}".format(coverage))
    
    # Optionally, print the first few reads
    for i, read in enumerate(reads[:5]):
        print("Read {}: {}".format(i+1, read))

Generated 100 error-free reads of length 50.
Average coverage of the genome: 0.93
Read 1: TCTGGCGCTCGCCCTGGTCGTCCGCAGCCGTTGCGAGGTACTAAAGGCAA
Read 2: GCGGTAGGTTTTCTGCTTAGGAGTTTAATCATGTTTCAGACTTTTATTTC
Read 3: TGCTGGAGGCCTCCACTATGAAATCGCGTAGAGGCTTTGCTATTCAGCGT
Read 4: AAGCGCCGTGGATGCCTGACCGTACCGAGGCTAACCCTAATGAGCTTAAT
Read 5: TGGCATTCAGTCGGCGACTTCACGCCAGAATACGAAAGACCAGGTATATG


In [30]:
def introduce_errors(read, error_prob):
    """
    Introduce sequencing errors in a read.
    For each base in the read, with probability error_prob, substitute it with a different nucleotide.
    
    Parameters:
      read (str): The error-free read.
      error_prob (float): The probability of an error at each base (0.001 ≤ P ≤ 0.1).
    
    Returns:
      str: The read after introducing errors.
    """
    bases = ['A', 'C', 'G', 'T']
    error_read = []
    for base in read:
        if random.random() < error_prob:
            # Replace the current base with one of the other nucleotides
            new_base = random.choice([b for b in bases if b != base])
            error_read.append(new_base)
        else:
            error_read.append(base)
    return ''.join(error_read)


In [31]:

def generate_error_prone_reads(genome, num_reads, read_length, error_prob, circular=True):
    """
    Generate error-prone reads from a genome sequence.
    
    This function first generates error-free reads and then simulates sequencing errors by
    introducing mismatches based on the given error probability.
    
    Parameters:
      genome (str): The genome sequence.
      num_reads (int): Number of reads to generate.
      read_length (int): Length of each read.
      error_prob (float): Probability of an error (mismatch) at each base.
      circular (bool): Whether the genome is circular. Defaults to True.
      
    Returns:
      List[str]: A list of error-prone reads.
    """
    # First, generate error-free reads using the previously defined function.
    error_free_reads = generate_error_free_reads(genome, num_reads, read_length, circular=circular)
    # Introduce errors in each read.
    error_prone_reads = [introduce_errors(read, error_prob) for read in error_free_reads]
    return error_prone_reads


In [54]:

# Example usage:
if __name__ == "__main__":
    # Assuming you've already read the genome from "Phix_genome.txt" using a function like read_genome.
    genome = read_genome("sequence.fasta")  # Make sure you have the read_genome function from step 2.
    
    num_reads = 100      # N: number of reads (example value)
    read_length = 50    # L: length of each read (example value within 50-150)
    error_prob = 0.01    # P: error probability, for example 1%
    
    error_reads = generate_error_prone_reads(genome, num_reads, read_length, error_prob, circular=True)
    
    print("Generated {} error-prone reads of length {} with error probability {}.".format(num_reads, read_length, error_prob))
    # Print the first 5 error-prone reads for inspection
    for i, read in enumerate(error_reads[:5]):
        print("Error-prone Read {}: {}".format(i+1, read))


Generated 100 error-prone reads of length 50 with error probability 0.01.
Error-prone Read 1: TCTGGTTGGTTGTGGCCTGTTGATGCTAAAGGTGAGCCGCTTAAAGCTAC
Error-prone Read 2: GCCATAATTCAAACTTTTTTTCTGATAAGCTGGTTCTCACTTCTGTTCCT
Error-prone Read 3: GACCAAGCGAAGCGCGGTATGTTTTCTGCTTAGCAGTTTAATCATGTTTC
Error-prone Read 4: TGTGTGCCTGAGTATGGTACAGCTAATGGCCGTCTTCATTTCCATGCGGT
Error-prone Read 5: AGCGAAGCGCGGTAGGTTTTCTGCTTAGGAGTTTAATCATGTTTCAGACT


In [55]:
def overlap(a, b, min_length=1):
    """
    Compute the length of the maximum suffix of 'a' that matches a prefix of 'b'
    with a minimum required overlap of min_length.
    Returns the overlap length (0 if no such overlap exists).
    """
    max_ov = 0
    # Check all possible overlap lengths starting at min_length up to the smaller read length.
    max_possible = min(len(a), len(b))
    for i in range(min_length, max_possible + 1):
        if a[-i:] == b[:i]:
            max_ov = i
    return max_ov


In [56]:

def build_overlap_graph(reads, min_overlap=1):
    """
    Constructs an overlap graph from a list of reads.
    Each node (represented by the read's index) is connected to other nodes with an edge
    if the suffix of one read overlaps with the prefix of the other by at least min_overlap bases.
    
    Returns:
      graph (dict): A dictionary where each key is a read index and each value is another dictionary.
                    For a key i, graph[i] maps a read index j to the overlap length between reads[i] and reads[j].
    """
    graph = {}
    for i, read in enumerate(reads):
        graph[i] = {}
        for j, other in enumerate(reads):
            if i != j:
                ov = overlap(read, other, min_overlap)
                if ov:
                    graph[i][j] = ov
    return graph


In [57]:

def greedy_assembly_overlap_graph(reads, min_overlap=1):
    """
    Assembles reads into contigs using a greedy algorithm guided by an overlap graph.
    
    The method works as follows:
      1. Construct the overlap graph for the current set of reads.
      2. Identify the pair of reads with the maximum overlap (of length at least min_overlap).
      3. Merge the two reads, replacing them with their merged sequence.
      4. Repeat until no overlap of at least min_overlap remains.
    
    Returns:
      List[str]: A list of assembled contigs.
    """
    reads = reads.copy()
    while True:
        # Build the current overlap graph.
        graph = build_overlap_graph(reads, min_overlap)
        best_i, best_j, best_ov = None, None, 0
        
        # Search for the best (i.e., largest) overlap among all pairs.
        for i in graph:
            for j, ov in graph[i].items():
                if ov > best_ov:
                    best_ov = ov
                    best_i, best_j = i, j
                    
        # If the best overlap is below the minimum threshold, stop merging.
        if best_ov < min_overlap:
            break
        
        # Merge the two reads: read[best_i] concatenated with read[best_j] excluding the overlapping part.
        new_read = reads[best_i] + reads[best_j][best_ov:]
        
        # Create a new list of reads: remove the merged reads and add the new merged read.
        new_reads = [reads[k] for k in range(len(reads)) if k not in (best_i, best_j)]
        new_reads.append(new_read)
        reads = new_reads
        
    return reads


In [58]:

# Example usage:
if __name__ == "__main__":
    # For demonstration, we use a toy genome and generate error-free reads.

    # Using a toy genome for demonstration.
    #genome = "AGTCTAGTCTAGCTAGCTAGCTAGCTA"
    genome = read_genome("sequence.fasta")

    num_reads = 100
    read_length = 50
    error_free_reads = generate_error_free_reads(genome, num_reads, read_length, circular=True)
    
    # Assemble the reads using our overlap graph approach.
    contigs = greedy_assembly_overlap_graph(error_free_reads, min_overlap=1)
    
    print("Assembled contigs:")
    for contig in contigs:
        print(contig)


Assembled contigs:
ACGAGAAGACGGTTACGCAGTTTTGCCGCAAGCTGGCTGCTGAACGCCCTGATACCAATAAAATCCCTAAGCATTTGTTTCAGGGTTATTTGAATATCGTGATTATCTTGCTGCTGCATTTCCTGAGCTTAATGCTTGGGAGCGTGGATTACTATCTGAGTCCGATGCTGTTCAACCACTAATAGGTAAGAAATCATGAG
AATCGCGTAGAGGCTTTGCTATTCAGCGTTTGATGAATGCAATGCGACAGCCTGACCGTACCGAGGCTAACCCTAATGAGCTTAATCAAGATGATGCTCGTTATGGTTTCCGTTGCTGCCATCTCAAAAACATTTGGACTGCTCCGCTTCCTCCTGAGACTGAGCTTTCTCGCCTTGCTATTGACTCTACTGTAGACATTTTTACTTTTTATGTCCCTCATCGTCACGTTTATGGTGAACAGTGGATTAAGTTCATGAAG
AACACTGACGTTCTTACTGACGCAGAAGAAAACGTGCGTCAAAAATTACGCCGGGCAATAACGTTTATGTTGGTTTCATGGTTTGGTCTAACTTTACCAACTCACTAAAAACCAAGCTGTCGCTACTTCCCAAGAAGCTGTTCAGAATCAGAATGAGCCGTTATATTTTGATAGTTTGACGGTTAATGCTGGTAATGGTGGTTTTCTTCATTGCATTCAGATGGATACATCTGTCAACGCCGCTAATCAGGTTG
TCTGATAAGCTGGTTCTCACTTCTGTTACTCCAGCTTCTTCGGCACCTGTCGGCGACTTCACGCCAGAATACGAAAGACCAGGTATATGCACAAAATGAGATGCTTGCTTATCAACAGAAGGAGTCTACTGCTCGCGTTGCGTCTATTGCTTCTGCTCTTGCTGGTGGCGCCATGTCTAAATTGTTTGGAGGCGGTCAAAAAGCCGCCTCCGGTTCTGCCGTTTTGGATTTAACCGAAGATGATTTCGATTTTCTGACGAGTAA

In [59]:
if __name__ == '__main__':
    # Define a synthetic toy genome for controlled testing.
    #genome = "ACGTACGTGACGTAGCTAGCTAGCATCGATCGATCGTACGATCG"

    # Define the PhiX genome for controlled testing.
    genome = read_genome("sequence.fasta")

    
    # Parameters for read generation.
    num_reads = 100       # Number of reads (adjustable within 100 to 1,000,000)
    read_length = 50      # Length of each read (adjustable within 50 to 150 for real data)
    error_prob = 0.01     # Error probability for error-prone reads (5% in this example)
    
    # --- Test 1: Assembly on Error-Free Reads ---
    error_free_reads = generate_error_free_reads(genome, num_reads, read_length, circular=True)
    assembled_error_free = greedy_assembly_overlap_graph(error_free_reads, min_overlap=5)
    
    print("Assembly on Error-Free Reads:")
    for i, contig in enumerate(assembled_error_free, 1):
        print("Contig {} (length {}): {}".format(i, len(contig), contig))
    
    # --- Test 2: Assembly on Error-Prone Reads ---
    error_prone_reads = generate_error_prone_reads(genome, num_reads, read_length, error_prob, circular=True)
    assembled_error_prone = greedy_assembly_overlap_graph(error_prone_reads, min_overlap=5)
    
    print("\nAssembly on Error-Prone Reads:")
    for i, contig in enumerate(assembled_error_prone, 1):
        print("Contig {} (length {}): {}".format(i, len(contig), contig))


Assembly on Error-Free Reads:
Contig 1 (length 50): CTTACTTTGCGACCTTTCGCCATCAACTAACGATTCTGTCAAAAACTGAC
Contig 2 (length 50): GACGCCGGATTTGAGAATCAAAAAGAGCTTACTAAAATGCAACTGGACAA
Contig 3 (length 50): CTTTCGGATATTTCTGATGAGTCGAAAAATTATCTTGATAAAGCAGGAAT
Contig 4 (length 50): CAATTTCTGGAAAGACGGTAAAGCTGATGGTATTGGCTCTAATTTGTCTA
Contig 5 (length 50): CTCGTGATTATCTTGCTGCTGCATTTCCTGAGCTTAATGCTTGGGAGCGT
Contig 6 (length 50): TTGTTTCTGGTGCTATGGCTAAAGCTGGTAAAGGACTTCTTGAAGGTACG
Contig 7 (length 50): TTAAGGATGAGTGTTCAAGATTGCTGGAGGCCTCCACTATGAAATCGCGT
Contig 8 (length 50): GCGCAACCTGTGACGACAAATCTGCTCAAATTTATGCGCGCTTCGATAAA
Contig 9 (length 50): TGCTATTGAGGCTTGTGGCATTTCTACTCTTTCTCAATCCCCAATGCTTG
Contig 10 (length 50): CTGGTTATATTGACCATGCCGCTTTTCTTGGCACGATTAACCCTGATACC
Contig 11 (length 50): GATGGATAACCGCATCAAGCTCTTGGAAGAGATTCTGTCTTTTCGTATGC
Contig 12 (length 50): TGACTCGCAAGGTTAGTGCTGAGGTTGACTTAGTTCATCAGCAAACGCAG
Contig 13 (length 50): AATGTCACGCTGATTATTTTGACTTTGAGCGTATCGAGGCTCTTAAACCT
Contig 14 (length

In [60]:
def compute_assembly_metrics(contigs, genome):
    """
    Computes several performance measures for the quality of the assembly.

    Measures:
      - num_contigs: The total number of contigs generated. Fewer contigs generally indicate a more contiguous assembly.
      - total_assembled_length: The sum of the lengths of all contigs. This can be compared to the original genome length.
      - max_contig_length: The length of the largest contig, which ideally should be as long as the genome in a perfect assembly.
      - N50: A common assembly metric defined as the contig length such that 50% of the total assembled bases
             are contained in contigs of that length or longer. A higher N50 value generally indicates a more continuous assembly.
      - genome_reconstructed: A boolean flag indicating whether the original genome is present (as a substring) in any contig.

    Parameters:
      contigs (List[str]): The list of assembled contigs.
      genome (str): The original genome sequence.

    Returns:
      dict: A dictionary containing the computed performance metrics.
    """
    metrics = {}
    # Number of contigs.
    metrics['num_contigs'] = len(contigs)
    
    # Total assembled length.
    total_length = sum(len(contig) for contig in contigs)
    metrics['total_assembled_length'] = total_length
    
    # Maximum contig length.
    metrics['max_contig_length'] = max((len(contig) for contig in contigs), default=0)
    
    # Compute N50: sort contig lengths in descending order and accumulate until reaching half of the total assembled length.
    lengths = sorted((len(contig) for contig in contigs), reverse=True)
    half_total = total_length / 2
    running_sum = 0
    N50 = 0
    for l in lengths:
        running_sum += l
        if running_sum >= half_total:
            N50 = l
            break
    metrics['N50'] = N50
    
    # Check if the genome is fully reconstructed in at least one contig.
    metrics['genome_reconstructed'] = any(genome in contig for contig in contigs)
    
    return metrics


In [67]:

# Example usage (this can be appended to your testing harness):
if __name__ == '__main__':
    # For testing purposes, we can use a toy genome:
    toy_genome = "ACGTACGTGACGTAGCTAGCTAGCATCGATCGATCGTACGATCG"

    # using the Phix genome:
    genome = read_genome("sequence.fasta")

    
    num_reads = 100       
    read_length = 50      
    error_prob = 0.01     # 1% error probability for error-prone reads.
    
    # Generate error-free reads and assemble.
    error_free_reads = generate_error_free_reads(toy_genome, num_reads, read_length, circular=True)
    assembled_error_free = greedy_assembly_overlap_graph(error_free_reads, min_overlap=5)
    metrics_error_free = compute_assembly_metrics(assembled_error_free, toy_genome)
    
    print("Performance Metrics on Error-Free Reads:")
    for key, value in metrics_error_free.items():
        print(f"{key}: {value}")
    
    # Generate error-prone reads and assemble.
    error_prone_reads = generate_error_prone_reads(toy_genome, num_reads, read_length, error_prob, circular=True)
    assembled_error_prone = greedy_assembly_overlap_graph(error_prone_reads, min_overlap=5)
    metrics_error_prone = compute_assembly_metrics(assembled_error_prone, toy_genome)
    
    print("\nPerformance Metrics on Error-Prone Reads:")
    for key, value in metrics_error_prone.items():
        print(f"{key}: {value}")


Performance Metrics on Error-Free Reads:
num_contigs: 1
total_assembled_length: 88
max_contig_length: 88
N50: 88
genome_reconstructed: True

Performance Metrics on Error-Prone Reads:
num_contigs: 8
total_assembled_length: 1280
max_contig_length: 333
N50: 202
genome_reconstructed: True
