# 6/22/23 - This notebook contains a workflow to extract the variants from the CHD cohort in order to count the frequency (the number) of variants throughtout the genome. We use the Chromosome location ontology (CHLO) (as of 8/1/2023 CHLO has been renamed HSCLO to make it human specific) to bin variants into 10k chunks of the genome.

### RUN THIS NOTEBOOK BEFORE THE create_nodes_edges notebook!

# This notebook was converted to jupyter notebook format from Apache zeppelin notebook format using  a command line tool ([ze2nb](https://runawayhorse001.github.io/LearningApacheSpark/ze2nb.html)). The zeppelin notebook was used to prepare data on the Kids First Variant WorkBench 

In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
import time

pd.set_option('display.max_columns', None)


In [2]:
MAF_CUTOFF = 0.01
CHD_variants = spark.table('variants').where((F.array_contains(F.col('studies'), 'SD_PREASA7S')) & (F.col('variant_class') == 'SNV'))\
                                .where((F.col('gnomad_genomes_3_1_1.af') <= MAF_CUTOFF)  | (F.col("gnomad_genomes_3_1_1").isNull()))   # rare freqency, or not in gnomad at all (presumably very rare)

print(CHD_variants.count())
CHD_variants.limit(50).toPandas().head()


In [3]:
CHD_csq = spark.table('consequences').where((F.array_contains(F.col('study_ids'), 'SD_PREASA7S')) & (F.col('variant_class') == 'SNV') )

# filter for high impact lower down
CHD_csq
print(CHD_csq.count())                               
CHD_csq.limit(50).toPandas().head(5)

In [4]:
CHD_csq_pr = CHD_csq.where(F.col('biotype') == 'protein_coding') # reduce size of this df so its faster to merge
CHD_csq_pr.count()

In [5]:
o = spark.table('occurrences').where(F.col('study_id').isin(['SD_PREASA7S']) &  (F.col('is_hi_conf_denovo') == True) & (F.col('variant_class') == 'SNV'))
print(o.count())
o.limit(5).toPandas().head(5)


In [6]:
#cols_keep = [ 'chromosome','start','reference', 'alternate', 'end', 'hgvsg']#, 'variant_class',  'zygosity',  'transmissions',
                                                     # 'gnomad_genomes_3_1_1', 'dbsnp_id', 'clinvar_id','clin_sig']  # 'frequencies',  '1k_genomes', 'topmed',  'name',

#CHD_csq_slct = CHD_csq.where(F.col('impact') == 'HIGH').select(cols_keep)
#CHD_csq_gene_info =  CHD_csq_pr.select( ['hgvsg','ensembl_gene_id','symbol'])


In [7]:
cols_keep = [ 'chromosome','start','reference', 'alternate', 'end', 'hgvsg']#, 'variant_class',  'zygosity',  'transmissions',
                                                     # 'gnomad_genomes_3_1_1', 'dbsnp_id', 'clinvar_id','clin_sig']  # 'frequencies',  '1k_genomes', 'topmed',  'name',

CHD_var_slct = CHD_variants.select(cols_keep)
CHD_csq_slct = CHD_csq.where(F.col('impact') == 'HIGH').select(cols_keep)
CHD_occ_slct = o.select(cols_keep) 

CHD_csq_gene_info =  CHD_csq_pr.select( ['hgvsg','ensembl_gene_id','symbol'])



In [8]:
chd_csq_occ = CHD_csq_slct.union(CHD_occ_slct).dropDuplicates()
print(chd_csq_occ.count())

In [9]:
j = CHD_var_slct.join(chd_csq_occ.select('hgvsg'),on='hgvsg',how='inner')
print(j.count())
j.show()            # 285,954


In [11]:
# test that start and hgsvg always match so we know that its a SNP?

split_col = F.split(j['hgvsg'], ':g.')

j_ = j.withColumn('split_hgsvg',split_col.getItem(1))

j_ = j_.withColumn("location",F.regexp_replace(F.col("split_hgsvg"),"(\D)",""))

#j_ = j_.select(['start','reference', 'alternate', 'name', 'end','chromosome','location','split_hgsvg'])

j_.show(3)

location_df = j_

# taylordm/taylor-urbs-r03-kf-cardiac/files/64593f02a716c906e00175cd

In [12]:
chro = spark.read.csv('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/OWLNETS_node_metadata.csv',header=True)#,sep='\t'))
chro = chro.toPandas()

chro['chromosome'] = [i.split(':')[0].replace('chr','') for i in chro['node_id']]
chro = chro[chro['node_id'].str.contains(':')]
chro['start'] = [i.split(':')[-1].split('-')[0] for i in chro['node_id']]
chro['end'] = [i.split(':')[-1].split('-')[1] for i in chro['node_id']]

