In [1]:
import numpy as np

In [2]:
# ----------------------------
# VCF loader (plain .vcf, not gzipped)
# ----------------------------
def load_vcf_plain_with_ids(path, keep_autosomes=True):
    samples, meta, var_chr, cols = [], [], [], []
    with open(path, "r", encoding="utf-8", errors="replace") as f:
        for line in f:
            if line.startswith("##"):
                continue
            if line.startswith("#CHROM"):
                header = line.rstrip("\n").split("\t")
                samples = header[9:]
                continue
            parts = line.rstrip("\n").split("\t")
            if len(parts) < 10:
                continue
            chrom, pos, _id, ref, alt, qual, flt, info, fmt = parts[:9]
            geno = parts[9:]
            if keep_autosomes:
                c = chrom.replace("chr", "")
                if not c.isdigit():
                    continue
                if not (1 <= int(c) <= 22):
                    continue
            dos = []
            for g in geno:
                gt = g.split(":", 1)[0]
                if gt in ("./.", ".|."):
                    dos.append(np.nan)
                else:
                    a = gt.replace("|", "/").split("/")
                    try:
                        dos.append(sum(int(x) for x in a))
                    except:
                        dos.append(np.nan)
            meta.append((chrom, int(pos), ref, alt))
            var_chr.append(chrom.replace("chr", ""))
            cols.append(np.asarray(dos, dtype=float))
    G = np.vstack(cols).T if cols else np.empty((len(samples), 0))
    return samples, meta, var_chr, G

In [3]:
# --- AF from reference (ignore NaNs) ---
def _af_from_ref(G_ref: np.ndarray, eps: float = 1e-6) -> np.ndarray:
    G = np.asarray(G_ref, dtype=float)
    nobs = np.sum(~np.isnan(G), axis=0)
    sums = np.nansum(G, axis=0)           # dosage sum
    with np.errstate(invalid="ignore", divide="ignore"):
        p = np.where(nobs > 0, (sums / 2.0) / nobs, 0.0)
    return np.clip(p, eps, 1.0 - eps)

In [4]:
# --- Generate AF-matched 'safe' cohort aligned to panel ---
def af_matched_null_safe(G_tr: np.ndarray, n_samples: int,
                         seed: int = 123, floor_maf: float | None = None) -> np.ndarray:
    """
    Returns G_safe (n_samples x m) for the exact panel in G_tr.
    Each SNP is sampled under HWE using AFs from G_tr.
    If floor_maf is set (e.g., 0.001), pushes ultra-rares away from 0/1.
    """
    rng = np.random.default_rng(seed)
    p = _af_from_ref(G_tr)        # AF per SNP from REAL train
    if floor_maf is not None:
        maf = np.minimum(p, 1.0 - p)
        low = maf < floor_maf
        # move probability away from boundary while keeping the major allele
        p[low] = np.where(p[low] < 0.5, floor_maf, 1.0 - floor_maf)

    m = p.size
    H1 = rng.binomial(1, p, size=(n_samples, m))
    H2 = rng.binomial(1, p, size=(n_samples, m))
    return (H1 + H2).astype(float)   # 0/1/2 dosages

In [5]:
# --- Minimal VCF writer (keeps CHROM/REF/ALT exactly as in REAL meta) ---
def write_vcf_from_matrix(out_path: str, samples: list[str], meta: list[tuple],
                          G: np.ndarray, phased: bool = False) -> None:
    """
    meta must be a list of (chrom, pos, ref, alt) in the REAL panel order.
    """
    sep = "|" if phased else "/"
    n, m = G.shape
    assert m == len(meta), "G columns must match panel length"
    with open(out_path, "w", encoding="utf-8") as f:
        f.write("##fileformat=VCFv4.2\n")
        f.write('##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">\n')
        header = ["#CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] + samples
        f.write("\t".join(header) + "\n")
        for j, (chrom, pos, ref, alt) in enumerate(meta):
            col = G[:, j]
            gts = []
            for d in col:
                d = int(round(d))
                if d <= 0:   gts.append(f"0{sep}0")
                elif d == 1:gts.append(f"0{sep}1")
                else:        gts.append(f"1{sep}1")
            row = [str(chrom), str(int(pos)), ".", str(ref), str(alt), ".", "PASS", ".", "GT"] + gts
            f.write("\t".join(row) + "\n")

In [6]:
REAL_VCF    = "1000G_65K_SNP_chr1.vcf"
TRAIN_FRAC  = 0.80
RANDOM_SEED = 42
N_SAFE      = 2500          # size of the safe baseline VCF to emit
SAFE_SEED   = 999
FLOOR_MAF   = None          # e.g., 0.001 to further suppress ultra-rares (optional)

# 1) Load REAL panel (same loader you’ve been using)
real_samples, real_meta, VAR_CHR, G_real = load_vcf_plain_with_ids(REAL_VCF)
print("Loaded REAL:", G_real.shape, "(samples × SNPs)")

# 2) Split REAL into train/holdout (match your evaluation pipeline)
rng = np.random.RandomState(RANDOM_SEED)
idx_all = np.arange(G_real.shape[0])
rng.shuffle(idx_all)
cut = int(len(idx_all) * TRAIN_FRAC)
tr_idx, ho_idx = idx_all[:cut], idx_all[cut:]
G_tr = G_real[tr_idx, :]
G_ho = G_real[ho_idx, :]
print("TRAIN/HOLDOUT sizes:", G_tr.shape[0], "/", G_ho.shape[0])

# 3) Generate the SAFE baseline aligned to the exact panel
G_safe = af_matched_null_safe(G_tr, n_samples=N_SAFE, seed=SAFE_SEED, floor_maf=FLOOR_MAF)

# 4) Write VCF (same CHROM/REF/ALT/order as REAL)
safe_samples = [f"SAFE_{i:04d}" for i in range(N_SAFE)]
SAFE_VCF = "SAFE_chr1_binomial_panel.vcf"
write_vcf_from_matrix(SAFE_VCF, safe_samples, real_meta, G_safe, phased=False)
print("Wrote SAFE VCF:", SAFE_VCF, "| shape:", G_safe.shape)

Loaded REAL: (2504, 65535) (samples × SNPs)
TRAIN/HOLDOUT sizes: 2003 / 501
Wrote SAFE VCF: SAFE_chr1_binomial_panel.vcf | shape: (2500, 65535)
