# Some general utilities used throughout these notebooks

In [1]:
import matplotlib
from matplotlib import pyplot
from statistics import median, mean
from collections import defaultdict

# Make the plots used here look kinda like ggplots
pyplot.style.use("ggplot")

# Use a consistent color scheme for relating codon positions to colors.
#
# Uses a hopefully colorblind-friendly grayscale scheme, where CP 1 = black, CP 2 = gray, and CP 3 = white
# (This should be accompanied by black borders around each color so that the gray/white are easily visible on
# a light background).
#
# We also say CP 4 = green (which is not quite as colorblind-friendly... in the future would be good to add a
# pattern or something). I know what you're thinking -- "CP 4? The computer scientists have gone too
# far this time." But don't worry, I know basic biology (hopefully) -- CP 4 here is shorthand that means
# "positions in non-coding regions of the genome", since it can be interesting to summarize these positions
# alongside the CPs.
cp2color = {1: "#000000", 2: "#888888", 3: "#ffffff", 4: "#29D321"}
BORDERCOLOR = "#000000"

# This is the blue color used in the ggplot-style matplotlib color settings: from
# https://gist.github.com/huyng/816622
SCATTERPLOT_PT_COLOR = "#348ABD"

# Maps seq names to easier-to-remember names used in the report.
seq2name = {"edge_6104": "CAMP", "edge_1671": "BACT1", "edge_2358": "BACT2"}
SEQS = ["edge_6104", "edge_1671", "edge_2358"]

# used in a few places, notably gene stats and gene utils
seq2len = {"edge_1671": 2153394, "edge_2358": 2806161, "edge_6104": 1289244}

# Whether or not this edge is in an isolated component of the graph. Used in the phasing analysis.
seq2iscircular = {"edge_1671": True, "edge_2358": True, "edge_6104": False}

## Utility functions

In [36]:
def use_thousands_sep(mpl_axis):
    # Use thousands separators for positions.
    # For an arbitrary pyplot figure, you can call this function with pyplot.gca().xaxis or pyplot.gca().yaxis. 
    # Modified from https://stackoverflow.com/a/25973637 -- this is modified to work better with integers
    # (matplotlib seems to store all values as floats internally, even essentially integral things -- so we can
    # use the float.is_integer() method to see if a value is "close enough" to an integer, and if so remove the
    # trailing ".0" that happens when you try to format a float of an integer -- see
    # https://stackoverflow.com/a/21583817.)
    mpl_axis.set_major_formatter(matplotlib.ticker.FuncFormatter(
        lambda x, pos: "{:,}".format(int(x)) if x.is_integer() else "{:,}".format(x)
    ))

In [1]:
def next_cp(cp):
    # "Trust, but verify." --Gregor Mendel or someone idk
    if cp == 1:
        return 2
    elif cp == 2:
        return 3
    elif cp == 3:
        return 1
    else:
        raise ValueError("Invalid CP")

In [37]:
def get_parent_gene_info_of_position(pos, genes_df):
    """Returns information about the parent gene(s) of a position.
    
    Note that this is pretty slow when you call it thousands of times in a row. If operating on
    lots of positions at once, you should probably just call get_parent_gene_info_of_many_positions(),
    defined also in this file, instead.
    
    Parameters
    ==========
    
    pos: int
        1-indexed position in a genome.
        
    genes_df: pd.DataFrame
        Result from calling parse_sco().
        
    Returns
    =======
    
    parent_info: list
        List of gene numbers that contain this position. Will usually contain just a single element
        (for positions located in a single gene), but may contain multiple elements if genes overlap
        at this position. May also be an empty list if this position is located in an intergenic region.
    """
    parent_genes = []
    for gene in genes_df.itertuples():
        if pos >= int(gene.LeftEnd) and pos <= int(gene.RightEnd):
            parent_genes.append(int(gene.Index))
    return parent_genes

