In [1]:
import os

import json
import yaml

import pandas as pd
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.types as t
import pyspark.sql.functions as f

import glow

In [2]:
snakefile_path = os.getcwd() + "/../Snakefile"
snakefile_path

'/data/nasif12/home_if12/hoelzlwi/Projects/ALS/workflow/notebooks/../Snakefile'

In [3]:
# del snakemake

In [4]:
try:
    snakemake
except NameError:
    from snakemk_util import load_rule_args
    
    snakemake = load_rule_args(
        snakefile = snakefile_path,
        rule_name = 'liftover_glow',
        default_wildcards={
#             'ds_dir': 'noCAT_samplefilter_maxol20_privvar'
            'ds_dir': 'gtex_noCAT_samplefilter_maxol20_privvar'
        }
    )

['Prostate', 'Breast_Mammary_Tissue', 'Adipose_Subcutaneous', 'Liver', 'Artery_Coronary', 'Cells_EBV_transformed_lymphocytes', 'Brain_Anterior_cingulate_cortex_BA24', 'Brain_Nucleus_accumbens_basal_ganglia', 'Brain_Substantia_nigra', 'Artery_Aorta', 'Thyroid', 'Adrenal_Gland', 'Heart_Left_Ventricle', 'Spleen', 'Esophagus_Muscularis', 'Esophagus_Gastroesophageal_Junction', 'Brain_Caudate_basal_ganglia', 'Heart_Atrial_Appendage', 'Minor_Salivary_Gland', 'Lung', 'Nerve_Tibial', 'Whole_Blood', 'Testis', 'Brain_Cortex', 'Esophagus_Mucosa', 'Fibroblasts_Prokisch', 'Uterus', 'Muscle_Skeletal', 'Brain_Cerebellar_Hemisphere', 'Small_Intestine_Terminal_Ileum', 'Pancreas', 'Brain_Putamen_basal_ganglia', 'Brain_Spinal_cord_cervical_c_1', 'Pituitary', 'Skin_Not_Sun_Exposed_Suprapubic', 'Brain_Cerebellum', 'Artery_Tibial', 'Brain_Hypothalamus', 'Skin_Sun_Exposed_Lower_leg', 'Vagina', 'Brain_Amygdala', 'Ovary', 'Brain_Frontal_Cortex_BA9', 'Adipose_Visceral_Omentum', 'Colon_Sigmoid', 'Cells_Transforme

In [5]:
print(json.dumps(snakemake.__dict__, indent=2))

{
  "resources": {
    "_cores": 1,
    "_nodes": 1,
    "mem_mb": 40960,
    "threads": 10
  },
  "input": {
    "chain_file": "/s/raw/ucsc/hg38/liftOver/hg38ToHg19.over.chain.gz",
    "input_vcf": "/s/raw/als/kaggle/end-als/genomics-data/AnswerALS_subset_annovar.hg38_anno_and_geno.no_intergenic.vcf.gz",
    "reference_fasta": "/s/project/kaggle-als/GRCh37.p13.genome.fa",
    "reference_fasta_hg38": "/s/genomes/human/hg38/GRCh38.primary_assembly.genome.fa"
  },
  "params": {
    "max_records_in_ram": 50000,
    "chroms": [
      "chr1",
      "chr2",
      "chr3",
      "chr4",
      "chr5",
      "chr6",
      "chr7",
      "chr8",
      "chr9",
      "chr10",
      "chr11",
      "chr12",
      "chr13",
      "chr14",
      "chr15",
      "chr16",
      "chr17",
      "chr18",
      "chr19",
      "chr20",
      "chr21",
      "chr22",
      "chrX"
    ]
  },
  "output": {
    "normalized_vcf_pq": "/s/project/kaggle-als/liftover_vcf/normalized_vcf_hg38.parquet",
    "lifted_vcf": "/

In [6]:
import os

try:
    snakemake
except NameError:
    from snakemk_util import load_rule_args
    snakemake = load_rule_args(
        snakefile = os.getcwd() + '/../Snakefile',
        rule_name = 'liftover_glow',
        root=os.getcwd() + "/..",
    )

In [7]:
MEM = os.popen("ulimit -m").read()
if MEM.startswith("unlimited"):
    print("Memory not constrained, using all available memory...")
    import psutil
    MEM = psutil.virtual_memory().available / 1024
MEM = int(MEM)
N_CPU = int(os.popen("nproc").read())
print("memory: %dk" % MEM)
print("number of cores: %d" % N_CPU)

memory: 1024000000k
number of cores: 256


In [8]:
os.environ['PYSPARK_SUBMIT_ARGS'] = " ".join([
    '--driver-memory %dk' % MEM,
    'pyspark-shell'
])
os.environ['PYSPARK_SUBMIT_ARGS']
MAX_FAILURES=4
spark = (
    SparkSession.builder
    .appName('glow_liftover')
    .config("spark.jars.packages", ",".join([
        "io.projectglow:glow-spark3_2.12:1.0.0",
#         "io.delta:delta-core_2.12:0.8.0",
    ]))
#    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
#    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.local.dir", os.environ.get("TMP"))
    .config("spark.master", f"local[{N_CPU},{MAX_FAILURES}]")
    .config("spark.sql.shuffle.partitions", "2001")
#     .config("spark.sql.execution.arrow.enabled", "true")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
#     .config("spark.sql.execution.useObjectHashAggregateExec", "false")
#     .config("spark.network.timeout", "1800s")
    .config("spark.driver.maxResultSize", "48G")
    .config("spark.default.parallelism", N_CPU / 2)
    .config("spark.files.maxPartitionBytes", 33554432)
#     .config("spark.databricks.io.cache.enabled", "true") # only enable when local storage is actually on local SSD
    .config("spark.task.maxFailures", MAX_FAILURES)
    .getOrCreate()
)
glow.register(spark)
spark

In [9]:
df = (
    spark
    .read
    .option("flattenInfoFields", False)
    .format('vcf')
    .load(snakemake.input['input_vcf'])
)

In [10]:
df = df.drop("attributes")

In [11]:
df.printSchema()

root
 |-- contigName: string (nullable = true)
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAlleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- qual: double (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- genotypes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sampleId: string (nullable = true)
 |    |    |-- conditionalQuality: integer (nullable = true)
 |    |    |-- MQ0: integer (nullable = true)
 |    |    |-- alleleDepths: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- PID: string (nullable = true)
 |    |    |-- phased: boolean (nullable = true)
 |    |    |-- calls: array (nul

In [13]:
df = df.where(
    f.col('contigName').isin(snakemake.params['chroms'])
)

In [14]:
df = glow.transform("split_multiallelics", df)
df = glow.transform("normalize_variants", df, reference_genome_path=snakemake.input['reference_fasta_hg38'])

In [15]:
# sort by chromosome and by location
df = (
   df
   .repartitionByRange(2048, f.col("contigName"), f.col("start"))
   .sortWithinPartitions(["contigName", "start"])
   .persist()
)

In [9]:
OUTPUT_VCF_PQ_PATH = snakemake.output["normalized_vcf_pq"]
OUTPUT_VCF_PQ_PATH

'/s/project/kaggle-als/liftover_vcf/normalized_vcf_hg38.parquet'

In [18]:
(
    df
    .write
    .parquet(OUTPUT_VCF_PQ_PATH, mode="overwrite", partitionBy=["contigName", ])
)

In [10]:
df = (
    spark
    .read
    .format("parquet")
    .load(OUTPUT_VCF_PQ_PATH)
)

In [11]:
df.rdd.getNumPartitions()

174

In [12]:
df.printSchema()

root
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAlleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- qual: double (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- INFO_OLD_MULTIALLELIC: string (nullable = true)
 |-- genotypes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sampleId: string (nullable = true)
 |    |    |-- conditionalQuality: integer (nullable = true)
 |    |    |-- MQ0: integer (nullable = true)
 |    |    |-- alleleDepths: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- PID: string (nullable = true)
 |    |    |-- phased: boolean (nullable = true)
 |    |    |-- calls:

In [13]:
lifted_df = glow.transform('lift_over_variants', df, chain_file=snakemake.input['chain_file'], reference_file=snakemake.input['reference_fasta'])

In [14]:
lifted_df.printSchema()

root
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAlleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- qual: double (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- INFO_OLD_MULTIALLELIC: string (nullable = true)
 |-- genotypes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sampleId: string (nullable = true)
 |    |    |-- conditionalQuality: integer (nullable = true)
 |    |    |-- MQ0: integer (nullable = true)
 |    |    |-- alleleDepths: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- PID: string (nullable = true)
 |    |    |-- phased: boolean (nullable = true)
 |    |    |-- calls:

In [15]:
# lifted_df.write.format("bigvcf").save(snakemake.output['lifted_vcf'])
lifted_df.write.format("vcf").mode("overwrite").save(snakemake.output['lifted_vcf'])