In [0]:
%pyspark
import sys
import pandas as pd
pd.set_option('display.max_columns', None)
!pip install joypy
#!pip install PdfPages
sys.path.append('./.local/lib/python3.8/site-packages') #/home/notebook/.local/lib/python3.8/site-packages
sys.path.append('./.local/lib/python3.8/site-packages/')
#import PdfPages
from matplotlib.backends.backend_pdf import PdfPages

import joypy
import numpy as np
import glob
from collections import Counter
import matplotlib.pyplot as plt
import matplotlib.backends.backend_pdf
from matplotlib.offsetbox import AnchoredText
from matplotlib_venn import venn2
from matplotlib import cm
import io
import requests
import os
from pyspark import SparkFiles
from pyspark.sql import functions as F
from pyspark.sql.functions import col,collect_list, concat, lit, when, monotonically_increasing_id, concat_ws 
from pyspark.sql.types import StringType, DoubleType

import seaborn as sns
from decimal import Decimal
from scipy.stats import mannwhitneyu



# Read in scored variants (just de novos and high impact) from CHD cohort


In [2]:
%pyspark
#scored_variants_dedup = spark.read.parquet('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/tables/scored_variants_HEART_aug13.parquet/')
#scored_variants_dedup = spark.read.parquet('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/tables/scored_variants_HEART_w_eQTLs.parquet')
scored_variants_dedup = spark.read.parquet('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/tables/scored_variants_HEART_w_eQTLs_sept15.parquet')
scored_variants_dedup = scored_variants_dedup.withColumn('cadd_score',col('cadd_score').cast(DoubleType()))
scored_variants_dedup.count() # 103477068


In [3]:
%pyspark
scored_variants_dedup.drop('is_proband','info_dp','quality').show()

In [4]:
%pyspark
# GTEx data
kidney = spark.read.csv('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/tables/gene_burden_testing_KIDNEY.csv',header=True)
heart = spark.read.csv('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/tables/gene_burden_testing_HEART.csv',header=True)

heart = heart.withColumn("combined_TPM",sum(heart[col] for col in  ['Heart - Left Ventricle', 'Heart - Atrial Appendage']) )
kidney = kidney.withColumn("combined_TPM",sum(kidney[col] for col in ['Kidney - Cortex', 'Kidney - Medulla'] ))

In [5]:
%pyspark
def get_perc_genes(df1,df2,percentage=0,constant=0,drop_mt=False,bottom_perc=0):
    
    if drop_mt:
        df1 = df1.where(~col('genes').startswith('MT-'))
        df2 = df2.where(~col('genes').startswith('MT-'))
        
    df1_len, df2_len = df1.count(), df2.count()
    
    df1_sort = df1.sort('combined_TPM',ascending=False)
    df2_sort = df2.sort('combined_TPM',ascending=False)
    
    if percentage and not constant:
        CUTOFF_1 = int(percentage*df1_len)
        CUTOFF_2 = int(percentage*df2_len)
        df1_genes = df1_sort.select('genes').toPandas().iloc[:CUTOFF_1,:].reset_index(drop=True)['genes']
        df2_genes = df2_sort.select('genes').toPandas().iloc[:CUTOFF_2,:].reset_index(drop=True)['genes']

    elif bottom_perc !=0:
        CUTOFF_1 = int(bottom_perc*df1_len)
        print(CUTOFF_1)
        CUTOFF_2 = int(bottom_perc*df2_len)
        df1_sort = df1.sort('combined_TPM',ascending=True)
        df2_sort = df2.sort('combined_TPM',ascending=True)
        
        df1_genes = df1_sort.select('genes').toPandas().iloc[:CUTOFF_1,:].reset_index(drop=True)['genes']
        df2_genes = df2_sort.select('genes').toPandas().iloc[:CUTOFF_2,:].reset_index(drop=True)['genes']        
    elif constant:
        df1_genes = df1_sort.select('genes').toPandas().iloc[:constant,:].reset_index(drop=True)['genes']
        df2_genes = df2_sort.select('genes').toPandas().iloc[:constant,:].reset_index(drop=True)['genes']
        
    return df1_genes, df2_genes
    
    
def plot_chd_ridgelines(percent_df,stratify_type):
    #stratify_type='Percentage' or 'Constant'
    pg = percent_df.groupby(stratify_type, sort=False)
    
    plt.figure(figsize=(16,24))
    fig, axes = joypy.joyplot(pg,by=stratify_type,linewidth=.5,  ylim = 'own',
    overlap=.5,
    color=[ '#eb4d4b', '#686de0'],
        legend=True,
        alpha=0.85,)
        
    #plt.title('CHD gene burden analysis using GTEx tissue datasets', fontsize=13)
    plt.tight_layout()
    #plt.savefig(f'CHD_gtex_ridgeline.png')
    z.show(plt)
    plt.close()

In [6]:
%pyspark
scored_variants_dedup.where(col('DeNovo_boolean') == 1).groupBy('biospecimen_id').sum('cadd_score').show()

