In [1]:
!ls -lh /content/CHOP


total 596K
-rw-r--r-- 1 root root  117 Jan 15 16:43 CDL-068-99.ped
-rw-r--r-- 1 root root 558K Jan 15 16:43 CDL-068-99.vcf.gz
-rw-r--r-- 1 root root  31K Jan 15 16:43 CDL-068-99.vcf.gz.tbi


In [2]:
with open("/content/CHOP/CDL-068-99.ped") as f:
    for i in range(5):
        print(f.readline())


CDL-068-99	CDL-068-99M	P1	P2	2	1

CDL-068-99	CDL-068-99F	P1	P2	1	1

CDL-068-99	CDL-068-99P	CDL-068-99F	CDL-068-99M	2	2





In [3]:
with open("/content/CHOP/CDL-068-99.ped") as f:
    print(f.readline())


CDL-068-99	CDL-068-99M	P1	P2	2	1



In [4]:
import pandas as pd

ped_path = "/content/CHOP/CDL-068-99.ped"

ped = pd.read_csv(
    ped_path,
    sep=r"\s+",
    header=None
)

ped.head()


Unnamed: 0,0,1,2,3,4,5
0,CDL-068-99,CDL-068-99M,P1,P2,2,1
1,CDL-068-99,CDL-068-99F,P1,P2,1,1
2,CDL-068-99,CDL-068-99P,CDL-068-99F,CDL-068-99M,2,2


In [5]:
base_cols = ["FID", "IID", "PID", "MID", "Sex", "Phenotype"]

num_snps = (ped.shape[1] - 6) // 2
snp_cols = []

for i in range(num_snps):
    snp_cols.extend([f"SNP{i+1}_A", f"SNP{i+1}_B"])

ped.columns = base_cols + snp_cols

ped.head()


Unnamed: 0,FID,IID,PID,MID,Sex,Phenotype
0,CDL-068-99,CDL-068-99M,P1,P2,2,1
1,CDL-068-99,CDL-068-99F,P1,P2,1,1
2,CDL-068-99,CDL-068-99P,CDL-068-99F,CDL-068-99M,2,2


In [11]:
family = ped6["FID"].iloc[0]
fam = ped6[ped6["FID"] == family].copy()

# proband: phenotype==2 (common convention)
proband_row = fam[fam["Phenotype"] == 2].iloc[0]
proband = proband_row["IID"]

# parents: from proband PID/MID (best), else infer by sex labels
father = proband_row["PID"]
mother = proband_row["MID"]

print("Family:", family)
print("Proband:", proband)
print("Father:", father)
print("Mother:", mother)


Family: CDL-068-99
Proband: CDL-068-99P
Father: CDL-068-99F
Mother: CDL-068-99M


In [12]:
from cyvcf2 import VCF

vcf_path = vcfs[0]  # first VCF found
vcf = VCF(vcf_path)

print("VCF samples:", vcf.samples)


VCF samples: ['CDL-068-99M', 'CDL-068-99F', 'CDL-068-99P']


In [13]:
trio = [father, mother, proband]
missing = [s for s in trio if s not in vcf.samples]
if missing:
    raise ValueError(f"These PED IDs were not found in VCF samples: {missing}\nCheck naming mismatches.")
else:
    print("✅ Trio sample IDs match the VCF.")


✅ Trio sample IDs match the VCF.


In [27]:
# Step 1: Define where your files live in Colab
BASE = "/content/CHOP"

# Edit these 2 filenames to match what you uploaded
VCF_GZ = f"{BASE}/CDL-068-99.vcf.gz"
PED    = f"{BASE}/CDL-068-99.ped"

# Output prefix (will create 2 TSV files)
OUT_PREFIX = f"{BASE}/CDL-068-99"
print("VCF:", VCF_GZ)
print("PED:", PED)
print("OUT:", OUT_PREFIX)


VCF: /content/CHOP/CDL-068-99.vcf.gz
PED: /content/CHOP/CDL-068-99.ped
OUT: /content/CHOP/CDL-068-99


In [28]:
# Step 2: Imports (no pysam needed)
import gzip
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple


In [29]:
# Step 3: Parse PED and extract (mother_id, father_id, proband_id)

def parse_ped(ped_path: str) -> Tuple[str, str, str]:
    rows = []
    with open(ped_path, "rt") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            parts = line.split()
            if len(parts) < 6:
                raise ValueError(f"PED line has <6 columns: {line}")
            fid, iid, father, mother, sex, pheno = parts[:6]
            rows.append((fid, iid, father, mother, sex, pheno))

    affected = [r for r in rows if r[5] == "2"]
    if len(affected) != 1:
        raise ValueError(f"Expected exactly 1 affected proband (PHENO=2); found {len(affected)}")

    _, proband_id, father_id, mother_id, _, _ = affected[0]
    return mother_id, father_id, proband_id

