## Description

Simulate reads with SNPs, indels and sequencing errors.

## Data & modules

In [1]:
from Bio import SeqIO, AlignIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
import numpy.random as rd
from scipy.stats import geom, binom, poisson
import numpy as np
from copy import deepcopy

In [2]:
RANDOM_SEED = 42  
rd.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)  


In [3]:
genes = ['PKD1'] + ['PKD1P%i' % i for i in range(1, 7)]
NUCL = ['A', 'C', 'T', 'G']
NUCL_extd = NUCL + ['N']

In [4]:
refseq_list = list(SeqIO.parse('ref.fa', 'fasta'))
refseq_dict = {s.id: s for s in refseq_list}

In [5]:
refseq_lens = [len(refseq_dict[g]) for g in genes]
refseq_len_dict = {g: len(refseq_dict[g]) for g in genes}

## Parameters

In [6]:
# Genomic
SNP_prob = 0.001
INDEL_prob = 0.0001  # joint for insertions and deletions
INDEL_len_mean = 2
# Sequencing
ERR_prob = 0.005
FRAGMENT_LEN_mean = 350

## Mutate the reference genome 

In [7]:
# Create a mutation log to track all mutations
mutation_log = []

# Apply mutations for all genes
mutseq_list = []
for gene_idx, gene in enumerate(genes):
    ref_seq = str(refseq_dict[gene].seq)
    # Create a copy of the reference sequence
    mut_seq = list(ref_seq)
    
    # Apply SNPs
    for pos in range(len(ref_seq)):
        if rd.random() < SNP_prob:
            ref_base = ref_seq[pos]
            # Select a random base different from the reference
            alt_bases = [b for b in NUCL if b != ref_base]
            alt_base = rd.choice(alt_bases)
            
            # Add to mutation log
            mutation_log.append((gene, pos, 'SNP', ref_base, alt_base))
            
            # Apply mutation to sequence
            mut_seq[pos] = alt_base
    
    # Apply indels
    pos = 0
    while pos < len(ref_seq):
        if rd.random() < INDEL_prob:
            # Determine if insertion or deletion
            is_insertion = bool(rd.choice([0, 1]))
            
            # Determine length using geometric distribution
            indel_len = max(1, int(rd.geometric(p=1/INDEL_len_mean)))
            
            if is_insertion:
                # Create random insertion sequence
                ins_seq = ''.join(rd.choice(NUCL, size=indel_len))
                
                # Add to mutation log
                mutation_log.append((gene, pos, 'INS', ref_seq[pos], ins_seq))
                
                # Apply insertion to sequence (insert after current position)
                for i, base in enumerate(ins_seq):
                    mut_seq.insert(pos + 1 + i, base)
                
                # Adjust position
                pos += indel_len
            else:
                # Ensure we don't delete beyond the end of sequence
                indel_len = min(indel_len, len(ref_seq) - pos)
                if indel_len > 0:
                    # Get sequence to be deleted
                    del_seq = ref_seq[pos:pos+indel_len]
                    
                    # Add to mutation log
                    mutation_log.append((gene, pos, 'DEL', del_seq, ''))
                    
                    # Apply deletion to sequence
                    del mut_seq[pos:pos+indel_len]
        
        pos += 1
    
    # Convert the mutated sequence back to a string
    mut_seq = ''.join(mut_seq)
    
    # Create a SeqRecord for the mutated sequence
    mut_seq_record = SeqRecord(Seq(mut_seq), id=gene+'_mutated', description='')
    
    # Add to mutated sequence list
    mutseq_list.append(mut_seq_record)

# Calculate lengths of mutated sequences
mutseq_lens = [len(s) for s in mutseq_list]

# Output all mutated sequences to FASTA file
with open('all_genes_sequences.fasta', 'w') as fasta:
    for gene_idx, gene in enumerate(genes):
        fasta.write(f">{gene}_reference\n")
        fasta.write(f"{str(refseq_dict[gene].seq)}\n")
        fasta.write(f">{gene}_mutated\n")
        fasta.write(f"{str(mutseq_list[gene_idx].seq)}\n")

# Create VCF and TSV files for all genes
with open('all_genes_mutations.vcf', 'w') as f:
    # Write VCF header
    f.write('##fileformat=VCFv4.2\n')
    f.write('##fileDate=20250315\n')
    f.write('##source=PythonScript\n')
    f.write('##reference=ref.fa\n')
    for gene in genes:
        f.write(f'##contig=<ID={gene},length={refseq_len_dict[gene]}>\n')
    f.write('##phasing=none\n')
    f.write('##INFO=<ID=TYPE,Number=A,Type=String,Description="The type of allele, either snp, ins, or del">\n')
    f.write('#CHROM\tPOS\tID\tREF\tALT\tINFO\n')
    
    # Sort mutations by gene and position
    sorted_mutations = sorted(mutation_log, key=lambda x: (x[0], x[1]))
    
    # Process each mutation for VCF output
    for mutation in sorted_mutations:
        gene, pos, mut_type, ref_seq, alt_seq = mutation
        ref_pos = pos  # 0-based position from the log
        ref_sequence = str(refseq_dict[gene].seq)
        
        if mut_type == 'SNP':
            # Single nucleotide polymorphism
            f.write(f"{gene}\t{ref_pos+1}\t.\t{ref_seq}\t{alt_seq}\tTYPE=SNP\n")
            
        elif mut_type == 'DEL':
            # Deletion - use the preceding base as anchor
            if ref_pos > 0:  # Ensure we're not at the start
                preceding_base = ref_sequence[ref_pos-1]
                f.write(f"{gene}\t{ref_pos}\t.\t{preceding_base}{ref_seq}\t{preceding_base}\tTYPE=DEL\n")
            
        elif mut_type == 'INS':
            # Insertion - use the base at position as anchor
            # For VCF, the position is the base before the insertion
            f.write(f"{gene}\t{ref_pos+1}\t.\t{ref_seq}\t{ref_seq}{alt_seq}\tTYPE=INS\n")