In [7]:
%pyspark
print('Total De Novo.............',scored_variants_dedup.where(col('DeNovo_boolean') == 1).count()) # De Novo High/Moderate
print('Total VEP High Impact.....',scored_variants_dedup.where(col('High_Impact_boolean') == 1).count()) # VEP High Impact 
print('Total CADD > 20...........',scored_variants_dedup.where(col('gt20_boolean') == 1).count()) # CADD greater than 20.0
print('Total eQTL................',scored_variants_dedup.where(col('eqtl_boolean') == 1).count()) # eQTLs

print('Unique De Novo.............',scored_variants_dedup.where(col('DeNovo_boolean') == 1).select('unique_variant_id').distinct().count()) # De Novo High/Moderate
print('Unique VEP High Impact.....',scored_variants_dedup.where(col('High_Impact_boolean') == 1).select('unique_variant_id').distinct().count()) # VEP High Impact 
print('Unique CADD > 20...........',scored_variants_dedup.where(col('gt20_boolean') == 1).select('unique_variant_id').distinct().count()) # CADD greater than 20.0
print('Unique eQTL................',scored_variants_dedup.where(col('eqtl_boolean') == 1).select('unique_variant_id').distinct().count()) # eQTLs

print('Total De Novo genes.............',scored_variants_dedup.where(col('DeNovo_boolean') == 1).select('symbol').distinct().count()) # De Novo High/Moderate
print('Total VEP High Impact genes.....',scored_variants_dedup.where(col('High_Impact_boolean') == 1).select('symbol').distinct().count()) # VEP High Impact 
print('Total CADD > 20 genes...........',scored_variants_dedup.where(col('gt20_boolean') == 1).select('symbol').distinct().count()) # CADD greater than 20.0
print('Total eQTL genes................',scored_variants_dedup.where(col('eqtl_boolean') == 1).select('symbol').distinct().count()) # eQTLs

In [8]:
%pyspark
denovo_BSids = list(Counter(scored_variants_dedup.where(col('DeNovo_boolean') == 1).select('biospecimen_id').toPandas()['biospecimen_id']).values())
#highImpact_BSids = list(Counter(scored_variants_dedup.where(col('High_Impact_boolean') == 1).select('biospecimen_id').toPandas()['biospecimen_id']).values())
#caddGT20_BSids = list(Counter(scored_variants_dedup.where(col('gt20_boolean') == 1).select('biospecimen_id').toPandas()['biospecimen_id']).values())
#eqtl_BSids = list(Counter(scored_variants_dedup.where(col('eqtl_boolean') == 1).select('biospecimen_id').toPandas()['biospecimen_id']).values())


In [9]:
%pyspark
plt.close('all')
plt.hist().select('biospecimen_id').toPandas()['biospecimen_id']).values())
z.show(plt)

In [10]:
%pyspark
plt.close('all')
plt.hist(highImpact_BSids)
z.show(plt)

In [11]:
%pyspark
plt.close('all')
plt.hist(eqtl_BSids)
z.show(plt)

In [12]:
%pyspark
plt.close('all')
plt.hist(caddGT20_BSids)
z.show(plt)

In [13]:
%pyspark
scored_variants_dedup.where(col('symbol').isin(list(heartGenesPerc))) #scored_variants_dedup.where(col('symbol').isin(list(heartGenesPerc)))

In [14]:
%pyspark
plt.close('all')

#means = gene_overlaps = num_genes = num_vars = []
percentages = [.001,.01,.025,.05,.1, .15,.2]
constants = [5,10,20,50,100,250,500,1000]
PERCENTAGE = CONSTANT = 0

