In [1]:
import numpy as np
import numba
import pandas as pd
import malariagen_data
import itertools
import functools
from pyprojroot import here

In [2]:
def exponential_peak(x, center, amplitude, decay, baseline, floor, ceiling):
    """Symmetric exponential peak function.

    Parameters
    ----------
    x : ndarray
        Independent variable.
    center : int or float
        The center of the peak.
    amplitude : float
        Amplitude parameter.
    decay : float
        Decay parameter.
    baseline : float
        Baseline parameter.
    floor : float
        Minimum value that the result can take.
    ceiling : float
        Maximum value that the result can take.

    Returns
    -------
    y : ndarray

    """

    # locate the index at which to split data into left and right flanks
    ix_split = bisect_right(x, center)

    # compute left flank
    xl = center - x[:ix_split]
    yl = baseline + amplitude * np.exp(-xl / decay)

    # compute right flank
    xr = x[ix_split:] - center
    yr = baseline + amplitude * np.exp(-xr / decay)

    # prepare output
    y = np.concatenate([yl, yr])

    # apply limits
    y = y.clip(floor, ceiling)

    return y

In [3]:
def skewed_exponential_peak(x, center, amplitude, decay, skew, baseline, floor, ceiling):
    """Asymmetric exponential decay peak function.

    Parameters
    ----------
    x : ndarray
        Independent variable.
    center : int or float
        The center of the peak.
    amplitude : float
        Amplitude parameter.
    decay : float
        Decay parameter.
    skew : float
        Skew parameter.
    baseline : float
        Baseline parameter.
    floor : float
        Minimum value that the result can take.
    ceiling : float
        Maximum value that the result can take.

    Returns
    -------
    y : ndarray

    """

    decay_right = 2**(-skew) * decay
    decay_left = 2**skew * decay

    # locate the index at which to split data into left and right flanks
    ix_split = bisect_right(x, center)

    # compute left flank
    xl = center - x[:ix_split]
    yl = baseline + amplitude * np.exp(-xl / decay_left)

    # compute right flank
    xr = x[ix_split:] - center
    yr = baseline + amplitude * np.exp(-xr / decay_right)

    # prepare output
    y = np.concatenate([yl, yr])

    # apply limits
    y = y.clip(floor, ceiling)

    return y

In [4]:
def gaussian_peak(x, center, amplitude, sigma, baseline, floor, ceiling):
    """Gaussian peak function.

    Parameters
    ----------
    x : ndarray
        Independent variable.
    center : int or float
        The center of the peak.
    amplitude : float
        Amplitude parameter.
    sigma : float
        Width parameter.
    baseline : float
        Baseline parameter.
    floor : float
        Minimum value that the result can take.
    ceiling : float
        Maximum value that the result can take.

    Returns
    -------
    y : ndarray

    """

    y = (baseline +
         amplitude *
         np.exp(-(x - center)**2 / (2 * sigma**2)))

    # apply limits
    y = y.clip(floor, ceiling)

    return y

In [5]:
def skewed_gaussian_peak(x, center, amplitude, sigma, skew, baseline, floor, ceiling):
    """Asymmetric Gaussian peak function.

    Parameters
    ----------
    x : ndarray
        Independent variable.
    center : int or float
        The center of the peak.
    amplitude : float
        Amplitude parameter.
    sigma : float
        Width parameter.
    skew : float
        Skew parameter.
    baseline : float
        Baseline parameter.
    floor : float
        Minimum value that the result can take.
    ceiling : float
        Maximum value that the result can take.

    Returns
    -------
    y : ndarray

    """

    sigma_right = 2**(-skew) * sigma
    sigma_left = 2**skew * sigma

    # locate the index at which to split data into left and right flanks
    ix_split = bisect_right(x, center)

    # compute left flank
    xl = center - x[:ix_split]
    yl = (baseline +
          amplitude *
          np.exp(-xl**2 / (2 * sigma_left**2)))

    # compute right flank
    xr = x[ix_split:] - center
    yr = (baseline +
          amplitude *
          np.exp(-xr**2 / (2 * sigma_right**2)))

    # prepare output
    y = np.concatenate([yl, yr])

    # apply limits
    y = y.clip(floor, ceiling)

    return y

In [6]:
@numba.njit
def hampel_filter(x, size, t=3):
    # https://link.springer.com/article/10.1186/s13634-016-0383-6
    # https://towardsdatascience.com/outlier-detection-with-hampel-filter-85ddf523c73d
    
    y = x.copy()
    mad_scale_factor = 1.4826
    
    for i in range(size, len(x) - size):
        # window
        w = x[i - size:i + size]
        # window median
        m = np.median(w)
        # median absolute deviation
        mad = np.median(np.abs(w - m))
        # MAD scale estimate
        s = mad_scale_factor * mad
        # construct response
        if np.abs(x[i] - m) > (t * s):
            y[i] = m
    
    return y

In [7]:
@numba.njit
def recursive_hampel_filter(x, size, t=3):
    # https://link.springer.com/article/10.1186/s13634-016-0383-6
    # https://towardsdatascience.com/outlier-detection-with-hampel-filter-85ddf523c73d
    
    y = x.copy()
    mad_scale_factor = 1.4826
    
    for i in range(size, len(x) - size):
        # window
        w = y[i - size:i + size]
        # window median
        m = np.median(w)
        # median absolute deviation
        mad = np.median(np.abs(w - m))
        # MAD scale estimate
        S = mad_scale_factor * mad
        # construct response
        if np.abs(y[i] - m) > (t * S):
            y[i] = m
    
    return y

In [8]:
@functools.lru_cache(maxsize=None)
def ag_gmap(contig):
    ag3 = malariagen_data.Ag3()
        
    # read in the genetic map file
    df_gmap = pd.read_csv(here() / f"workflow/notebooks/ag_{contig}.gmap", sep="\t")

    # set up an array of per-base recombination rate values
    rr = np.zeros(len(ag3.genome_sequence(contig)), dtype="f8")

    # fill in the recombination rate values from the genetic map file
    for row, next_row in zip(
        itertools.islice(df_gmap.itertuples(), 0, len(df_gmap)-1), 
        itertools.islice(df_gmap.itertuples(), 1, None)):
        
        # N.B., the genetic map file is in units of cM / Mbp
        # we multiple by 1e-6 to convert to cM / bp
        rr[row.pposition-1:next_row.pposition] = row.rrate * 1e-6
    
    # compute mapping from physical to genetic position
    gmap = np.cumsum(rr)
        
    return gmap


In [9]:
def ag_p2g(contig, ppos):
    """Convert physical position (bp) to genetic position (cM)."""
    gmap = ag_gmap(contig)
    gpos = gmap[ppos - 1]
    return gpos

In [10]:
# TODO
# implement a peak_scan method, similar to...
# https://github.com/alimanfoo/shiny-train/blob/peak-fitting-20200916/notebooks/artwork/peak-scans.ipynb

## Testing

In [15]:
# ag_p2g("2L", 10_000_000)

In [16]:
# import matplotlib.pyplot as plt
# %matplotlib inline

In [17]:
# fig, ax = plt.subplots()
# ax.plot(ag_gmap("2L"))
# ax.set_xlabel('Physical position (bp)')
# ax.set_ylabel('Genetic position (cM)');

In [18]:
# fig, ax = plt.subplots()
# ax.plot(ag_gmap("2R"))
# ax.set_xlabel('Physical position (bp)')
# ax.set_ylabel('Genetic position (cM)');