In [1]:
from abc import ABC, abstractmethod
from Bio.Data import CodonTable
from Bio.Seq import Seq
from copy import deepcopy

import os
import numpy as np
import pandas as pd
import random
import warnings

This notebook is for putting together the end-to-end tests of evSeq. We will reconstruct inputs for known outputs, then test them make sure the evSeq software outputs what we expect.

Steps:
1. Build a random amino acid sequence. This is our reference AA.
2. Decide if we are operating in detailed_refseq mode or not. If not, then the random amino acid sequence gives what we expect for the full plate.
3. Build variants of the input amino acid sequences. Decide if we have a mixed well or not. A mixed well means we need a second amino acid sequence.
4. Build both a "NNN" and no variation version of the reference sequence corresponding to positions where we expect mutations.
5. Choose a random codon to encode the amino acid changes in the sequence.
6. For building reads, choose:
    1. Ratio of different sequences. This decides whether or not they are recognized by `variable_thresh`
    2. Number of total sequences *at each position*. This decides whether a well is considered dead or not by `variable_count`
7. Build a set of perfect reads up to the total number of target sequences. 
8. Add on bad sequences that should get filtered out by the `bp_q_cutoff`, `length_cutoff`, and `average_q_cutoff` parameters. Also add indels.
9. Pad on additional sequence information that creates frameshifts. Add to an input file for processing.

The BioPython codon table implementation is beyond bad, so we rewrite it here, fixing all of the BioPython flaws:

In [3]:
class CustomCodonTable():
    """
    Because the BioPython implementation of a codon table is all over the place,
    fixing all the flaws in this class. Not inheriting because I just don't trust it.
    """
    def __init__(self):
        
        # Get the standard codon table from BioPython. Their docs do not say what this is,
        # but it looks right from a spot check. As it is, we wrote our own for evSeq, so it
        # is good to have a separate codon table used for validation.
        bio_codon_table = CodonTable.standard_dna_table
        
        # Build the forward dictionary. Add the stop codon translations (why
        # aren't these included in the first place???).
        self.codon_to_aa = bio_codon_table.forward_table.copy()
        for stop_codon in bio_codon_table.stop_codons:
            self.codon_to_aa[stop_codon] = "*"
            
        # Build the reverse dictionary. BioPython reverse dictionary includes just one 
        # codon per aa. Fix this.
        self.aa_to_codon = {}
        for codon, aa in self.codon_to_aa.items():
            if aa in self.aa_to_codon:
                self.aa_to_codon[aa].append(codon)
            else:
                self.aa_to_codon[aa] = [codon]

We need global variables that define the bounds on our testing. Everything is randomly produced, so we need to set ranges on our sampling:

In [4]:
# Import evSeq globals needed for testing
from evSeq.util.globals import (BARCODE_LENGTH, 
                                ADAPTER_F,
                                ADAPTER_LENGTH_F,
                                ADAPTER_R,
                                ADAPTER_LENGTH_R)

# Reference sequence bounds (IN NUMBER OF AAS!!)
MIN_REFSEQ_LEN = 50
MAX_REFSEQ_LEN = 251

# Bounds on readlength
MIN_READLENGTH = 150
MAX_READLENGTH = 301
assert (MIN_REFSEQ_LEN * 3) <= MIN_READLENGTH

# Bounds on quality cutoffs
MIN_BP_QUAL_CUTOFF = 15
MAX_BP_QUAL_CUTOFF = 36

MIN_GLOBAL_QUAL_CUTOFF = 15
MAX_GLOBAL_QUAL_CUTOFF = 36

MAX_QUAL_ALLOWED = 41

# Bounds on sequence length cutoffs
MIN_SEQLEN_CUTOFF = 0.0
MAX_SEQLEN_CUTOFF = 1.0

# Bounds on number of indels added
MIN_INDELS_ADDED = 1
MAX_INDELS_ADDED = 4

# Bounds on primer lengths
PRIMER_MIN_LEN = 10
PRIMER_MAX_LEN = 41

# Bounds on the frameshift for a reference seq
FRAMESHIFT_MIN = 0
FRAMESHIFT_MAX = 3

# Bounds on position identification
MIN_VARIABLE_THRESH = 0.0
MAX_VARIABLE_THRESH = 1.0
MIN_VARIABLE_COUNT = 0
MAX_VARIABLE_COUNT = 21

# Number of variants in a well
MIN_N_VARIANTS = 1
MAX_N_VARIANTS = 4

# Number of reads in a well
MIN_N_READS = 0
MAX_N_READS = 101

# Bounds on number of mutations per sequence (as a fraction
# of the number of amino acids captured by the read-length)
MIN_PERC_MUTATED = 0.0
MAX_PERC_MUTATED = 0.2

# Number of amino acids that can have noise added to them (as
# a fraction of the number of amino acids that have been mutated)
MIN_NOISE_PERC = 0.0
MAX_NOISE_PERC = 0.5

# Number of amino acids to rescue in overlapping regions (as
# a fraction of the total number of rescue options)
RESCUE_FREQ = 0.5

# Number of noisy reads added to the fastq
MIN_DUD_READS = 0
MAX_DUD_READS = 11

# Base for quality score calculation
Q_SCORE_BASE = 33

# Allowed nucleotides
ALLOWED_NUCLEOTIDES = ("A", "T", "C", "G")

# Codon table and allowed amino acid characters
CODON_TABLE = CustomCodonTable()
ALLOWED_AAS = tuple(sorted(list(CODON_TABLE.aa_to_codon.keys())))
INT_TO_AA = dict(enumerate(ALLOWED_AAS))
N_AAS = len(ALLOWED_AAS)

