In [1]:
import os
import sys
import subprocess
import numpy as np
import pandas as pd
import pyranges as pr
from io import StringIO


In [2]:
def read_VCF(vcf_file: str, has_phase=False):
    if has_phase:
        fields = "%CHROM\t%POS\t[%GT\t%PS]\n"
        names = ["CHR", "POS", "GT", "PS"]
    else:
        fields = "%CHROM\t%POS\n"
        names = ["CHR", "POS"]
    cmd = ["bcftools", "query", "-f", fields, vcf_file]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    snps = pd.read_csv(StringIO(result.stdout), sep="\t", header=None, names=names)
    if has_phase:
        # Drop entries without phasing output
        snps = snps[(snps["GT"] != ".") & (snps["PS"] != ".")]
        snps["GT"] = snps["GT"].apply(func=lambda v: v[0])  # USEREF
    assert not snps.duplicated().any(), f"{vcf_file} has duplicated rows"
    assert not snps.duplicated(subset=["CHR", "POS"]).any(), (
        f"{vcf_file} has duplicated rows"
    )
    return snps

# Convert to PyRanges
def to_pyranges(df, chrom_col="CHR", start_col="POS", name_col=None):
    pr_df = df[[chrom_col, start_col]].copy()
    pr_df.columns = ["Chromosome", "Start"]
    pr_df["End"] = pr_df["Start"] + 1
    if name_col:
        pr_df[name_col] = df[name_col].values
    return pr.PyRanges(pr_df)

def feature_to_pyranges(feature_df):
    pr_df = feature_df[["#CHR", "START", "END", "feature_id"]].copy()
    pr_df.columns = ["Chromosome", "Start", "End", "feature_id"]
    return pr.PyRanges(pr_df)


In [3]:
prep_dir="../datasets/HT934/trial1/preprocess/"
atac_dir="../datasets/HT934/07052025/ATAC01/"
gex_dir="../datasets/HT934/07052025/GEX01/"

feature_file = os.path.join(prep_dir, "features.tsv.gz")
atac_vcf = os.path.join(atac_dir, "cellSNP.base.vcf.gz")
atac_tmat = os.path.join(atac_dir, "cellSNP.tag.DP.mtx")
# atac_mtx: csr_matrix = mmread(atac_tmat).tocsr()

gex_vcf = os.path.join(gex_dir, "cellSNP.base.vcf.gz")
gex_tmat = os.path.join(gex_dir, "cellSNP.tag.DP.mtx")
# gex_mtx: csr_matrix = mmread(gex_tmat).tocsr()


In [10]:
features = pd.read_csv(feature_file, sep="\t")
features = features[["#CHR", "START", "END", "feature_id", "feature_type", "segID"]]
print(features[:5])
genes = features.loc[features["feature_type"] == "Gene Expression"].copy()
peaks = features.loc[features["feature_type"] == "Peaks"].copy()

atac_snps = read_VCF(atac_vcf)
atac_snps["POS"] -= 1
gex_snps = read_VCF(gex_vcf)
gex_snps["POS"] -= 1

gex_snp_ranges = to_pyranges(gex_snps)
atac_snp_ranges = to_pyranges(atac_snps)
gene_ranges = feature_to_pyranges(genes)
peak_ranges = feature_to_pyranges(peaks)

# Join to find overlaps
gene_overlap = gex_snp_ranges.join(gene_ranges)
peak_overlap = atac_snp_ranges.join(peak_ranges)

# You can merge results back to original SNPs if needed:
gex_snps["feature_id"] = pd.NA
atac_snps["feature_id"] = pd.NA

if not gene_overlap.df.empty:
    gex_snps.loc[gene_overlap.df.index, "feature_id"] = gene_overlap.df["feature_id"].values

if not peak_overlap.df.empty:
    atac_snps.loc[peak_overlap.df.index, "feature_id"] = peak_overlap.df["feature_id"].values

print("Mapped GEX SNPs:", gex_snps["feature_id"].notna().sum())
print(f"total GEX SNPs: {len(gex_snps)}")
print("Mapped ATAC SNPs:", atac_snps["feature_id"].notna().sum())
print(f"total ATAC SNPs: {len(atac_snps)}")



   #CHR   START     END    feature_id     feature_type  segID
0  chr1   13581  138480  LOC127239154  Gene Expression      0
1  chr1   23709   23710  LOC124905685  Gene Expression      0
2  chr1   54611   54612  LOC101928626  Gene Expression      0
3  chr1  112876  112877        OR4F16  Gene Expression      0
4  chr1  205170  205171  LOC100288069  Gene Expression      0
Mapped GEX SNPs: 52299
total GEX SNPs: 304851
Mapped ATAC SNPs: 76551
total ATAC SNPs: 1868231
