# Compute Frag Meth Summaries for a Region Set

Given a region set will compute frag scores for all samples in PAT Parquet files. Here we use hg38 PARQUET files.

SET UP SDDs TO BE USED WITH SPARK PRIOR TO RUNNING THIS NOTEBOOK!


In [1]:
import pandas as pd
import glob
import numpy as np
import itertools
import functools
import os
import regex as re

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

## Initialisation

### Parameters

In [2]:
REGIONS = 'deconvolution_v2.v23_conv.with_cpg_index'
REGION_BED_COLS = [
    'region_chr', 'region_start', 'region_end', 
    'region_cpg_index_min', 'region_cpg_index_max', 'region_id'
]
FILTER_CG_COUNT = 3
FILTER_CG_COUNT_REGION = 1

#--- Local paths
ROOT_DIR = '/analysis/gh-msun/projects'
PROJECT_SLUG = '2023_06_26_SRT_deconvolution_MS'
PROJECT_DIR = ROOT_DIR + '/{}'.format(PROJECT_SLUG)

# Regions
REGION_PATH = (
    PROJECT_DIR + '/stage/panel_data/{regions}.bed'
).format(regions=REGIONS)

# CpG map; genomic coordinate to CpG index;
CPG_MAP_PATH = PROJECT_DIR + '/stage/cpg_loci/cpg_loci_hg19.combined_annot.tsv.gz'


# BLUEPRINT HG38: s3://gh-bi-lunar/public_data/blueprint/hg38_20160816.pat.db_version.parquet/
PARQUET_PATH_LIST_HG38 = [
    '/analysis/hg38_20160816.pat.db_version.parquet'
]

#--- Where to store results
RESULTS_PATH = (
    PROJECT_DIR + '/output/meth_summaries/blueprint_meth_summaries_cg_count_geq_{k}_{regions}.tsv.gz'
).format(regions=REGIONS, k=FILTER_CG_COUNT)

In [3]:
RESULTS_PATH

'/analysis/gh-msun/projects/2023_06_26_SRT_deconvolution_MS/output/meth_summaries/blueprint_meth_summaries_cg_count_geq_3_deconvolution_v2.v23_conv.with_cpg_index.tsv.gz'

### Spark Setup

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import IntegerType, LongType, ArrayType, StringType, DoubleType
from pyspark.sql.functions import udf, explode, broadcast, count, lit, length, col
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType

In [5]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

## this works for PySpark v3.3.1 - only need to run this once
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages {aws_java},{aws_hadoop} pyspark-shell".\
   format(aws_java="com.amazonaws:aws-java-sdk-bundle:1.11.271",
          aws_hadoop="org.apache.hadoop:hadoop-aws:3.1.2")
#####

# UPDATE HOME!
os.environ["SPARK_HOME"] = "/home/ec2-user/mambaforge/envs/2023_06_26_SRT_deconvolution_MS/lib/python3.7/site-packages/pyspark"
# THIS needs to be set-up before running the notebook
os.environ["SPARK_LOCAL_DIRS"] = "/temp"
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

spark_conf = SparkConf()
spark_conf.set("spark.executor.instances", "2")
spark_conf.set("spark.executor.cores", "2")
spark_conf.set("spark.executor.memory", "16g")
spark_conf.set("spark.driver.memory", "64g")
spark_conf.set("spark.driver.maxResultSize", "32g")
spark_conf.set("spark.parquet.filterPushdown", "true")
spark_conf.set("spark.local.dir", "/temp")
spark_conf.getAll()

sc = SparkContext(conf=spark_conf)
sc.setLogLevel("ERROR")
spark = SparkSession(sc)



### CpG Map

In [6]:
cpg_map = pd.read_csv(CPG_MAP_PATH, usecols=['chr', 'start', 'end', 'cpg_index', 'cpg_index_hg38'], sep='\t')

In [7]:
%%time
ridxs = ~cpg_map['cpg_index_hg38'].isna()
hg19_hg38_map = dict(itertools.zip_longest(cpg_map[ridxs]['cpg_index'], cpg_map[ridxs]['cpg_index_hg38'].astype(int)))
hg38_hg19_map = dict(itertools.zip_longest(cpg_map[ridxs]['cpg_index_hg38'].astype(int), cpg_map[ridxs]['cpg_index']))


CPU times: user 15 s, sys: 5.54 s, total: 20.5 s
Wall time: 20.5 s


### Regions

