# ⚠️ Data setup instructions

This notebook generates `apasites_with_negatives.csv` by combining PolyASite positive sites
with sampled negative controls and extracting sequence windows from hg38.

The required input files are too large for GitHub. To run this notebook locally, make sure
you download the following and place them in the **working directory** (same folder as this notebook):

1. **PolyASite clusters (hg38)**
   - Download: https://polyasite.unibas.ch/download/ (choose hg38 clusters BED)
   - Save as: `polyasite_hg38_clusters.bed`

2. **hg38 chromosome sizes**
   - UCSC file: http://hgdownload.soe.ucsc.edu/goldenPath/hg38/bigZips/hg38.chrom.sizes
   - Save as: `hg38.chrom.sizes`

3. **hg38 genome FASTA**
   - Download and unzip (UCSC version with `chr*` names):
     ```bash
     curl -L https://hgdownload.soe.ucsc.edu/goldenPath/hg38/bigZips/latest/hg38.fa.gz -o hg38.fa.gz
     gunzip -f hg38.fa.gz
     ```
   - This creates `hg38.fa`

4. **FASTA index**
   - Required for fast random access:
     ```bash
     samtools faidx hg38.fa
     ```
   - This creates `hg38.fa.fai` alongside `hg38.fa`

---

After downloading, you should have these files in the notebook directory:

- `polyasite_hg38_clusters.bed`
- `hg38.chrom.sizes`
- `hg38.fa`
- `hg38.fa.fai`

The notebook will then generate:
- `apasites_with_negatives.csv` → balanced positives and negatives with sequence windows

In [2]:
#!/usr/bin/env python3
"""
Generate matched negatives for PolyASite positives, normalize chromosome names,
and write a strand-oriented sequence window per row.

Improvements in this version:
- Robust FASTA handling: auto-detects whether FASTA uses 'chr' prefixes and maps names.
- Sequence extraction is strand-oriented (reverse-complement for '-' strand).
- Optional filtering to primary chromosomes and skipping ALT/unlocalized contigs.

Example (notebook or CLI):
  python generate_negatives.py \
    --positives polyasite_hg38_clusters.bed \
    --chrom-sizes hg38.chrom.sizes \
    --out apasites_with_negatives.csv \
    --k-neg 1 --min-distance 200 \
    --primary-only --skip-alt \
    --fasta hg38.fa --window-up 200 --window-down 100
"""

import argparse
import csv
import random
import sys
from collections import defaultdict
from typing import Dict, List, Tuple, Optional

# ------------------------------
# I/O helpers
# ------------------------------

def read_chrom_sizes(path: str) -> Dict[str, int]:
    sizes = {}
    with open(path) as f:
        for line in f:
            if not line.strip() or line.startswith("#"):
                continue
            parts = line.strip().split()
            if len(parts) >= 2:
                sizes[parts[0]] = int(parts[1])
    if not sizes:
        raise ValueError(f"No chromosome sizes parsed from {path}")
    return sizes

def parse_bed_line(line: str):
    parts = line.rstrip("\n").split("\t")
    if len(parts) < 3:
        return None
    chrom = parts[0]
    start = int(parts[1])
    end = int(parts[2])
    name = parts[3] if len(parts) > 3 else ""
    score = parts[4] if len(parts) > 4 else "0"
    strand = parts[5] if len(parts) > 5 else "+"
    return chrom, start, end, name, score, strand

# ------------------------------
# Chromosome name normalization
# ------------------------------

PRIMARY_SET_NOCHR = {str(i) for i in range(1, 23)} | {"X", "Y", "M", "MT"}
PRIMARY_SET_CHR   = {f"chr{i}" for i in range(1, 23)} | {"chrX", "chrY", "chrM"}

def has_chr_prefix(names: List[str]) -> bool:
    return any(n.startswith("chr") for n in names)

def looks_alt_or_unlocalized(ch: str) -> bool:
    base = ch.replace("chr", "")
    return ("_" in ch) or base.startswith("GL") or base.startswith("KI")

