# HapMap preprocessing (Phase III polymorphic)

This notebook downloads HapMap Phase III *polymorphic hapmap_format* genotypes, auto-selects a CASE population
with the largest cohort size present in both chr2 and chr10, extracts two regions (chr2_5Mb, chr10_1Mb),
creates per-region case train/test splits, and builds control haplotype blocks/histograms for Method 2.


In [1]:
from pathlib import Path
import re
import json
import gzip
import time
import urllib.request
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd

# Robust PROJECT_ROOT detection

def find_project_root(start: Path = None) -> Path:
    cur = Path.cwd() if start is None else Path(start)
    cur = cur.resolve()
    for p in [cur] + list(cur.parents):
        if (p / ".git").exists() or (p / "requirements.txt").exists():
            return p
    return cur

PROJECT_ROOT = find_project_root()
print("PROJECT_ROOT =", PROJECT_ROOT)

DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw" / "hapmap"
PROC_DIR = DATA_DIR / "processed" / "hapmap"

GENO_DIR = RAW_DIR / "genotypes"
PHASE_DIR = RAW_DIR / "phasing" / "HapMap3_r2"
PHASE_META_DIR = RAW_DIR / "phasing" / "HapMap3_r2_meta"

REGION_DIR = PROC_DIR / "regions"
COHORT_DIR = PROC_DIR / "cohorts"
BLOCK_DIR = PROC_DIR / "blocks"
HAP_DIR = PROC_DIR / "haplotypes"
BLOCK_DIR.mkdir(parents=True, exist_ok=True)
HAP_DIR.mkdir(parents=True, exist_ok=True)
PHASE_INFO_DIR = PROC_DIR / "phasing"

for d in [GENO_DIR, PHASE_DIR, PHASE_META_DIR, REGION_DIR, COHORT_DIR, BLOCK_DIR, HAP_DIR, PHASE_INFO_DIR]:
    d.mkdir(parents=True, exist_ok=True)


FORCE_REDOWNLOAD = False
FORCE_REBUILD = False
SPLIT_SEED = 0
TRAIN_FRAC = 0.7

TARGET_SNPS_CHR2 = 311
TARGET_SNPS_CHR10 = 610
WINDOW_BP_CHR2 = 5_000_000
WINDOW_BP_CHR10 = 1_000_000

TOP_K = 50  

POLYMORPHIC_BASE = "https://ftp.ncbi.nlm.nih.gov/hapmap/genotypes/latest_phaseIII_ncbi_b36/hapmap_format/polymorphic/"
PHASE_BASE = "https://ftp.ncbi.nlm.nih.gov/hapmap/phasing/2009-02_phaseIII/HapMap3_r2"


PROJECT_ROOT = /Users/erkmenerken/Desktop/proje430


In [2]:


def download_file(url: str, dst: Path, overwrite: bool = False):
    dst.parent.mkdir(parents=True, exist_ok=True)
    if dst.exists() and dst.stat().st_size > 0 and not overwrite:
        print(f"  Already exists, skipping: {dst.name} ({dst.stat().st_size/1e6:.2f} MB)")
        return

    tmp = dst.with_suffix(dst.suffix + ".part")
    if tmp.exists():
        tmp.unlink()

    print(f"  Downloading: {url}")
    print(f"  Saving to  : {dst}")

    req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
    with urllib.request.urlopen(req) as resp, open(tmp, "wb") as f:
        while True:
            chunk = resp.read(1024 * 1024)
            if not chunk:
                break
            f.write(chunk)

    tmp.rename(dst)

    # Gzip magic check for .gz
    if dst.suffix == ".gz":
        with open(dst, "rb") as fh:
            magic = fh.read(2)
        if magic != b"\x1f\x8b":
            raise RuntimeError(f"Downloaded file does not look like gzip: {dst}")


def _find_sample_columns(columns: List[str]):
    cols = list(columns)
    if "QCcode" in cols:
        qc_idx = cols.index("QCcode")
        return cols[: qc_idx + 1], cols[qc_idx + 1 :]
    # Fallback (rarely needed)
    return cols[:11], cols[11:]


def list_polymorphic_files() -> List[str]:
    req = urllib.request.Request(POLYMORPHIC_BASE, headers={"User-Agent": "Mozilla/5.0"})
    with urllib.request.urlopen(req) as resp:
        html = resp.read().decode("utf-8", errors="ignore")
    pattern = r"genotypes_chr(\d+)_([A-Z]{3})_phase3\.2_nr\.b36_fwd\.txt\.gz"
    return re.findall(pattern, html)


def count_individuals_from_remote_header(url: str) -> int:
    req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
    with urllib.request.urlopen(req) as resp:
        with gzip.GzipFile(fileobj=resp) as gz:
            header = gz.readline().decode("utf-8", errors="replace").strip()
    cols = header.split()
    _, sample_cols = _find_sample_columns(cols)
    return len(sample_cols)


In [3]:

# Discover CASE population (largest cohort in chr2 & chr10)


matches = list_polymorphic_files()
if not matches:
    raise RuntimeError("Could not list polymorphic HapMap files. Check network access or base URL.")

pops_by_chr = {}
file_map = {}
for chrom_str, pop in matches:
    chrom = int(chrom_str)
    pops_by_chr.setdefault(chrom, set()).add(pop)
    file_map[(chrom, pop)] = f"genotypes_chr{chrom}_{pop}_phase3.2_nr.b36_fwd.txt.gz"

cand_pops = sorted((pops_by_chr.get(2, set()) & pops_by_chr.get(10, set())) - {"CEU"})
if not cand_pops:
    
    cand_pops = sorted(pops_by_chr.get(2, set()) & pops_by_chr.get(10, set()))
    if not cand_pops:
        raise RuntimeError("No populations present in both chr2 and chr10 in the polymorphic directory.")

pop_sizes = []
for pop in cand_pops:
    url2 = POLYMORPHIC_BASE + file_map[(2, pop)]
    url10 = POLYMORPHIC_BASE + file_map[(10, pop)]
    n2 = count_individuals_from_remote_header(url2)
    n10 = count_individuals_from_remote_header(url10)
    pop_sizes.append({"pop": pop, "n_chr2": n2, "n_chr10": n10, "min_n": min(n2, n10)})

pop_sizes_sorted = sorted(pop_sizes, key=lambda d: (-d["min_n"], d["pop"]))

top5 = pop_sizes_sorted[:5]
print("Top candidate CASE populations (by min(n_chr2, n_chr10)):")
for row in top5:
    print(f"  {row['pop']}: n_chr2={row['n_chr2']}, n_chr10={row['n_chr10']}, min={row['min_n']}")

CASE_POP = pop_sizes_sorted[0]["pop"]
print("Selected CASE_POP =", CASE_POP)


Top candidate CASE populations (by min(n_chr2, n_chr10)):
  MKK: n_chr2=171, n_chr10=171, min=171
  YRI: n_chr2=167, n_chr10=167, min=167
  LWK: n_chr2=90, n_chr10=90, min=90
  GIH: n_chr2=88, n_chr10=88, min=88
  TSI: n_chr2=88, n_chr10=88, min=88
Selected CASE_POP = MKK


In [4]:

# Download CEU + CASE_POP genotype files


MANIFEST = []
for chrom in (2, 10):
    for pop in ("CEU", CASE_POP):
        fname = f"genotypes_chr{chrom}_{pop}_phase3.2_nr.b36_fwd.txt.gz"
        url = POLYMORPHIC_BASE + fname
        MANIFEST.append({
            "name": f"{pop} genotypes chr{chrom}",
            "url": url,
            "dst": GENO_DIR / fname,
        })

