In [None]:
import pandas as pd
from statsmodels.iolib.smpickle import load_pickle

# I/O
MURATES = "resources/gnomad/gnomad_v2.supplement-f10.murates-long.tsv"
NEUTRAL_MODEL = "results/manuscript/maps/maps-calibrated.pickle"
MATRIX = "results/05_final-matrix/final_matrix.tsv"

###
# Functions
###

def apply_status(df, unbound_field: str, bound_field: str):
    return np.where(
        df[unbound_field] == 1,
        "Unbound",
        np.where(
            df[bound_field] == 1,
            "Bound",
            np.where(
                df[unbound_field].isna() & df[bound_field].isna(), "NaN", "Ambiguous"
            ),
        ),
    )
    
def read_murates(filepath: str) -> pd.DataFrame:
    """Returns gnomAD murate table as pandas df"""
    return pd.read_csv(filepath, sep="\t", engine="c")


def calculate_maps(snvs: pd.DataFrame, factor: list, betas) -> pd.DataFrame:
    """D"""
    # Factors
    factor_one = factor[0]
    factor_two = factor[1]
    factor_three = factor[2]
    
    # Aggregate stats
    agg_matrix = (
        snvs.groupby(factor)
        .agg(
            {
                "singleton": "sum",
                "isvar": "sum",
                "methylation_level": "unique",
            }
        )
        .explode("methylation_level")
        .reset_index()
    )
    agg_matrix.rename(
        columns={
            "singleton": "singleton_count",
            "isvar": "context_nvar",
        },
        inplace=True,
    )

    # Update with expected singlton count
    agg_matrix["expected_singelton_count"] = (
        agg_matrix["mu_snp"] * betas.mu_snp + betas.const
    ) * agg_matrix["context_nvar"]
    
    #return agg_matrix

    # Summarize
    agg = agg_matrix.groupby([factor_one, factor_two, factor_three]).agg(
        {
            "singleton_count": "sum",
            "expected_singelton_count": "sum",
            "context_nvar": "sum",
        }
    )

    # Calc ps
    agg["ps"] = agg["singleton_count"] / agg["context_nvar"]

    # Calculate maps and sem
    agg["maps"] = (agg["singleton_count"] - agg["expected_singelton_count"]) / agg[
        "context_nvar"
    ]
    agg["sem"] = (agg["ps"] * (1 - agg["ps"]) / agg["context_nvar"]) ** 0.5

    # Flag factor
    agg.reset_index(inplace=True)
   
    # Return agg matrix
    return agg

## Main

In [18]:
# New matrix
matrix = pd.read_csv("../../results/figures/matrix-subset.for_maps.tsv", sep="\t")

# Expected SP
model = load_pickle(NEUTRAL_MODEL)

# Extract betas for coeffs
betas = model.params

# Read murates
murate_df = read_murates(MURATES)

# Merge datasets
murate_keys = ["ref", "alt", "context"]
lobs = pd.merge(matrix, murate_df, on=murate_keys, how="left")

# Calc maps
maps =  calculate_maps(lobs, ["activity_quantile_mid", "dpwm_class", "pwm_stat_class", "mu_snp"], betas)

In [19]:
###
# Example output
###

maps.head()

Unnamed: 0,activity_quantile_mid,dpwm_class,pwm_stat_class,singleton_count,expected_singelton_count,context_nvar,ps,maps,sem
0,1,GoB,0,1182,1264.520198,2819,0.419298,-0.029273,0.009294
1,1,GoB,1,468,597.373957,1418,0.330042,-0.091237,0.012487
2,1,LoB,0,3390,3489.564328,7841,0.432343,-0.012698,0.005595
3,1,LoB,1,3200,3220.073753,7208,0.443951,-0.002785,0.005852
4,2,GoB,0,1288,1450.508296,3265,0.394487,-0.049773,0.008553