with PdfPages('CHD_GTEx_percent_filtering_Genes.pdf') as pdf:
        
    #for n,PERCENTAGE in enumerate(percentages):
    for n,CONSTANT in enumerate(constants):
        
        heartGenesPerc,kidneyGenesPerc = get_perc_genes(heart,kidney,percentage=0,constant=CONSTANT,drop_mt=True)
        #heartGenesPerc,kidneyGenesPerc = get_perc_genes(heart,kidney,percentage=PERCENTAGE,constant=0,drop_mt=True)
        
        #print('Number of Heart genes:',len(heartGenesPerc),' Number of Kidney genes:',len(kidneyGenesPerc))
        # length of intersection   divided by    length of unique set
        percentage_overlap = np.round(100 * (   len(set(list(heartGenesPerc)).intersection(set(list(kidneyGenesPerc)))) / len(set(list(heartGenesPerc) + list(kidneyGenesPerc)))),2  )
        #print('overlap',str(percentage_overlap))
        
        #scored_variants_dedup = scored_variants_dedup.where(col('DeNovo_boolean') == 1)
        #scored_variants_dedup = scored_variants_dedup.where(col('High_Impact_boolean') == 1)
        #scored_variants_dedup = scored_variants_dedup.where(col('gt20_boolean') == 1)
        #scored_variants_dedup = scored_variants_dedup.where(col('eqtl_boolean') == 1)
        
        df_vars_heart = scored_variants_dedup.where(col('symbol').isin(list(heartGenesPerc)))
        df_vars_kidney = scored_variants_dedup.where(col('symbol').isin(list(kidneyGenesPerc)))
        
        heart_variant_count = df_vars_heart.count()
        kidney_variant_count = df_vars_kidney.count()
        #print('Number of Heart variants:',heart_variant_count, 'Number of Kidney variants:', kidney_variant_count)
        
        # get max cadd score in each tissue for imputing indels w/o cadd scores
        MAX_CADD_SCORE_heart = df_vars_heart.agg({"cadd_score": "mean"}).collect()[0][0] # max
        MAX_CADD_SCORE_kidney = df_vars_kidney.agg({"cadd_score": "mean"}).collect()[0][0] # max
        #print(MAX_CADD_SCORE_heart,MAX_CADD_SCORE_kidney)
        #print('Imputing ',str(df_vars_heart.where( col('cadd_score').isNull() ).count()), 'heart variants.')
        #print('Imputing ',str(df_vars_kidney.where( col('cadd_score').isNull() ).count()), 'kidney variants.')
        
        df_vars_heart = df_vars_heart.withColumn('cadd_score_fixed',when(   col('cadd_score').isNull(), MAX_CADD_SCORE_heart   ).otherwise(col('cadd_score')))
        df_vars_kidney = df_vars_kidney.withColumn('cadd_score_fixed',when(   col('cadd_score').isNull(), MAX_CADD_SCORE_kidney   ).otherwise(col('cadd_score')))
        
        #df_vars_heart = df_vars_heart.sort('cadd_score_fixed',ascending=False).limit(2000)  # take just 1000 highest scoring variants
        #df_vars_kidney = df_vars_kidney.sort('cadd_score_fixed',ascending=False).limit(2000)
        #vars_per_person_heart =np.round(df_vars_heart.groupBy('biospecimen_id').count().select(F.mean('count')).collect()[0][0],2) 
        #vars_per_person_kidney = np.round(df_vars_kidney.groupBy('biospecimen_id').count().select(F.mean('count')).collect()[0][0],2) 
        
        # aggregate by gene symbol, not plotting  these
        cadd_gene_sums_heart = df_vars_heart.groupBy("symbol").sum("cadd_score_fixed").select(F.round(col('sum(cadd_score_fixed)')).alias('cadd_score_sum'),'symbol')
        cadd_gene_sums_kidney = df_vars_kidney.groupBy("symbol").sum("cadd_score_fixed").select(F.round(col('sum(cadd_score_fixed)')).alias('cadd_score_sum'),'symbol')
        # get frequency of each gene as a column
        cadd_gene_sums_heart = cadd_gene_sums_heart.join(df_vars_heart.groupBy('symbol').count(),'symbol')#.toPandas()['cadd_score_sum']
        cadd_gene_sums_heart = cadd_gene_sums_kidney.join(df_vars_kidney.groupBy('symbol').count(),'symbol')#.toPandas()['cadd_score_sum']
        
        break
        #cadd_sums_heart = df_vars_heart.groupBy("biospecimen_id").sum("cadd_score_fixed").select('sum(cadd_score_fixed)','biospecimen_id')#.toPandas()['sum(cadd_score_fixed)']
        #cadd_sums_kidney = df_vars_kidney.groupBy("biospecimen_id").sum("cadd_score_fixed").select('sum(cadd_score_fixed)','biospecimen_id')#.toPandas()['sum(cadd_score_fixed)']
        #cadd_sums_heart = cadd_sums_heart.join(df_vars_heart.groupBy('biospecimen_id').count(),'biospecimen_id') # join frequency of variants and cadd sums (both per person)
        #cadd_sums_kidney = cadd_sums_kidney.join(df_vars_kidney.groupBy('biospecimen_id').count(),'biospecimen_id') # join frequency of variants and cadd sums (both per person)
        
        # Normalize by per person ('count' col from previous 2 lines^^), # here we are dividing the (fixed, aka imputed) CADD scores by the number of variants each person has.
        # This is to account for the fact that the  
        
        #cadd_sums_heart = cadd_sums_heart.withColumn('cadd_sum_normed',F.round(col('sum(cadd_score_fixed)') / col('count'),2)).select('cadd_sum_normed').toPandas()['cadd_sum_normed']
        #cadd_sums_kidney = cadd_sums_kidney.withColumn('cadd_sum_normed',F.round(col('sum(cadd_score_fixed)') / col('count'),2)).select('cadd_sum_normed').toPandas()['cadd_sum_normed']  
        
        # append lists for plotting
        #means.append([np.mean(cadd_sums_heart),np.mean(cadd_sums_kidney)]); gene_overlaps.append(percentage_overlap); 
        #num_genes.append([len(heartGenesPerc),len(kidneyGenesPerc)]); num_vars.append([heart_variant_count,kidney_variant_count])
        
        U1, p = mannwhitneyu(cadd_sums_heart, cadd_sums_kidney) # https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.mannwhitneyu.html
        
        #plt.figure(figsize=(9,6))
        plt.hist(cadd_sums_heart, alpha=0.85,bins=8, label=f'Heart',edgecolor='darkred',color= '#eb4d4b')
        plt.hist(cadd_sums_kidney, alpha=0.85,bins=8, label=f'Kidney',edgecolor='darkblue',color =  '#686de0')
        
        #plt.figure(figsize=(6,6))
        #sns.violinplot(data=percent_df[percent_df['Percentage'] == '0.1%'][['heart','kidney']])
        
        legend = plt.legend(loc='upper right',frameon=True).get_frame().set_edgecolor('k')
        
        if CONSTANT!=0: t = str(CONSTANT)
        else:  t = f'{float(100*PERCENTAGE)}%'
        
        plt.title(f'Top {t} expressed genes from GTEx heart and kidney data\nwere used to filter variants from the CHD cohort')
        plt.xlabel('Per-patient normalized CADD scores'); plt.ylabel('Frequency');  plt.grid(axis='y',alpha=.3); plt.gca().set_axisbelow(True) 
    
        s=f'heart genes: {len(heartGenesPerc):,}, heart variants: {heart_variant_count:,}\nkidney genes: {len(kidneyGenesPerc):,}, kidney variants: {kidney_variant_count:,}\nGene Overlap: {percentage_overlap}%; p-value: {Decimal(p):.2E}';   # format nums w/ commas -->   f'{value:,}'
        
        at = AnchoredText(s, loc=2,prop=dict(size=7.5)); t = plt.gca().add_artist(at) #plt.savefig(f'CHD_distribution_gtex_{int(percentage*100)}percent.png')
        
        pdf.savefig() ; z.show(plt); plt.close()    
        
        if CONSTANT==0: STRATIFY_TYPE='Percentage'; STRING_COL_VALUE = f'{float(PERCENTAGE*100)}%'
        else: STRATIFY_TYPE='Constant'; STRING_COL_VALUE = f'{CONSTANT}'
            
        if n==0:                                
            percent_df = pd.concat([pd.DataFrame(cadd_sums_heart),pd.DataFrame(cadd_sums_kidney)],axis=1); percent_df.columns = ['heart','kidney']
    
             # cols may be differrent lengths (if one or more patients has no variants in one of the genes) esp. if 
             # were taking only a small % of top genes, so make sure we use the largest gene list to create % colummn
            #if CONSTANT!=0:   percent_df['Constant'] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [f'{CONSTANT}'] 
            #else: percent_df['Percentage'] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [f'{float(PERCENTAGE*100)}%'] 
            percent_df[STRATIFY_TYPE] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [STRING_COL_VALUE] 
            
        elif  n>0: 
            temp_df = pd.concat([pd.DataFrame(cadd_sums_heart),pd.DataFrame(cadd_sums_kidney)],axis=1); temp_df.columns = ['heart','kidney']
            
            #if CONSTANT!=0:  temp_df['Constant'] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [f'{CONSTANT}'] 
            #else: temp_df['Percentage'] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [f'{float(PERCENTAGE*100)}%'] 
            temp_df[STRATIFY_TYPE] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [STRING_COL_VALUE] 
            percent_df = pd.concat([percent_df,temp_df])

        if constants[-1] == CONSTANT or percentages[-1] == PERCENTAGE:
            #plot_chd_ridgelines(percent_df,stratify_type=STRATIFY_TYPE); 
            pg = percent_df.groupby(STRATIFY_TYPE, sort=False)
            
            plt.figure(figsize=(16,24))
            fig, axes = joypy.joyplot(pg,by=STRATIFY_TYPE,linewidth=.5,  ylim = 'own', overlap=.5,color=[ '#eb4d4b', '#686de0'],legend=True,alpha=0.85,)
            plt.title('Summary of CHD analysis using GTEx tissue datasets', fontsize=13)
            plt.xlabel('Per-patient normalized CADD scores');  plt.tight_layout()
           
            z.show(plt); pdf.savefig();plt.close(); #plt.savefig(f'CHD_gtex_ridgeline.png')
            