print("Starting genotype downloads...")
for item in MANIFEST:
    print(f"=== {item['name']} ===")
    download_file(item["url"], item["dst"], overwrite=FORCE_REDOWNLOAD)

print("Genotype downloads ready.")


Starting genotype downloads...
=== CEU genotypes chr2 ===
  Already exists, skipping: genotypes_chr2_CEU_phase3.2_nr.b36_fwd.txt.gz (6.85 MB)
=== MKK genotypes chr2 ===
  Already exists, skipping: genotypes_chr2_MKK_phase3.2_nr.b36_fwd.txt.gz (8.24 MB)
=== CEU genotypes chr10 ===
  Already exists, skipping: genotypes_chr10_CEU_phase3.2_nr.b36_fwd.txt.gz (4.36 MB)
=== MKK genotypes chr10 ===
  Already exists, skipping: genotypes_chr10_MKK_phase3.2_nr.b36_fwd.txt.gz (5.12 MB)
Genotype downloads ready.


In [5]:

# Region selection


def load_positions_rsids(geno_path: Path) -> pd.DataFrame:
    print(f" Reading rs# + pos from {geno_path.name}")
    df = pd.read_csv(
        geno_path,
        sep=r"\s+",
        engine="python",
        compression="gzip",
        usecols=["rs#", "pos"],
        dtype={"rs#": str, "pos": int},
    )
    df = df.dropna().sort_values("pos").reset_index(drop=True)
    print(f" Loaded {len(df)} SNP positions.")
    return df


def choose_window_by_bp(df_pos: pd.DataFrame, window_bp: int, target_snps: int) -> Dict[str, int]:
    pos = df_pos["pos"].to_numpy(np.int64)
    n = len(pos)
    ends = np.searchsorted(pos, pos + window_bp, side="right")
    counts = ends - np.arange(n)

    diff = np.abs(counts - target_snps)
    best_i = int(np.argmin(diff))

    start = int(pos[best_i])
    end = int(start + window_bp)
    count = int(counts[best_i])
    end_idx = int(ends[best_i] - 1)
    end_pos_actual = int(pos[end_idx]) if end_idx >= best_i else start

    return {
        "start_bp": start,
        "end_bp": end,
        "snps_in_window": count,
        "end_pos_actual": end_pos_actual,
        "total_snps_chr": int(n),
        "min_pos": int(pos[0]),
        "max_pos": int(pos[-1]),
    }

ceu_chr2_path = GENO_DIR / f"genotypes_chr2_CEU_phase3.2_nr.b36_fwd.txt.gz"
ceu_chr10_path = GENO_DIR / f"genotypes_chr10_CEU_phase3.2_nr.b36_fwd.txt.gz"

if not ceu_chr2_path.exists() or not ceu_chr10_path.exists():
    raise FileNotFoundError("Missing CEU genotype files. Re-run the download cell.")

chr2_df = load_positions_rsids(ceu_chr2_path)
chr10_df = load_positions_rsids(ceu_chr10_path)

chr2_plan = choose_window_by_bp(chr2_df, WINDOW_BP_CHR2, TARGET_SNPS_CHR2)
chr10_plan = choose_window_by_bp(chr10_df, WINDOW_BP_CHR10, TARGET_SNPS_CHR10)

print("windows selected:")
print("chr2 (5Mb, ~311 SNPs):", chr2_plan)
print("chr10 (1Mb, ~610 SNPs):", chr10_plan)


 Reading rs# + pos from genotypes_chr2_CEU_phase3.2_nr.b36_fwd.txt.gz
 Loaded 113979 SNP positions.
 Reading rs# + pos from genotypes_chr10_CEU_phase3.2_nr.b36_fwd.txt.gz
 Loaded 73116 SNP positions.
windows selected:
chr2 (5Mb, ~311 SNPs): {'start_bp': 88196854, 'end_bp': 93196854, 'snps_in_window': 311, 'end_pos_actual': 91649657, 'total_snps_chr': 113979, 'min_pos': 5703, 'max_pos': 242710183}
chr10 (1Mb, ~610 SNPs): {'start_bp': 8729009, 'end_bp': 9729009, 'snps_in_window': 610, 'end_pos_actual': 9726663, 'total_snps_chr': 73116, 'min_pos': 68983, 'max_pos': 135372380}


In [6]:



MISSING_TOKENS = {"", "NN", "NA", "N", "00", "--", "??"}


def genotype_row_to_counts(geno_strs, allele_a, allele_b, count_mode="minor"):
    g = np.asarray(geno_strs, dtype=object)
    g = np.array([x.strip() if isinstance(x, str) else "" for x in g], dtype=object)

    missing = np.zeros(len(g), dtype=bool)
    for t in MISSING_TOKENS:
        missing |= (g == t)

    valid = (~missing) & np.array([len(x) == 2 for x in g], dtype=bool)

    a_count = 0
    b_count = 0
    for x in g[valid]:
        a_count += (x[0] == allele_a) + (x[1] == allele_a)
        b_count += (x[0] == allele_b) + (x[1] == allele_b)

    if count_mode == "minor":
        if a_count < b_count:
            counted, other = allele_a, allele_b
        elif b_count < a_count:
            counted, other = allele_b, allele_a
        else:
            counted, other = allele_b, allele_a
    elif count_mode == "allele_a":
        counted, other = allele_a, allele_b
    else:
        raise ValueError(f"Unknown count_mode={count_mode}")

    out = np.full(len(g), -1, dtype=np.int8)
    for i, x in enumerate(g):
        if (not isinstance(x, str)) or (x in MISSING_TOKENS) or len(x) != 2:
            continue
        out[i] = np.int8((x[0] == counted) + (x[1] == counted))

    return out, counted, other


def parse_genotypes_window(geno_path: Path, start_bp: int, end_bp: int, chunksize: int = 2000, count_mode="minor"):
    print(f"Parsing genotypes from {geno_path.name}")
    print(f"   Window: [{start_bp}, {end_bp}] bp (inclusive)")
    print(f"   Count mode: {count_mode}")

    header = pd.read_csv(
        geno_path,
        sep=r"\s+",
        engine="python",
        compression="gzip",
        nrows=1,
        dtype=str,
    )
    _, sample_cols = _find_sample_columns(header.columns)
    sample_ids = np.array(sample_cols, dtype=object)
    print(f" Individuals detected: {len(sample_ids)}")

    G_cols, snp_ids, positions, counted_alleles, other_alleles = [], [], [], [], []

    reader = pd.read_csv(
        geno_path,
        sep=r"\s+",
        engine="python",
        compression="gzip",
        dtype=str,
        chunksize=chunksize,
    )

    kept = 0
    for chunk in reader:
        pos_int = pd.to_numeric(chunk["pos"], errors="coerce")
        mask = (pos_int >= start_bp) & (pos_int <= end_bp)
        chunk = chunk.loc[mask]
        if chunk.empty:
            continue

        for _, row in chunk.iterrows():
            rsid = row.get("rs#", None)
            alleles = row.get("alleles", None)
            pos = row.get("pos", None)

            if rsid is None or alleles is None or pos is None or "/" not in alleles:
                continue

            a, b = [x.strip() for x in alleles.split("/", 1)]
            if len(a) != 1 or len(b) != 1:
                continue

            geno_strs = row[sample_cols].values
            counts, counted, other = genotype_row_to_counts(geno_strs, a, b, count_mode=count_mode)

            G_cols.append(counts)
            snp_ids.append(rsid)
            positions.append(int(pos))
            counted_alleles.append(counted)
            other_alleles.append(other)
            kept += 1

    if kept == 0:
        raise RuntimeError("No SNPs were kept. Check start/end window values.")

    G = np.stack(G_cols, axis=1)  # (M, N)
    positions = np.array(positions, dtype=np.int32)

    order = np.argsort(positions)
    G = G[:, order]
    positions = positions[order]
    snp_ids = np.array(snp_ids, dtype=object)[order]
    counted_alleles = np.array(counted_alleles, dtype=object)[order]
    other_alleles = np.array(other_alleles, dtype=object)[order]

    print(f" Kept SNPs: {G.shape[1]} | Individuals: {G.shape[0]}")
    print(f" Missing rate: {float(np.mean(G == -1)):.4f}")
    print(f" Kept position range: {int(positions.min())} .. {int(positions.max())}")

    return G, sample_ids, snp_ids, positions, counted_alleles, other_alleles