def normalize_to_target(ch: str, target_has_chr: bool) -> str:
    """
    Normalize a chromosome name to target 'chr' style and harmonize mito.
    """
    c = ch
    if target_has_chr:
        if not c.startswith("chr"):
            c = "chr" + c
        if c in {"chrMT", "chrMt", "chrm", "chrmt"}:
            c = "chrM"
    else:
        if c.startswith("chr"):
            c = c[3:]
        if c.upper() == "MT":
            c = "M"
    return c

def build_bed_chrom_mapper(sizes_keys: List[str], bed_keys: List[str]) -> Dict[str, str]:
    sizes_has_chr = has_chr_prefix(sizes_keys)
    mapper = {}
    for bch in bed_keys:
        mapper[bch] = normalize_to_target(bch, sizes_has_chr)
    return mapper

# ------------------------------
# FASTA chrom-style helpers
# ------------------------------

def fasta_has_chr_prefix(fa) -> bool:
    try:
        for k in fa.keys():
            # As soon as we see a key, decide
            return k.startswith("chr")
    except Exception:
        return False
    return False

def normalize_for_fasta(ch: str, fasta_target_has_chr: bool) -> str:
    """
    Translate a normalized BED/CSV chrom name to the style used by the FASTA.
    """
    return normalize_to_target(ch, fasta_target_has_chr)

# ------------------------------
# BED readers using normalization and filters
# ------------------------------

def read_bed_points(
    path: str,
    chrom_mapper: Dict[str, str],
    primary_only: bool,
    skip_alt: bool,
    assume_strand_plus_if_missing: bool = True,
) -> Dict[str, List[Tuple[int, str, str]]]:
    """
    Convert BED intervals to cleavage points:
      if strand "+": pos = end - 1
      if strand "-": pos = start
    Returns dict: chrom -> list of (pos, strand, name) using *normalized* chrom keys.
    """
    d = defaultdict(list)

    with open(path) as f:
        for line in f:
            line=line.strip()
            if not line or line.startswith("#") or line.startswith("track"):
                continue
            rec = parse_bed_line(line)
            if rec is None:
                continue
            chrom, start, end, name, score, strand = rec

            norm_chrom = chrom_mapper.get(chrom, chrom)

            if primary_only:
                if has_chr_prefix([norm_chrom]):
                    if norm_chrom not in PRIMARY_SET_CHR:
                        continue
                else:
                    if norm_chrom not in PRIMARY_SET_NOCHR:
                        continue

            if skip_alt and looks_alt_or_unlocalized(norm_chrom):
                continue

            if strand not in ["+", "-"]:
                strand = "+" if assume_strand_plus_if_missing else strand

            pos = end - 1 if strand == "+" else start
            d[norm_chrom].append((pos, strand, name))

    for c in d:
        d[c].sort(key=lambda x: x[0])

    if not d:
        sys.stderr.write(
            "[WARN] No BED points loaded after normalization/filters. "
            "Check chromosome styles or filters.\n"
        )
    return d

def read_bed_regions(
    path: str,
    chrom_mapper: Dict[str, str],
    primary_only: bool,
    skip_alt: bool,
) -> Dict[str, List[Tuple[int,int,str]]]:
    d = defaultdict(list)
    rid = 0
    with open(path) as f:
        for line in f:
            line=line.strip()
            if not line or line.startswith("#") or line.startswith("track"):
                continue
            rec = parse_bed_line(line)
            if rec is None:
                continue
            chrom, start, end, name, score, strand = rec

            norm_chrom = chrom_mapper.get(chrom, chrom)

            if primary_only:
                if has_chr_prefix([norm_chrom]):
                    if norm_chrom not in PRIMARY_SET_CHR:
                        continue
                else:
                    if norm_chrom not in PRIMARY_SET_NOCHR:
                        continue

            if skip_alt and looks_alt_or_unlocalized(norm_chrom):
                continue

            region_id = name if name else f"region_{rid}"
            rid += 1
            d[norm_chrom].append((start, end, region_id))

    for c in d:
        d[c].sort()
    return d

# ------------------------------
# Interval / sampling helpers
# ------------------------------

