# Em's VCF pipeline  
A simple step-by-step workbook to take reads, align to a reference genome and generate per-sample vcfs or gvcf files.  
_The real work cells start after the methods section_

## _Methods section for you to copy in on any publications._

**Option A — Ultra‑concise (Nature/Science one‑liner)**  
Paired‑end reads were quality‑checked (FastQC), adapter/quality‑trimmed (cutadapt with NextSeq poly‑G trimming), aligned to the reference with BWA‑MEM2, sorted and duplicate‑marked, and small variants were called with FreeBayes in genome windows and concatenated/indexed with BCFtools; per‑sample VCFs were exported from the cohort VCF. The pipeline is publicaly available: https://github.com/Stephen-Richards-scripts/_Ems_vcf_pipeline/tree/main. [1](https://www.bioinformatics.babraham.ac.uk/projects/fastqc/)[2](https://cutadapt.readthedocs.io/en/stable/)[3](https://github.com/bwa-mem2/bwa-mem2)[4](https://gatk.broadinstitute.org/hc/en-us/articles/360037224932-MarkDuplicatesSpark)[5](https://github.com/freebayes/freebayes)[6](https://samtools.github.io/bcftools/bcftools.html)

---

**Option B — Short paragraph with example commands and versions (for supplement or journals that permit brief methods)**  
We used a custom lab pipeline (https://github.com/Stephen-Richards-scripts/_Ems_vcf_pipeline). Raw FASTQ files were assessed with **FastQC** (v0.12.x) and summarized with **MultiQC** (v1.14). Adapter and quality trimming used **cutadapt** (v5.x) with `--nextseq-trim=20`, `-q 15,15`, `-m 36`, `--pair-filter=any`, and `--trim-n`, supplying i5/i7 adapter sequences where available. Reads were aligned to the reference FASTA with **BWA‑MEM2** (v2.2.x), piped to **SAMtools** (v1.18+) for coordinate sorting. PCR/optical duplicates were marked with **GATK MarkDuplicatesSpark** (v4.x; local Spark), then BAMs were indexed. Joint variant calling used **FreeBayes** (v1.3.x) in fixed‑size genomic windows; chunk VCFs were concatenated and indexed with **BCFtools** (v1.18+) and tabix. Individual per‑sample VCFs were exported from the cohort VCF with `bcftools view` and indexed. Key QC metrics (flagstat and bcftools stats) were recorded. Example commands (paths omitted):

```bash
# Reference indexing
samtools faidx ref.fa
bwa-mem2 index ref.fa

# QC and trimming
fastqc -t 2 -o qc/fastqc_raw *.fastq.gz
multiqc qc/fastqc_raw -o qc/fastqc_raw
cutadapt -j 6 -q 15,15 -m 36 --pair-filter=any --trim-n --nextseq-trim=20 \
  -g file:adapters.fasta -G file:adapters.fasta -a file:adapters.fasta -A file:adapters.fasta \
  -o sample.R1.trimmed.fastq.gz -p sample.R2.trimmed.fastq.gz \
  sample_R1.fastq.gz sample_R2.fastq.gz

# Align → sort
bwa-mem2 mem -t 32 -v 1 -R '@RG\tID:sample\tSM:sample\tPL:ILLUMINA' ref.fa \
  sample.R1.trimmed.fastq.gz sample.R2.trimmed.fastq.gz \
| samtools sort -@ 12 -o sample.sorted.bam

# Duplicate marking (default: GATK MarkDuplicatesSpark)
gatk MarkDuplicatesSpark -I sample.sorted.bam -O sample.dedup.bam \
  --spark-runner LOCAL --spark-master local[24]
samtools index sample.dedup.bam

# FreeBayes (example for one window), then concatenate all chunks
freebayes -f ref.fa --region "ctg1:1-500000" --min-mapping-quality 20 --min-base-quality 20 \
  --use-best-n-alleles 4 --limit-coverage 500 --skip-coverage 800 *.dedup.bam \
| bgzip -@ 1 -c > chunks/ctg1_1_500000.vcf.gz
bcftools concat --threads 8 -Oz -o cohort.freebayes.vcf.gz chunks/*.vcf.gz
bcftools index --threads 8 -t cohort.freebayes.vcf.gz

# Per-sample VCFs
bcftools query -l cohort.freebayes.vcf.gz | \
  xargs -I{} sh -c 'bcftools view -s "{}" -Oz -o per-sample/{}.vcf.gz cohort.freebayes.vcf.gz && \
                    bcftools index -t per-sample/{}.vcf.gz'


## 0. Environment Setup
All required software is inside the conda environment:  
**Bondlab_phylo_env**

You should see this kernel selected in the top‑right of your Jupyter window.  
If not, run the following on the FARM login node:

1. module load conda  
2. conda activate /group/jbondgrp2/stephenRichards/_conda_envs/Bondlab_phylo_env  
3. python -m ipykernel install --user --name Bondlab_phylo_env --display-name "Python [conda env:Bondlab_phylo_env]"

### 0.1 CPU Check
The next cell checks how many CPUs are available in your OnDemand session or SLURM allocation.  
If the number looks too small for your job, restart your Jupyter session with more CPUs.

In [1]:
# check cpus available and set global THREADS
import os
THREADS = int(os.getenv("SLURM_CPUS_PER_TASK"))
print("CPUs available:", THREADS)

CPUs available: 64


### 0.2 Python library imports  

In [2]:
# Standard library
import os
import sys
import glob
import gzip
import json
import math
import random
import re
import shutil
import subprocess
from pathlib import Path
from datetime import datetime

# UI and display
from ipywidgets import Dropdown, Button, VBox, HTML, Output
from IPython.display import display, clear_output, HTML, Markdown

# Data + plotting
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
global_start_time = datetime.now()

## 1. Input Data

This pipeline requires two inputs:
1. **A reference genome** (FASTA)
2. **A directory of Illumina paired‑end reads**, one directory per project or batch

Spider genomes are typically large (human‑sized or larger), and sequencing facilities often produce **many files**, usually:

- `sampleX_R1.fastq.gz`
- `sampleX_R2.fastq.gz`

Please name your read files **something meaningful** (e.g. `Genus_species_sampleID_R1.fastq.gz`).  
Long names are fine — meaningful names help track samples through the pipeline.

### 1.1 Preparing Your Input Files
Before running the pipeline, place your reference genome and read files in the expected locations on the FARM storage.  
**1. Reference Genome**  
If it is not alreeady there, copy your reference FASTA file into:  
/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/data/references/  

**2. Sequencing Reads**  
Create a new directory for your reads inside:  
_Ems_vcf_pipeline/data/read_directories/  
_The name of your new directory will be copied into the results directory wherre you will see the results._  

To save storage, **soft‑link** your FASTQ files into this directory:  
ln -s /path/to/your/reads/*.fastq.gz   new_directory/  

**3. Select Your Inputs Below**  
The next cell will let you choose:  
- the reference FASTA   
- the directory containing your read files   

Once selected, continue with the pipeline.

In [4]:
import os
from pathlib import Path
from ipywidgets import Dropdown, Button, VBox, Output
from IPython.display import display, clear_output, Markdown

out = Output()

PROJECT_ROOT = "/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline"
refs_dir     = f"{PROJECT_ROOT}/data/references"
reads_root   = f"{PROJECT_ROOT}/data/read_directories"

# --- Helper functions ---
def list_refs():
    return sorted([f for f in os.listdir(refs_dir)
                   if f.lower().endswith((".fa",".fasta",".fna"))])

def list_read_dirs():
    return sorted([d for d in os.listdir(reads_root)
                   if os.path.isdir(os.path.join(reads_root, d))])

def pair_fastqs(dirpath):
    """Return dict sample -> {R1,R2} and list of unpaired files."""
    files = sorted(Path(dirpath).glob("*.fastq*"))
    pairs = {}
    unpaired = []

    tmp = {}
    for f in files:
        b = f.name
        if "_R1" in b:
            stem = b.split("_R1")[0]
            tmp.setdefault(stem, {})["R1"] = f
        elif "_R2" in b:
            stem = b.split("_R2")[0]
            tmp.setdefault(stem, {})["R2"] = f
        else:
            unpaired.append(f)

    for stem, rec in tmp.items():
        if "R1" in rec and "R2" in rec:
            pairs[stem] = rec
        else:
            unpaired.extend(rec.values())

    return pairs, unpaired

# --- Widgets ---
ref_dd  = Dropdown(options=list_refs(), description="Reference:", layout={'width':'70%'})
reads_dd= Dropdown(options=list_read_dirs(), description="Reads:", layout={'width':'70%'})
btn     = Button(description="Select", button_style="success")

display(VBox([ref_dd, reads_dd, btn, out]))

def safe_symlink(target: Path, link_path: Path):
    """Create or refresh a symlink."""
    link_path.parent.mkdir(parents=True, exist_ok=True)
    if link_path.exists() or link_path.is_symlink():
        try:
            if link_path.is_symlink() and link_path.resolve() == target.resolve():
                return  # already correct
            link_path.unlink()
        except Exception:
            # As a fallback, rename to .bak and continue
            link_path.rename(link_path.with_suffix(link_path.suffix + ".bak"))
    link_path.symlink_to(target)

def on_click(_):
    clear_output(wait=True)
    display(VBox([ref_dd, reads_dd, btn]))

    ref_src = Path(refs_dir) / ref_dd.value         # canonical source FASTA
    rd      = Path(reads_root) / reads_dd.value

    pairs, unpaired = pair_fastqs(rd)

    # Create run directory and reference view
    run       = Path(PROJECT_ROOT) / "results" / reads_dd.value
    ref_work  = run / "reference"
    ref_work.mkdir(parents=True, exist_ok=True)

    # Symlink FASTA into the run/reference dir
    ref_link = ref_work / ref_src.name
    safe_symlink(ref_src, ref_link)

    # ---- Save to globals ----
    # Keep both the source FASTA and the symlink path:
    # - REF_FASTA_SRC: where indexes will be built and live (permanent)
    # - REF_FASTA:     a convenience alias used throughout (points to the symlink in run/reference)
    global REFERENCE, READS_DIR, PAIRS, UNPAIRED, RUN_DIR, REF_FASTA, REF_FASTA_SRC
    REFERENCE     = str(ref_link)     # maintained for backward compatibility with your later cells
    REF_FASTA     = REFERENCE         # alias used later
    REF_FASTA_SRC = str(ref_src)      # canonical source FASTA (index location)
    READS_DIR     = str(rd)
    PAIRS         = pairs
    UNPAIRED      = unpaired
    RUN_DIR       = str(run)

    # Summary
    with out:
        print("✓ Reference:", ref_src.name)
        print("✓ Read set:", reads_dd.value)
        print("✓ Samples detected:", len(pairs))
        if unpaired:
            print("⚠ Unpaired files:", len(unpaired))
        print("Run directory:", run)
        print("Reference symlink in run:", ref_link)

btn.on_click(on_click)

VBox(children=(Dropdown(description='Reference:', layout=Layout(width='70%'), options=('GCA_036925085.1_qdBraP…

## 2. Pipeline preparation:   
From here on, **_you can run automatically by selecting "Run selected cell and all below" from the Run menu._**   
Rough timings - 200 cores, millipede Brachycybe_productor 295Mb reference, 48 10X sample set - 4 hours 20 mins (still need to paralellize/fix the report)  
### 2.1 Index reference files (if necessary):  
This takes a few minutes to run, if the index files have not already been made.  
**Note the spider genomes need a lot of memory to index, 64Gb was not enough - I needed 128 Gb of mem for this cell to work**  
64 cores and 128Gb ram is probably a minimum to run a spider genome, I would try 240 and 480 (or a multiple of 32, so 224 cores and 448Gb ram as the bwa-mem2 alignment tried to use 32 cores per job, 256 would be better but thats the whole machine, so you might not get that much as easily if someone else is using it.)

In [5]:
## 2. Pipeline preparation: Index reference files (if necessary)
import os, subprocess
from pathlib import Path

# ---- Require globals from selection cell ----
need = {"REF_FASTA", "REF_FASTA_SRC", "RUN_DIR"}
missing = [k for k in need if k not in globals()]
if missing:
    raise RuntimeError(f"{', '.join(missing)} not set. Run the selection cell first.")

REF_FASTA_SRC = Path(REF_FASTA_SRC)             # canonical FASTA in refs_dir
REF_FASTA_LINK_DIR = Path(RUN_DIR) / "reference" # where we keep symlinks
REF_FASTA_LINK_DIR.mkdir(parents=True, exist_ok=True)

def run(cmd):
    print("  $", " ".join(map(str, cmd)))
    subprocess.run(cmd, check=True)

def safe_symlink(target: Path, link_path: Path):
    link_path.parent.mkdir(parents=True, exist_ok=True)
    if link_path.exists() or link_path.is_symlink():
        try:
            if link_path.is_symlink() and link_path.resolve() == target.resolve():
                return
            link_path.unlink()
        except Exception:
            link_path.rename(link_path.with_suffix(link_path.suffix + ".bak"))
    link_path.symlink_to(target)

fasta_src = str(REF_FASTA_SRC)
print("Reference FASTA (source):", fasta_src)
print("Run reference dir (symlinks):", REF_FASTA_LINK_DIR)

# ---- Expected index files for samtools & bwa-mem2 ----
# samtools faidx:
fai_src = REF_FASTA_SRC.with_suffix(REF_FASTA_SRC.suffix + ".fai")

# bwa-mem2 creates a set of files next to the FASTA; the common set is:
#   .0123  .amb  .ann  .bwt.2bit.64  .pac
bwa_exts = [".0123", ".amb", ".ann", ".bwt.2bit.64", ".pac"]
bwa_src_files = [REF_FASTA_SRC.with_suffix(REF_FASTA_SRC.suffix + ext) for ext in bwa_exts]

print("\n[1/2] samtools faidx")
if not fai_src.exists():
    print("  Creating .fai index in source directory")
    run(["samtools", "faidx", fasta_src])
else:
    print("  .fai index exists in source, skipping.")

print("\n[2/2] bwa-mem2 index")
# If any expected file is missing, (re)build
if not all(p.exists() for p in bwa_src_files):
    print("  Building bwa-mem2 index in source directory")
    run(["bwa-mem2", "index", fasta_src])
    # Refresh the list in case filenames vary by version
    bwa_src_files = [REF_FASTA_SRC.with_suffix(REF_FASTA_SRC.suffix + ext) for ext in bwa_exts]
else:
    print("  bwa-mem2 index exists in source, skipping.")

# ---- Ensure symlinks to FASTA and all index files in the run/reference dir ----
print("\nLinking into run/reference:")
# Link the FASTA itself
fasta_link = REF_FASTA_LINK_DIR / REF_FASTA_SRC.name
safe_symlink(REF_FASTA_SRC, fasta_link)
print("  ->", fasta_link)

# Link samtools faidx
fai_link = REF_FASTA_LINK_DIR / (REF_FASTA_SRC.name + ".fai")
safe_symlink(fai_src, fai_link)
print("  ->", fai_link)

# Link bwa-mem2 index files that actually exist
for p in bwa_src_files:
    if p.exists():
        link_p = REF_FASTA_LINK_DIR / p.name
        safe_symlink(p, link_p)
        print("  ->", link_p)

print("\nDone. Reference indexes live in source; run dir has symlinks.")

Reference FASTA (source): /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/data/references/GCA_036925085.1_qdBraProd1.0.pri_genomic.fa
Run reference dir (symlinks): /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/reference

[1/2] samtools faidx
  .fai index exists in source, skipping.

[2/2] bwa-mem2 index
  bwa-mem2 index exists in source, skipping.

Linking into run/reference:
  -> /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/reference/GCA_036925085.1_qdBraProd1.0.pri_genomic.fa
  -> /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/reference/GCA_036925085.1_qdBraProd1.0.pri_genomic.fa.fai
  -> /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/reference/GCA_036925085.1_qdBraProd1.0.pri_genomic.fa.0123

## 3. Pipeline: QC on raw reads
### 3.1 Runs FastQC on raw R1/R2 reads for each sample.  
If reports already exist, this cell will skip them.  
(Optionally run the next cell to build a MultiQC summary.)

In [6]:
## 3. QC on raw reads (FastQC) — parallel by sample
from pathlib import Path
import os, re, shutil, subprocess, concurrent.futures, time
from datetime import timedelta

# ---- Required globals from earlier cells ----
for name in ["RUN_DIR", "PAIRS", "THREADS"]:
    if name not in globals() or globals()[name] in (None, "", {}):
        raise RuntimeError(f"Missing required global: {name}. Run the selection cell first.")

# Tool check
if shutil.which("fastqc") is None:
    raise RuntimeError("fastqc not found in PATH.")

# Output dir
global QC_RAW_DIR
QC_RAW_DIR = Path(RUN_DIR) / "qc" / "fastqc_raw"
QC_RAW_DIR.mkdir(parents=True, exist_ok=True)

def run(cmd):
    print("  $", " ".join(cmd))
    subprocess.run(cmd, check=True)

def fastqc_report_html(outdir: Path, fq_path: str) -> Path:
    base = Path(fq_path).name
    base = re.sub(r'\.(fastq|fq)(\.gz)?$', '', base, flags=re.IGNORECASE)
    return outdir / f"{base}_fastqc.html"

if not PAIRS:
    print("No paired FASTQs found (PAIRS is empty). Did you select the correct read directory?")
else:
    # ---------- Build worklist, skip if both R1/R2 HTMLs exist ----------
    jobs = []
    skipped = 0
    for sample_id, paths in sorted(PAIRS.items()):
        r1, r2 = str(paths["R1"]), str(paths["R2"])
        missing = [p for p in (r1, r2) if not os.path.exists(p)]
        if missing:
            print(f"[WARN] Skipping {sample_id} — missing file(s): {', '.join(missing)}")
            continue

        r1_html = fastqc_report_html(QC_RAW_DIR, r1)
        r2_html = fastqc_report_html(QC_RAW_DIR, r2)
        if r1_html.exists() and r2_html.exists():
            skipped += 1
            continue

        jobs.append((sample_id, r1, r2))

    total_samples = len(PAIRS)
    todo = len(jobs)
    print(f"Running FastQC on raw reads: {total_samples} sample(s) "
          f"(to do={todo}, skipped={skipped})")

    if todo == 0:
        print("Nothing to do.\nRaw FastQC outputs are already present at:", QC_RAW_DIR)
    else:
        # ---------- Simple concurrency policy ----------
        PER_JOB = 2              # fixed FastQC threads per sample
        THREADS = int(THREADS)

        # workers * PER_JOB <= THREADS
        workers = max(1, min(todo, THREADS // PER_JOB))

        print(f"Scheduling {workers} FastQC worker(s), {PER_JOB} thread(s) each "
            f"(<= {THREADS} total threads)")

        # ---------- Worker ----------
        def work_one(sample_id: str, r1: str, r2: str):
            # FastQC can take both mates at once; -t applies to the process
            cmd = ["fastqc", "-t", str(PER_JOB), "-o", str(QC_RAW_DIR), r1, r2]
            # We don't print every command to keep output tidy; uncomment if needed:
            # print("  $", " ".join(cmd))
            res = subprocess.run(cmd, capture_output=True, text=True)
            if res.returncode != 0:
                raise RuntimeError(res.stderr.strip() or "fastqc failed")

        # ---------- Run pool with basic progress ----------
        start = time.time()
        done = 0
        failures = []

        def eta_text(d, t, elapsed):
            if d <= 0: return "--:--:--"
            rate = d / max(elapsed, 1e-6)
            rem = t - d
            secs = int(rem / rate) if rate > 0 else 0
            return str(timedelta(seconds=secs))

        print(f"[Progress] 0/{todo} | ETA=--:--:--", end="", flush=True)
        with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex:
            futmap = {ex.submit(work_one, sid, r1, r2): sid for sid, r1, r2 in jobs}
            for fut in concurrent.futures.as_completed(futmap):
                sid = futmap[fut]
                try:
                    fut.result()
                except Exception as e:
                    failures.append((sid, str(e)))
                finally:
                    done += 1
                    eta = eta_text(done, todo, time.time() - start)
                    print(f"\r[Progress] {done}/{todo} | ETA={eta}   ", end="", flush=True)

        print()  # newline after progress

        # ---------- Summary ----------
        wrote = todo - len(failures)
        print("\nRaw FastQC finished.")
        print("Outputs:", QC_RAW_DIR)
        print(f"Summary: wrote={wrote}, skipped={skipped}, failed={len(failures)}, total={total_samples}")
        if failures:
            print("Failed samples:")
            for sid, msg in failures:
                print(f"  - {sid}: {msg}")

Running FastQC on raw reads: 48 sample(s) (to do=0, skipped=48)
Nothing to do.
Raw FastQC outputs are already present at: /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/qc/fastqc_raw


### 3.2 MultiQC
After this next MultiQC cell is run it is best to downlaod the html file to your laptop and open in a regular browser with javascript to see the charts.

In [7]:
## 3A. (Optional) MultiQC summary of raw FastQC reports
from pathlib import Path
import subprocess

if "QC_RAW_DIR" not in globals():
    raise RuntimeError("QC_RAW_DIR is not set. Run the FastQC cell first.")

print("Building MultiQC report for raw FastQC…")
subprocess.run(["multiqc", str(QC_RAW_DIR), "-o", str(QC_RAW_DIR)], check=True)
print("Done. See:", Path(QC_RAW_DIR) / "multiqc_report.html")

Building MultiQC report for raw FastQC…



[91m///[0m ]8;id=258751;https://multiqc.info\[1mMultiQC[0m]8;;\ [2mv1.33[0m

[34m       file_search[0m | Search path: /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/qc/fastqc_raw


[34m        searching [0m| ████████████████████████████████████████ 100% 193/193                                                   .zip[0m

[34m            fastqc[0m | Found 96 reports
[34m     write_results[0m | Existing reports found, adding suffix to filenames. Use '--force' to overwrite.
[34m     write_results[0m | Data        : /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/qc/fastqc_raw/multiqc_data_1
[34m     write_results[0m | Report      : /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/qc/fastqc_raw/multiqc_report_1.html
[34m           multiqc[0m | MultiQC complete



Done. See: /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/qc/fastqc_raw/multiqc_report.html


## 4. Trimming adapters with cutadapt  
I am assuming the sequence came of a Novaseq or similar 2 color illumina machine for this pipeline (I will need to make a different pipeline for different data.) So I have the flag   --nextseq-trim=20 \          # specialized 3′ poly-G trim in the trimming command

In [8]:
## 4. Trimming with cutadapt — parallel by sample
from pathlib import Path
import os, re, shutil, subprocess, concurrent.futures, time
from datetime import timedelta

# Globals needed: RUN_DIR, PAIRS, THREADS, PROJECT_ROOT
for name in ["RUN_DIR", "PAIRS", "THREADS", "PROJECT_ROOT"]:
    if name not in globals() or not globals()[name]:
        raise RuntimeError(f"Missing global: {name}")

# Tool check
if shutil.which("cutadapt") is None:
    raise RuntimeError("cutadapt not found in PATH.")

TRIM_DIR = Path(RUN_DIR) / "trimmed"
TRIM_DIR.mkdir(parents=True, exist_ok=True)
ADAPTERS = Path(PROJECT_ROOT) / "data" / "adapters" / "adapters.fasta"

def print_adapters_file_missing_error_message():
    print("""
adapters file missing. Please create a FASTA file at:
    <PROJECT_ROOT>/data/adapters/adapters.fasta
Example:
>i5
AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTCCTATTGAGTGTAGATCTCGGTGGTCGCCGTATCATT
>i7
GATCGGAAGAGCACACGTCTGAACTCCAGTCACCGTGCTGGATCTCGTATGCCGTCTTCTGCTTG
""")

print(f"Trimming with cutadapt (THREADS={THREADS})")

if not PAIRS:
    print("No paired FASTQs found (PAIRS is empty). Did you select the correct read directory?")
else:
    # Infer output extension from the first R1
    first_r1 = next(iter(PAIRS.values()))["R1"]
    gz = str(first_r1).lower().endswith(".gz")
    EXT = ".fastq.gz" if gz else ".fastq"

    # ---------- Build worklist, skipping samples with both outputs present ----------
    jobs = []
    skipped = 0
    for sample, paths in sorted(PAIRS.items()):
        r1 = str(paths["R1"])
        r2 = str(paths["R2"])

        out1 = TRIM_DIR / f"{sample}.R1.trimmed{EXT}"
        out2 = TRIM_DIR / f"{sample}.R2.trimmed{EXT}"

        # Input existence check
        missing = [p for p in (r1, r2) if not os.path.exists(p)]
        if missing:
            print(f"[WARN] Skipping {sample} — missing file(s): {', '.join(missing)}")
            continue

        # Idempotency
        if out1.exists() and out2.exists():
            skipped += 1
            continue

        jobs.append((sample, r1, r2, out1, out2))

    total = len(PAIRS)
    todo = len(jobs)
    print(f"Samples: {total} (to do={todo}, skipped={skipped})")

    if todo == 0:
        print("Nothing to trim.\nOutputs already present in:", TRIM_DIR)
    else:
        # ---------- Concurrency policy (simple, safe) ----------
        THREADS = int(THREADS)
        PER_JOB = int(globals().get("FASTQ_TRIM_THREADS_PER_SAMPLE", 6))  # set to 2 or 4; 4 is a good default
        if PER_JOB < 1:
            PER_JOB = 1
        if PER_JOB > THREADS:
            PER_JOB = THREADS
        workers = max(1, min(todo, THREADS // PER_JOB))
        print(f"Scheduling {workers} cutadapt worker(s), {PER_JOB} thread(s) each (<= {THREADS} total threads)")

        # ---------- Worker ----------
        def trim_one(sample: str, r1: str, r2: str, out1: Path, out2: Path):
            # Base cutadapt arguments:
            # -j: threads for this process
            # -q: quality trimming (5' and 3' per read)
            # -m: minimum length
            # --pair-filter=any: drop pair if any mate falls below length/filters (good default)
            # --trim-n: trim terminal Ns
            cmd = [
                "cutadapt",
                "-j", str(PER_JOB),
                "-q", "15,15",
                "-m", "36",
                "--pair-filter=any",
                "--trim-n",
                "--nextseq-trim=20",   # specialized 3' poly-G trim for 2 color chemistry
                "-o", str(out1),
                "-p", str(out2),
            ]
            if ADAPTERS.exists():
                # Provide adapters for 5' and 3' of both mates
                cmd += ["-g", f"file:{ADAPTERS}", "-G", f"file:{ADAPTERS}",
                        "-a", f"file:{ADAPTERS}", "-A", f"file:{ADAPTERS}"]
            else:
                # Still run without adapters, but warn once per job
                print_adapters_file_missing_error_message()

            cmd += [r1, r2]
            # Capture stderr to surface any errors; cutadapt writes its report to stderr by default
            res = subprocess.run(cmd, capture_output=True, text=True)
            if res.returncode != 0:
                raise RuntimeError(res.stderr.strip() or "cutadapt failed")
            # Optional: write the cutadapt report next to outputs
            report_path = TRIM_DIR / f"{sample}.cutadapt.txt"
            with open(report_path, "w") as rf:
                rf.write(res.stderr)

        # ---------- Run pool with progress ----------
        start = time.time()
        done = 0
        failures = []

        def eta_text(d, t, elapsed):
            if d <= 0: return "--:--:--"
            rate = d / max(elapsed, 1e-6)
            rem = t - d
            secs = int(rem / rate) if rate > 0 else 0
            return str(timedelta(seconds=secs))

        print(f"[Progress] 0/{todo} | ETA=--:--:--", end="", flush=True)
        with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex:
            futmap = {ex.submit(trim_one, s, r1, r2, o1, o2): s for (s, r1, r2, o1, o2) in jobs}
            for fut in concurrent.futures.as_completed(futmap):
                sid = futmap[fut]
                try:
                    fut.result()
                except Exception as e:
                    failures.append((sid, str(e)))
                finally:
                    done += 1
                    print(f"\r[Progress] {done}/{todo} | ETA={eta_text(done, todo, time.time()-start)}   ", end="", flush=True)

        print()  # newline
        wrote = todo - len(failures)
        print("\nTrim completed.")
        print("Outputs:", TRIM_DIR)
        print(f"Summary: wrote={wrote}, skipped={skipped}, failed={len(failures)}, total={total}")
        if failures:
            print("Failed samples:")
            for sid, msg in failures:
                print(f"  - {sid}: {msg}")

Trimming with cutadapt (THREADS=64)
Samples: 48 (to do=0, skipped=48)
Nothing to trim.
Outputs already present in: /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/trimmed


## 5. Align | Sort, Mark Duplicates, Index and Cleanup  
It will take some time to create the bams as they are streaming to the sort, so you won't see the intermediate files (to save space and time). This is obviously a slow bit doing the alignment work - give it some cores.    

In [9]:
## 5. Align → Sort (streamed) → MarkDuplicates(+index) → Cleanup  [quiet logs + concise console]
from pathlib import Path
import os, shutil, subprocess, concurrent.futures, time
from datetime import timedelta

# ===== Required globals =====
for name in ["RUN_DIR", "TRIM_DIR", "REF_FASTA", "PAIRS", "THREADS"]:
    if name not in globals() or not globals()[name]:
        raise RuntimeError(f"Missing global: {name}")

RUN_DIR   = Path(RUN_DIR)
TRIM_DIR  = Path(TRIM_DIR)
REF_FASTA = Path(REF_FASTA)

ALN_DIR   = RUN_DIR / "alignments"
LOGS_DIR  = ALN_DIR / "logs"
ALN_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)

READ_SET_NAME = globals().get("READ_SET_NAME", None)

# ===== Tool availability =====
def _which_or_raise(name):
    if shutil.which(name) is None:
        raise RuntimeError(f"'{name}' not found in PATH.")
_which_or_raise("bwa-mem2")
_which_or_raise("samtools")

DEDUP_TOOL = globals().get("DEDUP_TOOL", "spark").lower()
if DEDUP_TOOL in ("classic", "spark"):
    _which_or_raise("gatk")
elif DEDUP_TOOL == "sambamba":
    _which_or_raise("sambamba")
else:
    raise RuntimeError(f"Unknown DEDUP_TOOL='{DEDUP_TOOL}'. Use 'classic', 'spark', or 'sambamba'.")

# ===== Thread policy (per-sample) =====
BWA_THREADS   = int(globals().get("BWA_THREADS", 32))
SORT_THREADS  = int(globals().get("SORT_THREADS", 12))
DEDUP_THREADS = int(globals().get("DEDUP_THREADS", 24))
PER_SAMPLE_THREADS = int(globals().get("PER_SAMPLE_THREADS", max(BWA_THREADS, SORT_THREADS, DEDUP_THREADS)))
THREADS = int(THREADS)
if PER_SAMPLE_THREADS < 1:
    PER_SAMPLE_THREADS = 1
if PER_SAMPLE_THREADS > THREADS:
    PER_SAMPLE_THREADS = THREADS

def _calc_workers(todo:int) -> int:
    return max(1, min(todo, THREADS // PER_SAMPLE_THREADS if PER_SAMPLE_THREADS else 1))

# ===== Utilities =====
def log_path(sample: str) -> Path:
    return LOGS_DIR / f"{sample}.align_dedup.log"

def append_log(p: Path, text: str):
    with open(p, "a") as fh:
        fh.write(text)
        if not text.endswith("\n"):
            fh.write("\n")

def run_silently(cmd, sample: str, use_shell=False, env=None) -> tuple[int, str, str]:
    """
    Run a command, capture stdout/stderr, append both to the per-sample log.
    Return (returncode, stdout, stderr).
    """
    lp = log_path(sample)
    append_log(lp, f"$ {' '.join(cmd) if isinstance(cmd, list) else cmd}")
    try:
        if use_shell:
            res = subprocess.run(cmd, shell=True, check=False, executable="/bin/bash",
                                 env=env, capture_output=True, text=True)
        else:
            res = subprocess.run(cmd, check=False, env=env, capture_output=True, text=True)
    except Exception as e:
        append_log(lp, f"[exec error] {e}")
        return (1, "", str(e))

    if res.stdout:
        append_log(lp, res.stdout)
    if res.stderr:
        append_log(lp, res.stderr)
    return (res.returncode, res.stdout, res.stderr)

def trimmed_read(sample, tag):
    gz = TRIM_DIR / f"{sample}.{tag}.trimmed.fastq.gz"
    fq = TRIM_DIR / f"{sample}.{tag}.trimmed.fastq"
    return gz if gz.exists() else fq

def rg_string(sample: str):
    LB = READ_SET_NAME if READ_SET_NAME else sample
    return f'@RG\\tID:{sample}\\tSM:{sample}\\tPL:ILLUMINA\\tLB:{LB}\\tPU:{sample}'

# ===== Per-sample pipeline =====
def process_one_sample(sample: str):
    t0 = time.time()
    lp = log_path(sample)
    append_log(lp, f"=== {sample} ===")

    r1 = trimmed_read(sample, "R1")
    r2 = trimmed_read(sample, "R2")
    if not r1.exists() or not r2.exists():
        return (sample, False, "missing trimmed reads")

    bam_sorted = ALN_DIR / f"{sample}.sorted.bam"
    bam_dedup  = ALN_DIR / f"{sample}.dedup.bam"
    bai_dedup  = ALN_DIR / f"{sample}.dedup.bam.bai"
    metrics    = ALN_DIR / f"{sample}.metrics.txt"

    # 1) Align+Sort (quiet; logs captured)
    if bam_sorted.exists():
        print(f"[{sample}] Align+Sort: ✓ (exists)")
    else:
        t1 = time.time()
        # Quiet bwa-mem2 with -v 1; capture everything to log
        cmd = (
            "set -o pipefail; "
            f"bwa-mem2 mem -t {BWA_THREADS} -v 1 -R \"{rg_string(sample)}\" \"{REF_FASTA}\" \"{r1}\" \"{r2}\" "
            f"| samtools sort -@ {SORT_THREADS} -o \"{bam_sorted}\""
        )
        rc, _, _ = run_silently(cmd, sample, use_shell=True)
        dt = timedelta(seconds=int(time.time() - t1))
        if rc == 0 and bam_sorted.exists():
            print(f"[{sample}] Align+Sort: ✓ in {dt}")
        else:
            return (sample, False, f"Align+Sort failed (see log: {lp.name})")

    # 2) Mark duplicates + index (quiet)
    if bam_dedup.exists() and bai_dedup.exists():
        print(f"[{sample}] MarkDup+Index: ✓ (exists)")
    else:
        t2 = time.time()
        if DEDUP_TOOL == "classic":
            cmd = [
                "gatk", "MarkDuplicates",
                "-I", str(bam_sorted),
                "-O", str(bam_dedup),
                "-M", str(metrics),
                "--CREATE_INDEX", "true",
                "--VALIDATION_STRINGENCY", "LENIENT",
                "--verbosity", "ERROR",
            ]
            rc, _, _ = run_silently(cmd, sample)
            dt = timedelta(seconds=int(time.time() - t2))
            if rc != 0 or not (bam_dedup.exists() and bai_dedup.exists()):
                return (sample, False, f"MarkDuplicates failed (see log: {lp.name})")
            print(f"[{sample}] MarkDup+Index: ✓ in {dt}")

        elif DEDUP_TOOL == "spark":
            tmp = os.environ.get("TMPDIR", None) or os.environ.get("TMP", None) or "/tmp"
            cmd = [
                "gatk", "MarkDuplicatesSpark",
                "-I", str(bam_sorted),
                "-O", str(bam_dedup),
                "--read-validation-stringency", "LENIENT",
                "--spark-runner", "LOCAL",
                "--spark-master", f"local[{DEDUP_THREADS}]",
                "--conf", f"spark.local.dir={tmp}",
                "--verbosity", "ERROR",
            ]
            rc, _, _ = run_silently(cmd, sample)
            # Spark sometimes does not auto-index; index silently
            if rc == 0 and not (bai_dedup.exists()):
                rc2, _, _ = run_silently(["samtools", "index", "-@", "8", str(bam_dedup)], sample)
                if rc2 != 0:
                    return (sample, False, f"Index failed (see log: {lp.name})")
            dt = timedelta(seconds=int(time.time() - t2))
            if rc == 0 and bam_dedup.exists() and bai_dedup.exists():
                print(f"[{sample}] MarkDup(+Index): ✓ in {dt}")
            else:
                return (sample, False, f"MarkDuplicatesSpark failed (see log: {lp.name})")

        elif DEDUP_TOOL == "sambamba":
            cmd = ["sambamba", "markdup", "--quiet", "-t", str(DEDUP_THREADS), str(bam_sorted), str(bam_dedup)]
            rc, _, _ = run_silently(cmd, sample)
            if rc != 0 or not bam_dedup.exists():
                return (sample, False, f"sambamba markdup failed (see log: {lp.name})")
            # Index quietly
            rc2, _, _ = run_silently(["samtools", "index", "-@", "8", str(bam_dedup)], sample)
            if rc2 != 0:
                return (sample, False, f"Index failed (see log: {lp.name})")
            dt = timedelta(seconds=int(time.time() - t2))
            print(f"[{sample}] MarkDup(+Index): ✓ in {dt}")

    # 3) Cleanup
    if bam_sorted.exists() and bam_dedup.exists() and bai_dedup.exists():
        try:
            bam_sorted.unlink()
            print(f"[{sample}] Cleanup: removed temp sorted BAM")
        except Exception:
            pass

    dt_all = timedelta(seconds=int(time.time() - t0))
    return (sample, True, f"{dt_all}")

# ===== Worklist & concurrency =====
jobs = []
skipped = 0
for sample in sorted(PAIRS.keys()):
    bam_dedup  = ALN_DIR / f"{sample}.dedup.bam"
    bai_dedup  = ALN_DIR / f"{sample}.dedup.bam.bai"
    if (bam_dedup.exists() and bai_dedup.exists()):
        skipped += 1
    else:
        jobs.append(sample)

total = len(PAIRS)
todo  = len(jobs)
workers = _calc_workers(todo)

print(f"Align→Sort→Dedup for {total} sample(s) (to do={todo}, skipped={skipped})")
print(f"Concurrency: workers={workers} | PER_SAMPLE_THREADS={PER_SAMPLE_THREADS} "
      f"(BWA={BWA_THREADS}, SORT={SORT_THREADS}, DEDUP={DEDUP_THREADS}) | THREADS(total)={THREADS}")

if todo == 0:
    print("Nothing to do. Outputs already present in:", ALN_DIR)
else:
    start = time.time()
    done = 0
    failures = []

    def eta_text(d, t, elapsed):
        if d <= 0: return "--:--:--"
        rate = d / max(elapsed, 1e-6)
        rem = t - d
        secs = int(rem / rate) if rate > 0 else 0
        return str(timedelta(seconds=secs))

    print(f"[Progress] 0/{todo} | ETA=--:--:--", end="", flush=True)
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex:
        futmap = {ex.submit(process_one_sample, s): s for s in jobs}
        for fut in concurrent.futures.as_completed(futmap):
            sid = futmap[fut]
            try:
                sample, ok, msg = fut.result()
                if not ok:
                    failures.append((sid, msg))
            except Exception as e:
                failures.append((sid, str(e)))
            finally:
                done += 1
                print(f"\r[Progress] {done}/{todo} | ETA={eta_text(done, todo, time.time()-start)}   ", end="", flush=True)

    print()  # newline
    wrote = todo - len(failures)
    print("\nAlignment+Dedup complete.")
    print("Outputs:", ALN_DIR)
    print(f"Summary: wrote={wrote}, skipped={skipped}, failed={len(failures)}, total={total}")
    if failures:
        print("Failed samples (see per-sample logs in 'alignments/logs/'):")
        for sid, msg in failures:
            print(f"  - {sid}: {msg}")

Align→Sort→Dedup for 48 sample(s) (to do=0, skipped=48)
Concurrency: workers=1 | PER_SAMPLE_THREADS=32 (BWA=32, SORT=12, DEDUP=24) | THREADS(total)=64
Nothing to do. Outputs already present in: /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments


## 6. Variant calling (VCF generation)
### 6.1 Variant calling with FreeBayes (creates a cohort.freebayes.vcf.gz file)

In [10]:
## FreeBayes by fixed-size windows (robust pool, resumable, with logs + progress)
from pathlib import Path
import subprocess, os, time, math, re
from datetime import timedelta

# Optional: Jupyter progress bar
use_widgets = False
try:
    from IPython.display import display
    import ipywidgets as widgets
    use_widgets = True
except Exception:
    use_widgets = False

# ----------------- Globals / paths -----------------
RUN_DIR   = Path(RUN_DIR)
ALN_DIR   = RUN_DIR / "alignments"
VAR_DIR   = RUN_DIR / "variants" / "freebayes"
TMP_DIR   = VAR_DIR / "_tmp_chunks"
for d in (VAR_DIR, TMP_DIR):
    d.mkdir(parents=True, exist_ok=True)

REF_FASTA = Path(REF_FASTA)
THREADS   = int(THREADS)

# Tuning knobs (override via globals if you like):
FB_CHUNK_BP = 500000
FB_CHUNK_BP        = int(globals().get("FB_CHUNK_BP", 500_000))  # 500 Kb windows by default
FB_THREADS_PER_JOB = int(globals().get("FB_THREADS_PER_JOB", 2))   # 1-2 is typical (1 for FB, 1 for bgzip)
BCFTOOLS_THREADS   = int(globals().get("BCFTOOLS_THREADS", min(8, THREADS)))

# Gather BAMs
bams = sorted(ALN_DIR.glob("*.dedup.bam"))
if not bams:
    raise RuntimeError("No deduplicated BAMs found")

# Ensure indexes
fai_path = Path(str(REF_FASTA) + ".fai")
if not fai_path.exists():
    subprocess.run(["samtools", "faidx", str(REF_FASTA)], check=True)
for b in bams:
    bai = Path(str(b) + ".bai")
    if not bai.exists():
        subprocess.run(["samtools", "index", str(b)], check=True)

# ----------------- Build windows -----------------
# Read contigs and lengths from .fai
contigs = []
with open(fai_path) as f:
    for line in f:
        name, length, *_ = line.strip().split("\t")
        contigs.append((name, int(length)))

# Make windows: 1-based inclusive ranges: start..end, end <= length
windows = []
for ctg, L in contigs:
    if L <= FB_CHUNK_BP:
        windows.append((ctg, 1, L))
    else:
        nwin = math.ceil(L / FB_CHUNK_BP)
        for i in range(nwin):
            start = i * FB_CHUNK_BP + 1
            end   = min(L, (i+1) * FB_CHUNK_BP)
            windows.append((ctg, start, end))

# Sort windows by length descending (reduces long-tail), then by contig order & start
# (The concat order later is by FASTA order, not this; this sort is just scheduling)
windows.sort(key=lambda w: (w[2]-w[1]+1), reverse=True)

# ----------------- Helpers -----------------
def chunk_basename(ctg: str, start: int, end: int) -> str:
    # Sanitize contig name for filenames
    safe = re.sub(r"[^A-Za-z0-9_.-]", "_", ctg)
    return f"{safe}_{start}_{end}"

def out_paths_for(ctg: str, start: int, end: int):
    base = chunk_basename(ctg, start, end)
    out_vcf = TMP_DIR / f"{base}.vcf.gz"
    log     = TMP_DIR / f"{base}.log"
    return out_vcf, log

# Compute idempotency stats
already_done = sum(1 for (ctg,s,e) in windows if out_paths_for(ctg,s,e)[0].exists())
total_chunks = len(windows) - already_done

# Concurrency: one FB process ≈ 1 CPU; allow 1 more for bgzip if FB_THREADS_PER_JOB=2
PER_JOB = max(1, FB_THREADS_PER_JOB)
jobs = max(1, min(len(windows), THREADS // PER_JOB))

print(f"FreeBayes windowed calling:")
print(f"- Windows: {len(windows)} total (chunk size ~{FB_CHUNK_BP:,} bp)")
print(f"- Concurrency: up to {jobs} worker(s) | threads/job={PER_JOB} | total THREADS={THREADS}")
if already_done:
    print(f"- Resuming: {already_done} window(s) already completed.")

def chunk_cmd(ctg: str, start: int, end: int):
    out_vcf, log = out_paths_for(ctg, start, end)
    if out_vcf.exists():
        return None
    bgzip_threads = max(1, PER_JOB - 1)
    region = f"{ctg}:{start}-{end}"
    bam_args = " ".join(f'"{b}"' for b in bams)
    # NOTE the leading space before {bam_args}
    cmd = (
        'set -o pipefail; '                       # make the pipeline fail if freebayes fails
        f'freebayes -f "{REF_FASTA}" '
        f'--region "{region}" '
        f'--min-mapping-quality 20 --min-base-quality 20 --use-best-n-alleles 4 --limit-coverage 500 --skip-coverage 800 '
        f'{bam_args} '                            # <-- space added
        f'2> "{log}" | bgzip -@ {bgzip_threads} -c > "{out_vcf}"'
    )
    return cmd

# Prepare queue of commands
queue = [chunk_cmd(ctg, s, e) for (ctg, s, e) in windows]
queue = [c for c in queue if c]  # drop None (already-done)
procs = {}  # pid -> (Popen, cmd)

# ----------------- Progress UI -----------------
start_ts = time.time()

if use_widgets:
    pbar = widgets.IntProgress(
        value=already_done, min=0, max=already_done + total_chunks, step=1,
        description='FreeBayes:', bar_style=''
    )
    status = widgets.HTML()
    box = widgets.VBox([pbar, status])
    display(box)
else:
    print(f"[Progress] {already_done}/{already_done + total_chunks} completed | active=0 | queue={len(queue)} | ETA=--:--:--")

def eta_text(done, total, elapsed):
    if done <= 0 or elapsed <= 1e-6:
        return "--:--:--"
    rate = done / elapsed  # chunks/sec
    remain = max(0, total - done)
    secs = int(remain / rate) if rate > 0 else 0
    return str(timedelta(seconds=secs))

def count_done():
    # robust to external completion: count actual files on disk
    return sum(1 for (ctg,s,e) in windows if out_paths_for(ctg,s,e)[0].exists())

def update_progress():
    done_now = count_done()
    active = len(procs)
    qrem   = len(queue)
    elapsed = time.time() - start_ts
    eta = eta_text(done_now - already_done, total_chunks, elapsed)
    if use_widgets:
        pbar.max = already_done + total_chunks
        pbar.value = done_now
        pbar.description = f"{done_now}/{already_done + total_chunks}"
        status.value = f"<code>active={active} | queue={qrem} | ETA={eta}</code>"
    else:
        print(f"\r[Progress] {done_now}/{already_done + total_chunks} completed | active={active} | queue={qrem} | ETA={eta}    ",
              end='', flush=True)

def launch():
    while queue and len(procs) < jobs:
        cmd = queue.pop(0)
        p = subprocess.Popen(cmd, shell=True, executable="/bin/bash")
        procs[p.pid] = (p, cmd)

# Prime & loop
launch()
update_progress()

while procs or queue:
    time.sleep(0.5)
    finished = []
    for pid, (p, cmd) in list(procs.items()):
        ret = p.poll()
        if ret is not None:
            finished.append(pid)
            if ret != 0:
                print(f"\n[WARN] Window exited {ret}: {cmd}")
    for pid in finished:
        procs.pop(pid, None)
    launch()
    update_progress()

if not use_widgets:
    print()  # newline

# ----------------- Concatenate all window VCFs (FASTA order) -----------------
# Build the list in strict FASTA order (contigs order, then start asc)
chunk_vcfs = []
for ctg, L in contigs:
    # all windows for this contig in order
    starts = []
    for path in TMP_DIR.glob(f"{re.sub(r'[^A-Za-z0-9_.-]', '_', ctg)}_*_*.vcf.gz"):
        # parse start_end from filename tail
        m = re.search(r"_(\d+)_(\d+)\.vcf\.gz$", path.name)
        if m:
            starts.append((int(m.group(1)), int(m.group(2)), path))
    for _s, _e, p in sorted(starts, key=lambda t: t[0]):
        chunk_vcfs.append(str(p))

if not chunk_vcfs:
    raise RuntimeError("No chunk VCFs were produced; check logs in _tmp_chunks/")

vcf_gz = VAR_DIR / "cohort.freebayes.vcf.gz"
subprocess.run(["bcftools", "concat", "--threads", str(BCFTOOLS_THREADS), "-Oz", "-o", str(vcf_gz)] + chunk_vcfs, check=True)
subprocess.run(["bcftools", "index", "--threads", str(BCFTOOLS_THREADS), "-t", str(vcf_gz)], check=True)

if use_widgets:
    status.value = f"<b>Done.</b> Output: <code>{vcf_gz}</code>"
else:
    print("Done. Output:", vcf_gz)
print("Done. Output:", vcf_gz)

FreeBayes windowed calling:
- Windows: 932 total (chunk size ~500,000 bp)
- Concurrency: up to 32 worker(s) | threads/job=2 | total THREADS=64
- Resuming: 932 window(s) already completed.


VBox(children=(IntProgress(value=932, description='FreeBayes:', max=932), HTML(value='')))

Checking the headers and starting positions of 932 files
[W::bcf_hdr_check_sanity] GQ should be declared as Type=Integer
Concatenating /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/variants/freebayes/_tmp_chunks/JAUPFP010000001.1_1_500000.vcf.gz	0.315665 seconds
Concatenating /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/variants/freebayes/_tmp_chunks/JAUPFP010000001.1_500001_1000000.vcf.gz	0.280712 seconds
Concatenating /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/variants/freebayes/_tmp_chunks/JAUPFP010000001.1_1000001_1500000.vcf.gz	0.248040 seconds
Concatenating /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/variants/freebayes/_tmp_chunks/JAUPFP010000001.1_1500001_2000000.vcf.gz	0.247072 seconds
Concatenating /

Done. Output: /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/variants/freebayes/cohort.freebayes.vcf.gz


### 6.2 Split cohort VCF into individual per-sample VCFs

In [11]:
## Split FreeBayes cohort VCF into per-sample VCFs (parallel + progress)

from pathlib import Path
import subprocess, concurrent.futures, os, time
from datetime import timedelta

# Optional: Jupyter progress bar (fallback to text if widgets not available)
use_widgets = False
try:
    from IPython.display import display
    import ipywidgets as widgets
    use_widgets = True
except Exception:
    use_widgets = False

# --- Paths / globals from earlier cells ---
RUN_DIR  = Path(RUN_DIR)  # e.g., results/<read_set>
VAR_DIR  = RUN_DIR / "variants" / "freebayes"
COHORT_VCF = VAR_DIR / "cohort.freebayes.vcf.gz"
PER_SAMPLE_DIR = VAR_DIR / "per-sample"
PER_SAMPLE_DIR.mkdir(parents=True, exist_ok=True)

# --- Sanity checks ---
if not COHORT_VCF.exists():
    raise RuntimeError(f"Cohort VCF not found: {COHORT_VCF}. Run the FreeBayes calling cell first.")

tbi = Path(str(COHORT_VCF) + ".tbi")
if not tbi.exists():
    subprocess.run(["bcftools", "index", "-t", str(COHORT_VCF)], check=True)

# --- Get sample IDs from the cohort VCF header ---
res = subprocess.run(["bcftools", "query", "-l", str(COHORT_VCF)],
                     check=True, capture_output=True, text=True)
samples_all = [s for s in res.stdout.strip().splitlines() if s]

if not samples_all:
    raise RuntimeError("No samples found in cohort VCF header.")

# --- Build worklist (skip samples that already have both files) ---
work = []
skipped = 0
for s in samples_all:
    out_vcf = PER_SAMPLE_DIR / f"{s}.vcf.gz"
    out_tbi = Path(str(out_vcf) + ".tbi")
    if out_vcf.exists() and out_tbi.exists():
        skipped += 1
    else:
        work.append((s, out_vcf))

total = len(work)
print(f"Per-sample export: {len(samples_all)} sample(s) total "
      f"(to do={total}, skipped={skipped})")

if total == 0:
    print("Nothing to do.")
else:
    # --- Parallelism (safe default): use ~1/4 of THREADS, at least 2, up to #samples ---
    THREADS = int(THREADS)  # from your session
    default_workers = max(2, THREADS // 4)
    workers = min(default_workers, total)
    # Allow override via a global if you want: JOBS_PER_SAMPLE_SPLIT
    workers = int(globals().get("JOBS_PER_SAMPLE_SPLIT", workers))
    print(f"Running {workers} parallel worker(s)...")

    # Progress UI
    start_ts = time.time()
    done = 0
    if use_widgets:
        pbar = widgets.IntProgress(value=0, min=0, max=total, description='Split:')
        status = widgets.HTML()
        box = widgets.VBox([pbar, status])
        display(box)
    else:
        print(f"[Progress] 0/{total} | ETA=--:--:--", end='', flush=True)

    def eta_text(done, total, elapsed):
        if done <= 0 or elapsed <= 0:
            return "--:--:--"
        rate = done / elapsed  # samples per sec
        remain = total - done
        secs = int(remain / rate) if rate > 0 else 0
        return str(timedelta(seconds=secs))

    def update_progress():
        #nonlocal done
        elapsed = time.time() - start_ts
        eta = eta_text(done, total, elapsed)
        if use_widgets:
            pbar.value = done
            pbar.description = f"{done}/{total}"
            status.value = f"<code>workers={workers} | ETA={eta}</code>"
        else:
            print(f"\r[Progress] {done}/{total} | ETA={eta}    ", end='', flush=True)

    # Worker function
    def extract_one(sample: str, out_vcf: Path):
        # 1) bcftools view -s <sample> -Oz -o <out>
        subprocess.run(["bcftools", "view", "-s", sample, "-Oz",
                        "-o", str(out_vcf), str(COHORT_VCF)],
                       check=True)
        # 2) tabix index
        subprocess.run(["bcftools", "index", "-t", str(out_vcf)], check=True)
        return sample

    # Launch pool
    failures = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex:
        future_map = {ex.submit(extract_one, s, out): s for s, out in work}
        for fut in concurrent.futures.as_completed(future_map):
            s = future_map[fut]
            try:
                fut.result()
            except Exception as e:
                failures.append((s, str(e)))
                print(f"\n[WARN] {s}: failed → {e}")
            finally:
                done += 1
                update_progress()

    # Finalize progress line
    if not use_widgets:
        print()

    # Summary
    written = total - len(failures)
    print(f"\nDone. Per-sample VCFs in: {PER_SAMPLE_DIR}")
    print(f"Summary: written={written}, skipped={skipped}, failed={len(failures)}, total={len(samples_all)}")
    if failures:
        print("Failed samples:")
        for s, msg in failures:
            print(f"  - {s}: {msg}")


Per-sample export: 48 sample(s) total (to do=0, skipped=48)
Nothing to do.


## 7. Run Summary stats 

In [22]:
# =========================
# Final Summary Cell (FAST + updated for current pipeline)
# =========================

print("Started summary report cell (fast version)...")

import os, re, json, shutil, subprocess, concurrent.futures
from pathlib import Path
from datetime import datetime
from IPython.display import display, Markdown

# ---------- Helpers ----------
def have_tool(name: str) -> bool:
    return shutil.which(name) is not None

def run_cmd(cmd, check=True, capture=True, text=True):
    """Run command and return stdout (default)."""
    p = subprocess.run(cmd,
                       check=False,
                       capture_output=capture,
                       text=text)
    if check and p.returncode != 0:
        raise RuntimeError(f"Command failed: {' '.join(map(str, cmd))}\n{p.stderr}")
    return p.stdout if capture else ""

def human_bytes(n):
    units = ["B", "KB", "MB", "GB", "TB", "PB"]
    f = float(n)
    i = 0
    while f >= 1024 and i < len(units) - 1:
        f /= 1024
        i += 1
    return f"{f:.2f} {units[i]}"

def dir_size_bytes(path: Path) -> int:
    """Fast directory size via du -sb; fallback to python walk."""
    if not path.exists():
        return 0
    if have_tool("du"):
        try:
            out = run_cmd(["du", "-sb", str(path)], check=True).strip()
            return int(out.split()[0])
        except Exception:
            pass
    # fallback
    total = 0
    for root, _, files in os.walk(path):
        for f in files:
            p = Path(root) / f
            try:
                total += p.stat().st_size
            except Exception:
                pass
    return total

def parse_flagstat(flagstat_text: str):
    """Extract total reads, mapped reads/% , properly paired/% , duplicate reads from samtools flagstat output."""
    total = mapped_n = mapped_pct = proper_n = proper_pct = dup_n = None
    for line in flagstat_text.splitlines():
        line = line.strip()
        m = re.match(r"(\d+)\s+\+\s+\d+\s+in total", line)
        if m:
            total = int(m.group(1)); continue
        m = re.match(r"(\d+)\s+\+\s+\d+\s+mapped\s+\(([\d\.]+)%", line)
        if m:
            mapped_n = int(m.group(1)); mapped_pct = float(m.group(2)); continue
        m = re.match(r"(\d+)\s+\+\s+\d+\s+properly paired\s+\(([\d\.]+)%", line)
        if m:
            proper_n = int(m.group(1)); proper_pct = float(m.group(2)); continue
        m = re.match(r"(\d+)\s+\+\s+\d+\s+duplicates", line)
        if m:
            dup_n = int(m.group(1)); continue
    return total, mapped_n, mapped_pct, proper_n, proper_pct, dup_n

def parse_picard_metrics_percent_duplication(metrics_path: Path):
    """Parse PERCENT_DUPLICATION from Picard/GATK MarkDuplicates metrics text."""
    if not metrics_path.exists():
        return None
    try:
        lines = metrics_path.read_text(errors="ignore").splitlines()
        header_idx = None
        for i, ln in enumerate(lines):
            if ln.strip().startswith("LIBRARY\t"):
                header_idx = i
                break
        if header_idx is None or header_idx + 1 >= len(lines):
            return None
        header = lines[header_idx].split("\t")
        data = lines[header_idx + 1].split("\t")
        if "PERCENT_DUPLICATION" in header and len(header) == len(data):
            j = header.index("PERCENT_DUPLICATION")
            return float(data[j])
    except Exception:
        return None
    return None

# ---------- Required globals ----------
for req in ["PROJECT_ROOT", "RUN_DIR", "PAIRS", "REF_FASTA"]:
    if req not in globals() or globals()[req] in (None, "", {}):
        raise RuntimeError(f"{req} not set. Run the selection/setup cells first.")

PROJECT_ROOT = Path(PROJECT_ROOT)
RUN_DIR = Path(RUN_DIR)
PAIRS = globals().get("PAIRS", {})
REF_FASTA = Path(globals().get("REF_FASTA"))

THREADS = int(globals().get("THREADS", os.cpu_count() or 1))

# Where things live in the current pipeline
QC_RAW_DIR = RUN_DIR / "qc" / "fastqc_raw"
TRIM_DIR   = RUN_DIR / "trimmed"
ALN_DIR    = RUN_DIR / "alignments"
VAR_DIR    = RUN_DIR / "variants" / "freebayes"
PER_SAMPLE_DIR = VAR_DIR / "per-sample"

VCF_GZ  = VAR_DIR / "cohort.freebayes.vcf.gz"
VCF_TBI = Path(str(VCF_GZ) + ".tbi")

READ_SET_NAME = RUN_DIR.name

REPORT_MD = RUN_DIR / "run_report.md"
MANIFEST_JSON = RUN_DIR / "run_manifest.json"

# ---------- Tool checks ----------
for tool in ["samtools", "bcftools"]:
    if not have_tool(tool):
        raise RuntimeError(f"{tool} not found in PATH (activate the conda env).")

# ---------- Caching ----------
CACHE_DIR = RUN_DIR / "_cache"
CACHE_DIR.mkdir(exist_ok=True)
ALIGN_CACHE_PATH = CACHE_DIR / "align_stats_cache.json"
VCF_CACHE_PATH   = CACHE_DIR / "vcf_stats_cache.json"

align_cache = {}
if ALIGN_CACHE_PATH.exists():
    try:
        align_cache = json.loads(ALIGN_CACHE_PATH.read_text())
    except Exception:
        align_cache = {}

vcf_cache = {}
if VCF_CACHE_PATH.exists():
    try:
        vcf_cache = json.loads(VCF_CACHE_PATH.read_text())
    except Exception:
        vcf_cache = {}

# ---------- Alignment stats (parallel) ----------
# You can switch to a much faster but less detailed mode:
# FAST_MODE=True uses samtools idxstats only (no dup/proper-pair %).
FAST_MODE = bool(globals().get("SUMMARY_FAST_MODE", True))

# Cap workers to avoid thrashing the shared filesystem.
# On many HPC filesystems, 4–8 parallel flagstats is a sweet spot.
MAX_WORKERS = int(globals().get("SUMMARY_WORKERS", min(32, max(1, THREADS // 2))))

def sample_alignment_stats(sample_id: str):
    bam = ALN_DIR / f"{sample_id}.dedup.bam"
    if not bam.exists():
        return sample_id, None

    # cache key: mtime + size
    st = bam.stat()
    key = f"{st.st_mtime_ns}:{st.st_size}"
    cached = align_cache.get(sample_id)
    if cached and cached.get("_key") == key:
        return sample_id, cached

    # metrics filenames that may exist depending on dedup tool:
    # - classic: sample.metrics.txt
    # - spark: typically none unless you add -M; keep as None
    # - sambamba: none
    metrics_candidates = [
        ALN_DIR / f"{sample_id}.metrics.txt",
        ALN_DIR / f"{sample_id}.markdup.metrics.txt",
        ALN_DIR / f"{sample_id}.MarkDuplicates.metrics.txt",
    ]
    dup_rate = None
    for mp in metrics_candidates:
        dup_rate = parse_picard_metrics_percent_duplication(mp)
        if dup_rate is not None:
            break

    if FAST_MODE:
        # Very fast summary: mapped/unmapped by contig (no proper-pair %, no duplicates)
        # idxstats output: contig length mapped unmapped
        txt = run_cmd(["samtools", "idxstats", str(bam)], check=True)
        mapped_reads = 0
        unmapped_reads = 0
        for line in txt.splitlines():
            parts = line.split("\t")
            if len(parts) >= 4:
                mapped_reads += int(parts[2])
                unmapped_reads += int(parts[3])
        total_reads = mapped_reads + unmapped_reads
        rec = {
            "_key": key,
            "total_reads": total_reads,
            "mapped_reads": mapped_reads,
            "mapped_pct": (100.0 * mapped_reads / total_reads) if total_reads else None,
            "proper_pairs": None,
            "proper_pairs_pct": None,
            "dup_reads": None,
            "dup_rate_metric": dup_rate,
            "bam": str(bam),
        }
    else:
        fs = run_cmd(["samtools", "flagstat", str(bam)], check=True)
        total, mapped_n, mapped_pct, proper_n, proper_pct, dup_n = parse_flagstat(fs)
        rec = {
            "_key": key,
            "total_reads": total,
            "mapped_reads": mapped_n,
            "mapped_pct": mapped_pct,
            "proper_pairs": proper_n,
            "proper_pairs_pct": proper_pct,
            "dup_reads": dup_n,
            "dup_rate_metric": dup_rate,  # may be None for MarkDuplicatesSpark runs
            "bam": str(bam),
        }

    align_cache[sample_id] = rec
    return sample_id, rec

samples = sorted(PAIRS.keys())
align_stats = {}

if samples:
    todo = [s for s in samples if (ALN_DIR / f"{s}.dedup.bam").exists()]
    if todo:
        print(f"Computing alignment stats for {len(todo)} BAM(s) "
              f"({'FAST idxstats' if FAST_MODE else 'flagstat'} mode) "
              f"with {MAX_WORKERS} workers...")
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
            futs = [ex.submit(sample_alignment_stats, s) for s in todo]
            for fut in concurrent.futures.as_completed(futs):
                sid, rec = fut.result()
                if rec is not None:
                    align_stats[sid] = rec
    else:
        print("No dedup BAMs found to summarize.")
else:
    print("PAIRS is empty; no samples to summarize.")

# save cache
ALIGN_CACHE_PATH.write_text(json.dumps(align_cache, indent=2))

# ---------- VCF-level stats (cached) ----------
vcf_stats = {
    "vcf_path": str(VCF_GZ),
    "indexed": VCF_TBI.exists(),
    "samples": [],
    "n_variants": None,
    "n_snps": None,
    "n_indels": None,
    "ts_tv": None
}

if VCF_GZ.exists():
    v_st = VCF_GZ.stat()
    v_key = f"{v_st.st_mtime_ns}:{v_st.st_size}"
    cached = vcf_cache.get("cohort")
    if cached and cached.get("_key") == v_key:
        vcf_stats.update({k: cached.get(k) for k in vcf_stats.keys()})
    else:
        # sample list is cheap
        vcf_stats["samples"] = run_cmd(["bcftools", "query", "-l", str(VCF_GZ)], check=True).split()

        # bcftools stats can take time, but we cache it and only recompute if VCF changes
        stats_txt = run_cmd(["bcftools", "stats", str(VCF_GZ)], check=True)

        for line in stats_txt.splitlines():
            if line.startswith("SN"):
                parts = line.split("\t")
                if len(parts) >= 4:
                    label = parts[2].strip().lower()
                    val = parts[3].strip()
                    if label == "number of records:":
                        vcf_stats["n_variants"] = int(val)
                    elif label == "number of snps:":
                        vcf_stats["n_snps"] = int(val)
                    elif label == "number of indels:":
                        vcf_stats["n_indels"] = int(val)
            elif line.startswith("TSTV"):
                parts = line.split("\t")
                if len(parts) >= 6 and parts[2].upper() == "ALL":
                    try:
                        vcf_stats["ts_tv"] = float(parts[5])
                    except Exception:
                        pass

        vcf_cache["cohort"] = {"_key": v_key, **vcf_stats}
        VCF_CACHE_PATH.write_text(json.dumps(vcf_cache, indent=2))
else:
    print(f"[WARN] Cohort VCF not found: {VCF_GZ}")

# ---------- Per-sample VCF existence summary ----------
per_sample_vcfs = []
if PER_SAMPLE_DIR.exists():
    per_sample_vcfs = sorted(PER_SAMPLE_DIR.glob("*.vcf.gz"))

# ---------- Disk usage ----------
disk = shutil.disk_usage(str(RUN_DIR))
sizes = {
    "qc_raw":     dir_size_bytes(QC_RAW_DIR),
    "trimmed":    dir_size_bytes(TRIM_DIR),
    "alignments": dir_size_bytes(ALN_DIR),
    "variants":   dir_size_bytes(VAR_DIR),
}

# ---------- Build Markdown report ----------
lines = []
lines.append(f"# Run Report — {READ_SET_NAME}\n")
lines.append(f"_Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}_\n")
lines.append("## Paths")
lines.append(f"- Project root: `{PROJECT_ROOT}`")
lines.append(f"- Run dir: `{RUN_DIR}`")
lines.append(f"- Reference (run-local): `{REF_FASTA}`" if REF_FASTA else "- Reference: NA")
lines.append("")

lines.append("## Input summary")
lines.append(f"- Samples detected (paired FASTQs): **{len(PAIRS)}**")
lines.append(f"- Deduplicated BAMs present: **{len(list(ALN_DIR.glob('*.dedup.bam')))}**")
lines.append(f"- Raw FastQC dir: `{QC_RAW_DIR}`")
lines.append(f"- Trimmed reads dir: `{TRIM_DIR}`")
lines.append("")

lines.append("## Alignment summary (per sample)")
if align_stats:
    if FAST_MODE:
        lines.append("| Sample | Total reads | Mapped (%) | Dup. rate (metrics) | BAM |")
        lines.append("|---|---:|---:|---:|---|")
        for sid in sorted(align_stats.keys()):
            st = align_stats[sid]
            m_pct = f"{st['mapped_pct']:.2f}%" if st.get("mapped_pct") is not None else "NA"
            dup_rate = f"{st['dup_rate_metric']*100:.2f}%" if st.get("dup_rate_metric") is not None else "NA"
            lines.append(f"| {sid} | {st['total_reads'] or 'NA'} | {m_pct} | {dup_rate} | `{st['bam']}` |")
    else:
        lines.append("| Sample | Total reads | Mapped (%) | Proper pairs (%) | Duplicates | Dup. rate (metrics) | BAM |")
        lines.append("|---|---:|---:|---:|---:|---:|---|")
        for sid in sorted(align_stats.keys()):
            st = align_stats[sid]
            m_pct = f"{st['mapped_pct']:.2f}%" if st.get("mapped_pct") is not None else "NA"
            p_pct = f"{st['proper_pairs_pct']:.2f}%" if st.get("proper_pairs_pct") is not None else "NA"
            dup_reads = f"{st['dup_reads']:,}" if st.get("dup_reads") is not None else "NA"
            dup_rate = f"{st['dup_rate_metric']*100:.2f}%" if st.get("dup_rate_metric") is not None else "NA"
            lines.append(f"| {sid} | {st['total_reads'] or 'NA'} | {m_pct} | {p_pct} | {dup_reads} | {dup_rate} | `{st['bam']}` |")
else:
    lines.append("_No alignment stats found. Did alignment/dedup complete?_")
lines.append("")

lines.append("## VCF summary (FreeBayes cohort)")
if VCF_GZ.exists():
    lines.append(f"- Cohort VCF: `{VCF_GZ}` (indexed: **{'yes' if vcf_stats['indexed'] else 'no'}**)")
    lines.append(f"- Samples in cohort VCF: **{len(vcf_stats['samples'])}**")
    lines.append(f"- Total variants: **{vcf_stats['n_variants'] if vcf_stats['n_variants'] is not None else 'NA'}**")
    lines.append(f"- SNPs: **{vcf_stats['n_snps'] if vcf_stats['n_snps'] is not None else 'NA'}**")
    lines.append(f"- Indels: **{vcf_stats['n_indels'] if vcf_stats['n_indels'] is not None else 'NA'}**")
    lines.append(f"- Ts/Tv: **{vcf_stats['ts_tv'] if vcf_stats['ts_tv'] is not None else 'NA'}**")
else:
    lines.append(f"- Cohort VCF not found at `{VCF_GZ}`")
lines.append("")

lines.append("## Per-sample VCF export")
lines.append(f"- Per-sample VCF directory: `{PER_SAMPLE_DIR}`")
lines.append(f"- Per-sample VCFs present: **{len(per_sample_vcfs)}**")
lines.append("")

lines.append("## Disk usage for this run")
lines.append(f"- Run dir free/total: {human_bytes(disk.free)} / {human_bytes(disk.total)}")
for k, v in sizes.items():
    lines.append(f"- {k}: {human_bytes(v)}")
lines.append("")

# Write + render
REPORT_MD.write_text("\n".join(lines))
display(Markdown(REPORT_MD.read_text()))

# Machine-readable manifest
manifest = {
    "read_set": READ_SET_NAME,
    "run_dir": str(RUN_DIR),
    "reference": str(REF_FASTA) if REF_FASTA else None,
    "samples": sorted(PAIRS.keys()),
    "alignment": align_stats,
    "vcf": vcf_stats,
    "per_sample_vcfs": [str(p) for p in per_sample_vcfs],
    "sizes_bytes": {k: int(v) for k, v in sizes.items()},
    "generated_at": datetime.now().isoformat(),
    "summary_fast_mode": FAST_MODE,
    "workers_used": MAX_WORKERS,
}
MANIFEST_JSON.write_text(json.dumps(manifest, indent=2))

display(Markdown(f"**Saved:**  \n- `{REPORT_MD}`  \n- `{MANIFEST_JSON}`"))

Started summary report cell (fast version)...
Computing alignment stats for 48 BAM(s) (FAST idxstats mode) with 32 workers...


# Run Report — Brachycybe_producta_48sample_readset

_Generated: 2026-01-28 16:14:14_

## Paths
- Project root: `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline`
- Run dir: `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset`
- Reference (run-local): `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/reference/GCA_036925085.1_qdBraProd1.0.pri_genomic.fa`

## Input summary
- Samples detected (paired FASTQs): **48**
- Deduplicated BAMs present: **48**
- Raw FastQC dir: `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/qc/fastqc_raw`
- Trimmed reads dir: `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/trimmed`

## Alignment summary (per sample)
| Sample | Total reads | Mapped (%) | Dup. rate (metrics) | BAM |
|---|---:|---:|---:|---|
| CCGPMC021_BMEA101904_S1_L004 | 22363212 | 93.88% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101904_S1_L004.dedup.bam` |
| CCGPMC021_BMEA101907_S2_L004 | 23320672 | 93.77% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101907_S2_L004.dedup.bam` |
| CCGPMC021_BMEA101912_S25_L004 | 42856058 | 52.91% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101912_S25_L004.dedup.bam` |
| CCGPMC021_BMEA101919_S26_L004 | 18675920 | 35.67% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101919_S26_L004.dedup.bam` |
| CCGPMC021_BMEA101923_S27_L004 | 23470912 | 64.37% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101923_S27_L004.dedup.bam` |
| CCGPMC021_BMEA101927_S28_L004 | 62496500 | 51.65% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101927_S28_L004.dedup.bam` |
| CCGPMC021_BMEA101931_S29_L004 | 59604075 | 52.28% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101931_S29_L004.dedup.bam` |
| CCGPMC021_BMEA101935_S30_L004 | 57351238 | 42.06% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101935_S30_L004.dedup.bam` |
| CCGPMC021_BMEA101941_S31_L004 | 23537934 | 66.89% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101941_S31_L004.dedup.bam` |
| CCGPMC021_BMEA101945_S32_L004 | 36415162 | 49.50% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101945_S32_L004.dedup.bam` |
| CCGPMC021_BMEA101949_S3_L004 | 37618608 | 90.09% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101949_S3_L004.dedup.bam` |
| CCGPMC021_BMEA101982_S4_L004 | 41378178 | 93.58% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101982_S4_L004.dedup.bam` |
| CCGPMC021_BMEA101986_S5_L004 | 39824396 | 93.32% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101986_S5_L004.dedup.bam` |
| CCGPMC021_BMEA101990_S6_L004 | 29435367 | 92.75% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101990_S6_L004.dedup.bam` |
| CCGPMC021_BMEA101993_S7_L004 | 45531673 | 93.75% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101993_S7_L004.dedup.bam` |
| CCGPMC021_BMEA101997_S33_L004 | 4857630 | 38.02% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA101997_S33_L004.dedup.bam` |
| CCGPMC021_BMEA102003_S34_L004 | 30403871 | 49.55% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102003_S34_L004.dedup.bam` |
| CCGPMC021_BMEA102008_S35_L004 | 5538510 | 54.56% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102008_S35_L004.dedup.bam` |
| CCGPMC021_BMEA102014_S36_L004 | 40772219 | 44.52% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102014_S36_L004.dedup.bam` |
| CCGPMC021_BMEA102020_S8_L004 | 22673203 | 91.87% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102020_S8_L004.dedup.bam` |
| CCGPMC021_BMEA102038_S9_L004 | 30411727 | 51.38% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102038_S9_L004.dedup.bam` |
| CCGPMC021_BMEA102042_S10_L004 | 35530123 | 49.15% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102042_S10_L004.dedup.bam` |
| CCGPMC021_BMEA102046_S37_L004 | 32351914 | 54.02% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102046_S37_L004.dedup.bam` |
| CCGPMC021_BMEA102048_S38_L004 | 36699162 | 27.94% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102048_S38_L004.dedup.bam` |
| CCGPMC021_BMEA102051_S39_L004 | 63418990 | 39.26% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102051_S39_L004.dedup.bam` |
| CCGPMC021_BMEA102055_S11_L004 | 36222568 | 43.85% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102055_S11_L004.dedup.bam` |
| CCGPMC021_BMEA102060_S12_L004 | 41078316 | 50.48% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102060_S12_L004.dedup.bam` |
| CCGPMC021_BMEA102066_S40_L004 | 47665064 | 39.97% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102066_S40_L004.dedup.bam` |
| CCGPMC021_BMEA102070_S41_L004 | 25921169 | 37.87% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102070_S41_L004.dedup.bam` |
| CCGPMC021_BMEA102074_S13_L004 | 42492220 | 45.01% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102074_S13_L004.dedup.bam` |
| CCGPMC021_BMEA102080_S42_L004 | 30420527 | 35.52% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102080_S42_L004.dedup.bam` |
| CCGPMC021_BMEA102083_S14_L004 | 23069192 | 47.69% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102083_S14_L004.dedup.bam` |
| CCGPMC021_BMEA102090_S43_L004 | 42443460 | 34.44% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102090_S43_L004.dedup.bam` |
| CCGPMC021_BMEA102092_S44_L004 | 97846213 | 37.70% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102092_S44_L004.dedup.bam` |
| CCGPMC021_BMEA102093_S45_L004 | 67964932 | 42.16% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102093_S45_L004.dedup.bam` |
| CCGPMC021_BMEA102098_S15_L004 | 32844911 | 90.27% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102098_S15_L004.dedup.bam` |
| CCGPMC021_BMEA102104_S16_L004 | 33147837 | 94.42% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102104_S16_L004.dedup.bam` |
| CCGPMC021_BMEA102107_S17_L004 | 19146618 | 47.84% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102107_S17_L004.dedup.bam` |
| CCGPMC021_BMEA102112_S18_L004 | 14601129 | 92.57% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102112_S18_L004.dedup.bam` |
| CCGPMC021_BMEA102116_S19_L004 | 28603973 | 91.83% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102116_S19_L004.dedup.bam` |
| CCGPMC021_BMEA102119_S20_L004 | 29475742 | 48.89% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102119_S20_L004.dedup.bam` |
| CCGPMC021_BMEA102125_S21_L004 | 35357858 | 41.96% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102125_S21_L004.dedup.bam` |
| CCGPMC021_BMEA102129_S22_L004 | 40271030 | 95.30% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102129_S22_L004.dedup.bam` |
| CCGPMC021_BMEA102135_S23_L004 | 32532956 | 94.60% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102135_S23_L004.dedup.bam` |
| CCGPMC021_BMEA102137_S46_L004 | 46447395 | 43.59% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102137_S46_L004.dedup.bam` |
| CCGPMC021_BMEA102140_S24_L004 | 14927820 | 92.26% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102140_S24_L004.dedup.bam` |
| CCGPMC021_BMEA102175_S47_L004 | 61634979 | 38.86% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102175_S47_L004.dedup.bam` |
| CCGPMC021_BMEA102192_S48_L004 | 69232328 | 41.12% | NA | `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/alignments/CCGPMC021_BMEA102192_S48_L004.dedup.bam` |

## VCF summary (FreeBayes cohort)
- Cohort VCF: `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/variants/freebayes/cohort.freebayes.vcf.gz` (indexed: **yes**)
- Samples in cohort VCF: **48**
- Total variants: **18019086**
- SNPs: **13926477**
- Indels: **2698982**
- Ts/Tv: **NA**

## Per-sample VCF export
- Per-sample VCF directory: `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/variants/freebayes/per-sample`
- Per-sample VCFs present: **48**

## Disk usage for this run
- Run dir free/total: 1.76 TB / 15.00 TB
- qc_raw: 179.34 MB
- trimmed: 129.29 GB
- alignments: 144.91 GB
- variants: 119.02 GB


**Saved:**  
- `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/run_report.md`  
- `/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/results/Brachycybe_producta_48sample_readset/run_manifest.json`

## 8 End of Pipeline

In [23]:
# 🚧 Stop here unless explicitly allowed
import os, sys

display(Markdown("""
# 🎉 **Pipeline completed sucessfully!**
All analysis steps completed **successfully**.
system exit 
"""))

global_end_time = datetime.now()
global_run_time = global_end_time - global_start_time
print("total run time = ", global_run_time)
print("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")

raise SystemExit(0)  # clean stop


# 🎉 **Pipeline completed sucessfully!**
All analysis steps completed **successfully**.
system exit 


total run time =  0:22:06.500772




















SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


## 9. Endnotes: Odd/cool cells from the writing process put here for storage - _these are not the pipeline_

## SNAKEMAKE launcher cell -   
Keep for later improvement in case I go this way in the future

In [None]:
# === Snakemake launcher cell ============================================
# Run the Snakemake workflow from inside the Jupyter notebook.
#
# Requirements:
#  - workflow/Snakefile exists (we'll build it next)
#  - profiles/slurm/ contains Snakemake profile files (optional)
#  - PAIRS, READ_SET_NAME, REF_FASTA exist (from earlier cells)
#
# Modes:
#   mode = "local"  → single-node run (good for tiny test dataset)
#   mode = "slurm"  → submit jobs to Slurm using your profile
#
# =======================================================================

import os, sys, subprocess
from pathlib import Path
from IPython.display import display, Markdown

# --- Select mode ---
mode = "local"        # <-- change to "slurm" when ready
#mode = "slurm"

# --- Safety checks ---
if "PAIRS" not in globals() or len(PAIRS) == 0:
    raise RuntimeError("PAIRS dict is empty. Run your read‑selection cell first.")

if "REF_FASTA" not in globals():
    raise RuntimeError("REF_FASTA not defined. Run reference‑indexing cell first.")

# --- Directory setup ---
PROJECT_ROOT = Path(PROJECT_ROOT)
print("project_root = ", PROJECT_ROOT)
READ_SET_NAME = READ_SET_NAME if "READ_SET_NAME" in globals() else read_dir_dropdown.value
RUN_DIR = PROJECT_ROOT / "results" / READ_SET_NAME
CFG_DIR = PROJECT_ROOT / "workflow" / "config"
CFG_DIR.mkdir(parents=True, exist_ok=True)
SNAKEFILE = PROJECT_ROOT / "Snakefile"

# --- Write samples.tsv from PAIRS ---
samples_tsv = CFG_DIR / "samples.tsv"
with open(samples_tsv, "w") as f:
    f.write("sample\tR1\tR2\n")
    for sid, p in sorted(PAIRS.items()):
        f.write(f"{sid}\t{p['R1']}\t{p['R2']}\n")

# --- Write config.yaml ---
config_yaml = CFG_DIR / "config.yaml"
config_yaml.write_text(
    f"""
project_root: {PROJECT_ROOT}
read_set: {READ_SET_NAME}
reference: {Path(REF_FASTA).name}      # basename only; Snakefile resolves path
caller: bcftools                       # or "gatk"
threads_per_sample: 16
mem_gb_per_sample: 64
"""
)

display(Markdown(f"### 📝 Wrote config files:\n- `{samples_tsv}`\n- `{config_yaml}`"))

# --- Snakemake command ---
snk = [
    "snakemake",
    "--directory", str(PROJECT_ROOT),
    "-s", str(SNAKEFILE),
    "--cores", "1",                         # local scheduler cores
    "--printshellcmds",
    #"--rerun-incomplete",
    "--forceall",                          # for debug
]

if mode == "slurm":
    snk += [
        "--profile", "/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/profiles/slurm",
        "--jobs", "80",
    ]
else:
    # local mode for tiny tests
    snk += ["--cores", str(CPU_THREADS_SUGGESTED)]

display(Markdown(f"### 🚀 Running Snakemake in **{mode}** mode...\n```\n{' '.join(snk)}\n```"))

# --- Run Snakemake and stream output ---
process = subprocess.Popen(snk, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)

for line in process.stdout:
    print(line, end="")

process.wait()

if process.returncode == 0:
    display(Markdown("### 🎉 Snakemake pipeline complete!"))
else:
    raise RuntimeError(f"Snakemake failed with exit code {process.returncode}.")


#### Make tiny test dataset cell - keep in case needed

In [None]:
# tests/make_tiny_dataset.py
#
# Generates a tiny reference + two paired-end samples.
# - Sample1 matches reference
# - Sample2 has a few SNPs on ctgA
# - A fraction of reads include real adapter tails (i7 on R1 3', i5 on R2 3')
# - Outputs gzipped FASTQs
#
# Uses real adapters from:
#   /group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline/data/adapters/adapters.fasta
#
# After running, you can symlink/copy:
#   tests/tiny/ref/tiny.fa        -> data/references/tiny.fa
#   tests/tiny/reads/*.fastq.gz   -> data/read_directories/tiny_test/
#
from pathlib import Path
import random
import gzip
import os
import re

# -----------------------------
# Configuration
# -----------------------------
PROJECT_ROOT = Path("/group/jbondgrp2/stephenRichards/_Analysis_projects/_Ems_vcf_pipeline")

ADAPTERS_REAL = PROJECT_ROOT / "data" / "adapters" / "adapters.fasta"
OUT_ROOT      = Path("tests") / "tiny"
REF_DIR       = OUT_ROOT / "ref"
READS_DIR     = OUT_ROOT / "reads"
ADAPT_DIR     = OUT_ROOT / "adapters"    # optional local reference to adapters for the tiny test

REF_NAME      = "tiny.fa"
SAMPLES       = ["sample1", "sample2"]

READ_LEN      = 100     # length per end
INSERT_MEAN   = 300     # typical insert length
N_PAIRS       = 3000    # pairs per sample; keep small to run fast
ADAPT_FRAC    = 0.30    # fraction of pairs that will include adapter tails

RNG_SEED      = 42

# -----------------------------
# Helpers
# -----------------------------
def rand_dna(n):
    return "".join(random.choice("ACGT") for _ in range(n))

def rc(seq):
    comp = str.maketrans("ACGT", "TGCA")
    return seq.translate(comp)[::-1]

def load_adapters(fa_path):
    """
    Parse a simple FASTA file containing:
      >i5
      <seq>
      >i7
      <seq>
    Returns dict {'i5': seq, 'i7': seq}
    """
    if not fa_path.exists():
        raise FileNotFoundError(f"Adapters FASTA not found at: {fa_path}")

    adapters = {}
    name = None
    seqs = []
    with open(fa_path, "r") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            if line.startswith(">"):
                if name and seqs:
                    adapters[name] = "".join(seqs).upper()
                name = line[1:].strip().split()[0]  # take token after '>'
                seqs = []
            else:
                seqs.append(line.strip())
        if name and seqs:
            adapters[name] = "".join(seqs).upper()

    # Normalize keys: we expect 'i5' and 'i7'
    norm = {}
    for k, v in adapters.items():
        key = k.lower()
        if key in ("i5", "i7"):
            norm[key] = re.sub(r"[^ACGT]", "", v.upper())
    if "i5" not in norm or "i7" not in norm:
        raise ValueError(f"Adapters file must contain >i5 and >i7 entries. Found keys: {list(norm.keys())}")
    return norm

def write_fastq_gz(path, records):
    with gzip.open(path, "wt") as f:
        for name, seq, qual in records:
            f.write(f"@{name}\n{seq}\n+\n{qual}\n")

# -----------------------------
# Main
# -----------------------------
def main():
    random.seed(RNG_SEED)
    REF_DIR.mkdir(parents=True, exist_ok=True)
    READS_DIR.mkdir(parents=True, exist_ok=True)
    ADAPT_DIR.mkdir(parents=True, exist_ok=True)

    # Load real adapters
    adapters = load_adapters(ADAPTERS_REAL)
    i5 = adapters["i5"]
    i7 = adapters["i7"]

    # Symlink/copy adapters into tiny test dir (optional convenience)
    target_adapt = ADAPT_DIR / "adapters.fasta"
    if not target_adapt.exists():
        try:
            target_adapt.symlink_to(ADAPTERS_REAL)
        except Exception:
            # Fallback: copy
            target_adapt.write_text(ADAPTERS_REAL.read_text())

    # Build a tiny reference
    contigs = {
        "ctgA": list(rand_dna(30000)),
        "ctgB": list(rand_dna(20000)),
    }

    # SNP sites that sample2 will carry as ALT (ref stays as-is)
    snp_sites = [1000, 5000, 12000, 18000, 25000]  # on ctgA
    # For sample2 we’ll flip bases at these positions (simulate true polymorphisms)
    alt_bases = {}
    for pos in snp_sites:
        refb = contigs["ctgA"][pos]
        alts = [b for b in "ACGT" if b != refb]
        altb = random.choice(alts)
        alt_bases[pos] = altb

    ref_path = REF_DIR / REF_NAME
    with open(ref_path, "w") as f:
        for name, seq_list in contigs.items():
            f.write(f">{name}\n")
            seq = "".join(seq_list)
            for i in range(0, len(seq), 60):
                f.write(seq[i:i+60] + "\n")

    print(f"[ok] Wrote reference: {ref_path}")

    # Build per-sample sequence dicts (strings)
    seqs_s1 = {k: "".join(v) for k, v in contigs.items()}
    # Sample2: apply ALT at snp_sites on ctgA (only to sample2 basis)
    s2_ctgA = list(seqs_s1["ctgA"])
    for pos, alt in alt_bases.items():
        s2_ctgA[pos] = alt
    seqs_s2 = dict(seqs_s1)
    seqs_s2["ctgA"] = "".join(s2_ctgA)

    # Generate paired reads with a fraction containing adapter tails.
    # Model: For ADAPT_FRAC of fragments, create an insert shorter than READ_LEN,
    # so R1 and R2 have 3' overhangs filled by adapters (R1 gets i7 tail; R2 gets i5 tail).
    def generate_pairs(sample, seqs, n_pairs=N_PAIRS, read_len=READ_LEN, insert_mean=INSERT_MEAN, adapt_frac=ADAPT_FRAC):
        qual = "I" * read_len  # simple high-quality score
        r1_records = []
        r2_records = []

        keys = list(seqs.keys())
        for idx in range(n_pairs):
            ctg = random.choice(keys)
            seq = seqs[ctg]

            # Choose insert length
            if random.random() < adapt_frac:
                # short fragment (force adapter tails)
                frag_len = random.randint(30, read_len - 10)  # 20..90-ish
            else:
                # near mean with some variance
                frag_len = max(read_len + 20, int(random.gauss(insert_mean, insert_mean * 0.1)))

            if len(seq) <= frag_len + 10:
                # pick another contig if tiny; if still small, clamp
                frag_len = min(frag_len, len(seq) - 10)

            start = random.randint(0, max(0, len(seq) - frag_len - 1))
            frag = seq[start:start+frag_len]

            # R1 = forward 5'->3' from frag start
            r1seq = frag[:read_len]
            # R2 = reverse complement from frag end
            r2seq = rc(frag[-read_len:])

            # If fragment is shorter than read length, append adapter tails on 3'
            if len(frag) < read_len:
                overhang = read_len - len(frag)
                # R1 3' tail with i7
                r1_tail = (i7 * ((overhang // len(i7)) + 1))[:overhang]
                r1seq = frag + r1_tail

                # R2 3' tail with i5 (remember r2seq is RC of end of frag; appending tail in sequence-space)
                r2_tail = (i5 * ((overhang // len(i5)) + 1))[:overhang]
                # Because r2seq is already reverse-complement of fragment end, to simulate a real adapter tail
                # on the 3' end in read-space, we append the adapter *as-is* (not RC) to r2seq.
                r2seq = r2seq + r2_tail

            name = f"{sample}_{ctg}_{start}"
            r1_records.append((name + "/1", r1seq, qual))
            r2_records.append((name + "/2", r2seq, qual))

        return r1_records, r2_records

    r1_s1, r2_s1 = generate_pairs("sample1", seqs_s1)
    r1_s2, r2_s2 = generate_pairs("sample2", seqs_s2)

    # Write gzipped FASTQs
    out_r1_s1 = READS_DIR / "sample1_R1.fastq.gz"
    out_r2_s1 = READS_DIR / "sample1_R2.fastq.gz"
    out_r1_s2 = READS_DIR / "sample2_R1.fastq.gz"