In [7]:

# Extract + align CEU vs CASE_POP regions


ceu_chr2_path = GENO_DIR / f"genotypes_chr2_CEU_phase3.2_nr.b36_fwd.txt.gz"
ceu_chr10_path = GENO_DIR / f"genotypes_chr10_CEU_phase3.2_nr.b36_fwd.txt.gz"
case_chr2_path = GENO_DIR / f"genotypes_chr2_{CASE_POP}_phase3.2_nr.b36_fwd.txt.gz"
case_chr10_path = GENO_DIR / f"genotypes_chr10_{CASE_POP}_phase3.2_nr.b36_fwd.txt.gz"

CEU_REGION_CHR2 = REGION_DIR / "CEU_chr2_5Mb.npz"
CEU_REGION_CHR10 = REGION_DIR / "CEU_chr10_1Mb.npz"
CASE_REGION_CHR2 = REGION_DIR / f"{CASE_POP}_chr2_5Mb.npz"
CASE_REGION_CHR10 = REGION_DIR / f"{CASE_POP}_chr10_1Mb.npz"

ALIGNED_CEU_CHR2 = REGION_DIR / f"CEU_chr2_5Mb.common_with_{CASE_POP}.npz"
ALIGNED_CASE_CHR2 = REGION_DIR / f"{CASE_POP}_chr2_5Mb.common_with_CEU.npz"
ALIGNED_CEU_CHR10 = REGION_DIR / f"CEU_chr10_1Mb.common_with_{CASE_POP}.npz"
ALIGNED_CASE_CHR10 = REGION_DIR / f"{CASE_POP}_chr10_1Mb.common_with_CEU.npz"


def _parse_region_for_alignment(geno_path, start_bp, end_bp):
    G, sample_ids, snp_ids, positions, counted, other = parse_genotypes_window(
        geno_path, start_bp, end_bp, count_mode="allele_a"
    )
    return {
        "G": G,
        "sample_ids": sample_ids,
        "snp_ids": snp_ids,
        "positions": positions,
        "counted_alleles": counted,
        "other_alleles": other,
    }


def _align_by_snp(ceu, case, region_tag):
    ceu_rsids = [str(x) for x in ceu["snp_ids"]]
    case_rsids = [str(x) for x in case["snp_ids"]]
    ceu_idx = {rs: i for i, rs in enumerate(ceu_rsids)}
    case_idx = {rs: i for i, rs in enumerate(case_rsids)}

    common_rsids = [rs for rs in ceu_rsids if rs in case_idx]
    dropped_only_ceu = len(ceu_rsids) - len(common_rsids)
    dropped_only_case = len(case_rsids) - len(common_rsids)

    idx_ceu = []
    idx_case = []
    mismatch = 0
    for rs in common_rsids:
        i = ceu_idx[rs]
        j = case_idx[rs]
        if (ceu["positions"][i] != case["positions"][j]):
            mismatch += 1
            continue
        if (ceu["counted_alleles"][i] != case["counted_alleles"][j]) or (ceu["other_alleles"][i] != case["other_alleles"][j]):
            mismatch += 1
            continue
        idx_ceu.append(i)
        idx_case.append(j)

    print(f"[{region_tag}] CEU SNPs: {len(ceu_rsids)} | {CASE_POP} SNPs: {len(case_rsids)}")
    print(f"[{region_tag}] Common rsIDs: {len(common_rsids)} | mismatched dropped: {mismatch}")
    print(f"[{region_tag}] Dropped only-in-CEU: {dropped_only_ceu} | only-in-{CASE_POP}: {dropped_only_case}")

    if len(idx_ceu) == 0:
        raise RuntimeError(f"No aligned SNPs remain for {region_tag} after filtering.")

    ceu_aligned = {
        "G": ceu["G"][:, idx_ceu],
        "sample_ids": ceu["sample_ids"],
        "snp_ids": ceu["snp_ids"][idx_ceu],
        "positions": ceu["positions"][idx_ceu],
        "counted_alleles": ceu["counted_alleles"][idx_ceu],
        "other_alleles": ceu["other_alleles"][idx_ceu],
    }
    case_aligned = {
        "G": case["G"][:, idx_case],
        "sample_ids": case["sample_ids"],
        "snp_ids": case["snp_ids"][idx_case],
        "positions": case["positions"][idx_case],
        "counted_alleles": case["counted_alleles"][idx_case],
        "other_alleles": case["other_alleles"][idx_case],
    }

    assert np.array_equal(ceu_aligned["snp_ids"], case_aligned["snp_ids"])
    assert np.array_equal(ceu_aligned["positions"], case_aligned["positions"])
    assert np.array_equal(ceu_aligned["counted_alleles"], case_aligned["counted_alleles"])

    return ceu_aligned, case_aligned


def _save_region(out_path, region_tag, chrom, plan, payload, pop_label):
    np.savez_compressed(
        out_path,
        G=payload["G"].astype(np.int8),
        sample_ids=payload["sample_ids"],
        snp_ids=payload["snp_ids"],
        positions=payload["positions"],
        counted_alleles=payload["counted_alleles"],
        other_alleles=payload["other_alleles"],
        minor_alleles=payload["counted_alleles"],
        major_alleles=payload["other_alleles"],
        chrom=str(chrom),
    )
    meta = {
        "region_name": out_path.stem,
        "chrom": str(chrom),
        "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
        "window_start_bp": int(plan["start_bp"]),
        "window_end_bp": int(plan["end_bp"]),
        "snps_in_window": int(payload["G"].shape[1]),
        "individuals": int(payload["G"].shape[0]),
        "count_mode": "allele_a",
        "note": f"Raw {pop_label} window using CEU-derived region bounds (not aligned).",
    }
    out_path.with_suffix(".meta.json").write_text(json.dumps(meta, indent=2))
    print(f" Saved region: {out_path.relative_to(PROJECT_ROOT)}")


def _save_aligned_region(out_path, region_tag, chrom, plan, payload, other_pop):
    np.savez_compressed(
        out_path,
        G=payload["G"].astype(np.int8),
        sample_ids=payload["sample_ids"],
        snp_ids=payload["snp_ids"],
        positions=payload["positions"],
        counted_alleles=payload["counted_alleles"],
        other_alleles=payload["other_alleles"],
        minor_alleles=payload["counted_alleles"],
        major_alleles=payload["other_alleles"],
        chrom=str(chrom),
    )
    meta = {
        "region_name": out_path.stem,
        "chrom": str(chrom),
        "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
        "window_start_bp": int(plan["start_bp"]),
        "window_end_bp": int(plan["end_bp"]),
        "snps_in_window": int(payload["G"].shape[1]),
        "individuals": int(payload["G"].shape[0]),
        "count_mode": "allele_a",
        "note": f"Aligned with {other_pop} on common SNPs; G counts allele_a from the alleles column.",
    }
    out_path.with_suffix(".meta.json").write_text(json.dumps(meta, indent=2))
    print(f" Saved aligned: {out_path.relative_to(PROJECT_ROOT)}")