mother_id, father_id, proband_id = parse_ped(PED)
print("Mother :", mother_id)
print("Father :", father_id)
print("Proband:", proband_id)


Mother : CDL-068-99M
Father : CDL-068-99F
Proband: CDL-068-99P


In [30]:
# Step 4: Read VCF samples

def read_vcf_samples(vcf_gz_path: str) -> List[str]:
    with gzip.open(vcf_gz_path, "rt") as f:
        for line in f:
            if line.startswith("#CHROM"):
                parts = line.rstrip("\n").split("\t")
                return parts[9:]
    raise ValueError("Could not find #CHROM header line in VCF")

samples = read_vcf_samples(VCF_GZ)
print("Num samples in VCF:", len(samples))
print("First few samples:", samples[:5])

# Confirm trio exists in VCF
for s in (mother_id, father_id, proband_id):
    if s not in samples:
        raise ValueError(f"Sample '{s}' from PED not found in VCF header!")

i_m = samples.index(mother_id)
i_f = samples.index(father_id)
i_p = samples.index(proband_id)
print("Indices (mother,father,proband):", i_m, i_f, i_p)


Num samples in VCF: 3
First few samples: ['CDL-068-99M', 'CDL-068-99F', 'CDL-068-99P']
Indices (mother,father,proband): 0 1 2


In [31]:
# Step 5: Parsing helpers

def parse_info(info_str: str) -> Dict[str, str]:
    out = {}
    if not info_str or info_str == ".":
        return out
    for entry in info_str.split(";"):
        if "=" in entry:
            k, v = entry.split("=", 1)
            out[k] = v
        else:
            out[entry] = "True"
    return out

def to_int(x: Optional[str]) -> Optional[int]:
    if x is None:
        return None
    try:
        return int(x)
    except Exception:
        return None

def to_float(x: Optional[str]) -> Optional[float]:
    if x is None:
        return None
    try:
        return float(x)
    except Exception:
        return None

def parse_sample(fmt_keys: List[str], sample_str: str) -> Dict[str, Optional[str]]:
    vals = sample_str.split(":")
    d = {}
    for i, k in enumerate(fmt_keys):
        v = vals[i] if i < len(vals) else None
        if v in (".", "./.", ".|.", ""):
            v = None
        d[k] = v
    return d

def gt_tuple(gt: Optional[str]) -> Optional[Tuple[str, str]]:
    if gt is None:
        return None
    gt = gt.replace("|", "/")
    if gt in (".", "./."):
        return None
    a = gt.split("/")
    if len(a) != 2:
        return None
    return (a[0], a[1])

def is_hom_ref(gt: Optional[str]) -> bool:
    return gt_tuple(gt) == ("0", "0")

def is_het(gt: Optional[str]) -> bool:
    return gt_tuple(gt) in (("0", "1"), ("1", "0"))

def parse_ad(ad: Optional[str]) -> Optional[List[int]]:
    if ad is None:
        return None
    try:
        return [int(x) for x in ad.split(",")]
    except Exception:
        return None

def vaf_from_ad(ad: Optional[List[int]]) -> Optional[float]:
    if not ad or len(ad) < 2:
        return None
    ref, alt = ad[0], ad[1]
    tot = ref + alt
    if tot == 0:
        return None
    return alt / tot


In [32]:
# Step 6: Variant container + streaming iterator

@dataclass
class Variant:
    chrom: str
    pos: int
    ref: str
    alt: str
    qual: Optional[float]
    flt: str
    info: Dict[str, str]
    mother: Dict[str, Optional[str]]
    father: Dict[str, Optional[str]]
    proband: Dict[str, Optional[str]]

    def is_snv(self) -> bool:
        return len(self.ref) == 1 and all(len(a) == 1 for a in self.alt.split(","))

def iter_variants(vcf_gz_path: str):
    with gzip.open(vcf_gz_path, "rt") as f:
        for line in f:
            if line.startswith("#"):
                continue
            parts = line.rstrip("\n").split("\t")
            chrom, pos, _id, ref, alt, qual, flt, info_str, fmt = parts[:9]
            fmt_keys = fmt.split(":")
            sm = parts[9:]

            m  = parse_sample(fmt_keys, sm[i_m])
            fa = parse_sample(fmt_keys, sm[i_f])
            pr = parse_sample(fmt_keys, sm[i_p])

            yield Variant(
                chrom=chrom,
                pos=int(pos),
                ref=ref,
                alt=alt,
                qual=None if qual == "." else float(qual),
                flt=flt,
                info=parse_info(info_str),
                mother=m,
                father=fa,
                proband=pr,
            )


In [33]:
# Step 7: Strict de novo logic