# Random number generators
RANDOM_SEED = sum(ord(char) for char in "PDawg")
NP_RNG = np.random.default_rng(RANDOM_SEED)
RANDOM_RNG = random.Random(RANDOM_SEED)

# Barcode file
INDEX_DF = pd.read_csv("../evSeq/util/index_map.csv")
N_INDICES = len(INDEX_DF)
N_PLATES = len(INDEX_DF.IndexPlate.unique())

In [5]:
def reverse_complement(seq):
    return str(Seq(seq).reverse_complement())

# Use qualities as an ordinal encoding
def ord_to_chr(qualities):
    rebased_qs = qualities + Q_SCORE_BASE
    return "".join(chr(qual) for qual in rebased_qs)

We will store basic variables used for building test data in a configuration class, defined below:

In [6]:
class QualityGenerator():
    """
    Utility for generating quality score arrays
    """
    def __init__(self, min_q_allowed):
        self.min_q_allowed = min_q_allowed
        
    def generate_qualities(self, size):
        return NP_RNG.integers(self.min_q_allowed, MAX_QUAL_ALLOWED,
                               size = size)

class FakeRefseq():
    def __init__(self):
        
        # Randomly create an amino acid reference sequence. Record the length.
        self.refseq_len = NP_RNG.integers(MIN_REFSEQ_LEN, MAX_REFSEQ_LEN)
        aa_ints = NP_RNG.choice(N_AAS, size = self.refseq_len)
        self.aa_refseq = [INT_TO_AA[aa_int] for aa_int in aa_ints]

        # Assign a codon to each amino acid.
        self.codon_refseq = [RANDOM_RNG.choice(CODON_TABLE.aa_to_codon[aa]) 
                             for i, aa in enumerate(self.aa_refseq)] 
        self.codon_refseq_len = self.refseq_len * 3
        
        # Decide on the frameshift of the reference sequence
        self.frameshift_front = NP_RNG.integers(FRAMESHIFT_MIN, FRAMESHIFT_MAX)
        self.frameshift_back = NP_RNG.integers(FRAMESHIFT_MIN, FRAMESHIFT_MAX)        
        
        # Determine the bases for the frameshift
        self.frameshift_bp_front = RANDOM_RNG.choices(ALLOWED_NUCLEOTIDES, 
                                                      k = self.frameshift_front)
        self.frameshift_bp_back = RANDOM_RNG.choices(ALLOWED_NUCLEOTIDES, 
                                                     k = self.frameshift_back)
                
        # Create primer seeds
        self.primer_seed_len_f = NP_RNG.integers(PRIMER_MIN_LEN, PRIMER_MAX_LEN)
        self.primer_seed_len_r = NP_RNG.integers(PRIMER_MIN_LEN, PRIMER_MAX_LEN)
        self.primer_seed_f = RANDOM_RNG.choices(ALLOWED_NUCLEOTIDES, k = self.primer_seed_len_f)
        self.primer_seed_r = RANDOM_RNG.choices(ALLOWED_NUCLEOTIDES, k = self.primer_seed_len_r)
        
        # Assign the total readable window
        self.readable_window_len = (self.codon_refseq_len + 
                                    self.frameshift_front +
                                    self.frameshift_back + 
                                    self.primer_seed_len_f + 
                                    self.primer_seed_len_r +
                                    ADAPTER_LENGTH_F +
                                    ADAPTER_LENGTH_R +
                                    2 * BARCODE_LENGTH)
    
    def define_refseq_windows(self, readlength):
        """
        Defines the region within the mutable refseq region that can be modified.
        """
        # Choose legal positions for making mutations. For forward, the first readable comes after
        # the barcode, adapter, and seed region; this is the last readable for reverse. For forward,
        # the last readable is the last amino acid  captured in full by the readlength, considering
        # the primer binding region, the adapter machinery, and the barcode.
        effective_readlength = readlength - BARCODE_LENGTH
        max_readable_aa_ind_f = (effective_readlength - self.frameshift_front -
                                 ADAPTER_LENGTH_F - self.primer_seed_len_f) // 3
        min_readable_aa_ind_r = int(np.ceil((effective_readlength - self.frameshift_back - 
                                             ADAPTER_LENGTH_R - self.primer_seed_len_r) / 3))
        
        # Make sure we don't break the bounds of the readable region
        max_readable_aa_ind_f = min(max_readable_aa_ind_f, self.refseq_len)
        min_readable_aa_ind_r = max(self.refseq_len - min_readable_aa_ind_r, 0)
        
        # Define the readable positions
        self.forward_readable_aas = list(range(0, max_readable_aa_ind_f))
        self.reverse_readable_aas = list(range(min_readable_aa_ind_r, self.refseq_len))
        mutable_aa_inds = self.forward_readable_aas + self.reverse_readable_aas
                
        # Get only unique mutable inds
        mutable_aa_set = set(mutable_aa_inds)
        self.mutable_aa_inds = np.array(list(mutable_aa_set), dtype = int)
        self.mutable_aa_inds.sort()
        
        # Checks on the mutable inds
        all_possible_inds = set(range(self.refseq_len))
        assert mutable_aa_set.issubset(all_possible_inds)
        
        # Get the set of indices that is captured by both the forward and reverse reads
        self.double_count_inds = set(self.forward_readable_aas) & set(self.reverse_readable_aas)
        
    def assign_qualities(self, min_q_allowed):
        """
        Assigns base quality scores to all sequence components. This includes:
        1. Primer seeds
        2. Frameshifts
        3. The codon refseq
        """
        # Create a quality generator
        q_generator = QualityGenerator(min_q_allowed)
        
        # Primer seed qualities
        self.primer_qualities_f = q_generator.generate_qualities(self.primer_seed_len_f)
        self.primer_qualities_r = q_generator.generate_qualities(self.primer_seed_len_r)
        
        # Barcode qualities
        self.fbc_qualities = q_generator.generate_qualities(BARCODE_LENGTH)
        self.rbc_qualities = q_generator.generate_qualities(BARCODE_LENGTH)
        
        # Adapter qualities
        self.adapter_qualities_f = q_generator.generate_qualities(ADAPTER_LENGTH_F)
        self.adapter_qualities_r = q_generator.generate_qualities(ADAPTER_LENGTH_R)
        
        # Frameshift qualities
        self.frameshift_front_qualities = q_generator.generate_qualities(self.frameshift_front)
        self.frameshift_back_qualities = q_generator.generate_qualities(self.frameshift_back)
        
        # Variable region base qualities
        self.base_variable_qualities = q_generator.generate_qualities(self.codon_refseq_len)
        
    @staticmethod
    def aa_seq_to_codon_seq(aa_seq):
        return [RANDOM_RNG.choice(CODON_TABLE.aa_to_codon[aa]) 
                for i, aa in enumerate(aa_seq)]         
        
    @property
    def refseq_bp_seq(self):
        """
        Returns the sequence of the refseq that will be fed into evSeq
        """
        return "".join(
            self.primer_seed_f +
            self.frameshift_bp_front +
            self.codon_refseq +
            self.frameshift_bp_back +
            self.primer_seed_r
        )

        