# chr2
need_ceu2_raw = (not CEU_REGION_CHR2.exists()) or FORCE_REBUILD
need_case2_raw = (not CASE_REGION_CHR2.exists()) or FORCE_REBUILD
need_align_chr2 = (not ALIGNED_CEU_CHR2.exists()) or (not ALIGNED_CASE_CHR2.exists()) or FORCE_REBUILD

ceu2_raw = None
case2_raw = None

if need_ceu2_raw or need_align_chr2:
    ceu2_raw = _parse_region_for_alignment(ceu_chr2_path, chr2_plan["start_bp"], chr2_plan["end_bp"])
if need_case2_raw or need_align_chr2:
    case2_raw = _parse_region_for_alignment(case_chr2_path, chr2_plan["start_bp"], chr2_plan["end_bp"])

if need_ceu2_raw:
    _save_region(CEU_REGION_CHR2, "chr2_5Mb", 2, chr2_plan, ceu2_raw, "CEU")
else:
    print(" Using existing CEU region file for chr2")

if need_case2_raw:
    _save_region(CASE_REGION_CHR2, "chr2_5Mb", 2, chr2_plan, case2_raw, CASE_POP)
else:
    print(f" Using existing {CASE_POP} region file for chr2")

if need_align_chr2:
    ceu2_aligned, case2_aligned = _align_by_snp(ceu2_raw, case2_raw, "chr2_5Mb")
    _save_aligned_region(ALIGNED_CEU_CHR2, "chr2_5Mb", 2, chr2_plan, ceu2_aligned, CASE_POP)
    _save_aligned_region(ALIGNED_CASE_CHR2, "chr2_5Mb", 2, chr2_plan, case2_aligned, "CEU")
else:
    print(" Using existing aligned files for chr2")
    ceu2_aligned = np.load(ALIGNED_CEU_CHR2, allow_pickle=True)
    case2_aligned = np.load(ALIGNED_CASE_CHR2, allow_pickle=True)
    assert np.array_equal(ceu2_aligned["snp_ids"], case2_aligned["snp_ids"])
    assert np.array_equal(ceu2_aligned["positions"], case2_aligned["positions"])

# chr10
need_ceu10_raw = (not CEU_REGION_CHR10.exists()) or FORCE_REBUILD
need_case10_raw = (not CASE_REGION_CHR10.exists()) or FORCE_REBUILD
need_align_chr10 = (not ALIGNED_CEU_CHR10.exists()) or (not ALIGNED_CASE_CHR10.exists()) or FORCE_REBUILD

ceu10_raw = None
case10_raw = None

if need_ceu10_raw or need_align_chr10:
    ceu10_raw = _parse_region_for_alignment(ceu_chr10_path, chr10_plan["start_bp"], chr10_plan["end_bp"])
if need_case10_raw or need_align_chr10:
    case10_raw = _parse_region_for_alignment(case_chr10_path, chr10_plan["start_bp"], chr10_plan["end_bp"])

if need_ceu10_raw:
    _save_region(CEU_REGION_CHR10, "chr10_1Mb", 10, chr10_plan, ceu10_raw, "CEU")
else:
    print(" Using existing CEU region file for chr10")

if need_case10_raw:
    _save_region(CASE_REGION_CHR10, "chr10_1Mb", 10, chr10_plan, case10_raw, CASE_POP)
else:
    print(f" Using existing {CASE_POP} region file for chr10")

if need_align_chr10:
    ceu10_aligned, case10_aligned = _align_by_snp(ceu10_raw, case10_raw, "chr10_1Mb")
    _save_aligned_region(ALIGNED_CEU_CHR10, "chr10_1Mb", 10, chr10_plan, ceu10_aligned, CASE_POP)
    _save_aligned_region(ALIGNED_CASE_CHR10, "chr10_1Mb", 10, chr10_plan, case10_aligned, "CEU")
else:
    print(" Using existing aligned files for chr10")
    ceu10_aligned = np.load(ALIGNED_CEU_CHR10, allow_pickle=True)
    case10_aligned = np.load(ALIGNED_CASE_CHR10, allow_pickle=True)
    assert np.array_equal(ceu10_aligned["snp_ids"], case10_aligned["snp_ids"])
    assert np.array_equal(ceu10_aligned["positions"], case10_aligned["positions"])

print("Alignment complete: CEU/CASE_POP common SNP files ready.")


 Using existing CEU region file for chr2
 Using existing MKK region file for chr2
 Using existing aligned files for chr2
 Using existing CEU region file for chr10
 Using existing MKK region file for chr10
 Using existing aligned files for chr10
Alignment complete: CEU/CASE_POP common SNP files ready.


In [8]:

# Cohort splits per region (CASE_POP train/test, CEU control)


def load_region_npz(path: Path) -> Dict[str, np.ndarray]:
    z = np.load(path, allow_pickle=True)
    return {k: z[k] for k in z.files}


def split_case(sample_ids: List[str], seed: int = 0, frac_train: float = 0.7):
    rng = np.random.default_rng(seed)
    ids = list(sample_ids)
    idx = rng.permutation(len(ids))
    n = len(ids)
    n_train = int(round(n * frac_train))
    n_train = max(1, min(n - 1, n_train))
    train_idx = idx[:n_train]
    test_idx = idx[n_train:]
    train_ids = [ids[i] for i in train_idx]
    test_ids = [ids[i] for i in test_idx]
    return train_idx.tolist(), test_idx.tolist(), train_ids, test_ids

ceu_chr2 = load_region_npz(ALIGNED_CEU_CHR2)
ceu_chr10 = load_region_npz(ALIGNED_CEU_CHR10)
case_chr2 = load_region_npz(ALIGNED_CASE_CHR2)
case_chr10 = load_region_npz(ALIGNED_CASE_CHR10)

# CEU control: all individuals (public reference)
ceu_ctrl_idx_chr2 = list(range(len(ceu_chr2["sample_ids"])))
ceu_ctrl_idx_chr10 = list(range(len(ceu_chr10["sample_ids"])))

# CASE_POP train/test per region
case_train_idx_chr2, case_test_idx_chr2, case_train_ids_chr2, case_test_ids_chr2 = split_case(
    [str(x) for x in case_chr2["sample_ids"]], seed=SPLIT_SEED, frac_train=TRAIN_FRAC
)
case_train_idx_chr10, case_test_idx_chr10, case_train_ids_chr10, case_test_ids_chr10 = split_case(
    [str(x) for x in case_chr10["sample_ids"]], seed=SPLIT_SEED, frac_train=TRAIN_FRAC
)

print("Cohort sizes (chr2): CEU control=", len(ceu_ctrl_idx_chr2), "CASE train=", len(case_train_idx_chr2), "CASE test=", len(case_test_idx_chr2))
print("Cohort sizes (chr10): CEU control=", len(ceu_ctrl_idx_chr10), "CASE train=", len(case_train_idx_chr10), "CASE test=", len(case_test_idx_chr10))

# Save NPZs
ceu_ctrl_chr2_npz = COHORT_DIR / "hapmap_CEU_control_chr2_5Mb.npz"
ceu_ctrl_chr10_npz = COHORT_DIR / "hapmap_CEU_control_chr10_1Mb.npz"
case_train_chr2_npz = COHORT_DIR / f"hapmap_{CASE_POP}_case_train_chr2_5Mb.npz"
case_test_chr2_npz = COHORT_DIR / f"hapmap_{CASE_POP}_case_test_chr2_5Mb.npz"
case_train_chr10_npz = COHORT_DIR / f"hapmap_{CASE_POP}_case_train_chr10_1Mb.npz"
case_test_chr10_npz = COHORT_DIR / f"hapmap_{CASE_POP}_case_test_chr10_1Mb.npz"