def is_strict_denovo(v: Variant) -> bool:
    return (
        is_hom_ref(v.mother.get("GT")) and
        is_hom_ref(v.father.get("GT")) and
        is_het(v.proband.get("GT"))
    )


In [34]:
# Step 8: Quality filters

def passes_quality_filters(
    v: Variant,
    *,
    require_pass: bool = True,
    min_qual: float = 30.0,
    min_dp: int = 15,
    min_gq: int = 30,
    min_alt_reads: int = 8,
    min_vaf: float = 0.35,
    max_vaf: float = 0.65,
    max_parent_alt_reads: int = 0,
    max_parent_vaf: float = 0.01,
    require_vqslod_gt: Optional[float] = 0.0,   # set None to disable
    apply_gatk_like_info_filters: bool = True,
) -> bool:

    # 1) FILTER / QUAL
    if require_pass and v.flt not in ("PASS", "."):
        return False
    if v.qual is None or v.qual < min_qual:
        return False

    # 2) Optional VQSLOD gate
    if require_vqslod_gt is not None:
        vq = to_float(v.info.get("VQSLOD"))
        if vq is not None and vq <= require_vqslod_gt:
            return False

    # 3) Optional INFO hard filters (only if the tag exists)
    if apply_gatk_like_info_filters:
        qd = to_float(v.info.get("QD"))
        if qd is not None and qd < 2.0:
            return False
        fs = to_float(v.info.get("FS"))
        if fs is not None and fs > 60.0:
            return False
        sor = to_float(v.info.get("SOR"))
        if sor is not None and sor > 3.0:
            return False
        mq = to_float(v.info.get("MQ"))
        if mq is not None and mq < 40.0:
            return False
        rprs = to_float(v.info.get("ReadPosRankSum"))
        if rprs is not None and rprs < -8.0:
            return False
        mqrs = to_float(v.info.get("MQRankSum"))
        if mqrs is not None and mqrs < -12.5:
            return False

    # 4) FORMAT DP/GQ in all trio members
    for s in (v.proband, v.mother, v.father):
        dp = to_int(s.get("DP"))
        gq = to_int(s.get("GQ"))
        if dp is None or dp < min_dp:
            return False
        if gq is None or gq < min_gq:
            return False

    # 5) Proband allele balance + alt support
    ad_p = parse_ad(v.proband.get("AD"))
    if ad_p is None:
        return False
    vaf_p = vaf_from_ad(ad_p)
    if vaf_p is None:
        return False
    if ad_p[1] < min_alt_reads:
        return False
    if not (min_vaf <= vaf_p <= max_vaf):
        return False

    # 6) Parents should have ~no alt evidence
    for parent in (v.mother, v.father):
        ad = parse_ad(parent.get("AD"))
        if ad is None:
            return False
        vaf = vaf_from_ad(ad)
        alt_reads = ad[1] if len(ad) > 1 else 0

        if alt_reads > max_parent_alt_reads:
            return False
        if vaf is not None and vaf > max_parent_vaf:
            return False

    return True


In [35]:
# Step 9: TSV writer

def write_tsv(path: str, variants: List[Variant]) -> None:
    header = [
        "CHROM", "POS", "REF", "ALT", "TYPE", "QUAL", "FILTER",
        "VQSLOD", "QD", "FS", "SOR", "MQ", "MQRankSum", "ReadPosRankSum",
        "MOTHER_GT", "MOTHER_DP", "MOTHER_GQ", "MOTHER_AD", "MOTHER_VAF",
        "FATHER_GT", "FATHER_DP", "FATHER_GQ", "FATHER_AD", "FATHER_VAF",
        "PROBAND_GT", "PROBAND_DP", "PROBAND_GQ", "PROBAND_AD", "PROBAND_VAF",
    ]
    with open(path, "wt") as out:
        out.write("\t".join(header) + "\n")

        for v in variants:
            def svals(s: Dict[str, Optional[str]]):
                ad = parse_ad(s.get("AD"))
                return (
                    s.get("GT"),
                    to_int(s.get("DP")),
                    to_int(s.get("GQ")),
                    s.get("AD"),
                    vaf_from_ad(ad),
                )

            mgt, mdp, mgq, mad, mvaf = svals(v.mother)
            fgt, fdp, fgq, fad, fvaf = svals(v.father)
            pgt, pdp, pgq, pad, pvaf = svals(v.proband)

            row = [
                v.chrom, str(v.pos), v.ref, v.alt,
                "SNV" if v.is_snv() else "INDEL",
                "." if v.qual is None else f"{v.qual:.2f}",
                v.flt,
                v.info.get("VQSLOD", "."),
                v.info.get("QD", "."),
                v.info.get("FS", "."),
                v.info.get("SOR", "."),
                v.info.get("MQ", "."),
                v.info.get("MQRankSum", "."),
                v.info.get("ReadPosRankSum", "."),
                mgt or ".", str(mdp or "."), str(mgq or "."), mad or ".", f"{mvaf:.4f}" if mvaf is not None else ".",
                fgt or ".", str(fdp or "."), str(fgq or "."), fad or ".", f"{fvaf:.4f}" if fvaf is not None else ".",
                pgt or ".", str(pdp or "."), str(pgq or "."), pad or ".", f"{pvaf:.4f}" if pvaf is not None else ".",
            ]
            out.write("\t".join(row) + "\n")


