
###Date of last update: 1-26-26
This will be used
Establish the genomic background model to correct for relatedness, to be used for testing later.

In [None]:
# Initialize Hail with GRCh38

import hail as hl
import pandas as pd
import os

hl.init(default_reference='GRCh38')

In [None]:
# 2. Define Cloud Paths 
bucket = os.getenv("WORKSPACE_BUCKET")
# Path to whole genome sequence data stored in hail matrix table format
WGS_MT_PATH = "gs://fc-aou-datasets-controlled/v8/wgs/short_read/snpindel/acaf_threshold/splitMT/hail.mt"
# Path to GWAS results stored in hail table format (allele frequency and allele count)
GWAS_RESULTS_PATH = "gs://fc-aou-datasets-controlled/AllxAll/v1/ht/ACAF/EUR/phenotype_NS_326.1_ACAF_results.ht"

# Ancestry prediction file found here: gs://fc-aou-datasets-controlled/v8/wgs/short_read/snpindel/aux/ancestry/ancestry_preds.tsv

In [None]:
# Filter genotype data to European ancestry samples
# 3. Process data for European ancestry samples
print("Loading EUR sample IDs...")
!gsutil -u $GOOGLE_PROJECT cp gs://fc-aou-datasets-controlled/v8/wgs/short_read/snpindel/aux/ancestry/ancestry_preds.tsv ./ancestry_temp.tsv
ancestry_df = pd.read_csv("./ancestry_temp.tsv", sep="\t")
eur_ids = set(ancestry_df[ancestry_df['ancestry_pred'] == 'eur']['research_id'].astype(str))
!rm ./ancestry_temp.tsv


# 4. Filter Matrix Table to EUR Samples
wgs_mt = hl.read_matrix_table(WGS_MT_PATH)
wgs_eur_mt = wgs_mt.filter_cols(hl.literal(eur_ids).contains(wgs_mt.s))

In [None]:
# 5. Filter Matrix Table to ~500K Common SNPs (Crash-Proof Version)
import random
import hail as hl

# Get reference
rg = hl.get_reference('GRCh38')
lengths = rg.lengths

# Generate 4,000 intervals
print("Generating random genomic windows...")
n_intervals = 4000
window_size = 50000
chroms = [f"chr{i}" for i in range(1, 23)]

hl_intervals = []
for _ in range(n_intervals):
    chrom = random.choice(chroms)
    chrom_len = lengths[chrom]
    start = random.randint(1, chrom_len - window_size)
    interval = hl.locus_interval(chrom, start, start + window_size, reference_genome='GRCh38')
    hl_intervals.append(interval)

# Convert to Table
print("Converting intervals to Table...")
interval_structs = [{'interval': i} for i in hl_intervals]
interval_ht = hl.Table.parallelize(
    interval_structs, 
    schema=hl.tstruct(interval=hl.tinterval(hl.tlocus('GRCh38')))
).key_by('interval')

# Filter -> Select -> Repartition
print("Filtering intervals and dropping unused fields...")
mt_fast_subset = wgs_eur_mt.select_entries('GT')
mt_fast_subset = mt_fast_subset.filter_rows(hl.is_defined(interval_ht[mt_fast_subset.locus]))

# CRITICAL: Repartition to 1000 to reduce overhead
print("Repartitioning...")
mt_fast_subset = mt_fast_subset.naive_coalesce(1000) 

# QC
print("Calculating QC...")
mt_fast_subset = hl.variant_qc(mt_fast_subset)

# Filter for common SNPs
common_snps_mt = mt_fast_subset.filter_rows(
    (mt_fast_subset.variant_qc.AF[1] > 0.05) & 
    (mt_fast_subset.variant_qc.AF[1] < 0.95) &
    (mt_fast_subset.variant_qc.call_rate > 0.95)
)

# STABILITY FIX: Write an intermediate checkpoint to DISK instead of memory
# This breaks the lineage and prevents the driver from getting overwhelmed
print("Checkpointing intermediate result to disk...")
checkpoint_path = f"{bucket}/tmp/intermediate_common_snps.mt"
common_snps_mt = common_snps_mt.write(checkpoint_path, overwrite=True)
common_snps_mt = hl.read_matrix_table(checkpoint_path)

# Now it is safe to count because the data is fully written to disk
n_total = common_snps_mt.count_rows()
print(f"Variants found: {n_total}")

# Sample
sampling_fraction = 500000 / n_total if n_total > 500000 else 1.0
mt_final = common_snps_mt.sample_rows(sampling_fraction, seed=42)

# Final Save
print("Saving final dataset...")
mt_final_path = f"{bucket}/data/eur_common_snps_500k.mt"
mt_final.write(mt_final_path, overwrite=True)
print(f"Success! Final MatrixTable saved to: {mt_final_path}")

In [None]:
# # 5. Filter Matrix Table to Common SNPs using random 50kb windows (n=1,000) across the genome
# import random
# import hail as hl

# # Get exact chromosome lengths for GRCh38 to avoid "out of bounds" errors
# rg = hl.get_reference('GRCh38')
# lengths = rg.lengths

# # Generate 1,000 valid random 50kb intervals
# print("Generating random genomic windows based on actual chromosome lengths...")
# n_intervals = 1000
# window_size = 50000
# # We only want autosomes for the Null Model (Kinship)
# chroms = [f"chr{i}" for i in range(1, 23)]

# hl_intervals = []
# for _ in range(n_intervals):
#     chrom = random.choice(chroms)
#     chrom_len = lengths[chrom]
#     # Ensure the window doesn't go past the end of the chromosome
#     start = random.randint(1, chrom_len - window_size)
#     interval = hl.locus_interval(chrom, start, start + window_size, reference_genome='GRCh38')
#     hl_intervals.append(interval)

# # Filter the genotype data to the random 50kb windows
# # Use 30 workers on dataproc cluster $$$
# print("Applying fast interval filter...")
# mt_fast_subset = hl.filter_intervals(wgs_eur_mt, hl_intervals)

# # Perform QC and filter for Common SNPs (0.05 < MAF < 0.95 & Call Rate > 0.95)
# print("Calculating QC on subset...")
# mt_fast_subset = hl.variant_qc(mt_fast_subset)
# common_snps_mt = mt_fast_subset.filter_rows(
#     (mt_fast_subset.variant_qc.AF[1] > 0.05) & 
#     (mt_fast_subset.variant_qc.AF[1] < 0.95) &
#     (mt_fast_subset.variant_qc.call_rate > 0.95)
# )

# # 5. Checkpoint to break the lineage
# common_snps_mt = common_snps_mt.checkpoint(f"{bucket}/tmp/fast_common_snps_v2.mt", overwrite=True)

# # 6. Sample down to exactly 150k for Regenie
# n_total = common_snps_mt.count_rows()
# print(f"Variants found: {n_total}")
# sampling_fraction = 150000 / n_total if n_total > 150000 else 1.0
# mt_final = common_snps_mt.sample_rows(sampling_fraction, seed=42)