# Class that holds parameters that will be needed by evSeq
class Config():
    def __init__(self, detailed = True):
        """
        Builds a set of conditions that might be passed into an evSeq run.
        """
        # Record whether or not this is a detailed run
        self.detailed = detailed
        
        # Build as many reference sequences as needed
        n_refs_needed = N_INDICES if detailed else N_PLATES
        self.refseqs = [FakeRefseq() for _ in range(n_refs_needed)] 
        
        # Decide on the readlength that we will be using. The maximum allowed
        # readlength cannot be longer than the full sequencable length of
        # the shortest a reference sequence
        min_refseq_len = min(refseq.readable_window_len for refseq in self.refseqs)
        max_readlength = min(min_refseq_len, MAX_READLENGTH)
        self.readlength = NP_RNG.integers(MIN_READLENGTH, max_readlength)
                
        # Decide on the input variables that will define the run, including 
        # average_q_cutoff, bp_q_cutoff, length_cutoff, variable_thresh, and
        # variable_count
        self.average_q_cutoff = NP_RNG.integers(MIN_GLOBAL_QUAL_CUTOFF, MAX_GLOBAL_QUAL_CUTOFF)
        self.bp_q_cutoff = NP_RNG.integers(MIN_BP_QUAL_CUTOFF, MAX_BP_QUAL_CUTOFF)
        self.length_cutoff = NP_RNG.uniform(MIN_SEQLEN_CUTOFF, MAX_SEQLEN_CUTOFF)
        self.variable_thresh = NP_RNG.uniform(MIN_VARIABLE_THRESH, MAX_VARIABLE_THRESH)
        self.variable_count = NP_RNG.integers(MIN_VARIABLE_COUNT, MAX_VARIABLE_COUNT)
        
        # Build base quality scores for the reference sequences. Also define the allowed
        # mutagenesis windows for the amino acids, which encompasses the last amino acids
        # captured in full by the readlength. 
        for refseq in self.refseqs:
            refseq.assign_qualities(max(self.average_q_cutoff, self.bp_q_cutoff) + 1)
            refseq.define_refseq_windows(self.readlength)

The basic data generator is a FakeWell instance. This holds methods for building fake data from the Config object.