np.savez_compressed(ceu_ctrl_chr2_npz, sample_ids=ceu_chr2["sample_ids"], indices=np.array(ceu_ctrl_idx_chr2, dtype=int))
np.savez_compressed(ceu_ctrl_chr10_npz, sample_ids=ceu_chr10["sample_ids"], indices=np.array(ceu_ctrl_idx_chr10, dtype=int))
np.savez_compressed(case_train_chr2_npz, sample_ids=np.array(case_train_ids_chr2, dtype=object), indices=np.array(case_train_idx_chr2, dtype=int))
np.savez_compressed(case_test_chr2_npz, sample_ids=np.array(case_test_ids_chr2, dtype=object), indices=np.array(case_test_idx_chr2, dtype=int))
np.savez_compressed(case_train_chr10_npz, sample_ids=np.array(case_train_ids_chr10, dtype=object), indices=np.array(case_train_idx_chr10, dtype=int))
np.savez_compressed(case_test_chr10_npz, sample_ids=np.array(case_test_ids_chr10, dtype=object), indices=np.array(case_test_idx_chr10, dtype=int))

case_pop_json = COHORT_DIR / "hapmap_case_pop.json"
case_pop_payload = {
    "case_pop": CASE_POP,
    "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
    "split_seed": SPLIT_SEED,
    "split_frac_train": TRAIN_FRAC,
    "phased_paths_json": str((PHASE_INFO_DIR / f"{CASE_POP.lower()}_phased_paths.json").relative_to(PROJECT_ROOT)),
    "regions": {
        "chr2_5Mb": {
            "ceu_region": str(ALIGNED_CEU_CHR2.relative_to(PROJECT_ROOT)),
            "case_region": str(ALIGNED_CASE_CHR2.relative_to(PROJECT_ROOT)),
            "ceu_control_npz": str(ceu_ctrl_chr2_npz.relative_to(PROJECT_ROOT)),
            "case_train_npz": str(case_train_chr2_npz.relative_to(PROJECT_ROOT)),
            "case_test_npz": str(case_test_chr2_npz.relative_to(PROJECT_ROOT)),
            "n_ceu": len(ceu_ctrl_idx_chr2),
            "n_case_train": len(case_train_idx_chr2),
            "n_case_test": len(case_test_idx_chr2),
        },
        "chr10_1Mb": {
            "ceu_region": str(ALIGNED_CEU_CHR10.relative_to(PROJECT_ROOT)),
            "case_region": str(ALIGNED_CASE_CHR10.relative_to(PROJECT_ROOT)),
            "ceu_control_npz": str(ceu_ctrl_chr10_npz.relative_to(PROJECT_ROOT)),
            "case_train_npz": str(case_train_chr10_npz.relative_to(PROJECT_ROOT)),
            "case_test_npz": str(case_test_chr10_npz.relative_to(PROJECT_ROOT)),
            "n_ceu": len(ceu_ctrl_idx_chr10),
            "n_case_train": len(case_train_idx_chr10),
            "n_case_test": len(case_test_idx_chr10),
        },
    },
}

case_pop_json.write_text(json.dumps(case_pop_payload, indent=2))
print("Saved CASE_POP cohort summary:", case_pop_json.relative_to(PROJECT_ROOT))


Cohort sizes (chr2): CEU control= 165 CASE train= 120 CASE test= 51
Cohort sizes (chr10): CEU control= 165 CASE train= 120 CASE test= 51
Saved CASE_POP cohort summary: data/processed/hapmap/cohorts/hapmap_case_pop.json


In [9]:


# Resolve + download phased files for CEU and CASE_POP

import ftplib
import gzip
import re

FTP_HOST = "ftp.ncbi.nlm.nih.gov"
PHASE_BASE_DIR = "/hapmap/phasing/2009-02_phaseIII/HapMap3_r2"
MIN_PHASED_INDIV = 30


def ftp_list_files(dirpath: str):
    files = []
    with ftplib.FTP(FTP_HOST) as ftp:
        ftp.login()
        ftp.cwd(dirpath)
        names = ftp.nlst()
        for name in names:
            if name in (".", ".."):
                continue
            size = 0
            try:
                size = ftp.size(name) or 0
            except Exception:
                size = 0
            files.append((name, size))
    return files


def phased_individual_count(path: Path) -> int:
    with gzip.open(path, "rt", encoding="utf-8", errors="replace") as f:
        header = f.readline().strip().split()
    hap_cols = [c for c in header if c.endswith("_A") or c.endswith("_B")]
    indivs = set(c[:-2] for c in hap_cols)
    return len(indivs)


def pick_phased_file_in_dir(pop: str, chrom: int, subdir: str):
    dirpath = f"{PHASE_BASE_DIR}/{pop}"
    if subdir:
        dirpath = f"{dirpath}/{subdir}"
    try:
        files = ftp_list_files(dirpath)
    except Exception:
        return None

    candidates = []
    chrom_pat = re.compile(rf"chr{chrom}(?!\d)")
    for name, size in files:
        if chrom_pat.search(name) and name.endswith(".phased.gz"):
            score = 1 if "qc.poly" in name else 0
            candidates.append({
                "dirpath": dirpath,
                "name": name,
                "size": size,
                "score": score,
                "subdir": subdir,
            })
    if not candidates:
        return None

    candidates.sort(key=lambda d: (d["score"], d["size"]), reverse=True)
    return candidates[0]


def resolve_phased_for_chrom(pop: str, chrom: int, min_ind: int = 30):
    for subdir in ["UNRELATED", "TRIOS", ""]:
        info = pick_phased_file_in_dir(pop, chrom, subdir)
        if info is None:
            continue

        local_dir = PHASE_DIR / pop
        if subdir:
            local_dir = local_dir / subdir
        local_dir.mkdir(parents=True, exist_ok=True)

        url = f"{PHASE_BASE}/{pop}"
        if subdir:
            url = f"{url}/{subdir}"
        url = f"{url}/{info['name']}"

        dst = local_dir / info["name"]
        download_file(url, dst, overwrite=FORCE_REDOWNLOAD)
        n_ind = phased_individual_count(dst)
        print(f"Phased individuals ({pop}, chr{chrom}, {subdir or 'ROOT'}): {n_ind}")

        if n_ind >= min_ind:
            return {"path": str(dst), "url": url, "n_ind": n_ind, "source": subdir or "ROOT"}

    raise RuntimeError(f"Phased cohort for {pop} chr{chrom} still too small (<{min_ind}) across directories.")

# CEU phased (public)
ceu_chr2_info = resolve_phased_for_chrom("CEU", 2, min_ind=MIN_PHASED_INDIV)
ceu_chr10_info = resolve_phased_for_chrom("CEU", 10, min_ind=MIN_PHASED_INDIV)

# CASE_POP phased
case_chr2_info = resolve_phased_for_chrom(CASE_POP, 2, min_ind=MIN_PHASED_INDIV)
case_chr10_info = resolve_phased_for_chrom(CASE_POP, 10, min_ind=MIN_PHASED_INDIV)

phase_info = {
    "case_pop": CASE_POP,
    "CEU": {"chr2": ceu_chr2_info, "chr10": ceu_chr10_info},
    "case": {"chr2": case_chr2_info, "chr10": case_chr10_info},
}

phase_info_path = PHASE_INFO_DIR / f"{CASE_POP.lower()}_phased_paths.json"
phase_info_path.write_text(json.dumps(phase_info, indent=2))
print("Saved phased path info:", phase_info_path.relative_to(PROJECT_ROOT))


  Already exists, skipping: hapmap3_r2_b36_fwd.consensus.qc.poly.chr2_ceu.unr.phased.gz (1.80 MB)
