We will generate a number of count tables, described in sections below. 

## import 

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F
import pyspark.sql.types as T
import pickle
import pandas as pd

## create a spark session

In [2]:
conf = SparkConf() \
    .setAppName("Count")\

# Create a SparkContext with the specified configurations
if 'spark' in locals() and spark!=None:
    spark.stop()

sc = SparkContext(conf=conf)

# Create a SparkSession from the SparkContext
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/23 16:04:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load in gnomad variants filtered in the last script

In [3]:
#loading in all autosomes
#Skipping sex chromosomes, see readme
df = spark.read \
    .option("comment", "#") \
    .option("delimiter", ",") \
    .csv("/gpfs/gibbs/pi/reilly/VariantEffects/scripts/noon_data/2.filter/*.csv/*.csv", header=True)

24/01/23 16:04:56 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

## cast columns to the appropriate types & Drop columns rows with null values. 

Dropping isn't strictly necessary. We could, for example, only drop those rows with null malinouis skew when computing malinouis-skew-based metrics, drop rows with no phyloP scores when computing phyloP-based metrics, etc etc. However, this would result in different sets of variants summarized by each graph, which could create biases : if, for example, PhyloP scores are annotated for a nonrandom set of variants. Therefore I will drop rows with null data in any relevant columns prior to subsequent analysis. 

In [4]:
int_columns=["POS","AC","AN"]
float_columns=["AF","K562__ref","HepG2__ref","SKNSH__ref","K562__alt","HepG2__alt","SKNSH__alt","K562__skew","HepG2__skew","SKNSH__skew","cadd_phred","P_ANNO","mean_ref","mean_skew","MAF"]
cre_bool_columns=[]
for column in df.columns:
    if column.startswith("is_in"):
        cre_bool_columns.append(column)

In [5]:
df = df.dropna()#subset=["CHROM","POS","cadd_phred","P_ANNO","mean_ref","mean_skew","category"]+cre_bool_columns

In [6]:

for column in int_columns:
    df = df.withColumn(column, F.col(column).cast(T.IntegerType()))

for column in float_columns:
    df = df.withColumn(column, F.col(column).cast(T.FloatType()))

for column in cre_bool_columns:
    df = df.withColumn(column, F.col(column).cast(T.BooleanType()))

    
df_cre=df

### Compute pleitropy

"Pleitropy" here refers to a variant which is an emVar in multiple cell-types. We're calling emVars as anything with abs(skew)>=0.5 and max(alt activitym ref activity)>=1

In [7]:
#first we compute whether each variant can be called an emvar in each cell-type. 
for cell_type in ["K562","SKNSH","HepG2"]:
    df_cre = df_cre.withColumn(f"emVar_{cell_type}", 
                           (F.abs(F.col(f"{cell_type}__skew")) >= 0.5) & 
                           (F.greatest(F.col(f"{cell_type}__ref"), F.col(f"{cell_type}__alt")) >= 1.0))

#next, we count the number of cell-types each variant is an emvar in to compute the pleitropy. 
df_cre = df_cre.withColumn("pleio", F.col("emVar_K562").cast("int") + F.col("emVar_SKNSH").cast("int") + F.col("emVar_HepG2").cast("int"))

# compute count tables

All count tables will be broken down by each of the CRE types. 

## PhyloP vs rarity
- add column : "significant"/"not significant" : threshold is 2.27
- count table of significance VS category
- dump to disc

In [8]:
df_phylop_significant=df_cre.withColumn("phylop_significant",F.col("P_ANNO")>=2.27)

In [None]:
phylop_count_table = df_phylop_significant.groupBy(["category","phylop_significant"]+cre_bool_columns).count()

In [21]:
data_base_path="/home/mcn26/varef/scripts/noon_data/3.count/"

In [12]:
#ACTION
phylop_count_table.coalesce(1).write.csv(data_base_path+"phylop_count_table", mode="overwrite", header=True)

                                                                                

## PhyloP VS pleiotropy
Are variants which cause skew in different numbers of cell-types conserved more or less?

In [10]:
phylop_pleio = df_phylop_significant.groupBy(["pleio","phylop_significant"]+cre_bool_columns).count()

In [11]:
#ACTION
phylop_pleio.coalesce(1).write.csv(data_base_path+"phylop_pleio", mode="overwrite", header=True)

24/01/22 18:08:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

## CADD vs rarity
Similar approach to phylop above,

Cutoffs are 
- All
- score≥10
- score≥20
- score≥30
- score≥40
- score≥50

In [13]:
df_cadd_cutoff=df_cre.withColumn(
    "CADD>=10",F.col("cadd_phred")>=10
).withColumn(
    "CADD>=20",F.col("cadd_phred")>=20
).withColumn(
    "CADD>=30",F.col("cadd_phred")>=30
).withColumn(
    "CADD>=40",F.col("cadd_phred")>=40
).withColumn(
    "CADD>=50",F.col("cadd_phred")>=50
)

cadd_columns=["CADD>=10","CADD>=20","CADD>=30","CADD>=40","CADD>=50"]

with open("cadd_columns.pkl",'wb') as file:
    pickle.dump(cadd_columns,file)

cadd_count_table = df_cadd_cutoff.groupBy(["category"]+cadd_columns+cre_bool_columns).count()


In [22]:
#ACTION
cadd_count_table.coalesce(1).write.csv(data_base_path+"CADD_count_table", mode="overwrite", header=True)