In [8]:
region_df = pd.read_csv(REGION_PATH, sep='\t', usecols=range(0, 6), names=REGION_BED_COLS)

region_df['region_cpg_index_max'] -= 1
region_df.sort_values('region_cpg_index_min', inplace=True)
region_df['region_cpg_index_min_hg38'] = region_df['region_cpg_index_min'].map(hg19_hg38_map)
region_df['region_cpg_index_max_hg38'] = region_df['region_cpg_index_max'].map(hg19_hg38_map)

region_df.shape[0], region_df['region_id'].nunique()

(1658, 1658)

In [40]:
region_df.head()

Unnamed: 0,region_chr,region_start,region_end,region_cpg_index_min,region_cpg_index_max,region_id,region_cpg_index_min_hg38,region_cpg_index_max_hg38,batch
0,chr1,1114771,1114971,20117,20129,Immune_Broad_B-chr1:1114772-1114971,21119,21131,0
1,chr1,1157450,1157720,21684,21703,Immune_Broad_NK-chr1:1157451-1157720,22686,22705,0
2,chr1,1157879,1158277,21710,21726,Immune_Broad_NK-chr1:1157880-1158277,22712,22728,0
3,chr1,1652503,1652793,41590,41598,Loyfer2022_Preprint_Colon-Ep:Gastric-Ep:Small-...,42716,42724,0
4,chr1,1849567,1849674,46692,46697,Pancreas_Acinar-chr1:1849568-1849674,47819,47824,0


In [9]:
ridxs = ~region_df['region_cpg_index_min_hg38'].isna()
ridxs &= ~region_df['region_cpg_index_max_hg38'].isna()
region_df = region_df[ridxs].copy()
region_df.shape[0], region_df['region_id'].nunique()

(1658, 1658)

In [10]:
cg_count_hg19 = region_df['region_cpg_index_max']-region_df['region_cpg_index_min'] + 1
cg_count_hg38 = region_df['region_cpg_index_max_hg38']-region_df['region_cpg_index_min_hg38'] + 1
ridxs = (cg_count_hg19==cg_count_hg38)
ridxs &= (cg_count_hg19>=FILTER_CG_COUNT_REGION)
region_df = region_df[ridxs].copy()
region_df.shape[0], region_df['region_id'].nunique()

(1657, 1657)

In [11]:
region_df['region_cpg_index_min_hg38'] = region_df['region_cpg_index_min_hg38'].astype(int)
region_df['region_cpg_index_max_hg38'] = region_df['region_cpg_index_max_hg38'].astype(int)

### PAT PARQUET Files

In [12]:
PAT_COLS = [
    'sample_id', 'molecule_id', 'chr', 'number_molecules',
    'cpg_index_min', 'cpg_index_max', 'pat_string'
]

In [13]:
pat_parquet_files = [spark.read.parquet(ifile).select(*PAT_COLS) for ifile in PARQUET_PATH_LIST_HG38]
pat_hg38_ddf = functools.reduce(DataFrame.unionByName, pat_parquet_files)
pat_hg38_ddf.printSchema()

root
 |-- sample_id: string (nullable = true)
 |-- molecule_id: string (nullable = true)
 |-- chr: string (nullable = true)
 |-- number_molecules: integer (nullable = true)
 |-- cpg_index_min: long (nullable = true)
 |-- cpg_index_max: long (nullable = true)
 |-- pat_string: string (nullable = true)



In [21]:
pat_hg38_ddf.show(6)

+---------+-----------+----+----------------+-------------+-------------+--------------------+
|sample_id|molecule_id| chr|number_molecules|cpg_index_min|cpg_index_max|          pat_string|
+---------+-----------+----+----------------+-------------+-------------+--------------------+
|ERS337091|          2|chr1|               1|           17|           57|TTCTCCTTTCCCCCTTC...|
|ERS337091|          1|chr1|               1|           19|           58|TTCCCCCTTCTCCTTTC...|
|ERS337091|          3|chr1|               1|           99|          115|   CCCCCCCCCCCCCCCTT|
|ERS337091|          6|chr1|               1|          101|          120|CCTCCCCCCCC....TTCTT|
|ERS337091|          5|chr1|               1|          108|          118|         CCCCCCTCCTC|
|ERS337091|          4|chr1|               1|          110|          118|           CCCTTTCTC|
+---------+-----------+----+----------------+-------------+-------------+--------------------+
only showing top 6 rows



In [25]:
unique_sample_id = pat_hg38_ddf.select('sample_id').distinct().collect()  
len(unique_sample_id)