def build_positive_buffers(positives: List[int], min_distance: int) -> List[Tuple[int,int]]:
    intervals = []
    for p in positives:
        s = max(0, p - min_distance)
        e = p + min_distance + 1
        intervals.append((s,e))
    if not intervals:
        return intervals
    intervals.sort()
    merged = [intervals[0]]
    for s,e in intervals[1:]:
        ps,pe = merged[-1]
        if s <= pe:
            merged[-1] = (ps, max(pe, e))
        else:
            merged.append((s,e))
    return merged

def contains_pos(intervals: List[Tuple[int,int]], pos: int) -> bool:
    lo, hi = 0, len(intervals)
    while lo < hi:
        mid = (lo+hi)//2
        if intervals[mid][0] <= pos:
            lo = mid + 1
        else:
            hi = mid
    for i in range(max(0, lo-5), min(len(intervals), lo+5)):
        s,e = intervals[i]
        if s <= pos < e:
            return True
    return False

# ------------------------------
# Sequence helpers
# ------------------------------

_RC = str.maketrans({"A":"T","T":"A","G":"C","C":"G","N":"N"})

def reverse_complement(seq: str) -> str:
    return seq.translate(_RC)[::-1]

def _fa_ok(fa) -> bool:
    return (fa is not None) and hasattr(fa, "get_seq")

def _extract_seq_upper(fa, chrom, start_1based, end_inclusive):
    """
    pyfaidx can return either an object with `.seq` or a plain str (when as_raw=True).
    Normalize to an uppercase A/C/G/T/N string here.
    """
    res = fa.get_seq(chrom, start_1based, end_inclusive)
    s = res.seq if hasattr(res, "seq") else str(res)
    s = s.upper()
    # keep only A/C/G/T/N
    return "".join(c if c in "ACGTN" else "N" for c in s)

def fetch_window_seq(
    fa,
    chrom: str,
    pos: int,
    strand: str,
    up: int,
    down: int,
    chrom_len: Optional[int],
    fasta_target_has_chr: Optional[bool]
) -> str:
    if fa is None or not hasattr(fa, "get_seq") or chrom_len is None:
        return ""
    ch_for_fa = normalize_for_fasta(chrom, bool(fasta_target_has_chr))
    start = max(0, pos - up)
    end   = min(chrom_len, pos + down)
    if end <= start:
        return ""
    try:
        # pyfaidx coordinates are 1-based inclusive
        s = _extract_seq_upper(fa, ch_for_fa, start + 1, end)
    except Exception:
        return ""
    if strand == "-":
        s = reverse_complement(s)
    return s

def motif_present_upstream(
    fa,
    chrom: str,
    pos: int,
    strand: str,
    upstream_len: int,
    motifs: List[str],
    fasta_target_has_chr: Optional[bool]
) -> bool:
    if fa is None or not hasattr(fa, "get_seq"):
        return False
    ch_for_fa = normalize_for_fasta(chrom, bool(fasta_target_has_chr))
    start = max(0, pos - upstream_len)
    end = pos
    if start >= end:
        return False
    try:
        seq = _extract_seq_upper(fa, ch_for_fa, start + 1, end)
    except Exception:
        return False
    if strand == "-":
        seq = reverse_complement(seq)
    seq = seq.replace("U","T")
    for m in motifs:
        if m and m.upper().replace("U","T") in seq:
            return True
    return False

# ------------------------------
# Main
# ------------------------------

