In [None]:
## import packages
from datetime import datetime
import os
import pandas as pd
import numpy as np
import random
from itertools import chain
import hail as hl
from hail.linalg import BlockMatrix
import statsmodels.api as sm

In [3]:
# set up hail
hl.init(default_reference = "GRCh38")


Using hl.init with a default_reference argument is deprecated. To set a default reference genome after initializing hail, call `hl.default_reference` with an argument to set the default reference genome.


Reading spark-defaults.conf to determine GCS requester pays configuration. This is deprecated. Please use `hailctl config set gcs_requester_pays/project` and `hailctl config set gcs_requester_pays/buckets`.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Running on Apache Spark version 3.3.0
SparkUI available at http://all-of-us-11150-m.us-central1-b.c.terra-vpc-sc-fd39b54c.internal:43339
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.130.post1-c69cd67afb8b
LOGGING: writing to /home/jupyter/workspaces/prswithwgsvsarraydata/hail-20241119-1146-0.2.130.post1-c69cd67afb8b.log


## 

In [4]:
# read matrix table (from Supp_WGS_Ancestry_QC)
mt_wgs_afr = hl.read_matrix_table(f"{bucket}/WGSData/WGS_GT_HDL_Ancestry_AFR_QCed.mt")
mt_wgs_afr = mt_wgs_afr.key_rows_by("locus")

In [7]:
## read Sample_quant 
Sample_quant = hl.import_table(f"{bucket}/Pheno/quant_all.tsv", 
                         missing='',
                        impute=True,
                        types = {"person_id": "str"})
Sample_quant = Sample_quant.key_by("person_id")