### Compute median fragment count per sample: 297,233,100

In [31]:
read_count_by_sample = pat_hg38_ddf.groupBy('sample_id').count().orderBy('count')
read_count_by_sample.show(6)

+---------+---------+
|sample_id|    count|
+---------+---------+
|ERS568736| 17780129|
|ERS392586| 55655580|
|ERS392582| 58590887|
|ERS392584| 61896835|
|ERS392580| 64465551|
|ERS661058|163079538|
+---------+---------+
only showing top 6 rows



In [34]:
df_read_count_by_sample = read_count_by_sample.toPandas()
df_read_count_by_sample.describe()

Unnamed: 0,count
count,127.0
mean,342518700.0
std,146597900.0
min,17780130.0
25%,279094600.0
50%,297233100.0
75%,395777500.0
max,1057485000.0


In [39]:
df_read_count_by_sample['count'].sum()
# total count: 43,499,869,385

43499869385

## Fragment Level Scoring

In [15]:
QUANTILES = [0.1, 0.25, 0.75, 0.9]
KMERS = [1, 3, 4]
RATES_LEQ = [0.25]
RATES_GEQ = [0.75]

RETURN_SCHEMA = StructType()\
    .add('sample_id', 'string')\
    .add('region_id', 'string')\
    .add('number_molecules', 'integer')\
    .add('meth_k1', 'integer')\
    .add('unmeth_k1', 'integer')\
    .add('total_k1', 'integer')\
    .add('meth_k3', 'integer')\
    .add('unmeth_k3', 'integer')\
    .add('total_k3', 'integer')\
    .add('meth_k4', 'integer')\
    .add('unmeth_k4', 'integer')\
    .add('total_k4', 'integer')\
    .add('frac_alpha_leq_25pct', 'float')\
    .add('frac_alpha_geq_75pct', 'float')

def compute_frag_scores(cpg_number_cutoff: int) -> pd.DataFrame:
    
    """
    Function that returns a function, used for reduce
    """
    
    def compute_frag_scores_inner(pat_df: pd.DataFrame) -> pd.DataFrame:
        
        data = pat_df.copy()
        data['offset_min'] = (data['region_cpg_index_min'] - data['cpg_index_min']).clip(lower=0)
        data['offset_max'] = np.minimum(
            data['region_cpg_index_max'] - data['cpg_index_min'], 
            data['cpg_index_max'] - data['cpg_index_min'])
        data['trimmed_pat'] = data.apply(lambda x: x['pat_string'][x['offset_min']:(x['offset_max']+1)], axis=1)
        #--- Filter molecules based on observed CpG loci
        observed_cpg_number = (data['trimmed_pat'].str.count('C')+data['trimmed_pat'].str.count('T'))
        ridxs = (observed_cpg_number>=cpg_number_cutoff)
        data = data[ridxs].copy()
        if (data.shape[0]>0):
            # Compute k-mer methylation states
            for k in KMERS:
                data['meth_k%i'%k] = data['trimmed_pat']\
                    .apply(lambda x: len(re.findall('[C]{%i}'%k, x, overlapped=True)))
                data['unmeth_k%i'%k] = data['trimmed_pat']\
                    .apply(lambda x: len(re.findall('[T]{%i}'%k, x, overlapped=True)))
                data['total_k%i'%k] = data['trimmed_pat']\
                    .apply(lambda x: len(re.findall('[TC]{%i}'%k, x, overlapped=True)))
            # Compute alpha distribution metrics
            data['alpha'] = data['meth_k1']/data['total_k1']
            for rate in RATES_LEQ:
                data['frac_alpha_leq_%ipct'%(100*rate)] = np.where(data['alpha']<=rate, 1, 0)
            for rate in RATES_GEQ:
                data['frac_alpha_geq_%ipct'%(100*rate)] = np.where(data['alpha']>=rate, 1, 0)
            # Expand entries that correspond to multiple molecules
            data['number_molecules'] = data['number_molecules'].apply(lambda x: list(range(x)))
            data = data.explode('number_molecules')
            data['number_molecules'] = 1
            # Aggregate metrics
            rv = data.groupby(['region_id', 'sample_id'])\
                [['meth_k1', 'unmeth_k1', 'total_k1',
                  'meth_k3', 'unmeth_k3', 'total_k3',
                  'meth_k4', 'unmeth_k4', 'total_k4',
                  'frac_alpha_leq_25pct', 'frac_alpha_geq_75pct', 'number_molecules']].sum()\
                .reset_index()
            rv['frac_alpha_leq_25pct'] = rv['frac_alpha_leq_25pct']/rv['number_molecules']
            rv['frac_alpha_geq_75pct'] = rv['frac_alpha_geq_75pct']/rv['number_molecules']
        else:
            rv = pd.DataFrame(columns=RETURN_SCHEMA.names)
                      
        
        return rv[RETURN_SCHEMA.names]

    return compute_frag_scores_inner