def main():
    ap = argparse.ArgumentParser(description="Generate matched negatives for PolyASite positives.")
    ap.add_argument("--positives", required=True, help="PolyASite BED (BED3/BED6), hg38")
    ap.add_argument("--chrom-sizes", required=True, help="chrom sizes TSV (chrom\\tlength)")
    ap.add_argument("--out", required=True, help="Output CSV")

    ap.add_argument("--k-neg", type=int, default=1, help="Negatives per positive")
    ap.add_argument("--min-distance", type=int, default=200, help="Min distance from any positive (bp)")

    ap.add_argument("--mask-bed", default=None, help="Optional BED to constrain where negatives can be sampled (e.g., 3'UTRs)")
    ap.add_argument("--primary-only", action="store_true", help="Keep only primary chromosomes (1–22, X, Y, M/MT)")
    ap.add_argument("--skip-alt", action="store_true", help="Skip ALT/unlocalized contigs (contain '_' or start with GL/KI)")

    ap.add_argument("--fasta", default=None, help="Optional hg38 FASTA for sequence extraction & motif-aware filtering")
    ap.add_argument("--window-up", type=int, default=200, help="Upstream window size (bp) for 'sequence' column")
    ap.add_argument("--window-down", type=int, default=100, help="Downstream window size (bp) for 'sequence' column")

    ap.add_argument("--motif-aware", action="store_true", help="Avoid negatives with strong PAS motifs upstream")
    ap.add_argument("--motif-upstream", type=int, default=40, help="Upstream window for motif screening (bp)")
    ap.add_argument("--motif-list", default="AAUAAA,AUUAAA,AGUAAA", help="Comma-separated PAS-like hexamers (RNA alphabet allowed)")

    ap.add_argument("--seed", type=int, default=42, help="Random seed")
    args = ap.parse_args()

    random.seed(args.seed)

    # 1) Load chrom sizes
    chrom_sizes = read_chrom_sizes(args.chrom_sizes)
    sizes_keys = list(chrom_sizes.keys())

    # 2) Build chrom mapper by peeking at BED chrom names
    bed_chroms = set()
    with open(args.positives) as f:
        for line in f:
            s = line.strip()
            if not s or s.startswith("#") or s.startswith("track"):
                continue
            rec = parse_bed_line(s)
            if rec is None:
                continue
            bed_chroms.add(rec[0])
            if len(bed_chroms) >= 2000:
                break
    bed_chroms = sorted(bed_chroms)

    chrom_mapper = build_bed_chrom_mapper(sizes_keys, bed_chroms)

    # 3) Read positives (normalized + filtered)
    pos_by_chrom = read_bed_points(
        args.positives,
        chrom_mapper=chrom_mapper,
        primary_only=args.primary_only,
        skip_alt=args.skip_alt,
        assume_strand_plus_if_missing=True,
    )

    # 4) Optional mask
    mask_by_chrom = None
    if args.mask_bed:
        mask_by_chrom = read_bed_regions(
            args.mask_bed,
            chrom_mapper=chrom_mapper,
            primary_only=args.primary_only,
            skip_alt=args.skip_alt,
        )

    # 5) Build positive buffers
    buffers_by_chrom = {
        chrom: build_positive_buffers([p for p,_,_ in plist], args.min_distance)
        for chrom, plist in pos_by_chrom.items()
    }

    # 6) FASTA / pyfaidx -> build a handle and detect its chrom style
    fasta_handle = None
    fasta_target_has_chr = None
    if args.fasta:
        try:
            from pyfaidx import Fasta  # type: ignore
            fasta_handle = Fasta(str(args.fasta), as_raw=True, sequence_always_upper=True)
            fasta_target_has_chr = fasta_has_chr_prefix(fasta_handle)
        except Exception as e:
            print("[WARN] Could not import/use pyfaidx; sequence and motif-aware filtering will be skipped.", e)
            fasta_handle = None
            fasta_target_has_chr = None
            args.motif_aware = False

    motifs = [m.strip() for m in args.motif_list.split(",") if m.strip()]

    # 7) Prepare output rows: positives first
    out_rows = []
    n_pos = 0
    for chrom, plist in pos_by_chrom.items():
        chrom_len = chrom_sizes.get(chrom)
        for pos, strand, name in plist:
            seq = fetch_window_seq(
                fasta_handle, chrom, pos, strand,
                args.window_up, args.window_down, chrom_len,
                fasta_target_has_chr
            )
            out_rows.append({
                "chrom": chrom,
                "pos": pos,
                "strand": strand,
                "label": 1,
                "sequence": seq,
                "source_id": name,
                "region_id": "",
                "notes": ""
            })
            n_pos += 1

    # 8) Sample negatives
    n_neg = 0
    n_skipped_chrom = 0
    for chrom, plist in pos_by_chrom.items():
        chrom_len = chrom_sizes.get(chrom, None)
        if chrom_len is None:
            print(f"[WARN] chrom {chrom} not found in chrom sizes; skipping negatives for this chrom.")
            n_skipped_chrom += 1
            continue

        buffers = buffers_by_chrom[chrom]
        mask = mask_by_chrom.get(chrom, []) if mask_by_chrom else None

        for pos, strand, name in plist:
            to_add = args.k_neg
            attempts = 0
            while to_add > 0 and attempts < 5000:
                attempts += 1

                # sample candidate
                if mask:
                    start, end, rid = random.choice(mask)
                    if end - start <= 1:
                        continue
                    cand = random.randint(start, end - 1)
                    cand_region = rid
                else:
                    cand = random.randint(0, chrom_len - 1)
                    cand_region = ""

                # distance from any positive
                if contains_pos(buffers, cand):
                    continue

                # motif-aware upstream screen
                if args.motif_aware and motif_present_upstream(
                    fasta_handle, chrom, cand, strand,
                    args.motif_upstream, motifs, fasta_target_has_chr
                ):
                    continue

                seq = fetch_window_seq(
                    fasta_handle, chrom, cand, strand,
                    args.window_up, args.window_down, chrom_len,
                    fasta_target_has_chr
                )
                out_rows.append({
                    "chrom": chrom,
                    "pos": cand,
                    "strand": strand,
                    "label": 0,
                    "sequence": seq,
                    "source_id": "",
                    "region_id": cand_region,
                    "notes": "neg"
                })
                n_neg += 1
                to_add -= 1

            if to_add > 0:
                print(f"[WARN] Only generated {args.k_neg - to_add}/{args.k_neg} negatives for {chrom}:{pos}{strand} after {attempts} attempts.")

    # 9) Write CSV
    fieldnames = ["chrom","pos","strand","label","sequence","source_id","region_id","notes"]
    with open(args.out, "w", newline="") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in out_rows:
            w.writerow(r)

    print(f"[OK] Wrote {len(out_rows)} rows (pos+neg) to {args.out}  |  positives={n_pos}, negatives={n_neg}, skipped_chroms={n_skipped_chrom}")
    if n_neg == 0:
        print("[HINT] If negatives are zero, ensure chromosome styles match (chr vs no-chr) and consider --primary-only/--skip-alt.")
    if fasta_handle is None:
        print("[HINT] No FASTA provided or pyfaidx unavailable: 'sequence' column will be empty. Pass --fasta hg38.fa to populate it.")

