In [None]:
from __future__ import annotations

import re
from pathlib import Path
from typing import Iterator, Optional, Tuple, Set, List

# ===== Patterns / constants =====
HEADER_LINE_RE = re.compile(r"^\s*rsid\s+chromosome\s+position\s+genotype\s*$", re.I)
_RS_PREFIX_RE  = re.compile(r"^rs\d+$", re.I)
BUILD_RE       = re.compile(r"(build\s*(36|37|38)|grch\s*(36|37|38)|hg(18|19|38))", re.I)

def _normalize_build_tag(s: str) -> Optional[str]:
    """Return '36','37','38' or None from a build-ish string."""
    s = s.lower()
    if "hg18" in s or "36" in s:
        return "36"
    if "hg19" in s or "37" in s:
        return "37"
    if "hg38" in s or "38" in s:
        return "38"
    return None

def _detect_build_from_txt(path: Path) -> Optional[str]:
    """Scan comment/header lines to detect build (returns '36','37','38' or None)."""
    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for raw in f:
            s = raw.strip()
            if not s:
                continue
            if s.startswith("#"):
                m = BUILD_RE.search(s)
                if m:
                    b = _normalize_build_tag(m.group(0))
                    if b:
                        return b
                continue
            # stop once we hit first data/header line
            break
    return None

def find_txt_files(root: Path, follow_symlinks: bool = False) -> List[Path]:
    if follow_symlinks:
        results: List[Path] = []
        for p in root.rglob("*.txt"):
            try:
                if (p.is_file() or (p.is_symlink() and p.resolve().is_file())):
                    results.append(p.resolve())
            except FileNotFoundError:
                continue
        return sorted(set(results))
    return sorted([p.resolve() for p in root.rglob("*.txt") if p.is_file()])

def _iter_data_lines(path: Path) -> Iterator[str]:
    """Yield only data rows (skip blank, comment, and explicit header rows)."""
    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for raw in f:
            s = raw.strip()
            if not s or s.startswith("#") or HEADER_LINE_RE.match(s):
                continue
            yield raw.rstrip("\n")  # keep original spacing/tabs for output

def _parse_minimal(s: str) -> Optional[Tuple[str, str, str]]:
    """Return (id_token, chrom, pos) or None if malformed."""
    parts = re.split(r"\s+", s)
    if len(parts) < 3:
        return None
    return parts[0], parts[1], parts[2]

def build_outputs(
    root: str | Path,
    out_rsids: str | Path = "./work/23andme_rsids.txt",
    out_nonrsid_grch37: str | Path = "./work/23andme_nonrsid_grch37.txt",
    follow_symlinks: bool = True,
) -> dict:
    root = Path(root)
    out_rsids = Path(out_rsids)
    out_nonrsid_grch37 = Path(out_nonrsid_grch37)
    out_rsids.parent.mkdir(parents=True, exist_ok=True)
    out_nonrsid_grch37.parent.mkdir(parents=True, exist_ok=True)

    all_rsids: Set[str] = set()
    nonrsid_lines_grch37: Set[str] = set()

    txts = find_txt_files(root, follow_symlinks=follow_symlinks)
    scanned = 0
    for p in txts:
        scanned += 1
        build = _detect_build_from_txt(p)  # '36','37','38' or None
        for line in _iter_data_lines(p):
            parsed = _parse_minimal(line)
            if not parsed:
                continue
            id0, chrom, pos = parsed
            if _RS_PREFIX_RE.match(id0):
                all_rsids.add(id0.lower())
            else:
                if build == "37":
                    # keep original row text for output
                    nonrsid_lines_grch37.add(line)

    # Write rsIDs (sorted naturally by numeric part)
    def _rs_sort_key(rs: str):
        m = re.match(r"^rs(\d+)$", rs)
        return (int(m.group(1)) if m else 10**18, rs)

    with out_rsids.open("w", encoding="utf-8") as f:
        for rs in sorted(all_rsids, key=_rs_sort_key):
            f.write(rs + "\n")

    # Write GRCh37 non-rs rows (sorted for determinism by chrom,pos,id when parseable)
    def _line_sort_key(s: str):
        parts = re.split(r"\s+", s)
        chrom = parts[1] if len(parts) > 1 else "ZZ"
        pos = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else 10**18
        id0 = parts[0] if parts else "~"
        # chromosome order: 1..22,X,Y,MT, then others
        CHROM_ORDER = {**{str(i): i for i in range(1, 23)}, "X": 23, "Y": 24, "MT": 25, "M": 25}
        chrom_rank = CHROM_ORDER.get(chrom.upper(), 10**6)
        return (chrom_rank, pos, id0.lower())

    with out_nonrsid_grch37.open("w", encoding="utf-8") as f:
        for row in sorted(nonrsid_lines_grch37, key=_line_sort_key):
            f.write(row + "\n")

    return {
        "scanned_files": scanned,
        "unique_rsids": len(all_rsids),
        "nonrsid_rows_grch37": len(nonrsid_lines_grch37),
        "out_rsids": str(out_rsids),
        "out_nonrsid_grch37": str(out_nonrsid_grch37),
    }


In [None]:
!find ./biovault-data/snp/23andme -type f -name '*.zip' > zips.txt

In [None]:
%%bash
while IFS= read -r zipfile; do
  dir="$(dirname "$zipfile")"
  echo "Unzipping: $zipfile -> $dir"
  unzip -o -q -d "$dir" "$zipfile"
done < zips.txt


In [None]:
!find ./biovault-data/snp/23andme -name '__MACOSX' -type d -exec rm -rf {} +
!find ./biovault-data/snp/23andme -type f -name '._*' -delete
!find ./biovault-data/snp/23andme -type f -name '*.txt'

In [None]:
stats = build_outputs("./biovault-data/snp/23andme")
stats