compute_frag_scores_udf = compute_frag_scores(cpg_number_cutoff=FILTER_CG_COUNT)


### Compute for HG38 Data

In [16]:
%%time
BATCH_SIZE = 20
region_df['batch'] = (np.arange(region_df.shape[0])/BATCH_SIZE).astype(int)
rv_scores = list()
for batch, batch_region_df in region_df.groupby('batch'):
    rv_ov = list()
    print('---> Processing batch %i...' % batch)
    for _, row in batch_region_df.iterrows():
        ov_ddf = pat_hg38_ddf.filter(col('cpg_index_min')<=row['region_cpg_index_max_hg38'])\
            .filter(col('cpg_index_max') >= row['region_cpg_index_min_hg38'])\
            .withColumn('region_id', lit(row['region_id']))\
            .withColumn('region_cpg_index_min', lit(row['region_cpg_index_min_hg38']))\
            .withColumn('region_cpg_index_max', lit(row['region_cpg_index_max_hg38']))
        rv_ov.append(ov_ddf)
    scores_df = functools.reduce(DataFrame.union, rv_ov)\
        .groupby('region_id')\
        .applyInPandas(compute_frag_scores_udf, schema=RETURN_SCHEMA)\
        .toPandas()
    rv_scores.append(scores_df)

---> Processing batch 0...
---> Processing batch 1...
---> Processing batch 2...
---> Processing batch 3...
---> Processing batch 4...
---> Processing batch 5...
---> Processing batch 6...
---> Processing batch 7...
---> Processing batch 8...
---> Processing batch 9...
---> Processing batch 10...
---> Processing batch 11...
---> Processing batch 12...
---> Processing batch 13...
---> Processing batch 14...
---> Processing batch 15...
---> Processing batch 16...
---> Processing batch 17...
---> Processing batch 18...
---> Processing batch 19...
---> Processing batch 22...
---> Processing batch 23...
---> Processing batch 24...
---> Processing batch 25...
---> Processing batch 26...
---> Processing batch 27...
---> Processing batch 28...
---> Processing batch 29...
---> Processing batch 30...
---> Processing batch 31...
---> Processing batch 32...
---> Processing batch 33...
---> Processing batch 34...
---> Processing batch 35...
---> Processing batch 36...
---> Processing batch 37...
--

In [17]:
scores_hg38_df = pd.concat(rv_scores)

## Write Out

In [18]:
scores_df = scores_hg38_df
scores_df.shape[0], scores_df['region_id'].nunique(), scores_df['sample_id'].nunique()

(206303, 1648, 127)

In [19]:
%%time
scores_df.to_csv(RESULTS_PATH,
                 sep='\t', 
                 index=False)

CPU times: user 3.1 s, sys: 2.43 ms, total: 3.11 s
Wall time: 3.1 s


In [20]:
scores_df.shape

(206303, 14)

In [23]:
scores_df.head()

Unnamed: 0,sample_id,region_id,number_molecules,meth_k1,unmeth_k1,total_k1,meth_k3,unmeth_k3,total_k3,meth_k4,unmeth_k4,total_k4,frac_alpha_leq_25pct,frac_alpha_geq_75pct
0,ERS1022343,Immune_Broad_Neutro-chr1:9147789-9147871,21,64,6,70,23,1,27,5,0,7,0.047619,0.904762
1,ERS1112536,Immune_Broad_Neutro-chr1:9147789-9147871,18,51,17,68,15,0,32,5,0,14,0.111111,0.666667
2,ERS1112540,Immune_Broad_Neutro-chr1:9147789-9147871,13,32,16,48,9,3,20,4,1,9,0.153846,0.538462
3,ERS1138462,Immune_Broad_Neutro-chr1:9147789-9147871,23,65,23,88,20,4,42,8,1,19,0.173913,0.695652
4,ERS1138463,Immune_Broad_Neutro-chr1:9147789-9147871,15,42,9,51,14,1,21,2,0,6,0.066667,0.866667