In [7]:
# Create a class that holds all relevant information for a variant
class Variant(FakeData):
    def __init__(self, well, counts, minimum_reads_allowed):
        
        # The well is an instance variables
        self.well = well
        
        # Set the total number of counts for the variant and the
        # minimum reads that are allowed for it
        self.total_counts = counts
        self.minimum_reads_allowed = minimum_reads_allowed
        
        # Don't move forward if there are no counts
        self.no_counts = (counts == 0)
        
        if not self.no_counts:
            
            # Choose the variable positions and build variants
            self.choose_variable_positions()
            self.build_variants()

            # Assign Q scores. Get expected counts.
            self.incorporate_noisy_positions()
        
    def choose_variable_positions(self):
        """
        Chooses the variable positions for the variant and assigns mutant
        amino acids/codons.
        """
        warnings.warn("You need to make sure that the amino acid returned is different"
                      " from the existing one. Same goes for the nucleotides.")
        # Get the number of mutations to make
        n_mutable_positions = len(self.well.refseq.mutable_aa_inds)
        min_n_muts = int(MIN_PERC_MUTATED * n_mutable_positions)
        max_n_muts = int(MAX_PERC_MUTATED * n_mutable_positions) + 1
        self.n_variable_positions = NP_RNG.integers(min_n_muts, max_n_muts)
        
        # Decide on the positions that will be varied within the variant
        # This must be in the readable region. 
        self.mutated_positions = NP_RNG.choice(self.well.refseq.mutable_aa_inds, 
                                               size = self.n_variable_positions,
                                               replace = False)
        self.mutated_positions.sort()
        
        # Choose variable amino acids
        self.variable_aas = RANDOM_RNG.choices(ALLOWED_AAS, k = self.n_variable_positions)
        
        # Choose a random codon for encoding said amino acids. It cannot be the
        # same as the existing codon.
        self.variable_codons = [None] * self.n_variable_positions
        for i, aa in enumerate(self.variable_aas):
            
            # Get the options for the alternate codons. Again, we cannot
            # reuse the codon that is already present in the refseq
            existing_codon = self.well.refseq.codon_refseq[self.mutated_positions[i]]
            codon_opts = [codon for codon in CODON_TABLE.aa_to_codon[aa]
                          if codon != existing_codon]
            
            # If there are no alternate codons (e.g., if we selected methionine as
            # our mutant aa and refseq was already methionine), then just sample from
            # leucine
            if len(codon_opts) == 0:
                self.variable_aas[i] = "L"
                codon_opts = CODON_TABLE.aa_to_codon["L"]
                
            # Select the new codon
            self.variable_codons[i] = RANDOM_RNG.choice(codon_opts)
            
    def build_variants(self):
        """
        Builds sequence variants based on the refseq sequence and chosen replacement
        amino acids/positions
        """
        # Create base mutant sequences based on the refseq
        self.base_mut_aa_seq = self.well.refseq.aa_refseq.copy()
        base_mut_codon_seq = self.well.refseq.codon_refseq.copy()

        # Loop over the mutated positions and add to the base sequence
        assert len(self.mutated_positions) == len(self.variable_aas)
        assert len(self.mutated_positions) == len(self.variable_codons)
        for i, mutated_pos in enumerate(self.mutated_positions):
            self.base_mut_aa_seq[mutated_pos] = self.variable_aas[i]
            base_mut_codon_seq[mutated_pos] = self.variable_codons[i]

        # Create forward and reverse copies of the codon sequences. Make as many copies
        # as there are variants for the refseq
        self.mut_codon_seqs_f = [base_mut_codon_seq.copy() for _ in range(self.total_counts)]
        self.mut_codon_seqs_r = deepcopy(self.mut_codon_seqs_f)
        
    def id_noisy_positions(self):
        """
        Adds noise to a number of mutated positions.
        """
        # Decide how many reads for combos of amino acids will have noise
        # added to them. This number must fall above the minimum counts required.
        buffer_region = self.total_counts - self.minimum_reads_allowed
        n_combos_destroyed = NP_RNG.integers(buffer_region) if buffer_region > 0 else 0

        # Get the expected number of both bp and aa combinations
        self.expected_combo_counts = self.total_counts - n_combos_destroyed 

        # Decide which reads will have noise added.
        read_inds = np.arange(self.total_counts)
        noisy_reads = NP_RNG.choice(read_inds, 
                                    size = n_combos_destroyed,
                                    replace = False)
        noisy_reads.sort()

        # Determine options for noisy positions.
        if len(self.well.refseq.double_count_inds) != 0:
            forbidden_noisy_aas = {min(self.well.refseq.double_count_inds), 
                                   max(self.well.refseq.double_count_inds)}
        else:
            forbidden_noisy_aas = set()
        noisened_position_options = np.array([pos for pos in self.mutated_positions
                                              if pos not in forbidden_noisy_aas])

        # Determine how many noisy positions per read. 
        n_noisey_per_read = int(len(noisened_position_options) * 
                                NP_RNG.uniform(MIN_NOISE_PERC, MAX_NOISE_PERC))

        # Decide which amino acids within each read will have noise added.
        # Do not add noise to codons within 1 of the double overlap region.
        noisy_positions = np.array([NP_RNG.choice(noisened_position_options, 
                                                  size = n_noisey_per_read,
                                                  replace = False)
                                    for _ in range(n_combos_destroyed)])

        # Determine which bases are noisy for each noisy amino acid. 
        n_noisy_bases_by_noisy_pos = NP_RNG.integers(1, 4, size = (n_combos_destroyed, n_noisey_per_read))
        noisy_bases_by_noisy_pos = [[NP_RNG.choice(3,
                                                  size = n_noisy_bases,
                                                  replace = False) 
                                     for n_noisy_bases in noisy_base_array]
                                    for noisy_base_array in n_noisy_bases_by_noisy_pos]
        
        # Checks
        assert len(noisy_reads) == len(noisy_positions)
        
        return noisy_reads, noisy_positions, noisy_bases_by_noisy_pos
                            
    def build_expected_count_arrays(self):

        # Create arrays of counts.
        expected_aa_counts = np.full(self.well.refseq.refseq_len, 
                                     self.total_counts)
        expected_bp_counts = np.full(self.well.refseq.codon_refseq_len, 
                                     self.total_counts)

        # Double positions in the counts where we have overlap 
        for i, mutant_pos in enumerate(self.mutated_positions):
            if mutant_pos in self.well.refseq.double_count_inds:

                # Double aa counts
                expected_aa_counts[i] *= 2

                # Double bp counts
                bp_start_ind = i * 3
                for bp_ind in range(bp_start_ind, bp_start_ind + 3):
                    expected_bp_counts[bp_ind] *= 2

        return expected_aa_counts, expected_bp_counts
        
    def incorporate_noisy_positions(self):
        """
        Assigns counts to different bases and amino acids, then assigns quality scores
        to get them to these counts
        """
        # Identify noisy reads, positions, and nucleotides
        noisy_reads, noisy_positions, noisy_bases_by_noisy_pos = self.id_noisy_positions()

        # Build expected output counts for amino acids and bases
        self.expected_aa_counts, self.expected_bp_counts = self.build_expected_count_arrays()
        
        # Create two quality score arrays. One is for the forward read and the other
        # is for the reverse reads
        self.f_quals = np.tile(self.well.refseq.base_variable_qualities.copy(),
                          (self.total_counts, 1))
        self.r_quals = self.f_quals.copy()
        min_existing_q = self.f_quals.min()
        bad_qual_q = min_existing_q - 2

        # Add noise to positions. Adjust counts and qualities accordingly.
        for noisy_read, noisy_position_array, noisy_bp_array in \
            zip(noisy_reads, noisy_positions, noisy_bases_by_noisy_pos):

            # Loop over all positions and adjust basepair quality as appropriate
            for noisy_pos, noisy_base_set in zip(noisy_position_array, noisy_bp_array):

                # Determine count adjustment for the position
                double_count_pos = (noisy_pos in self.well.refseq.double_count_inds)
                count_adj = 2 if double_count_pos else 1

                # If a position is in the double-read region, with some probability
                # one read will rescue the other. If we rescue, then we set only 1 
                # codon/amino acid as low quality. The other one is kept fine. We also
                # mutate the low-quality codon again (this should never be counted, providing
                # a test to make sure that we are appropriately ignoring codons)
                rescue_check = (double_count_pos and (NP_RNG.uniform() < RESCUE_FREQ))
                rescue = True if rescue_check else False

                # Get the base index for the noisy position
                noisy_base_index_zero = noisy_pos * 3

                # Loop over the base set and add noise
                for noisy_base in noisy_base_set:

                    # Calculate the actual index
                    actual_base_ind = noisy_base_index_zero + noisy_base

                    # If rescuing, choose one of the quality score arrays to update. Also add 
                    # an error to the low-quality sequence. This should be ignored the evSeq software.
                    if rescue:

                        # Define options for qualities and sequences
                        qual_opts = [self.f_quals, self.r_quals]
                        codon_opts = [self.mut_codon_seqs_f, self.mut_codon_seqs_r]

                        # Choose which quality and sequence array will be update
                        target_ind = 0 if NP_RNG.uniform() < 0.5 else 1
                        target_qual_array = qual_opts[target_ind]
                        target_codon_list = codon_opts[target_ind]

                        # Update the qualities in the chosen array
                        target_qual_array[noisy_read, actual_base_ind] = bad_qual_q

                        # Choose a replacement base
                        replacement_codon = list(target_codon_list[noisy_read][noisy_pos])
                        replacement_codon[noisy_base] = RANDOM_RNG.choice(ALLOWED_NUCLEOTIDES)
                        target_codon_list[noisy_read][noisy_pos] = "".join(replacement_codon)

                        # Set the count adjuster. It is only 1 here, because we lost a base.
                        count_adj = 1

                    # Otherwise, we adjust both quality arrays and make no changes to sequence
                    else:

                        # Set quality to be 2 less than the minimum existing quality in
                        # the array of q-scores.
                        self.f_quals[noisy_read, actual_base_ind] = bad_qual_q
                        self.r_quals[noisy_read, actual_base_ind] = bad_qual_q

                    # Adjust the counts. 
                    self.expected_aa_counts[noisy_pos] -= count_adj
                    self.expected_bp_counts[actual_base_ind] -= count_adj
        
    def build_perfect_reads(self):
        """
        Assigns a `perfect_reads` variable to the instance.
        """
        # Confirm that we have as many qualities as mutant sequences
        assert len(self.f_quals) == self.total_counts
        assert len(self.f_quals) == len(self.r_quals)
        assert len(self.f_quals) == len(self.mut_codon_seqs_f)
        assert len(self.f_quals) == len(self.mut_codon_seqs_r)

        # Now loop over the remaining sequence variants and build entries.
        fastq_r1 = [None] * self.total_counts
        fastq_r2 = fastq_r1.copy()
        for i in range(self.total_counts):

            # Confirm that mutant sequence length matches with quality score length
            full_forward_bp = "".join(self.mut_codon_seqs_f[i])
            full_forward_q = self.f_quals[i]
            assert len(full_forward_bp) == len(full_forward_q)

            full_rev_bp = "".join(self.mut_codon_seqs_r[i])
            full_rev_q = self.r_quals[i]
            assert len(full_rev_bp) == len(full_rev_q)
            
            assert len(full_forward_bp) == len(full_rev_bp)    
            
            # Record fastq entries
            fastq_r1[i], fastq_r2[i] = self.well.build_fastq_entry(full_forward_bp,
                                                                   full_rev_bp,
                                                                   full_forward_q,
                                                                   full_rev_q,
                                                                   i)

        return fastq_r1, fastq_r2        
    
    def build_output_counts(self):
        """
        Builds output files for the different `OutputCounts`
        """
        pass  