Phased individuals (CEU, chr2, UNRELATED): 17
  Already exists, skipping: hapmap3_r2_b36_fwd.consensus.qc.poly.chr2_ceu.phased.gz (4.59 MB)
Phased individuals (CEU, chr2, TRIOS): 88
  Already exists, skipping: hapmap3_r2_b36_fwd.consensus.qc.poly.chr10_ceu.unr.phased.gz (1.13 MB)
Phased individuals (CEU, chr10, UNRELATED): 17
  Already exists, skipping: hapmap3_r2_b36_fwd.consensus.qc.poly.chr10_ceu.phased.gz (2.88 MB)
Phased individuals (CEU, chr10, TRIOS): 88
  Already exists, skipping: hapmap3_r2_b36_fwd.consensus.qc.poly.chr2_mkk.unr.phased.gz (4.77 MB)
Phased individuals (MKK, chr2, UNRELATED): 87
  Already exists, skipping: hapmap3_r2_b36_fwd.consensus.qc.poly.chr10_mkk.unr.phased.gz (2.97 MB)
Phased individuals (MKK, chr10, UNRELATED): 87
Saved phased path info: data/processed/hapmap/phasing/mkk_phased_paths.json


In [10]:

from collections import Counter
import json
import numpy as np
import pandas as pd



def load_phased_df(phase_gz_path: Path) -> pd.DataFrame:
    df = pd.read_csv(
        phase_gz_path,
        sep=r"\s+",
        engine="python",
        compression="gzip",
        dtype=str,
    )


    rs_candidates = ["rsid", "rs#", "snp", "marker"]
    pos_candidates = ["pos", "position", "bp"]

    lower_map = {c.lower(): c for c in df.columns}
    rs_col = None
    pos_col = None

    for cand in rs_candidates:
        if cand in lower_map:
            rs_col = lower_map[cand]
            break
    for cand in pos_candidates:
        if cand in lower_map:
            pos_col = lower_map[cand]
            break

    if rs_col is None and pos_col is None:
        raise ValueError(f"Phased file missing rsID/pos columns. Columns: {list(df.columns)}")

    if rs_col is not None and rs_col != "rsID":
        df = df.rename(columns={rs_col: "rsID"})
    if pos_col is not None and pos_col != "pos":
        df = df.rename(columns={pos_col: "pos"})

    print(
        f"Phased columns: file={phase_gz_path.name}, rsID_col={rs_col or 'None'}, pos_col={pos_col or 'None'}, rows={len(df)}"
    )

    return df


def phased_individuals_from_columns(df: pd.DataFrame) -> list:
    hap_cols = [c for c in df.columns if c.endswith("_A") or c.endswith("_B")]
    if len(hap_cols) == 0:
        raise ValueError("No _A/_B haplotype columns found in phased file.")
    individuals = sorted(set(c[:-2] for c in hap_cols))
    individuals = [i for i in individuals if f"{i}_A" in df.columns and f"{i}_B" in df.columns]
    return individuals


def make_phased_compatible_region(region, phased_df: pd.DataFrame, region_name: str, phase_name: str = ""):
    region_sample_ids = np.array([str(x) for x in region["sample_ids"]], dtype=object)
    region_snp_ids = np.array([str(x) for x in region["snp_ids"]], dtype=object)
    region_positions = np.array(region["positions"], dtype=int)

    phased_indivs = phased_individuals_from_columns(phased_df)
    print(f"[{region_name}] Phased individuals: {len(phased_indivs)}")


    if "rsID" not in phased_df.columns:
        raise ValueError(f"[{region_name}] Phased file missing rsID column after normalization.")

    phased_rsids_set = set(phased_df["rsID"].astype(str).tolist())
    keep_snp_mask = np.array([rs in phased_rsids_set for rs in region_snp_ids], dtype=bool)
    match_method = "rsid"

    if int(keep_snp_mask.sum()) < 5:
        if "pos" not in phased_df.columns:
            raise ValueError(f"[{region_name}] rsID match failed and phased file has no position column.")
        print(f"[{region_name}] rsID matching found 0 SNPs; falling back to position matching.")
        phased_pos = pd.to_numeric(phased_df["pos"], errors="coerce")
        phased_pos_set = set(phased_pos.dropna().astype(int).tolist())
        keep_snp_mask = np.array([int(p) in phased_pos_set for p in region_positions], dtype=bool)
        match_method = "pos"

    snp_ids_sub = region_snp_ids[keep_snp_mask]
    positions_sub = region_positions[keep_snp_mask]

    print(f"[{region_name}] SNPs matched: {len(snp_ids_sub)} / {len(region_snp_ids)} (method={match_method})")

    if len(snp_ids_sub) == 0:
        phased_rsids_head = phased_df["rsID"].astype(str).tolist()[:10] if "rsID" in phased_df.columns else []
        phased_pos_head = (
            pd.to_numeric(phased_df["pos"], errors="coerce").dropna().astype(int).tolist()[:10]
            if "pos" in phased_df.columns
            else []
        )
        raise RuntimeError(
            f"[{region_name}] No SNPs matched. Phased file={phase_name}. "
            f"Region rsIDs head={region_snp_ids[:10].tolist()}, phased rsIDs head={phased_rsids_head}. "
            f"Region pos head={region_positions[:10].tolist()}, phased pos head={phased_pos_head}."
        )

    snp_idx_in_region = np.where(keep_snp_mask)[0]


    G = region["G"]
    if G.shape[0] == len(region_sample_ids):
        G_sub = G[:, snp_idx_in_region]
    elif G.shape[1] == len(region_sample_ids):
        G_sub = G[snp_idx_in_region, :].T
    else:
        raise RuntimeError(f"[{region_name}] Unexpected G shape: {G.shape}")

    sample_ids_sub = region_sample_ids


    if match_method == "rsid":
        phased_sub = phased_df[phased_df["rsID"].isin(set(snp_ids_sub.tolist()))].copy()
        order_map = {rsid: i for i, rsid in enumerate(snp_ids_sub.tolist())}
        phased_sub["__order"] = phased_sub["rsID"].map(order_map)
    else:
        phased_pos = pd.to_numeric(phased_df["pos"], errors="coerce")
        phased_sub = phased_df[phased_pos.isin(set(positions_sub.tolist()))].copy()
        order_map = {int(pos): i for i, pos in enumerate(positions_sub.tolist())}
        phased_sub["__order"] = pd.to_numeric(phased_sub["pos"], errors="coerce").map(order_map)

    phased_sub = phased_sub.sort_values("__order").drop(columns="__order")

    A_cols = [f"{i}_A" for i in phased_indivs]
    B_cols = [f"{i}_B" for i in phased_indivs]
    alleles_A = phased_sub[A_cols].to_numpy(dtype=object)  
    alleles_B = phased_sub[B_cols].to_numpy(dtype=object)

    minor_all = region.get("minor_alleles", region.get("counted_alleles"))
    minor_alleles_sub = minor_all[snp_idx_in_region] if minor_all is not None else None

    print(f"[{region_name}] Phased allele matrices: A={alleles_A.shape}, B={alleles_B.shape}")

    return {
        "region_name": region_name,
        "G_sub": G_sub,
        "sample_ids_sub": sample_ids_sub,
        "snp_ids_sub": snp_ids_sub,
        "positions_sub": positions_sub,
        "minor_alleles_sub": minor_alleles_sub,
        "alleles_A": alleles_A,
        "alleles_B": alleles_B,
        "phased_individuals": phased_indivs,
        "snp_idx_in_region": snp_idx_in_region,
        "match_method": match_method,
    }