In [36]:
# Step 10: Run calling

strict = []
filtered = []

# ---- Customize thresholds here if you want ----
PARAMS = dict(
    require_pass=True,        # require FILTER=PASS (or .)
    min_qual=30.0,            # variant QUAL
    min_dp=15,                # DP in each trio member
    min_gq=30,                # GQ in each trio member
    min_alt_reads=8,          # proband alt reads
    min_vaf=0.35,             # proband VAF lower
    max_vaf=0.65,             # proband VAF upper
    max_parent_alt_reads=0,   # parents alt reads allowed
    max_parent_vaf=0.01,      # parents vaf allowed
    require_vqslod_gt=0.0,    # set to None to disable
    apply_gatk_like_info_filters=True
)

for v in iter_variants(VCF_GZ):
    if not is_strict_denovo(v):
        continue
    strict.append(v)

    if passes_quality_filters(v, **PARAMS):
        filtered.append(v)

strict_path = f"{OUT_PREFIX}.strict_denovo.tsv"
filt_path   = f"{OUT_PREFIX}.filtered_denovo.tsv"

write_tsv(strict_path, strict)
write_tsv(filt_path, filtered)

snv = sum(v.is_snv() for v in filtered)
indel = len(filtered) - snv

print(f"Trio: mother={mother_id}, father={father_id}, proband={proband_id}")
print("Strict de novo candidates:", len(strict))
print(f"Filtered candidates: {len(filtered)} (SNV={snv}, INDEL={indel})")
print("Wrote:", strict_path)
print("Wrote:", filt_path)


Trio: mother=CDL-068-99M, father=CDL-068-99F, proband=CDL-068-99P
Strict de novo candidates: 343
Filtered candidates: 88 (SNV=80, INDEL=8)
Wrote: /content/CHOP/CDL-068-99.strict_denovo.tsv
Wrote: /content/CHOP/CDL-068-99.filtered_denovo.tsv


In [37]:
# Step 11: Load and preview results
import pandas as pd

df = pd.read_csv(filt_path, sep="\t")
df.head(20)


Unnamed: 0,CHROM,POS,REF,ALT,TYPE,QUAL,FILTER,VQSLOD,QD,FS,...,FATHER_GT,FATHER_DP,FATHER_GQ,FATHER_AD,FATHER_VAF,PROBAND_GT,PROBAND_DP,PROBAND_GQ,PROBAND_AD,PROBAND_VAF
0,chr1,9999315,G,A,SNV,464.03,PASS,16.33,12.21,4.815,...,0/0,41,99,410,0.0,0/1,38,99,1919,0.5
1,chr1,90679453,T,C,SNV,500.03,PASS,16.68,13.16,3.053,...,0/0,35,99,350,0.0,0/1,38,99,1919,0.5
2,chr1,157742727,C,A,SNV,610.03,PASS,15.4,12.98,2.488,...,0/0,24,58,240,0.0,0/1,47,99,2423,0.4894
3,chr1,184521693,T,C,SNV,587.03,PASS,16.41,12.49,2.552,...,0/0,30,87,300,0.0,0/1,47,99,2423,0.4894
4,chr1,229575608,A,C,SNV,701.03,PASS,15.03,12.75,1.026,...,0/0,30,87,300,0.0,0/1,55,99,2728,0.5091
5,chr1,239408722,G,C,SNV,340.03,PASS,15.71,15.46,0.0,...,0/0,38,99,380,0.0,0/1,22,99,913,0.5909
6,chr2,28745488,A,G,SNV,976.03,PASS,13.91,17.43,14.117,...,0/0,38,99,380,0.0,0/1,56,99,2036,0.6429
7,chr2,58142201,G,T,SNV,560.03,PASS,15.76,11.67,2.415,...,0/0,34,99,340,0.0,0/1,48,99,2721,0.4375
8,chr2,112297295,G,T,SNV,531.03,PASS,16.91,14.75,4.943,...,0/0,36,99,360,0.0,0/1,36,99,1521,0.5833
9,chr2,118798757,A,"*,T,ATT",INDEL,2120.16,PASS,2.13,11.22,0.451,...,0/0,37,93,37000,0.0,0/1,38,99,181800,0.5