2024-11-19 11:18:05.176 Hail: INFO: Reading table to impute column types
2024-11-19 11:18:09.895 Hail: INFO: Loading <StructExpression of type struct{person_id: str, Age: int32, is_sex_Male: int32, is_sex_Female: int32, is_White: int32, is_Black_or_African_American: int32, is_Native_Hawaiian_or_Other_Pacific_Islander: int32, is_Asian: int32, is_Middle_Eastern_or_North_African: int32, is_gender_Male: int32, is_gender_Female: int32, is_Hispanic: int32, is_anc_pred_eur: int32, is_anc_pred_amr: int32, is_anc_pred_afr: int32, is_anc_pred_sas: int32, is_anc_pred_eas: int32, is_anc_pred_mid: int32, PC1: float64, PC2: float64, PC3: float64, PC4: float64, PC5: float64, PC6: float64, PC7: float64, PC8: float64, PC9: float64, PC10: float64, PC11: float64, PC12: float64, PC13: float64, PC14: float64, PC15: float64, PC16: float64, BMI: float64, Height: float64, DBP: float64, SBP: float64, HbA1c: float64, leukocyte: float64, Lymphocyte: float64, RBC: float64, Neutrophil: float64, Hemoglobin_concentr

In [8]:
mt_wgs_quant = mt_wgs_afr.semi_join_cols(Sample_quant)
mt_wgs_quant = mt_wgs_quant.annotate_cols(**Sample_quant[mt_wgs_quant.s])

In [9]:
def SumStats_Wrangle(sumstats_file, phenotype):
    
    ## read hail table; change key
    sumstats = hl.import_table(sumstats_file,
                 types={"is_negative_strand": "bool",
                        "beta_meta": "float",
                        "beta_meta_fix_ref_alt": "float",
                        "se_meta": "float",
                        "neglog10_pval_meta": "float",
                        "pval_meta": "float"},
                        missing="") # Treat empty strings as missing
    
    sumstats = sumstats.annotate(locus = hl.parse_locus(sumstats.locus, reference_genome='GRCh38'))
    sumstats = sumstats.key_by("locus")
    
    ## add beta with different thresholds
    sumstats = sumstats.annotate(beta_thresh1 =  sumstats.beta_meta_fix_ref_alt)
    sumstats = sumstats.annotate(beta_thresh2 =  hl.if_else(sumstats.pval_meta < 0.5,  sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh3 =  hl.if_else(sumstats.pval_meta < 1e-1, sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh4 =  hl.if_else(sumstats.pval_meta < 1e-2, sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh5 =  hl.if_else(sumstats.pval_meta < 1e-3, sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh6 =  hl.if_else(sumstats.pval_meta < 1e-4, sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh7 =  hl.if_else(sumstats.pval_meta < 1e-5, sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh8 =  hl.if_else(sumstats.pval_meta < 1e-6, sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh9 =  hl.if_else(sumstats.pval_meta < 1e-7, sumstats.beta_meta_fix_ref_alt, 0))
    sumstats = sumstats.annotate(beta_thresh10 = hl.if_else(sumstats.pval_meta < 5e-8, sumstats.beta_meta_fix_ref_alt, 0))
    return(sumstats)

In [14]:
HDL_sumstats = SumStats_Wrangle(f'{bucket}/Sumstats_clumped/WGS_HDL_afr_QCed_clumps.tsv', "quant") 
# check point
HDL_sumstats = HDL_sumstats.checkpoint(f"{bucket}/hail_checkpoints/HDL_clumps_sumstats_afr_checkpoint.ht", overwrite=True) 

2024-11-18 21:42:24.123 Hail: INFO: Reading table without type imputation
  Loading field 'locus' as type str (not specified)
  Loading field 'rsid' as type str (not specified)
  Loading field 'alleles1_wgs' as type str (not specified)
  Loading field 'alleles2_wgs' as type str (not specified)
  Loading field 'alleles1_sumstats_original' as type str (not specified)
  Loading field 'alleles2_sumstats_original' as type str (not specified)
  Loading field 'is_negative_strand' as type bool (user-supplied)
  Loading field 'alleles1_sumstats_fixstrand' as type str (not specified)
  Loading field 'alleles2_sumstats_fixstrand' as type str (not specified)
  Loading field 'beta_meta' as type float64 (user-supplied)
  Loading field 'beta_meta_fix_ref_alt' as type float64 (user-supplied)
  Loading field 'se_meta' as type float64 (user-supplied)
  Loading field 'neglog10_pval_meta' as type float64 (user-supplied)
  Loading field 'varid' as type str (not specified)
  Loading field 'pval_meta' as typ

In [11]:
# read checkpoint
HDL_sumstats = hl.read_table(f"{bucket}/hail_checkpoints/HDL_clumps_sumstats_afr_checkpoint.ht") 

In [18]:
mt_wgs_quant = mt_wgs_quant.annotate_rows(HDL_sumstats = HDL_sumstats[mt_wgs_quant.locus])
mt_wgs_quant = mt_wgs_quant.checkpoint(f"{bucket}/hail_checkpoints/mt_wgs_quant_checkpoint2.mt", overwrite=True)

2024-11-19 11:22:32.549 Hail: INFO: wrote table with 382342 rows in 1 partition to /tmp/__iruid_6005-AuIbzN5AVxKhokyrc9GJNJ
2024-11-19 11:43:37.188 Hail: INFO: wrote matrix table with 382342 rows and 20652 columns in 140126 partitions to gs://fc-secure-9afe7562-2fad-4781-ab60-03528a626c19/hail_checkpoints/mt_wgs_quant_checkpoint2.mt


## Start from here after writing the checkpoint MT

In [4]:
mt_wgs_quant = hl.read_matrix_table(f"{bucket}/hail_checkpoints/mt_wgs_quant_checkpoint2.mt")

In [6]:
mt_wgs_quant = mt_wgs_quant.repartition(1000) ### this seems to dramatically speed up the pipeline

In [8]:
pgs_fields = [f"beta_thresh{i}" for i in range(1, 11)]

mt_wgs_quant = mt_wgs_quant.annotate_cols(
    # HDL
    HDL_pgs = hl.struct(**{
        f"pgs{i}": hl.agg.sum(mt_wgs_quant.HDL_sumstats[pgs_fields[i-1]] * mt_wgs_quant.GT)
        for i in range(1, 11)
    })
)

In [10]:
def export_Scores(mt, pheno, mt_type, method, ancestry):
    export_filename = f"{bucket}/Scores/" + mt_type + "/" + pheno + "_" + method + "_" + ancestry + ".bgz"
    sample_info = mt.cols().select(
        "Age", 'is_sex_Male', 'is_sex_Female', 
        "PC1", "PC2", "PC3", "PC4", "PC5", "PC6", "PC7", "PC8", 
        "PC9", "PC10", "PC11", "PC12", "PC13", "PC14", "PC15", "PC16", 
        pheno + "_pgs", pheno)
    sample_info = sample_info.annotate(**sample_info[pheno + "_pgs"])
    sample_info = sample_info.drop(pheno + "_pgs")
    sample_info.export(export_filename)

In [11]:
###### 
print("HDL " + str(datetime.now()))
export_Scores(mt_wgs_quant, "HDL", "WGS", "clump", "AFR")

HDL 2024-11-19 11:46:33.454331


2024-11-19 11:46:33.455 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'
2024-11-19 11:51:51.626 Hail: INFO: Coerced sorted dataset       (19 + 12) / 31]
2024-11-19 11:51:53.662 Hail: INFO: merging 137 files totalling 2.3M...3) / 136]
2024-11-19 11:51:54.268 Hail: INFO: while writing:
    gs://fc-secure-9afe7562-2fad-4781-ab60-03528a626c19/Scores/WGS/HDL_clump_AFR.bgz
  merge time: 604.346ms


In [None]:
!gsutil mv f'{bucket}/Scores/WGS/HDL_clump_AFR.bgz' f'{bucket}/Scores/WGS/HDL_clump_AFR.gz'