In [38]:
def get_parent_gene_info_of_many_positions(genes_df):
    """Returns information about the parent gene(s) of all positions in a genome.
    
    This is much faster than calling get_parent_gene_info_of_position() lots of times.
    
    Parameters
    ==========
        
    genes_df: pd.DataFrame
        Result from calling parse_sco() for this genome's genes.
        
    Returns
    =======
    
    pos_to_genes: defaultdict
        Maps position numbers (also 1-indexed) to a list of the gene number(s) of the genes
        overlapping this position. This list can have 0, 1, or more elements depending on
        how many genes overlap a position.
        
        Note that positions not in genes just straight-up won't get assigned an entry in this;
        this doesn't make a difference, since this is a defaultdict, and these positions' entries
        would be [] either way (which is the default value of this defaultdict).
    """  
    pos_to_genes = defaultdict(list)
    
    for gene in genes_df.itertuples():
        gene_left = int(gene.LeftEnd)
        gene_right = int(gene.RightEnd)
        gene_num = int(gene.Index)
        
        for pos in range(gene_left, gene_right + 1):
            pos_to_genes[pos].append(gene_num)
            
    return pos_to_genes

In [39]:
def convert_to_runs(positions):
    """Converts a (sorted) list of integers to a list of 2-tuple "runs" of consecutive integers.
    
    Parameters
    ==========
    
    positions: list of ints
        Assumed to be sorted.
        
    Returns
    =======
    
    runs: list of 2-tuples of ints
        Each entry in this list is a 2-tuple of the format (p1, p2), where p1 and p2 are both integers
        such that [p1, p1 + 1, p1 + 2, ..., p2] are all present in positions. If positions was empty,
        this will be an empty list.
    """
    
    runs = []
    if len(positions) > 1:
        prev_pos = positions[0]
        run_start_pos = positions[0]
        for sp in positions[1:]:
            if prev_pos == sp - 1:
                prev_pos = sp
            else:
                runs.append((run_start_pos, prev_pos))
                run_start_pos = sp
                prev_pos = sp

        runs.append((run_start_pos, sp))
    elif len(positions) == 1:
        runs.append((positions[0], positions[0]))
    return runs

In [40]:
# for when we set up formal tests
assert convert_to_runs([1,2,3,5,6,7,10,20,31,32,33,34, 35, 40]) == \
       [(1, 3), (5, 7), (10, 10), (20, 20), (31, 35), (40, 40)]

In [1]:
def get_p2mincov(percentages, minReadNumber):
    # percentages should be values of p in the range (0, 50].
    # minReadNumber indicates the minimum number of reads supporting a p-mutation we'd need to see.
    #
    # so, if minReadNumber is 5 and p = 50%, then the minimum coverage is (100 * 5) / 50 = 10x: this
    # makes sense, since if this mutation is present at 50% frequency, then on average we'd need to see a coverage
    # of 10x in order to see 5 copies of the mutated allele. Lowering p or increasing minReadNumber
    # results in the min coverage increasing.
    numerator = 100 * minReadNumber
    return {p: (numerator / p) for p in percentages}

assert get_p2mincov([1, 2, 3, 50, 20, 50], 5) == {
    1: 500, 2: 250, 3: (500/3), 50: 10, 20: 25, 50: 10
}

def get_p2pct(percentages):
    return {p: p / 100 for p in percentages}

assert get_p2pct([1, 2, 50]) == {1: 0.01, 2: 0.02, 50: 0.5}

In [1]:
def get_meancovs():
    # I guess we could output the median coverages also if we wanted to
    import pileup
    seq2pos2pileup = pileup.load()
    
    seq2meancov = {}
    for seq in SEQS:
        covs = []
        for pos in range(1, seq2len[seq] + 1):
            covs.append(pileup.get_cov(seq2pos2pileup[seq][pos]))
        mean_cov = mean(covs)
        median_cov = median(covs)
        print(f"Sequence {seq} has average coverage {mean_cov:,.2f} and median coverage {median_cov:,.2f}.")
        seq2meancov[seq] = mean_cov
    return seq2meancov