ERROR:root:KeyboardInterrupt while sending command.         (1726 + 10) / 10743]
Traceback (most recent call last):
  File "/home/mcn26/.conda/envs/mcn_vareff/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/mcn26/.conda/envs/mcn_vareff/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/mcn26/.conda/envs/mcn_vareff/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt

KeyboardInterrupt: 



## CADD VS pleiotropy

In [None]:
cadd_pleio_table = df_cadd_cutoff.groupBy(["pleio"]+cadd_columns+cre_bool_columns).count()

In [None]:
#ACTION
cadd_pleio_table.coalesce(1).write.csv(data_base_path+"CADD_pleio", mode="overwrite", header=True)

## malinouis : reference activity & skew vs rarity


In [8]:
## add a mean_alt column
df_cre=df_cre.withColumn("mean_alt", (F.col("K562__alt") + F.col("HepG2__alt") + F.col("SKNSH__alt")) / 3)

### helper functions

In [9]:
def get_column_names(var):
    final_names=[]
    for sub in var:
        final_names.append(sub[0])
    return final_names

def dump_cutoff_names_to_disc(var,name):
    #so we don't have to hard-code the names in multiple files. 
    #It's ugly enough that we're hard-coding the thresholds
    with open(name+'.pkl', 'wb') as file:
        final_names=get_column_names(var)
        pickle.dump(final_names, file)

#Ugly code! Really ought to combine make_reference_cutoffs & make_skew_cutoffs into one function that takes a list of intervals
#then a second function that can make intervals based on start/stop/step
def make_reference_cutoffs(name):
    return [
        [f"{name}_(-Inf,-6)", (F.col(name) < -6)]
    ] + [
        [f"{name}_[{i},{i+1})", (F.col(name) >= i) & (F.col(name) < i+1)] for i in range(-6, 6)
    ] + [
        [f"{name}_[6,Inf)", (F.col(name) >= 6)]
    ]

def make_skew_cutoffs(name):
    start_int = -9   # corresponds to -4.5 (represented as -9 * 0.5)
    end_int = 9      # corresponds to 4.5 (represented as 9 * 0.5)
    step_int = 1     # Step of 0.5 (represented as 1 * 0.5)

    return [
        [f"{name}_(-Inf, -4.0)", (F.col(name) < -4.0)]
        if i == start_int
        else [f"{name}_(4.0, Inf)", (F.col(name) >= 4.0)]
        if i == end_int - step_int
        else [f"{name}_[{i * 0.5:.1f}, {(i + step_int) * 0.5:.1f})", (F.col(name) >= i * 0.5) & (F.col(name) < (i + step_int) * 0.5)]
        for i in range(start_int, end_int, step_int)
    ]

def apply_cutoffs(df,cutoffs):
    df_working=df
    for name,cutoff_condition in cutoffs:
        df_working=df_working.withColumn(name,cutoff_condition)
    return df_working

In [10]:
#list of lists of skew,ref column names we would like to use. 
cuts= [["mean_skew" , "mean_ref"],["K562__skew","K562__ref"],["HepG2__skew","HepG2__ref"],["SKNSH__skew","SKNSH__ref"]]
#create the actual cutoffs & add to the vector

#cuts=[[i[0],i[1],make_skew_cutoffs(i[0]),make_reference_cutoffs(i[1])]for i in cuts]
cuts=[{"skew_name":i[0],'skew_cuts':make_skew_cutoffs(i[0]),'ref_name':i[1],'ref_cuts':make_reference_cutoffs(i[1])} for i in cuts]

In [11]:
#dump it all to disc
for i in cuts:
    dump_cutoff_names_to_disc(var=i["skew_cuts"],name=i["skew_name"]+".pkl")
    dump_cutoff_names_to_disc(var=i["ref_cuts"],name=i["ref_name"]+".pkl")

In [12]:
#apply all cuts
df_working=df_cre
for i in cuts:
    df_working=apply_cutoffs(df_working,i["skew_cuts"])
    df_working=apply_cutoffs(df_working,i["ref_cuts"])

In [17]:
# perform the counts
#- for all cell types (separate table)
#- for all regions of interest (within same table)

tabs={}

for i in cuts:
    #get the name of the cell-type (plus mean)
    celltype=i["skew_name"].split("_")[0]
    
    #make a big list of things we want to keep (group by)
    #"category" is rarity category, "cre_bool_columns" are the genomic regions
    to_group_by=["category"]+cre_bool_columns
    #then we group by skew & reference activity bins
    to_group_by=to_group_by+get_column_names(i["skew_cuts"])+get_column_names(i["ref_cuts"])
    
    #now we have to remove commas and periods from the column names because spark will choke on them
    renamed_column_map = {col: col.replace(',', '^').replace('.','&') for col in to_group_by}

    for old_name, new_name in renamed_column_map.items():
        df_working = df_working.withColumnRenamed(old_name, new_name)
    
    #actually do the counting, put in 'tabs' under the cell-type name. 
    tabs[celltype]=df_working.groupBy(list(renamed_column_map.values())).count()
    
    

In [None]:
#dump every table to disc
for name in tabs.keys():
    tabs[name].coalesce(1).write.csv(data_base_path+"malinois_"+name,mode="overwrite", header=True)

[Stage 3:==>                                                 (494 + 10) / 12843]