#cp  CHD_GTEx_constant_filtering.pdf  ~/cavatica/projects/taylordm/taylor-urbs-r03-kf-cardiac/
#percent_df = percent_df.iloc[::-1]

In [15]:
%sh
#ls 
cp CHD_GTEx_percent_filtering_bsID_2.pdf  ~/cavatica/projects/taylordm/taylor-urbs-r03-kf-cardiac/ 


In [16]:
%pyspark
cadd_gene_sums_heart.sort('count',ascending=False).show(5)


In [17]:
%pyspark
cadd_gene_sums_heart.sort('cadd_score_sum',ascending=False).show(5)

In [18]:
%pyspark
cadd_gene_sums_heart.select('symbol').distinct().count()

In [19]:
%pyspark

#df_vars_heart.drop('cadd_score','is_proband','quality','info_dp','reference','alternate','chromosome').toPandas().to_csv(f'CHD_GTEx_heart_{float(PERCENTAGE*100)}%.csv',index=False)
#df_vars_kidney.drop('cadd_score','is_proband','quality','info_dp','reference','alternate','chromosome').toPandas().to_csv(f'CHD_GTEx_kidney_{float(PERCENTAGE*100)}%.csv',index=False)  
#df_vars_heart.drop('cadd_score','is_proband','quality','info_dp','reference','alternate','chromosome').write.parquet(f'CHD_GTEx_heart_{float(PERCENTAGE*100)}%.parquet')
#df_vars_kidney.drop('cadd_score','is_proband','quality','info_dp','reference','alternate','chromosome').write.parquet(f'CHD_GTEx_kidney_{float(PERCENTAGE*100)}%.parquet')


In [20]:
%pyspark
plt.close('all')
plt.hist(cadd_gene_sums_heart.select('cadd_score_sum').toPandas()['cadd_score_sum'].values,bins=5)
z.show(plt)

