# Pre-Filter Regions for Diffmeth Analysis with LIMMA

Small modifications of Elena's notebook and Alan's version of it:
* https://github.com/guardant/ezotenko-gh/blob/2023_01_30_SRT_deconvolution_panel_v2_EZ/analysis/2023_01_30_SRT_deconvolution_panel_v2_EZ/notebooks/data/04_filter_regions_standard_cpg_clusters.ipynb
* https://github.com/guardant/gh-aselewa/blob/main/projects/2023_07_10_HYPER_design_AS/notebooks/04_filter_regions_standard_cpg_clusters.ipynb

Here we will use a rough filter to reduce large region sets prior to differential methylation analysis with `limma`/`edgeR`.

## Initialization

### Paths and Global Variables

In [None]:
import os
import pandas as pd

pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', 500)

In [None]:
#--- Parameters
REGIONS = 'hg19_cpg_clusters_k3_s150_w150'
FILTER_RATE_LOW = 0.4; FILTER_RATE_HIGH = 0.6
FILTER_TARGET_NUM = 1; FILTER_OTHER_NUM = 1 # changed filter rate other to 1
FILTER_MIN_CPGs_REGION = 8
# Number of observations for computing region meth rate
FILTER_COV = 10
# Number of observations per sample group
FILTER_NOBS = 2
#--- Local Paths
ROOT_DIR = '/home/ubuntu/git/etsang/projects'
PROJECT_SLUG = '2023_10_10_SRT_hyper_tissue_dmr_selection_EKT'
PROJECT_DIR = f"{ROOT_DIR}/{PROJECT_SLUG}/work"

ALAN_PROJECT_DIR = "/home/ubuntu/data/2023_07_10_HYPER_design_AS"
# Samples
SAMPLES_PATH = ALAN_PROJECT_DIR + '/stage/metadata/loyfer2022_samples_with_blueprint.tsv'
# Meth data
PARQUET_PATH =  ALAN_PROJECT_DIR + '/stage/frag_scores_k3/loyfer2022/'
# Blueprint data (for erythroblasts and megakaryocytes
BLUEPRINT_PARQUET_PATH = ALAN_PROJECT_DIR + '/stage/frag_scores_k3/blueprint/'
### Units/regions file
REGION_PATH = (ALAN_PROJECT_DIR + '/stage/metadata/{regions}.ov_cpg_loci.tsv.gz').format(regions=REGIONS)
# Where to store the results
RESULTS_DIR = (
    PROJECT_DIR + 
    '/bp_loyfer_meth_summaries/standard-{regions}.filtered'
).format(regions=REGIONS)
#--- Other parameters
REGION_BED_COLS = [
    'region_chr', 'region_start', 'region_end', 'region_id'
]

### Spark Setup

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import IntegerType, ArrayType
import pyspark.sql.functions as sfunc
from pyspark.sql.functions import col

In [None]:
os.environ["SPARK_HOME"] = "/home/ubuntu/mambaforge/envs/2023_10_10_SRT_hyper_tissue_dmr_selection_EKT/lib/python3.10/site-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"
os.environ["SPARK_LOCAL_DIRS"] = "/temp"
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

spark_conf = SparkConf()
spark_conf.set("spark.executor.memory", "16g")
spark_conf.set("spark.driver.memory", "64g")
spark_conf.set("spark.driver.maxResultSize", "32g")
spark_conf.set("spark.parquet.filterPushdown", "true")
spark_conf.set("spark.local.dir", "/temp")
spark_conf.getAll()

sc = SparkContext(conf=spark_conf)
sc.setLogLevel("ERROR")
spark = SparkSession(sc)

### Regions
Filter to CpG clusters with at least 8 CpGs

In [None]:
region_df = pd.read_csv(REGION_PATH, sep='\t').groupby('region_id').size().to_frame('n_cpgs').reset_index()

In [None]:
region_df = region_df.query(f"n_cpgs >= {FILTER_MIN_CPGs_REGION}")
region_ddf = spark.createDataFrame(region_df[['region_id']])

In [None]:
region_df.shape

### Samples

In [None]:
samples_pdf = pd.read_csv(SAMPLES_PATH, sep='\t')
ridxs = ~(samples_pdf['super_group'].isna() | samples_pdf['super_group'].str.startswith('Blueprint-'))
# drop umbilical endothelium
ridxs = ridxs & (samples_pdf['super_group'] != 'Umbilical-Endothelium')
samples_pdf = samples_pdf[ridxs].copy()
samples_pdf['sample_group'] = samples_pdf['super_group']
FILTER_SAMPLES = samples_pdf['sample_id'].tolist()
len(FILTER_SAMPLES), samples_pdf['sample_group'].nunique()

Which sample groups do not have enough samples for diff meth analysis?

In [None]:
summary = samples_pdf.groupby('sample_group').size()
summary

In [None]:
summary[summary<FILTER_NOBS]

### Methylation Data

In [None]:
meth_ddf = spark.read.parquet(PARQUET_PATH).filter(col('sample_id').isin(FILTER_SAMPLES))
add_ddf = spark.read.parquet(BLUEPRINT_PARQUET_PATH).filter(col('sample_id').isin(FILTER_SAMPLES))
meth_ddf = meth_ddf.union(add_ddf)
meth_ddf.printSchema()

## Aggregate Methylate Data Across Groups - not needed anymore since filtering only on CpG count

In [None]:
# samples_ddf = spark.createDataFrame(samples_pdf[['sample_id', 'sample_group']])

In [None]:
# agg_meth_ddf = meth_ddf\
#     .filter(col('region_number_total')>=FILTER_COV)\
#     .join(samples_ddf, meth_ddf.sample_id==samples_ddf.sample_id)\
#     .drop(samples_ddf.sample_id)\
#     .groupby(['region_id', 'sample_group'])\
#     .agg(sfunc.avg('region_meth_rate').alias('region_meth_rate'),
#          sfunc.count('region_meth_rate').alias('nobs'))
# agg_meth_ddf = agg_meth_ddf\
#     .filter(col('nobs')>=FILTER_NOBS)\
#     .drop(agg_meth_ddf.nobs)
# agg_meth_ddf.createOrReplaceTempView('meth_data')
# agg_meth_ddf.printSchema()

## Filter Regions - skipping this section now that we've added a CpG filter

In [None]:
# SQL_QUERY_DMR = """
# SELECT region_id, 
#     SUM(CAST((region_meth_rate<={rate_low}) as INTEGER)) as low_group_number,
#     SUM(CAST((region_meth_rate>={rate_high}) as INTEGER)) as high_group_number
# FROM meth_data
# GROUP BY region_id
# """

In [None]:
# dmr_ddf = spark.sql(SQL_QUERY_DMR.format(rate_low=FILTER_RATE_LOW, rate_high=FILTER_RATE_HIGH))
# dmr_ddf = dmr_ddf.persist()

In [None]:
# %%time
# select_region_ddf = dmr_ddf\
#     .filter(col('low_group_number')>=FILTER_OTHER_NUM)\
#     .filter(col('high_group_number')>=FILTER_TARGET_NUM)

## Get Methylation Values for Selected Regions

In [None]:
RV_COLS = [
    'sample_id', 'region_id', 'region_number_total', 'region_meth_rate'
]

In [None]:
%%time
rv_ddf = meth_ddf\
    .join(region_ddf, region_ddf.region_id==meth_ddf.region_id)\
    .drop(region_ddf.region_id)\
    .select(*RV_COLS)
    

In [None]:
%%time
rv_ddf.write.csv(RESULTS_DIR, sep='\t')