chro=spark.createDataFrame(chro) 
chro.show(5)



In [13]:
from pyspark.sql.types import IntegerType

chro = chro.withColumn("chromosome", chro["chromosome"].cast(IntegerType()))\
           .withColumn("start", chro["start"].cast(IntegerType()))\
           .withColumn("end", chro["end"].cast(IntegerType()))
           
location_df = location_df.withColumn("chromosome", location_df["chromosome"].cast(IntegerType()))\
           .withColumn("location", location_df["location"].cast(IntegerType()))
         

In [14]:
#spark.read.parquet("s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/delete_CHD_snp_locs_merged.parquet").show()

In [15]:
#df_merge = location_df.join(chro, (chro.chromosome == location_df.chromosome) &\
#                       (chro.start <= location_df.location) &\
#                       (chro.end >= location_df.location),'leftsemi' )
#df_merge.count()

#df_merge.write \
#    .parquet("s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/CHD_snp_locs_merged_2.parquet")

In [16]:
#df = spark.read.parquet("s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/CHD_snp_locs_merged_2.parquet")
#df.head()

In [17]:
from pyspark.sql.functions import isnan, when, count, col   # why id there a null in the chromosome.result?
#location_df.select([count(when(isnan(c), c)).alias(c) for c in location_df.columns]).show()



In [18]:
uniq_chroms = location_df.select('chromosome').distinct().toPandas().sort_values(by='chromosome').dropna()
uniq_chroms = uniq_chroms[(uniq_chroms['chromosome']!='Y')]
uniq_chroms = uniq_chroms[(uniq_chroms['chromosome']!='X')]
uniq_chroms = [i[0] for i in uniq_chroms.dropna().astype(int).sort_values(by='chromosome').values]
uniq_chroms


In [19]:
#location_df_CHROM =  location_df.where(F.col('chromosome') == int(22))
#chro_CHROM =  chro.where(F.col('chromosome') == int(22))
    


In [20]:
#chro_CHROM = chro_CHROM.sample(1/500)
#location_df_CHROM = location_df_CHROM.sample(1/500)
#location_df_CHROM.count()

In [21]:
for CHROM in uniq_chroms[::-1]:
    print(f'Merging chr{CHROM}',end='')
    s = time.time()
    
    location_df_CHROM =  location_df.where(F.col('chromosome') == int(CHROM))
    chro_CHROM =  chro.where(F.col('chromosome') == int(CHROM))
    
    location_df_CHROM = location_df_CHROM.drop('reference','alternate','name')
    chro_CHROM = chro_CHROM.select(F.col('node_id'),F.col('chromosome').alias('chromosome_chro'),F.col('start').alias('start_chro'),F.col('end').alias('end_chro'))

    #  (chro_CHROM.chromosome_chro == location_df_CHROM.chromosome) &\
    chrom_merge = location_df_CHROM.join(chro_CHROM,
                           (chro_CHROM.start_chro <= location_df_CHROM.location) &\
                           (chro_CHROM.end_chro >= location_df_CHROM.location),'right' )    # do right merge. we want locations assigned to variants. we dont want locations if there arent any variants there
    print(f'...Saving chr{CHROM}...',end='')
    
    chrom_merge.write.parquet(f"s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/merged_snps/CHD_snp_locs_{CHROM}.parquet")
    e = time.time() - s
    elapsed=e//60
    print(f'Chr{CHROM} took {elapsed}min')

In [23]:
merged_all = spark.read.parquet('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/merged_snps/*.parquet')


In [24]:
print(merged_all.count())
merged_all.show()

In [25]:
CHD_csq_gene_info.count()




In [26]:
merged_all = merged_all.select(['hgvsg', 'chromosome', 'start', 'end', 'node_id'])
merged_all.count()

In [27]:
merged_all_genes = merged_all.join(CHD_csq_gene_info,on='hgvsg',how='inner')
print(merged_all_genes.count())
merged_all_genes.show()

In [28]:
merged_all_genes.write.parquet('s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/CHD_merged_genes_inner.parquet')

In [29]:
print(merged_all_genes.count())
merged_all_genes.show()

In [30]:
merged_all_genes.write.parquet(f"s3a://kf-strides-variant-parquet-prd/notebooks/5175e6e3-c3d7-4c19-b51f-6f1ea4dd3700/DataDistillery/CHD_merged_genes.parquet")


In [31]:
print(CHD_csq_gene_info.count())
CHD_csq_gene_info.show()