In [21]:
%pyspark
cadd_gene_sums_kidney.sort('sum(cadd_score_fixed)',ascending=False).show()

In [22]:
%pyspark
.show()

In [23]:
%pyspark
df_vars_heart.sort('cadd_score_fixed',ascending=False).drop('start','rsID','is_proband','quality',
                                                            'info_dp','reference','alternate','chromosome',
                                                            'name','hgvsg','biospecimen_id').limit(1000).show()

In [24]:
%pyspark
df_vars_heart.where((col('cadd_score').isNull())).count()

In [25]:
%pyspark
df_vars_kidney.count()

In [26]:
%pyspark
print(df_vars_heart.count())
#df_vars_heart.where(col('cadd_score').isNull()).count()

In [27]:
%pyspark
df_vars_heart.agg({"cadd_score": "mean"}).collect()[0][0]

In [28]:
%pyspark
cadd_sums_kidney.show()

In [29]:
%pyspark
ls

In [30]:
%pyspark
df_vars_heart.drop('cadd_score','is_proband','quality','info_dp','reference','alternate','chromosome').count()

In [31]:
%pyspark
plt.close('all')
plt.plot(pd.DataFrame(means),[ '#eb4d4b', '#686de0'])
z.show(plt)

In [32]:
%pyspark
pd.DataFrame(means)
pd.DataFrame(gene_overlaps)
pd.DataFrame(num_genes)
pd.DataFrame(num_vars)

In [33]:
%pyspark
#plt.figure(figsize=(9,6))
data = [cadd_sums_heart, cadd_sums_kidney]
fig, ax = plt.subplots()
#ax.set_title(f'Distributions of (per person) CADD scores from the\nCHD cohort (top {int(percentage*100)}% of GTEx expressed genes in heart vs kidney)')
ax.boxplot(data)
ax.grid(alpha=.25)
plt.xticks([1, 2], ['GTEx Heart gene list', 'GTEx Kidney gene list'])
#plt.savefig(f'CHD_bloxplot_gtex_{int(percentage*100)}percent.png')

z.show(plt)
plt.close()

In [34]:
%pyspark
plt.figure(figsize=(6,6))
sns.violinplot(data=percent_df[percent_df['Percentage'] == '0.1%'][['heart','kidney']])
z.show(plt)

# Now compute aggregated (per person) CADD scores for the HuBMAP Heart (by study and cluster) data


In [36]:
%pyspark
hm_heart = spark.read.csv('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/tables/hubmap_heart_cluster_genes/integrated_heart_hubmap_sciseq_7samples_markers.csv',header=True)

hm_heart_2 = hm_heart.withColumn("p_val_adj_double", col('p_val_adj').cast(DoubleType()))
hm_heart_2 = hm_heart_2.where((col('_c0') != 1) &\
                            (col('p_val_adj') < .05) &\
                            (col('avg_log2FC') > .48))\
                            .select( list(set(hm_heart_2.columns) - set(["pct.1","pct.2","_c0"])) )
                            
hm_heart_2 = hm_heart_2.where(col('SYMBOL') != 'NA')
print(hm_heart_2.count())


hm_kidney = spark.read.csv('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/tables/hubmap_heart_cluster_genes/integrated_kidney_hubmap_10xseq_7samples_markers.csv',header=True)

hm_kidney_2 = hm_kidney.withColumn("p_val_adj_double", col('p_val_adj').cast(DoubleType()))
hm_kidney_2 = hm_kidney_2.where((col('_c0') != 1) &\
                            (col('p_val_adj') < .05) &\
                            (col('avg_log2FC') > .48))\
                            .select( list(set(hm_kidney_2.columns) - set(["pct.1","pct.2","_c0"])) )
                            
hm_kidney_2 = hm_kidney_2.where(col('SYMBOL') != 'NA')
print(hm_kidney_2.count())

In [37]:
%pyspark
Counter(hm_heart_2.select('cluster').toPandas()['cluster'])

In [38]:
%pyspark
def get_hubmap_genes(df1,df2,percentage=0,constant=0,drop_mt=False,bottom_perc=0):
    
    if drop_mt:
        df1 = df1.where(~col('SYMBOL').startswith('MT-'))
        df2 = df2.where(~col('SYMBOL').startswith('MT-'))
        
    df1_len, df2_len = df1.count(), df2.count()
    
    df1_sort = df1.sort('p_val_adj_double',ascending=True)
    df2_sort = df2.sort('p_val_adj_double',ascending=True)
    
    if percentage and not constant:
        CUTOFF_1 = int(percentage*df1_len)
        CUTOFF_2 = int(percentage*df2_len)
        df1_genes = df1_sort.select('SYMBOL').toPandas().iloc[:CUTOFF_1,:].reset_index(drop=True)['SYMBOL']
        df2_genes = df2_sort.select('SYMBOL').toPandas().iloc[:CUTOFF_2,:].reset_index(drop=True)['SYMBOL']
    '''
    elif bottom_perc !=0:
        CUTOFF_1 = int(bottom_perc*df1_len)
        print(CUTOFF_1)
        CUTOFF_2 = int(bottom_perc*df2_len)
        df1_sort = df1.sort('combined_TPM',ascending=True)
        df2_sort = df2.sort('combined_TPM',ascending=True)
        
        df1_genes = df1_sort.select('genes').toPandas().iloc[:CUTOFF_1,:].reset_index(drop=True)['genes']
        df2_genes = df2_sort.select('genes').toPandas().iloc[:CUTOFF_2,:].reset_index(drop=True)['genes']        
    elif constant:
        df1_genes = df1_sort.select('genes').toPandas().iloc[:constant,:].reset_index(drop=True)['genes']
        df2_genes = df2_sort.select('genes').toPandas().iloc[:constant,:].reset_index(drop=True)['genes']
    '''
    return df1_genes, df2_genes