def adjacent_r2_from_G(G):
    X = G.astype(float)
    M, N = X.shape
    out = np.zeros(N - 1, dtype=float)
    for j in range(N - 1):
        x = X[:, j]
        y = X[:, j + 1]
        mask = (x >= 0) & (y >= 0)
        if mask.sum() < 10:
            out[j] = 0.0
            continue
        xv = x[mask] - x[mask].mean()
        yv = y[mask] - y[mask].mean()
        denom = np.sqrt((xv * xv).sum() * (yv * yv).sum())
        out[j] = 0.0 if denom == 0 else float(((xv * yv).sum() / denom) ** 2)
    return out


def build_blocks_from_adjacent_r2(r2, threshold=0.8, min_snps=5, max_snps=80):
    N = len(r2) + 1
    cuts = [0]
    for j, v in enumerate(r2):
        if v < threshold:
            cuts.append(j + 1)
    cuts.append(N)

    blocks = [(cuts[i], cuts[i + 1] - 1) for i in range(len(cuts) - 1)]

    merged = []
    for s, e in blocks:
        if not merged:
            merged.append((s, e))
        else:
            if (e - s + 1) < min_snps:
                ps, pe = merged[-1]
                merged[-1] = (ps, e)
            else:
                merged.append((s, e))

    final = []
    for s, e in merged:
        while (e - s + 1) > max_snps:
            final.append((s, s + max_snps - 1))
            s = s + max_snps
        final.append((s, e))
    return final


def hap_strings_for_block(allele_matrix, start, end):
    block = allele_matrix[start:end+1, :]
    return ["".join(block[:, j].tolist()) for j in range(block.shape[1])]


def compute_maf_from_G(G, idx):
    sub = G[:, idx]
    mask = sub >= 0
    called = mask.sum(axis=1).astype(np.int32)
    mac = np.where(mask, sub, 0).sum(axis=1).astype(np.float64)
    denom = 2.0 * np.maximum(called, 1)
    return mac / denom


def save_control_haplotypes(region_name, blocks, pc, control_ids, top_k=50):
    phased_ids = [str(x) for x in pc["phased_individuals"]]
    control_cols = list(range(len(phased_ids)))

    print(f"  phased-compatible individuals used for control: {len(control_cols)}")

    alleles_A = pc["alleles_A"]
    alleles_B = pc["alleles_B"]
    minor = pc["minor_alleles_sub"]


    p_ctrl_pc = compute_maf_from_G(pc["G_sub"], list(range(pc["G_sub"].shape[0])))

    block_payload = []
    for block_id, (s, e) in enumerate(blocks):
        hA_all = hap_strings_for_block(alleles_A, s, e)
        hB_all = hap_strings_for_block(alleles_B, s, e)

        ctr = Counter()
        for col in control_cols:
            ctr[hA_all[col]] += 1
            ctr[hB_all[col]] += 1

        total = int(sum(ctr.values()))
        top = ctr.most_common(top_k)
        top_haps = [h for h, _ in top]
        top_counts = [int(c) for _, c in top]
        top_set = set(top_haps)
        other_count = int(total - sum(top_counts))

        block_size = e - s + 1
        other_minor_counts = np.zeros(block_size, dtype=np.float64)
        other_haps = 0
        for col in control_cols:
            for h in (hA_all[col], hB_all[col]):
                if h in top_set:
                    continue
                other_haps += 1
                for j, allele in enumerate(h):
                    if allele == minor[s + j]:
                        other_minor_counts[j] += 1.0

        if other_haps > 0:
            other_minor_frac = (other_minor_counts / other_haps).tolist()
        else:
            other_minor_frac = p_ctrl_pc[s : e + 1].tolist()

        block_payload.append({
            "block_id": int(block_id),
            "start_snp_index": int(s),
            "end_snp_index": int(e),
            "num_snps": int(block_size),
            "total_haplotypes_counted": total,
            "top_k": int(top_k),
            "top_haplotypes": top_haps,
            "top_counts": top_counts,
            "other_count": other_count,
            "other_minor_frac": other_minor_frac,
        })

    out = HAP_DIR / f"{region_name}.control_haplotypes.phased_compatible.json"
    payload = {
        "region_name": region_name,
        "note": "Built on phased SNP intersection; control uses all phased individuals (no ID overlap required).",
        "counts_from": "CONTROL cohort only, restricted to phased-compatible individuals",
        "phased_compatible": {
            "num_individuals_total": int(len(phased_ids)),
            "num_control_individuals_used": int(len(control_cols)),
            "num_snps_total": int(len(pc["snp_ids_sub"])),
        },
        "blocks": block_payload,
    }
    out.write_text(json.dumps(payload, indent=2))
    print(f"Saved control haplotypes → {out.relative_to(PROJECT_ROOT)}")
    return out


phase_info = json.loads((PHASE_INFO_DIR / f"{CASE_POP.lower()}_phased_paths.json").read_text())
PHASE_CHR2 = Path(phase_info["CEU"]["chr2"]["path"])
PHASE_CHR10 = Path(phase_info["CEU"]["chr10"]["path"])

for name in ["ceu_chr2", "ceu_chr10"]:
    if name not in globals():
        raise RuntimeError(f"Missing `{name}`. Run the region+cohorts loading cell first.")

if "pc2" not in globals() or "match_method" not in pc2 or "phased_individuals" not in pc2:
    df_phase_chr2 = load_phased_df(PHASE_CHR2)
    pc2 = make_phased_compatible_region(ceu_chr2, df_phase_chr2, "chr2_5Mb", PHASE_CHR2.name)
    print(" Created pc2")
else:
    print(" pc2 already exists")

if "pc10" not in globals() or "match_method" not in pc10 or "phased_individuals" not in pc10:
    df_phase_chr10 = load_phased_df(PHASE_CHR10)
    pc10 = make_phased_compatible_region(ceu_chr10, df_phase_chr10, "chr10_1Mb", PHASE_CHR10.name)
    print(" Created pc10")
else:
    print(" pc10 already exists")

print(f"[chr2_5Mb] SNPs matched={len(pc2['snp_ids_sub'])}, phased individuals={len(pc2['phased_individuals'])}, match={pc2['match_method']}")
print(f"[chr10_1Mb] SNPs matched={len(pc10['snp_ids_sub'])}, phased individuals={len(pc10['phased_individuals'])}, match={pc10['match_method']}")


TAG_CHR2 = f"CEU_chr2_5Mb.common_with_{CASE_POP}"
TAG_CHR10 = f"CEU_chr10_1Mb.common_with_{CASE_POP}"


BLOCK_PARAMS = {"threshold": 0.8, "min_snps": 5, "max_snps": 80}

print("Building phased-compatible blocks for chr2")
pc2_adj = adjacent_r2_from_G(pc2["G_sub"])
blocks_chr2 = build_blocks_from_adjacent_r2(pc2_adj, **BLOCK_PARAMS)
print(f" chr2 phased-compatible SNPs: {pc2['G_sub'].shape[1]} | blocks: {len(blocks_chr2)}")

print("Building phased-compatible blocks for chr10")
pc10_adj = adjacent_r2_from_G(pc10["G_sub"])
blocks_chr10 = build_blocks_from_adjacent_r2(pc10_adj, **BLOCK_PARAMS)
print(f" chr10 phased-compatible SNPs: {pc10['G_sub'].shape[1]} | blocks: {len(blocks_chr10)}")