# Create comprehensive TSV file for mutations
with open('all_genes_mutations.tsv', 'w') as tsv:
    # Write TSV header
    tsv.write('GENE\tPOS\tREF\tMUT\tTYPE\n')
    
    # Create a mapping of mutations per gene
    gene_mutation_maps = {}
    gene_insertion_sequences = {}
    
    for gene in genes:
        gene_mutation_maps[gene] = {}
        gene_insertion_sequences[gene] = {}
    
    # Process all mutations
    for mutation in sorted_mutations:
        gene, pos, mut_type, ref_seq, alt_seq = mutation
        if mut_type == 'SNP':
            gene_mutation_maps[gene][pos] = ('SNP', ref_seq, alt_seq)
        elif mut_type == 'DEL':
            for i in range(len(ref_seq)):
                gene_mutation_maps[gene][pos + i] = ('DEL', ref_seq[i], '-')
        elif mut_type == 'INS':
            # Store insertion point and sequence
            gene_insertion_sequences[gene][pos] = alt_seq
    
    # Process each gene and position
    for gene in genes:
        ref_sequence = str(refseq_dict[gene].seq)
        current_pos = 0
        
        while current_pos < len(ref_sequence):
            ref_base = ref_sequence[current_pos]
            
            # Check for a mutation at this position
            if current_pos in gene_mutation_maps[gene]:
                mut_type, ref, alt = gene_mutation_maps[gene][current_pos]
                tsv.write(f"{gene}\t{current_pos+1}\t{ref}\t{alt}\t{mut_type}\n")
            else:
                # No mutation at this position - it's a match
                tsv.write(f"{gene}\t{current_pos+1}\t{ref_base}\t{ref_base}\tMATCH\n")
            
            # Check for an insertion after this position
            if current_pos in gene_insertion_sequences[gene]:
                inserted_seq = gene_insertion_sequences[gene][current_pos]
                # Write insertion rows without position numbers
                for base in inserted_seq:
                    tsv.write(f"{gene}\t\t-\t{base}\tINS\n")
            
            current_pos += 1

print("Exported to all_genes_mutations.vcf, all_genes_sequences.fasta, and all_genes_mutations.tsv")

Exported to all_genes_mutations.vcf, all_genes_sequences.fasta, and all_genes_mutations.tsv


In [8]:
mutseq_lens = [len(s) for s in mutseq_list]

## Simulate reads
Using mutseq_list as reference

In [64]:
N_reads = 100000

In [10]:
r1_list = []
r2_list = []
# Select genes to simulate reads from 
selected_genes = rd.choice(len(genes), size=N_reads)  
start_locations = []
for rid, gid in enumerate(selected_genes):
    # Sample the DNA fragment length
    # We use a shifted geometric distribution with minimum value = 150, mean value = FRAGMENT_LEN_mean
    FRAGLEN = 150 + geom.rvs(p=1/(FRAGMENT_LEN_mean-150))
    # Choose the read starting location. We just sample uniformly, this could be made 
    # more realistically but whatever. Note that this depends on FRAGLEN.
    START = rd.choice(mutseq_lens[gid]-FRAGLEN+1)
    read_description = genes[gid] + '; ' + str(START)+'-'+str(START+FRAGLEN)
    # Take the genome sequences, add sequencing errors.
    # r1 is the forward read, r2 is the backward read
    r1 = list(mutseq_list[gid][START:(START+150)])
    is_mutated1 = rd.choice(2, size=len(r1), p=[1-ERR_prob, ERR_prob])
    r1 = [rd.choice(NUCL) if mut else c for c,mut in zip(r1, is_mutated1)]
    r1 = SeqRecord(Seq(''.join(r1)), id = 'Read'+str(rid), name='', description = read_description)
    r1.letter_annotations['phred_quality'] = [42]*150
    assert len(r1) == 150
    # Note: we reverse complement r2
    r2 = list(mutseq_list[gid][(START+FRAGLEN-150):(START+FRAGLEN)].reverse_complement())
    is_mutated2 = rd.choice(2, size=len(r2), p=[1-ERR_prob, ERR_prob])
    r2 = [rd.choice(NUCL) if mut else c for c,mut in zip(r2, is_mutated2)]
    r2 = SeqRecord(Seq(''.join(r2)), id = 'Read'+str(rid), name='', description = read_description)
    r2.letter_annotations['phred_quality'] = [42]*150
    assert len(r2) == 150
    r1_list.append(r1)
    r2_list.append(r2)

## Save the reads and SNP locations

In [11]:
with open('simulated_r1.fq', 'w') as h:
    SeqIO.write(r1_list, h, 'fastq')
with open('simulated_r2.fq', 'w') as h:
    SeqIO.write(r2_list, h, 'fastq')