In [8]:
# Class that holds information for a test well
class FakeWell(FakeData):
    def __init__(self, config, reference_sequence):
    
        # Assign the config and reference sequence objects as instance variables
        self.config = config
        self.refseq = reference_sequence
        
        # Create plate, well, and barcode variables as a placeholder.
        # This will be filled when the well is passed with others to
        # a FakeRun instance.
        self.platename = None
        self.wellname = None
        self.f_barcode = None
        self.r_barcode = None
                
        # Get the total number of reads in the well. 
        self.total_reads = NP_RNG.integers(MIN_N_READS, MAX_N_READS)
        
        # Decide on how many variants we want in the well. 
        self.assign_n_variants()
        
        # Decide the relative abundance of each variant, then build variants
        abundances = self.calculate_variant_abundances()
        
        # Only continue if there are any variants
        if abundances is None:
            self.dud_well = True
        else:
            self.dud_well = False
            variant_abundances, minimum_reads_per_variant = abundances
            self.variants = [Variant(self, abundance, minimum_reads_per_variant) for
                             abundance in variant_abundances]
         
        warnings.warn("Add `murder` to list of functions")
    
    def assign_n_variants(self):
        """
        Determines how many variants are in the well.
        """
        # Decide on how many variants we want in the well. The maximum allowed is
        # the minimum of (1) how many variants we can spread reads over to get above
        # the `variable_count` threshold, (2) the most allowed with the given 
        # `variable_thresh`, and (3) the `MAX_N_VARIANTS` global.
        count_divisor = 1 if self.config.variable_count == 0 else self.config.variable_count
        max_n_variants = min(
            self.total_reads // count_divisor,
            int(1 // self.config.variable_thresh) - 1,
            MAX_N_VARIANTS
        )
        
        # Make it so that we always have at least one variant
        if max_n_variants == 0:
            max_n_variants = 1
        self.n_variants = NP_RNG.integers(MIN_N_VARIANTS, max_n_variants + 1)
    
    def calculate_variant_abundances(self):
        """
        Decide how many counts go to each variant.
        """
        # Each variant must be more abundant than both the variable threshold
        # and the variable counts
        minimum_reads_per_variant = max(
            int(np.ceil(self.config.variable_thresh * self.total_reads)),
            self.config.variable_count
        )
        
        # If we cannot get reads for all variants, this is a dud well
        if minimum_reads_per_variant * self.n_variants > self.total_reads:
            return None
        
        # Now assign read counts to each variant. If there is only 1 variant, then it gets
        # all reads
        if self.n_variants == 1:
            variant_counts = [self.total_reads]

        # If there are more than 1 variants, for each variant, we sample
        # from the range of minimum reads per variant to total reads 
        # remaining, considering that some reads have already been
        # assigned to variants
        else:
            total_reads_available = self.total_reads
            variants_remaining = self.n_variants
            variant_counts = [None] * self.n_variants
            for i in range(self.n_variants):

                # Get the maximum number of reads that we can sample
                max_reads_available_ind = FakeWell.calculate_maximum_reads_ind(total_reads_available, 
                                                                               variants_remaining,
                                                                               minimum_reads_per_variant)

                # Sample to get the number of variants
                if minimum_reads_per_variant == max_reads_available_ind:
                    sampled_variants = minimum_reads_per_variant
                else:
                    sampled_variants = NP_RNG.integers(minimum_reads_per_variant, max_reads_available_ind)
                variant_counts[i] = sampled_variants

                # Update the number of reads available for sampling and the number of variants
                # still needing samples
                total_reads_available -= sampled_variants
                assert total_reads_available > 0
                variants_remaining -= 1

                # If this is the breakpoint, assign the remaining reads to the remaining variant
                if variants_remaining == 1:
                    variant_counts[-1] = total_reads_available
                    break

        # Checks to make sure the counts were assigned appropriately
        assert not any(count is None for count in variant_counts)
        assert sum(variant_counts) == self.total_reads   
        
        return variant_counts, minimum_reads_per_variant
    
    def build_corrupted_reads(self):
        """
        Assigns `corrupted_reads` and `corrupted_bases` variables to the instance.
        """
        # Choose how many sequences to add of each flavor.
        corruption_levels = NP_RNG.integers(MIN_DUD_READS,
                                            MAX_DUD_READS,
                                            size = 3)
        
        # Add reads with indels. 
        indel_reads = self.build_indel_reads(corruption_levels[0])
        
        # Add sequences filtered out by the average_q_cutoff. 
        low_q_reads = self.build_low_q_reads(corruption_levels[1])
        
        # Add sequences filtered out by the length_cutoff. This is added to `corrupted_reads`.
        short_reads = self.build_short_reads(corruption_levels[2])
        
        return indel_reads, low_q_reads, short_reads
        
    def build_indel_reads(self, n_indels):
        
        # Choose how many sequences to add of each flavor.
        corruption_levels = NP_RNG.integers(MIN_DUD_READS,
                                            MAX_DUD_READS,
                                            size = 3)

        # Get the base sequences and qualities
        f_indel_reads = [list(self.base_refseq) for _ in range(n_indels)]
        f_indel_qs = [self.refseq.base_variable_qualities.copy() for _ in range(n_indels)]
        r_indel_reads = [list(self.base_refseq) for _ in range(n_indels)]
        r_indel_qs = [self.refseq.base_variable_qualities.copy() for _ in range(n_indels)]

        # Create lists for storing fastq entries
        r1s = [None] * n_indels
        r2s = [None] * n_indels
        target_f_read = [True, False]

        # Add indels. 
        for read_target in target_f_read:

            # Set targets
            if read_target:
                fastq_list = r1s
                mutable_positions = self.refseq.forward_readable_aas
                indel_reads = f_indel_reads
                indel_qs = f_indel_qs
            else:
                fastq_list = r2s
                mutable_positions = self.refseq.reverse_readable_aas
                indel_reads = r_indel_reads
                indel_qs = r_indel_qs

            # Add indels
            for i in range(n_indels):

                # Decide if adding an insertions or deletions and the number
                # to add
                insertion = True if 0.5 < NP_RNG.uniform() else False
                n_muts = NP_RNG.integers(MIN_INDELS_ADDED, MAX_INDELS_ADDED)

                # Decide on locations of indels
                indel_aa_locs = NP_RNG.choice(mutable_positions,
                                              size = n_muts,
                                              replace = False)
                indel_codon_locs = NP_RNG.integers(0, 3, size = n_muts)
                indel_locs = indel_aa_locs * 3 + indel_codon_locs
                indel_locs = np.sort(indel_locs)[::-1] # So that largest indices are removed first
                
                # Add indels to sequences
                for indel_loc in indel_locs:

                    # If insertion, add a random character
                    if insertion:
                        indel_reads[i].insert(indel_loc, RANDOM_RNG.choice(ALLOWED_NUCLEOTIDES))
                    else:
                        del(indel_reads[i][indel_loc])

                # Add indels to qualities
                if insertion:
                    new_qs = NP_RNG.integers(self.config.bp_q_cutoff, MAX_QUAL_ALLOWED, size = n_muts)
                    indel_qs[i] = np.concatenate((indel_qs[i], new_qs))
                else:
                    indel_qs[i] = indel_qs[i][:-n_muts]

                # Convert reads to strings
                indel_reads[i] = "".join(indel_reads[i])

        # Store results
        for i in range(n_indels):
            r1s[i], r2s[i] = self.build_fastq_entry(f_indel_reads[i],
                                                    r_indel_reads[i],
                                                    f_indel_qs[i],
                                                    r_indel_qs[i],
                                                    i)
        
        return r1s, r2s        
        
    def build_low_q_reads(self, n_low_q):
        
        # Create as many copies of the reference sequence as we want bad sequences
        bad_seq_copies = [self.base_refseq] * n_low_q
        bad_seq_quals = NP_RNG.integers(0, self.config.average_q_cutoff,
                                        size = (n_low_q, len(self.base_refseq)))

        # Build the fastq entries
        forward_bad_q = [None] * n_low_q
        reverse_bad_q = [None] * n_low_q
        for i, (bad_seq_copy, bad_seq_qual) in enumerate(zip(bad_seq_copies, bad_seq_quals)):
            forward_bad_q[i], reverse_bad_q[i] = self.build_fastq_entry(bad_seq_copy,
                                                                        bad_seq_copy,
                                                                        bad_seq_qual,
                                                                        bad_seq_qual,
                                                                        i)
            
        return forward_bad_q, reverse_bad_q
    
    def build_short_reads(self, n_short):

        # Identify the longest possible short read
        max_allowed_length = int(self.config.length_cutoff * self.config.readlength)
        
        # Determine the maximum allowed readable window that gives the longest
        # possible short read
        forward_readable_max = (max_allowed_length - 
                                self.refseq.frameshift_front - 
                                self.refseq.primer_seed_len_f -
                                ADAPTER_LENGTH_F - BARCODE_LENGTH)
        reverse_readable_max = (max_allowed_length -
                                self.refseq.frameshift_back -
                                self.refseq.primer_seed_len_r -
                                ADAPTER_LENGTH_R - BARCODE_LENGTH)
        
        # If we cannot add short reads, just return empty lists
        if forward_readable_max <= 0 or reverse_readable_max <= 0:
            return ([], [])
        
        # Sample the lengths of the bad fragments
        bad_fragment_lengths_f = NP_RNG.integers(0, forward_readable_max, 
                                                 size = n_short)
        bad_fragment_lengths_r = NP_RNG.integers(0, reverse_readable_max, 
                                                 size = n_short)

        # Build the fragments
        r1 = [None] * n_short
        r2 = [None] * n_short
        for i, (frag_len_f, frag_len_r) in enumerate(zip(bad_fragment_lengths_f, bad_fragment_lengths_r)):

            # Slice the base reference sequence and the base qualities
            min_r = self.refseq.codon_refseq_len - frag_len_r
            newseq_f = self.base_refseq[:frag_len_f]
            newseq_r = self.base_refseq[min_r:]
        
            newq_f = self.refseq.base_variable_qualities[:frag_len_f]
            newq_r = self.refseq.base_variable_qualities[min_r:]

            # Create entries
            r1[i], r2[i] = self.build_fastq_entry(newseq_f, newseq_r,
                                                  newq_f, newq_r, i)
            
        return r1, r2  
            
    # Build a fastq entry
    def build_fastq_entry(self,
                          full_forward_bp,
                          full_rev_bp,
                          full_forward_q, 
                          full_rev_q, 
                          i):
        """
        Sequences and qs should be in the order we expect to see them in the fastq (e.g,
        the reverse complement should be taken of the forward seq before going into
        this function)
        """
        # Get the start sequences and qualities
        start_f_seq, start_f_q = self.build_start_seq_f()
        start_r_seq, start_r_q = self.build_start_seq_r()
        
        # Build the start sequences for the forward and reverse reads.
        # We need the reverse complement of the reverse primer
        available_read_f = self.config.readlength - len(start_f_seq)
        available_read_r = self.config.readlength - len(start_r_seq)
        
        # Get the forward and reverse readable sequences
        forward_readable = full_forward_bp[:available_read_f]
        reverse_readable = reverse_complement(full_rev_bp)[:available_read_r]

        # Get the forward and reverse readable qualities
        forward_qs = ord_to_chr(full_forward_q[:available_read_f])
        reverse_qs = ord_to_chr(np.flip(full_rev_q)[:available_read_r])
                
        # Complete the forward and reverse sequences
        complete_f_seq = start_f_seq + forward_readable
        complete_r_seq = start_r_seq + reverse_readable
        complete_f_qual = start_f_q + forward_qs
        complete_r_qual = start_r_q + reverse_qs

        # Record fastq entries
        r1 = f"Test{i}\n{complete_f_seq}\n+\n{complete_f_qual}"
        r2 = f"Test{i}\n{complete_r_seq}\n+\n{complete_r_qual}"
        
        return r1, r2
    
    def build_start_seq_f(self):
        """
        Returns the sequence and qualities for the forward primer, adapter, barcode,
        and frameshift as we would see them in a fastq file
        """
        # Build the sequence
        forward_seed_and_shift = "".join(self.refseq.primer_seed_f + 
                                         self.refseq.frameshift_bp_front)
        forward_read_start = (self.f_barcode + ADAPTER_F + forward_seed_and_shift)

        # Build qualities
        quals = np.concatenate((
            self.refseq.fbc_qualities,
            self.refseq.adapter_qualities_f,
            self.refseq.primer_qualities_f,
            self.refseq.frameshift_front_qualities
        ))
        assert len(quals) == len(forward_read_start)
        
        return forward_read_start, ord_to_chr(quals)
        
    def build_start_seq_r(self):
        """
        Returns the sequence and qualities for the reverse primer, adapter, barcode,
        and frameshift as we would see them in a fastq file
        """
        # Build the sequence
        reverse_seed_and_shift = reverse_complement("".join(self.refseq.frameshift_bp_back +
                                                            self.refseq.primer_seed_r))
        reverse_read_start = (self.r_barcode + ADAPTER_R + reverse_seed_and_shift)
        
        # Build qualities
        quals = np.concatenate((
            self.refseq.rbc_qualities,
            self.refseq.adapter_qualities_r,
            self.refseq.frameshift_back_qualities,
            self.refseq.primer_qualities_r
        ))
        
        assert len(quals) == len(reverse_read_start)
        
        return reverse_read_start, ord_to_chr(quals)
    
    def build_all_reads(self):
        """
        Generates reads for a well.
        """
        # Build lists for holding outputs
        forward_reads = []
        reverse_reads = []

        # Build perfect reads
        for variant in self.variants:
            f_perfect, r_perfect = variant.build_perfect_reads()
            forward_reads.extend(f_perfect)
            reverse_reads.extend(r_perfect)

        # Now augment with bad reads
        for f_bad, r_bad in self.build_corrupted_reads():
            forward_reads.extend(f_bad)
            reverse_reads.extend(r_bad)
        
        return forward_reads, reverse_reads
    
    def build_output_counts(self):
        """
        Builds output files for the different `OutputCounts`
        """
        pass
        
    
    def murder_well(self):
        """
        With some frequency, screws up the variants in a well to force a DEAD variant.
        """
        pass

    
    @staticmethod
    def calculate_maximum_reads_ind(total_reads_available, total_variants, minimum_reads_per_variant):
        
        # Get the upper bound of sampling maximum reads
        return total_reads_available - ((total_variants - 1) * minimum_reads_per_variant) + 1
    
    @property
    def base_refseq(self):
        return "".join(self.refseq.codon_refseq)

All FakeWell instances are held within a FakeRun instance that assigns barcodes to the fake wells, handles fake input generation, and builds expected outputs.

In [12]:
# Class that holds information for a test run
class FakeRun(FakeData):
    def __init__(self, detailed = True):
        """
        fakewells: A list of fully prepared FakeWell objects.
        detailed: Whether or not we are using a detailed refseq file. 
        """
        # Set a random configuration
        self.config = Config(detailed = detailed)
                
        # Build wells
        self.wells = [None] * len(INDEX_DF)
        unique_plates_found = {}
        refseq_counter = -1
        for i, row in enumerate(INDEX_DF.itertuples()):
            
            # One path for detailed, increment the refseq counter
            if detailed:
                refseq_counter += 1
            
            # Otherwise, only increment the counter if it is for a new
            # plate
            elif row.IndexPlate not in unique_plates_found:
                unique_plates_found.add(row.IndexPlate)
                refseq_counter += 1
        
            # Create a new well
            well = FakeWell(self.config,
                            self.config.refseqs[refseq_counter])
            
            # Assign information to the well
            well.platename = row.IndexPlate
            well.wellname = row.Well
            well.f_barcode = row.FBC
            well.r_barcode = row.RBC
            
            # Record
            self.wells[i] = well
            
    def build_input(self, saveloc):        
        
        warnings.warn("Add an NNN version of the refseq")
        
        # Loop over all wells. Generate reads
        forward_reads = []
        reverse_reads = []
        for well in self.wells:
            
            # Produce reads if this is not a dud
            if not well.dud_well:
                f_well_reads, r_well_reads = well.build_all_reads()
                forward_reads.extend(f_well_reads)
                reverse_reads.extend(r_well_reads)
        
        # Return the fastq files ready for processing
        f_savename = os.path.join(saveloc, f"testinput_R1_allreads.fastq")
        r_savename = os.path.join(saveloc, f"testinput_R2_allreads.fastq")
        with open(f_savename, "w") as f:
            f.write("\n".join(forward_reads))
        with open(r_savename, "w") as f:
            f.write("\n".join(reverse_reads))
            
        # Build reference sequence files
        
    def build_output_counts(self):
        """
        Builds output files for the different `OutputCounts`
        """
        pass

In [10]:
test_run = FakeRun()



In [11]:
test_run.build_input("./")

To do:
1. Output refseq
2. Fix warnings
3. Run a test
4. Get output counts