def save_blocks(tag, blocks, block_params, pc):
    out = BLOCK_DIR / f"{tag}.blocks.json"
    payload = {
        "region_tag": tag,
        "block_params": block_params,
        "num_snps_phased_compatible": int(pc["G_sub"].shape[1]),
        "num_blocks": int(len(blocks)),
        "blocks": [
            {
                "start": int(s),
                "end": int(e),
                "num_snps": int(e - s + 1),
            }
            for (s, e) in blocks
        ],
        "snp_ids_sub": [str(x) for x in pc["snp_ids_sub"]],
        "match_method": pc.get("match_method", "unknown"),
        "phased_source": pc.get("phased_source", "unknown"),
    }
    out.write_text(json.dumps(payload, indent=2))
    print(f"Saved blocks -> {out.relative_to(PROJECT_ROOT)}")
    return out


blocks_chr2_out = save_blocks(TAG_CHR2, blocks_chr2, BLOCK_PARAMS, pc2)
blocks_chr10_out = save_blocks(TAG_CHR10, blocks_chr10, BLOCK_PARAMS, pc10)

hap_chr2_out = save_control_haplotypes(TAG_CHR2, blocks_chr2, pc2, None, top_k=TOP_K)
hap_chr10_out = save_control_haplotypes(TAG_CHR10, blocks_chr10, pc10, None, top_k=TOP_K)

print("Done building phased-compatible control haplotype histograms.")
print("Saved blocks:", blocks_chr2_out, blocks_chr10_out)
print("Saved haplotypes:", hap_chr2_out, hap_chr10_out)
print(" chr2 →", hap_chr2_out.relative_to(PROJECT_ROOT))
print(" chr10 →", hap_chr10_out.relative_to(PROJECT_ROOT))


Phased columns: file=hapmap3_r2_b36_fwd.consensus.qc.poly.chr2_ceu.phased.gz, rsID_col=rsID, pos_col=None, rows=116430
[chr2_5Mb] Phased individuals: 88
[chr2_5Mb] SNPs matched: 274 / 300 (method=rsid)
[chr2_5Mb] Phased allele matrices: A=(274, 88), B=(274, 88)
 Created pc2
Phased columns: file=hapmap3_r2_b36_fwd.consensus.qc.poly.chr10_ceu.phased.gz, rsID_col=rsID, pos_col=None, rows=73832
[chr10_1Mb] Phased individuals: 88
[chr10_1Mb] SNPs matched: 541 / 588 (method=rsid)
[chr10_1Mb] Phased allele matrices: A=(541, 88), B=(541, 88)
 Created pc10
[chr2_5Mb] SNPs matched=274, phased individuals=88, match=rsid
[chr10_1Mb] SNPs matched=541, phased individuals=88, match=rsid
Building phased-compatible blocks for chr2
 chr2 phased-compatible SNPs: 274 | blocks: 5
Building phased-compatible blocks for chr10
 chr10 phased-compatible SNPs: 541 | blocks: 10
Saved blocks -> data/processed/hapmap/blocks/CEU_chr2_5Mb.common_with_MKK.blocks.json
Saved blocks -> data/processed/hapmap/blocks/CEU_chr

In [11]:
# -----------------------------
# Final file existence check
# -----------------------------

def _check(path: Path, label: str):
    ok = path.exists()
    status = "OK:" if ok else "PROBLEM:"
    print(f"{status} {label}: {path.relative_to(PROJECT_ROOT)}")

print("Raw genotypes:")
_check(GENO_DIR / f"genotypes_chr2_CEU_phase3.2_nr.b36_fwd.txt.gz", "CEU genotypes chr2")
_check(GENO_DIR / f"genotypes_chr10_CEU_phase3.2_nr.b36_fwd.txt.gz", "CEU genotypes chr10")
_check(GENO_DIR / f"genotypes_chr2_{CASE_POP}_phase3.2_nr.b36_fwd.txt.gz", f"{CASE_POP} genotypes chr2")
_check(GENO_DIR / f"genotypes_chr10_{CASE_POP}_phase3.2_nr.b36_fwd.txt.gz", f"{CASE_POP} genotypes chr10")

print("Aligned regions:")
_check(ALIGNED_CEU_CHR2, "CEU chr2 aligned")
_check(ALIGNED_CEU_CHR10, "CEU chr10 aligned")
_check(ALIGNED_CASE_CHR2, f"{CASE_POP} chr2 aligned")
_check(ALIGNED_CASE_CHR10, f"{CASE_POP} chr10 aligned")

print("Cohorts:")
_check(COHORT_DIR / "hapmap_case_pop.json", "case_pop JSON")
_check(COHORT_DIR / "hapmap_CEU_control_chr2_5Mb.npz", "CEU control chr2")
_check(COHORT_DIR / "hapmap_CEU_control_chr10_1Mb.npz", "CEU control chr10")
_check(COHORT_DIR / f"hapmap_{CASE_POP}_case_train_chr2_5Mb.npz", f"{CASE_POP} case train chr2")
_check(COHORT_DIR / f"hapmap_{CASE_POP}_case_test_chr2_5Mb.npz", f"{CASE_POP} case test chr2")
_check(COHORT_DIR / f"hapmap_{CASE_POP}_case_train_chr10_1Mb.npz", f"{CASE_POP} case train chr10")
_check(COHORT_DIR / f"hapmap_{CASE_POP}_case_test_chr10_1Mb.npz", f"{CASE_POP} case test chr10")

print("Blocks + haplotypes:")
_check(BLOCK_DIR / f"CEU_chr2_5Mb.common_with_{CASE_POP}.blocks.json", "Blocks chr2")
_check(BLOCK_DIR / f"CEU_chr10_1Mb.common_with_{CASE_POP}.blocks.json", "Blocks chr10")
_check(HAP_DIR / f"CEU_chr2_5Mb.common_with_{CASE_POP}.control_haplotypes.phased_compatible.json", "Haplotypes chr2")
_check(HAP_DIR / f"CEU_chr10_1Mb.common_with_{CASE_POP}.control_haplotypes.phased_compatible.json", "Haplotypes chr10")

print("Phased paths JSON:")
_check(PHASE_INFO_DIR / f"{CASE_POP.lower()}_phased_paths.json", f"{CASE_POP} phased paths")


Raw genotypes:
OK: CEU genotypes chr2: data/raw/hapmap/genotypes/genotypes_chr2_CEU_phase3.2_nr.b36_fwd.txt.gz
OK: CEU genotypes chr10: data/raw/hapmap/genotypes/genotypes_chr10_CEU_phase3.2_nr.b36_fwd.txt.gz
OK: MKK genotypes chr2: data/raw/hapmap/genotypes/genotypes_chr2_MKK_phase3.2_nr.b36_fwd.txt.gz
OK: MKK genotypes chr10: data/raw/hapmap/genotypes/genotypes_chr10_MKK_phase3.2_nr.b36_fwd.txt.gz
Aligned regions:
OK: CEU chr2 aligned: data/processed/hapmap/regions/CEU_chr2_5Mb.common_with_MKK.npz
OK: CEU chr10 aligned: data/processed/hapmap/regions/CEU_chr10_1Mb.common_with_MKK.npz
OK: MKK chr2 aligned: data/processed/hapmap/regions/MKK_chr2_5Mb.common_with_CEU.npz
OK: MKK chr10 aligned: data/processed/hapmap/regions/MKK_chr10_1Mb.common_with_CEU.npz
Cohorts:
OK: case_pop JSON: data/processed/hapmap/cohorts/hapmap_case_pop.json
OK: CEU control chr2: data/processed/hapmap/cohorts/hapmap_CEU_control_chr2_5Mb.npz
OK: CEU control chr10: data/processed/hapmap/cohorts/hapmap_CEU_control_c