In [39]:
%pyspark
'''   NOT THE CORRECT WAY TO DO THIS -- MUST SPLIT  IT UP BY CLUSTER
plt.close('all')
means = []
gene_overlaps = []
num_genes = []
num_vars = []
#percentages = [.0005,.001,.0015,.002,.003,.004]
percentages = [.001,.01,.025,.05,.1,.15,.2]
percentages = [.01,.025,.05,.1,.25,.5,.75,1]

#for n,CONSTANT in enumerate([5,20,50,100,250,500,1000]):
#for n,PERCENTAGE in enumerate(percentages):
for n,PERCENTAGE in enumerate(percentages):

    heartGenesPerc,kidneyGenesPerc = get_hubmap_genes(hm_heart_2,hm_kidney_2,percentage=PERCENTAGE,constant=0,drop_mt=True)
    
    #print('Number of Heart genes:',len(heartGenesPerc),' Number of Kidney genes:',len(kidneyGenesPerc))
    # length of intersection   divided by    length of unique set
    percentage_overlap = np.round(100 * (   len(set(list(heartGenesPerc)).intersection(set(list(kidneyGenesPerc)))) / len(set(list(heartGenesPerc) + list(kidneyGenesPerc)))),2  )
    #print('overlap',str(percentage_overlap))
    
    df_vars_heart = scored_variants_dedup.where(col('symbol').isin(list(heartGenesPerc)))
    df_vars_kidney = scored_variants_dedup.where(col('symbol').isin(list(kidneyGenesPerc)))
    
    heart_variant_count = df_vars_heart.count()
    kidney_variant_count = df_vars_kidney.count()
    #print('Number of Heart variants:',heart_variant_count, 'Number of Kidney variants:', kidney_variant_count)
    
    # get max cadd score in each tissue for imputing indels w/o cadd scores
    MAX_CADD_SCORE_heart = df_vars_heart.agg({"cadd_score": "max"}).collect()[0][0]
    MAX_CADD_SCORE_kidney = df_vars_kidney.agg({"cadd_score": "max"}).collect()[0][0]
    
    #print(MAX_CADD_SCORE_heart,MAX_CADD_SCORE_kidney)
    #print('Imputing ',str(df_vars_heart.where( col('cadd_score').isNull() ).count()), 'heart variants.')
    #print('Imputing ',str(df_vars_kidney.where( col('cadd_score').isNull() ).count()), 'kidney variants.')
    
    df_vars_heart = df_vars_heart.withColumn('cadd_score_fixed',when(   col('cadd_score').isNull(), MAX_CADD_SCORE_heart   ).otherwise(col('cadd_score')))
    df_vars_kidney = df_vars_kidney.withColumn('cadd_score_fixed',when(   col('cadd_score').isNull(), MAX_CADD_SCORE_kidney   ).otherwise(col('cadd_score')))
    
    #vars_per_person_heart =np.round(df_vars_heart.groupBy('biospecimen_id').count().select(F.mean('count')).collect()[0][0],2) 
    #vars_per_person_kidney = np.round(df_vars_kidney.groupBy('biospecimen_id').count().select(F.mean('count')).collect()[0][0],2) 
    
    cadd_sums_heart = df_vars_heart.groupBy("biospecimen_id").sum("cadd_score_fixed").select('sum(cadd_score_fixed)','biospecimen_id')
    cadd_sums_kidney = df_vars_kidney.groupBy("biospecimen_id").sum("cadd_score_fixed").select('sum(cadd_score_fixed)','biospecimen_id')
    
    cadd_sums_heart = cadd_sums_heart.join(df_vars_heart.groupBy('biospecimen_id').count(),'biospecimen_id') # join frequency of variants and cadd sums (both per person)
    cadd_sums_kidney = cadd_sums_kidney.join(df_vars_kidney.groupBy('biospecimen_id').count(),'biospecimen_id') # join frequency of variants and cadd sums (both per person)
    
    # Normalize by per person ('count' col from previous 2 lines^^), # here we are dividing the (fixed, aka imputed) CADD scores by the number of variants each person has.
    # This is to account for the fact that the  
    cadd_sums_heart = cadd_sums_heart.withColumn('cadd_sum_normed',F.round(col('sum(cadd_score_fixed)') / col('count'),2)).select('cadd_sum_normed').toPandas()['cadd_sum_normed']
    cadd_sums_kidney = cadd_sums_kidney.withColumn('cadd_sum_normed',F.round(col('sum(cadd_score_fixed)') / col('count'),2)).select('cadd_sum_normed').toPandas()['cadd_sum_normed']  
    
    # append lists for plotting
    means.append([np.mean(cadd_sums_heart),np.mean(cadd_sums_kidney)]); gene_overlaps.append(percentage_overlap); 
    num_genes.append([len(heartGenesPerc),len(kidneyGenesPerc)]); num_vars.append([heart_variant_count,kidney_variant_count])
    
    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.mannwhitneyu.html
    U1, p = mannwhitneyu(cadd_sums_heart, cadd_sums_kidney)

    #plt.figure(figsize=(9,6))
    plt.hist(cadd_sums_heart, alpha=0.85,bins=8, label=f'Heart',edgecolor='darkred',color= '#eb4d4b')
    plt.hist(cadd_sums_kidney, alpha=0.85,bins=8, label=f'Kidney',edgecolor='darkblue',color =  '#686de0')
    
    #plt.figure(figsize=(6,6))
    #sns.violinplot(data=percent_df[percent_df['Percentage'] == '0.1%'][['heart','kidney']])
    
    legend = plt.legend(loc='upper right',frameon=True).get_frame().set_edgecolor('k')
    #legend.get_frame().set_alpha(None);legend.get_frame().set_facecolor((0, 0, 1, 0.1))
    
    plt.title(f'Top {float(100*PERCENTAGE)}% expressed genes from GTEx heart and kidney data\nwas used to filter variants from the CHD cohort')
    plt.xlabel('Per-patient normalized CADD scores'); plt.ylabel('Frequency');  plt.grid(axis='y',alpha=.3); plt.gca().set_axisbelow(True) 

    s=f'heart genes: {len(heartGenesPerc):,}, heart variants: {heart_variant_count:,}\nkidney genes: {len(kidneyGenesPerc):,}, kidney variants: {kidney_variant_count:,}\nGene Overlap: {percentage_overlap}%\nMann-Whitney U p-value: {Decimal(p):.2E}';   # format nums w/ commas -->   f'{value:,}'
    
    at = AnchoredText(s, loc=2,prop=dict(size=9))

    t = plt.gca().add_artist(at)
    #t.set_bbox(dict(facecolor='red', alpha=0, edgecolor='red'))
    #plt.savefig(f'CHD_distribution_gtex_{int(percentage*100)}percent.png')
    z.show(plt); plt.close()    
    
    if n==0:# and len(cadd_sums_heart) == len(cadd_sums_kidney):                                
        percent_df = pd.concat([pd.DataFrame(cadd_sums_heart),pd.DataFrame(cadd_sums_kidney)],axis=1)
        percent_df.columns = ['heart','kidney']

         # cols may be differrent lengths (if one or more patients has no variants in one of the genes) esp. if 
         # were taking only a small % of top genes, so make sure we use the largest gene list to create % colummn
        percent_df['Percentage'] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [f'{float(PERCENTAGE*100)}%'] 
        
    elif  n>0:# and len(cadd_sums_heart) == len(cadd_sums_kidney): 
        temp_df = pd.concat([pd.DataFrame(cadd_sums_heart),pd.DataFrame(cadd_sums_kidney)],axis=1)
        temp_df.columns = ['heart','kidney']
        temp_df['Percentage'] = np.max([len(cadd_sums_heart), len(cadd_sums_kidney)]) * [f'{float(PERCENTAGE*100)}%'] 

        percent_df = pd.concat([percent_df,temp_df])

    #if PERCENTAGE == .01:
    #    break
#percent_df = percent_df.iloc[::-1]
plot_chd_ridgelines(percent_df) '''