if __name__ == "__main__":
    main()

usage: ipykernel_launcher.py [-h] --positives POSITIVES --chrom-sizes
                             CHROM_SIZES --out OUT [--k-neg K_NEG]
                             [--min-distance MIN_DISTANCE]
                             [--mask-bed MASK_BED] [--primary-only]
                             [--skip-alt] [--fasta FASTA]
                             [--window-up WINDOW_UP]
                             [--window-down WINDOW_DOWN] [--motif-aware]
                             [--motif-upstream MOTIF_UPSTREAM]
                             [--motif-list MOTIF_LIST] [--seed SEED]
ipykernel_launcher.py: error: the following arguments are required: --positives, --chrom-sizes, --out


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [3]:
# !pip install pyfaidx  # if needed

import os, shlex, sys
positives_bed = "polyasite_hg38_clusters.bed"
chrom_sizes   = "hg38.chrom.sizes"
out_csv       = "apasites_with_negatives.csv"
hg38_fasta    = "hg38.fa"

args = f"""
--positives {positives_bed}
--chrom-sizes {chrom_sizes}
--out {out_csv}
--k-neg 1
--min-distance 200
--primary-only
--skip-alt
--fasta {hg38_fasta}
--window-up 200
--window-down 100
""".strip()

for p in [positives_bed, chrom_sizes, hg38_fasta]:
    if not os.path.exists(p):
        raise FileNotFoundError(f"Path not found: {p}")

sys.argv = ["generate_negatives.py"] + shlex.split(args)
main() 

[OK] Wrote 1137216 rows (pos+neg) to apasites_with_negatives.csv  |  positives=568608, negatives=568608, skipped_chroms=0


unset GITHUB_TOKEN