In [40]:
%pyspark
plt.figure()
venn2([set(heartGenesPerc_cluster), set(kidneyGenesPerc)], ('Heart', 'Kidney'))
plt.title(f'Overlap of top {int(percentage*100)}% expressed genes in heart and kidney (GTEx)')
plt.show()
plt.savefig(f'HM__venn_diagram_{int(percentage*100)}percent.png')

z.show(plt)
plt.close()

In [41]:
%pyspark
plt.figure(figsize=(9,6))
plt.hist(cadd_sums_heart, alpha=0.5,bins=15, label=f'Heart - Top {int(100*percentage)}% expr. genes')
plt.hist(cadd_sums_kidney, alpha=0.5,bins=15, label=f'Kidney - Top {int(100*percentage)}% expr. genes')
plt.legend(loc='upper right')
plt.title('Distribution of sums of Patient scores')
#plt.grid('x')
z.show(plt)
plt.close()

In [42]:
%pyspark

plt.close('all')
#means = num_genes = num_vars = []

percentages = [.001,.01,.025,.05,.1,.15,.2]
percentages = [.01,.05,.1,.25]

with PdfPages('CHD_HuBMAP_percent_filtering_bsID.pdf') as pdf:
        
    heart_cluster_dict = dict()
    kidney_cluster_dict = dict()
    
    clusters_heart = [str(i) for i in np.sort([int(i) for i in hm_heart_2.select('cluster').distinct().toPandas()['cluster'].values])]
    clusters_kidney = [str(i) for i in np.sort([int(i) for i in hm_kidney_2.select('cluster').distinct().toPandas()['cluster'].values])]
    
    # steps are the same for this for loop, we are just seperating the heart and kidney so we can loop through the clusters of each tissue 
    for n,PERCENTAGE in enumerate(percentages):
        
        # loop through heart clusters
        for CLUSTER in clusters_heart:
            print('-----',CLUSTER,'-----')
            hm_heart_cl = hm_heart_2.where(col('cluster') == CLUSTER) 
            heartGenesPerc, _ = get_hubmap_genes(hm_heart_cl,hm_kidney_2,percentage=PERCENTAGE,constant=0,drop_mt=True)
    
            df_vars_heart = scored_variants_dedup.where(col('symbol').isin(list(heartGenesPerc)))
            #heart_variant_count = df_vars_heart.count()
            MAX_CADD_SCORE_heart = df_vars_heart.agg({"cadd_score": "mean"}).collect()[0][0]
            
            #print('# of genes in cluster:', hm_heart_cl.count()); print(f'# of genes after filtering for top {100*PERCENTAGE}%: {heartGenesPerc.count()}')
            #print(f'# of variants: {heart_variant_count}')
            #print(100*(df_vars_heart.where(col('cadd_score').isNull()).count()/heart_variant_count))
            df_vars_heart = df_vars_heart.withColumn('cadd_score_fixed',when(   col('cadd_score').isNull(), MAX_CADD_SCORE_heart   ).otherwise(col('cadd_score')))
            cadd_sums_heart = df_vars_heart.groupBy("biospecimen_id").sum("cadd_score_fixed").select('sum(cadd_score_fixed)','biospecimen_id')
            cadd_sums_heart = cadd_sums_heart.join(df_vars_heart.groupBy('biospecimen_id').count(),'biospecimen_id') # join frequency of variants and cadd sums (both per person)
            cadd_sums_heart = cadd_sums_heart.withColumn('cadd_sum_normed',F.round(col('sum(cadd_score_fixed)') / col('count'),2)).select('cadd_sum_normed').toPandas()['cadd_sum_normed']
    
            heart_cluster_dict['heart_'+CLUSTER] = cadd_sums_heart
        
            
        # loop through kidney tissue
        for CLUSTER in clusters_kidney:
            print('-----',CLUSTER,'-----')
            hm_kidney_cl = hm_kidney_2.where(col('cluster') == CLUSTER) 
            _ , kidneyGenesPerc = get_hubmap_genes(hm_heart_cl,hm_kidney_cl,percentage=PERCENTAGE,constant=0,drop_mt=True)
    
            df_vars_kidney = scored_variants_dedup.where(col('symbol').isin(list(kidneyGenesPerc)))
            #kidney_variant_count = df_vars_kidney.count()
            MAX_CADD_SCORE_kidney = df_vars_kidney.agg({"cadd_score": "mean"}).collect()[0][0]
            
            #print('# of genes in cluster:', hm_kidney_cl.count()); print(f'# of genes after filtering for top {100*PERCENTAGE}%: {kidneyGenesPerc.count()}')
            #print(f'# of variants: {kidney_variant_count}')
            #print(100*(df_vars_kidney.where(col('cadd_score').isNull()).count()/kidney_variant_count))
            df_vars_kidney = df_vars_kidney.withColumn('cadd_score_fixed',when(   col('cadd_score').isNull(), MAX_CADD_SCORE_kidney   ).otherwise(col('cadd_score')))
            cadd_sums_kidney = df_vars_kidney.groupBy("biospecimen_id").sum("cadd_score_fixed").select('sum(cadd_score_fixed)','biospecimen_id')
            cadd_sums_kidney = cadd_sums_kidney.join(df_vars_kidney.groupBy('biospecimen_id').count(),'biospecimen_id') # join frequency of variants and cadd sums (both per person)
            cadd_sums_kidney = cadd_sums_kidney.withColumn('cadd_sum_normed',F.round(col('sum(cadd_score_fixed)') / col('count'),2)).select('cadd_sum_normed').toPandas()['cadd_sum_normed']  
            kidney_cluster_dict['kidney_'+CLUSTER] = cadd_sums_kidney
    
    
        d = dict(heart_cluster_dict,**kidney_cluster_dict)
        plt.figure(figsize=(8,8))
        box = plt.boxplot(d.values(),vert=False,notch=True, patch_artist=True,flierprops={'marker': '.', 'markersize': 5})#, 'markerfacecolor': 'fuchsia'})
        plt.gca().set_yticklabels(d.keys()); plt.grid(alpha=.4) #axis='x'; #ax.xlabel(rotate=90)
        
        colors = ['red']*len(heart_cluster_dict) + ['blue']*len(kidney_cluster_dict)
        for patch, color in zip(box['boxes'], colors): patch.set_facecolor(color)
        
        plt.title(f'Top {float(100*PERCENTAGE)}% smallest p-values of marker genes from each cluster\nwere used to filter CHD cohort') # from HuBMAP heart and kidney studies
        plt.xlabel('Normalized patient CADD scores'); #plt.ylabel('Tissue_Cluster');
        z.show(plt); pdf.savefig(); plt.close()


In [43]:
%sh

cp  CHD_HuBMAP_percent_filtering_bsID.pdf  ~/cavatica/projects/taylordm/taylor-urbs-r03-kf-cardiac/

