In [6]:
# ==== Python deps ====
!pip install --quiet pandas matplotlib gradio

In [12]:
import gzip
import requests
from io import BytesIO
from Bio import SeqIO

url = "https://ftp.ensembl.org/pub/release-111/fasta/dicentrarchus_labrax/dna/Dicentrarchus_labrax.dlabrax2021.dna.toplevel.fa.gz"

# Télécharger dans la mémoire et décompresser
response = requests.get(url, stream=True)
response.raise_for_status()

with gzip.open(BytesIO(response.content), "rt") as handle:
    records = list(SeqIO.parse(handle, "fasta"))

print(f"Nombre total de contigs/chromosomes : {len(records)}\n")

# Aperçu des 5 premiers
for rec in records[:5]:
    print(f"ID: {rec.id}")
    print(f"Description: {rec.description}")
    print(f"Taille: {len(rec.seq):,} bases\n")

# Résumé global
lengths = [len(r.seq) for r in records]
print(f"Taille totale (bases) : {sum(lengths):,}")
print(f"Taille moyenne contig : {sum(lengths)//len(lengths):,}")
print(f"Max contig : {max(lengths):,}")
print(f"Min contig : {min(lengths):,}")


Nombre total de contigs/chromosomes : 302

ID: CAJNNU010000019.1
Description: CAJNNU010000019.1 dna:primary_assembly primary_assembly:dlabrax2021:CAJNNU010000019.1:1:37882625:1 REF
Taille: 37,882,625 bases

ID: CAJNNU010000021.1
Description: CAJNNU010000021.1 dna:primary_assembly primary_assembly:dlabrax2021:CAJNNU010000021.1:1:35031035:1 REF
Taille: 35,031,035 bases

ID: CAJNNU010000020.1
Description: CAJNNU010000020.1 dna:primary_assembly primary_assembly:dlabrax2021:CAJNNU010000020.1:1:34940063:1 REF
Taille: 34,940,063 bases

ID: CAJNNU010000004.1
Description: CAJNNU010000004.1 dna:primary_assembly primary_assembly:dlabrax2021:CAJNNU010000004.1:1:34614956:1 REF
Taille: 34,614,956 bases

ID: CAJNNU010000011.1
Description: CAJNNU010000011.1 dna:primary_assembly primary_assembly:dlabrax2021:CAJNNU010000011.1:1:34597903:1 REF
Taille: 34,597,903 bases

Taille totale (bases) : 695,892,153
Taille moyenne contig : 2,304,278
Max contig : 37,882,625
Min contig : 1,880


In [13]:
!apt-get update -y && apt-get install -y --no-install-recommends wget gzip samtools minimap2
!pip install --quiet fastapi uvicorn python-multipart nest_asyncio pyngrok


Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:5 https://cli.github.com/packages stable InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/restricted amd64 Packages [5,691 kB]
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,280 kB]
Get:14 http://ar

In [6]:
import shutil, subprocess

def ensure_binaries():
    missing = [t for t in ("minimap2","samtools","wget","gzip","ca-certificates") if shutil.which(t) is None]
    if missing:
        print("Installing:", " ".join(missing))
        subprocess.run(["apt-get","update","-y"], check=True)
        subprocess.run(["apt-get","install","-y","--no-install-recommends",
                        "minimap2","samtools","wget","gzip","ca-certificates"], check=True)
    # Vérif
    subprocess.run(["minimap2","--version"], check=True)
    subprocess.run("samtools --version | head -n1", shell=True, check=True)

ensure_binaries()


Installing: minimap2 samtools ca-certificates


In [10]:
# --- REMPLACE ta fonction d'échantillonnage par celle-ci (détection binaire du gz) ---
import gzip, random, tempfile

def sample_fastq_reservoir(src_fastq: str, n_reads=100_000, seed=42) -> tuple[str, int]:
    """
    Tire n_reads au hasard uniformément (reservoir) d'un FASTQ ou FASTQ.GZ,
    en détectant le gzip par signature binaire (pas par extension).
    Retourne (chemin_sample, total_reads_vus).
    """
    src = Path(src_fastq)
    # détection gzip robuste
    with open(src, "rb") as fh:
        is_gz = fh.read(2) == b"\x1f\x8b"
    opener = (lambda p: gzip.open(p, "rt")) if is_gz else (lambda p: open(p, "r"))

    rng = random.Random(seed)
    out_dir = Path(tempfile.mkdtemp(prefix="reservoir_"))
    out = out_dir / f"sample_{n_reads}.fastq"

    reservoir, k = [], 0
    rec = []
    with opener(src) as f:
        for i, line in enumerate(f):
            rec.append(line)
            if len(rec) == 4:       # NB: suppose FASTQ en 4 lignes (classique ONT/Illumina)
                k += 1
                if len(reservoir) < n_reads:
                    reservoir.append(list(rec))
                else:
                    j = rng.randrange(k)
                    if j < n_reads:
                        reservoir[j] = list(rec)
                rec.clear()
    # écrit le sous-échantillon
    with open(out, "w") as g:
        for r in reservoir:
            g.write("".join(r))
    if len(reservoir) < n_reads:
        print(f"ℹ️ Fichier plus petit que n={n_reads}: on a pris {len(reservoir)} reads sur {k}.")
    return str(out), k


In [12]:
# ==== PATCH ANTI BadGzipFile : correction chemin + sampling robuste ====
from pathlib import Path
import gzip, random, tempfile

# 1) Corrige le chemin si le contenu n'est pas gz (renomme .fastq.gz -> .fastq)
def fix_fastq_path(path: str) -> str:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"FASTQ introuvable: {p}")
    with open(p, "rb") as fh:
        is_gz = (fh.read(2) == b"\x1f\x8b")
    if p.suffix == ".gz" and not is_gz:
        # renomme en .fastq (sans recompresser)
        q = p.with_suffix("") if p.name.endswith(".fastq.gz") else p.with_suffix(".fastq")
        p.rename(q)
        print(f"ℹ️ Le fichier n'est pas gz → renommé en: {q}")
        return str(q)
    print(f"OK: gzip_detected={is_gz} pour {p}")
    return str(p)

# 2) Sampling réservoir avec détection binaire (aucune dépendance à l'extension)
def sample_fastq_reservoir(src_fastq: str, n_reads=100_000, seed=42) -> tuple[str, int]:
    """
    Tire n_reads reads au hasard (uniformément) d'un FASTQ **ou** FASTQ.GZ.
    Détection gzip par signature binaire. Retourne (chemin_sample, total_reads_vus).
    """
    src = Path(src_fastq)
    with open(src, "rb") as fh:
        is_gz = (fh.read(2) == b"\x1f\x8b")
    opener = (lambda p: gzip.open(p, "rt")) if is_gz else (lambda p: open(p, "r"))

    rng = random.Random(seed)
    out_dir = Path(tempfile.mkdtemp(prefix="reservoir_"))
    out = out_dir / f"sample_{n_reads}.fastq"

    reservoir, k, rec = [], 0, []
    with opener(src) as f:
        for i, line in enumerate(f):
            rec.append(line)
            if len(rec) == 4:  # 4 lignes par read
                k += 1
                if len(reservoir) < n_reads:
                    reservoir.append(list(rec))
                else:
                    j = rng.randrange(k)
                    if j < n_reads:
                        reservoir[j] = list(rec)
                rec.clear()
    with open(out, "w") as g:
        for r in reservoir:
            g.write("".join(r))
    if len(reservoir) < n_reads:
        print(f"ℹ️ Fichier plus petit que n={n_reads}: {len(reservoir)} reads tirés sur {k}.")
    return str(out), k

# 3) Applique la correction au chemin FASTQ existant, puis test d’ouverture
FASTQ = fix_fastq_path(FASTQ)

def _smoke_test_read(path: str, n_reads=1):
    p = Path(path)
    with open(p, "rb") as fh:
        is_gz = (fh.read(2) == b"\x1f\x8b")
    opener = (lambda q: gzip.open(q, "rt")) if is_gz else (lambda q: open(q, "r"))
    with opener(p) as f:
        rec, got = [], 0
        for i, line in enumerate(f):
            rec.append(line)
            if len(rec) == 4:
                got += 1
                rec.clear()
                if got >= n_reads:
                    break
    print(f"✅ Smoke-test OK: {got} read(s) lus avec {'gzip' if is_gz else 'texte'}")

_smoke_test_read(FASTQ, 1)


ℹ️ Le fichier n'est pas gz → renommé en: /content/input.fastq
✅ Smoke-test OK: 1 read(s) lus avec texte


In [40]:
# ========== eDNA — Bar européen (Dicentrarchus labrax) : robuste par chromosome ==========
import os, re, io, json, gzip, random, shutil, tempfile, statistics, subprocess, requests
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# ----------------- Config -----------------
SAMPLE_N   = 100_000   # nb de reads tirés uniformément (None = tout)
REPLICATES = 1
SEED       = 42
TOPN       = 30
THREADS    = int(os.environ.get("EDNA_THREADS", os.cpu_count() or 2))

# GCF dlabrax2021 (RefSeq) — génome + rapport d'assemblage
FTP_DIR = "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/905/237/075/GCF_905237075.1_dlabrax2021"
CACHE   = Path("~/.cache/edna").expanduser(); CACHE.mkdir(parents=True, exist_ok=True)
NUC_GZ  = CACHE / "GCF_905237075.1_dlabrax2021_genomic.fna.gz"
NUC_FA  = CACHE / "GCF_905237075.1_dlabrax2021_genomic.fna"
ASM_RPT = CACHE / "GCF_905237075.1_dlabrax2021_assembly_report.txt"

# ----------------- Utilitaires shell -----------------
def run(cmd):
    print(">>", " ".join(map(str, cmd)))
    subprocess.run(cmd, check=True)

def run_cap(cmd):
    return subprocess.run(cmd, check=True, capture_output=True, text=True).stdout

def ensure_tools():
    for t in ("minimap2", "samtools"):
        try:
            subprocess.run([t, "--version"], check=True, capture_output=True)
        except Exception as e:
            raise RuntimeError(f"{t} introuvable. Installe : !apt-get -y install {t}") from e

# ----------------- FASTQ -----------------
def resolve_fastq():
    x = globals().get("FASTQ", None)
    if isinstance(x, (str, os.PathLike)) and x and Path(x).exists():
        return str(Path(x))
    for p in ["/content/input.fastq.gz", "/content/input.fastq",
              "/content/sample.fastq.gz", "/content/sample.fastq"]:
        if Path(p).exists():
            return p
    raise FileNotFoundError("Aucun FASTQ trouvé. Place-le à /content/input.fastq(.gz) ou définis FASTQ='…'.")

# ----------------- Téléchargements de référence -----------------
def ensure_reference():
    if not NUC_FA.exists():
        run(["wget", "-O", str(NUC_GZ), f"{FTP_DIR}/GCF_905237075.1_dlabrax2021_genomic.fna.gz"])
        run(["gunzip", "-f", str(NUC_GZ)])
    if not ASM_RPT.exists():
        run(["wget", "-O", str(ASM_RPT), f"{FTP_DIR}/GCF_905237075.1_dlabrax2021_assembly_report.txt"])

def fasta_has_mito(fp: Path) -> bool:
    with open(fp, "r", errors="ignore") as f:
        for line in f:
            if line.startswith(">") and re.search(r"(mitochond|(^|\W)MT(\W|$)|(^|\W)chrM(\W|$)|^NC_\d+)", line, flags=re.I):
                return True
    return False

def ensure_combined_fasta():
    comb = CACHE / "dlabrax2021_plus_mito.fa"
    if fasta_has_mito(NUC_FA):
        return NUC_FA, Path(str(NUC_FA)+".mmi")
    # Ajoute NC_026074 si besoin
    mito_fa = CACHE / "dlabrax_mito_NC_026074.fa"
    if not mito_fa.exists():
        url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
        params = {"db":"nuccore","id":"NC_026074","rettype":"fasta","retmode":"text",
                  "tool":"edna-colab","email":"you@example.com"}
        r = requests.get(url, params=params, timeout=120); r.raise_for_status()
        mito_fa.write_text(r.text)
    with open(comb, "wb") as out:
        out.write(open(NUC_FA, "rb").read()); out.write(b"\n"); out.write(open(mito_fa, "rb").read())
    return comb, Path(str(comb)+".mmi")

# ----------------- Index minimap2 -----------------
def ensure_mmi(fa: Path, mmi: Path):
    if not mmi.exists():
        run(["minimap2", "-d", str(mmi), str(fa)])

# ----------------- Échantillonnage uniforme (réservoir) -----------------
def sample_fastq_reservoir(src_fastq: str, n_reads=100_000, seed=42) -> tuple[str, int]:
    src = Path(src_fastq)
    gz  = src.suffix == ".gz"
    op  = (lambda p: gzip.open(p, "rt")) if gz else (lambda p: open(p, "r"))
    rng = random.Random(seed)
    tmp = Path(tempfile.mkdtemp(prefix="reservoir_"))
    out = tmp / f"sample_{n_reads}.fastq"

    reservoir, k = [], 0
    rec = []
    with op(src) as f:
        for i, line in enumerate(f):
            rec.append(line)
            if len(rec) == 4:
                k += 1
                if len(reservoir) < n_reads:
                    reservoir.append(list(rec))
                else:
                    j = rng.randrange(k)
                    if j < n_reads:
                        reservoir[j] = list(rec)
                rec.clear()
    with open(out, "w") as g:
        for r in reservoir:
            g.write("".join(r))
    return str(out), k

# ----------------- Helpers analyse -----------------
def guess_preset_from_fastq(fastq_path, sample=20000):
    with open(fastq_path, "rb") as fh: gz = fh.read(2) == b"\x1f\x8b"
    op = (lambda p: gzip.open(p, "rt")) if gz else (lambda p: open(p, "r"))
    lens=[]
    with op(fastq_path) as f:
        for i, line in enumerate(f):
            if i % 4 == 1:
                lens.append(len(line.strip()))
                if len(lens) >= sample: break
    med = int(statistics.median(lens)) if lens else 0
    if med < 500: preset = "sr"
    elif med < 10000: preset = "map-ont"
    else: preset = "map-hifi"
    return preset, med, len(lens)

def q30_counts_by_contig(bam_path: Path):
    txt = run_cap(["samtools", "view", "-q", "30", "-F", "0x904", str(bam_path)])
    contigs=[]
    if txt.strip():
        for L in txt.splitlines():
            parts = L.split("\t", 4)
            if len(parts) >= 3:
                contigs.append(parts[2])
    ser = pd.Series(contigs, name="chrom").value_counts().rename_axis("chrom").reset_index(name="q30_primary")
    return ser

def mito_contigs_from_fasta(fp: Path):
    mito=[]
    with open(fp, "r", errors="ignore") as f:
        for line in f:
            if line.startswith(">"):
                name = line[1:].split()[0]
                if ("mitochond" in line.lower()) or re.search(r"(^mt$|^chrm$|^nc_\d+)", name, flags=re.I):
                    mito.append(name)
    return sorted(set(mito))

# ---------- Assembly report: mapping contig -> chromosome / unplaced ----------
def build_contig_to_chrom_map() -> dict:
    """Retourne un dict {accn -> '1'/'2'/... ou 'unplaced'} en lisant l'assembly_report NCBI."""
    headers = None
    rows = []
    with open(ASM_RPT, "r", encoding="utf-8", errors="ignore") as fh:
        for line in fh:
            if not line.strip():
                continue
            if line.startswith("#"):
                if line.startswith("# Sequence-Name"):
                    headers = line[1:].strip().split("\t")
                continue
            parts = line.strip().split("\t")
            if headers and len(parts) == len(headers):
                rows.append(dict(zip(headers, parts)))
    if not rows:
        return {}
    df = pd.DataFrame(rows)

    def chrom_label(r):
        role = (r.get("Sequence-Role","") or "").lower()
        loc  = (r.get("Assigned-Molecule-Location/Type","") or "").lower()
        assigned = (r.get("Assigned-Molecule","") or "").strip()
        if role in {"assembled-molecule","chromosome"} or loc == "chromosome":
            return assigned if assigned else (r.get("Sequence-Name") or "unplaced")
        return "unplaced"

    df["chrom_label"] = df.apply(chrom_label, axis=1)

    mapping = {}
    for _, r in df.iterrows():
        for key in ("RefSeq-Accn","GenBank-Accn","Sequence-Name"):
            k = r.get(key)
            if k:
                mapping[k] = r["chrom_label"]
    return mapping

def counts_per_chromosome(df_q30: pd.DataFrame) -> pd.DataFrame:
    contig2chrom = build_contig_to_chrom_map()
    df = df_q30.copy()
    df["chrom_label"] = df["chrom"].map(lambda x: contig2chrom.get(x, "unplaced"))
    out = (df.groupby("chrom_label", as_index=False)["q30_primary"]
             .sum()
             .sort_values("q30_primary", ascending=False))
    return out

# ----------------- Graphiques -----------------
def donut_nuc_mito(nuc_count, mito_count, title, out_png: Path):
    total = nuc_count + mito_count
    sns.set_theme(style="whitegrid")
    labels, vals = ["Nucléaire","Mitochondrial"], [nuc_count, mito_count]
    colors = sns.color_palette("deep", 2)
    fig, ax = plt.subplots(figsize=(5,5))
    ax.pie(vals, labels=labels, autopct=lambda p: f"{p:.1f}%", startangle=90,
           pctdistance=0.78, colors=colors, textprops={'fontsize':11})
    centre = plt.Circle((0,0), 0.55, fc="white"); fig.gca().add_artist(centre)
    ax.set_title(title + f"\nTotal bar: {total:,} reads (Q≥30)", fontsize=12)
    plt.tight_layout(); fig.savefig(out_png, dpi=220, bbox_inches="tight"); plt.close(fig)

def barplot_top_contigs(df_counts: pd.DataFrame, out_png: Path, topN=30, title=""):
    if df_counts.empty:
        fig, ax = plt.subplots(figsize=(8, 2.8))
        ax.text(0.5, 0.5, "Aucun contig (Q≥30)", ha="center", va="center"); ax.axis("off")
        fig.savefig(out_png, dpi=220, bbox_inches="tight"); plt.close(fig); return
    sns.set_theme(context="talk", style="whitegrid")
    top = df_counts.sort_values("q30_primary", ascending=True).tail(topN)
    fig, ax = plt.subplots(figsize=(10, max(3.8, 0.28*len(top))))
    sns.barplot(data=top, y="chrom", x="q30_primary", ax=ax)
    ax.set_xlabel("Lectures assignées (primaires, MAPQ ≥ 30)")
    ax.set_ylabel("Chromosome / Contig (bar)")
    ax.set_title(title)
    for c in ax.containers:
        ax.bar_label(c, fmt="%.0f", padding=2, fontsize=9)
    plt.tight_layout(); fig.savefig(out_png, dpi=240, bbox_inches="tight"); plt.close(fig)

def plot_counts_per_chromosome(df_chr: pd.DataFrame, out_png: Path,
                               title="Bar — séquences par chromosome (Q≥30)"):
    if df_chr.empty:
        fig, ax = plt.subplots(figsize=(8, 2.8))
        ax.text(0.5, 0.5, "Aucun chromosome détecté (Q≥30)", ha="center", va="center"); ax.axis("off")
        fig.savefig(out_png, dpi=220, bbox_inches="tight"); plt.close(fig); return
    sns.set_theme(context="talk", style="whitegrid")
    fig, ax = plt.subplots(figsize=(12, max(3.6, 0.35*len(df_chr))))
    sns.barplot(data=df_chr, y="chrom_label", x="q30_primary", ax=ax)
    ax.set_xlabel("Lectures de bar (primaires, MAPQ ≥ 30)")
    ax.set_ylabel("Chromosome")
    ax.set_title(title)
    for c in ax.containers:
        ax.bar_label(c, fmt="%.0f", padding=2, fontsize=10)
    plt.tight_layout(); fig.savefig(out_png, dpi=240, bbox_inches="tight"); plt.close(fig)

# ----------------- Pipeline principal -----------------
def analyze_fastq(fastq_path: str,
                  sample_n: int | None = SAMPLE_N,
                  threads: int = THREADS,
                  topn: int = TOPN,
                  seed: int = SEED):
    ensure_tools()
    ensure_reference()
    fa, mmi = ensure_combined_fasta()
    ensure_mmi(fa, mmi)

    export = Path("/content/edna_out"); export.mkdir(parents=True, exist_ok=True)
    mito_names = set(mito_contigs_from_fasta(fa))

    all_summaries = []

    for rep in range(1, REPLICATES+1):
        # 1) échantillon
        if sample_n is None:
            sample_path, total_seen = fastq_path, None
            print(f"[rep{rep}] Pas d'échantillonnage (tout le FASTQ).")
        else:
            sample_path, total_seen = sample_fastq_reservoir(fastq_path, n_reads=sample_n, seed=seed+(rep-1))
            print(f"[rep{rep}] Échantillon uniforme: {sample_n} reads tirés sur {total_seen}.")

        # 2) preset
        preset, med, n_len = guess_preset_from_fastq(sample_path)
        print(f"[rep{rep}] Preset auto: {preset} (médiane ~{med} bp; n_len={n_len})")

        # 3) alignement → BAM trié + index
        work = Path(tempfile.mkdtemp(prefix=f"edna_rep{rep}_"))
        bam = work / "aligned.sorted.bam"
        p1 = subprocess.Popen(["minimap2","-t",str(threads),"-ax",preset,str(mmi),str(sample_path)], stdout=subprocess.PIPE)
        p2 = subprocess.Popen(["samtools","view","-bS","-"], stdin=p1.stdout, stdout=subprocess.PIPE)
        p3 = subprocess.Popen(["samtools","sort","-o", str(bam)], stdin=p2.stdout)
        p1.stdout.close(); p2.stdout.close(); p3.communicate()
        run(["samtools","index", str(bam)])
        print(f"[rep{rep}] BAM:", bam)

        # 4) stats + q30 comptages
        fs = run_cap(["samtools","flagstat", str(bam)])
        m = re.search(r"(\d+)\s+\+\s+\d+\s+mapped", fs)
        t = re.search(r"(\d+)\s+\+\s+\d+\s+in total", fs)
        mapped_total = int(m.group(1)) if m else None
        reads_total  = int(t.group(1)) if t else None
        pct_mapped   = round(100*mapped_total/reads_total, 2) if (mapped_total and reads_total) else None

        idx = run_cap(["samtools","idxstats", str(bam)])
        df_idx = pd.read_csv(io.StringIO(idx), sep="\t", header=None,
                             names=["chrom","length","mapped","unmapped"], engine="python")
        df_idx = df_idx[df_idx["chrom"]!="*"].copy()

        df_q30 = q30_counts_by_contig(bam).merge(df_idx[["chrom","length"]], on="chrom", how="left")

        # ---- Nouveau : par CHROMOSOME (assembly_report)
        df_chr = counts_per_chromosome(df_q30)
        chr_png = export / f"bar_per_chromosome_rep{rep}.png"
        plot_counts_per_chromosome(df_chr, chr_png, "Bar — séquences par chromosome (Q≥30)")
        df_chr.to_csv(export / f"q30_per_chromosome_rep{rep}.csv", index=False)

        # Comptages bar / mito / nuc
        bar_primary_q30_total = int(df_q30["q30_primary"].sum())
        mito_q30    = int(df_q30.loc[df_q30["chrom"].isin(mito_names), "q30_primary"].sum())
        nuclear_q30 = bar_primary_q30_total - mito_q30
        mito_pct_q30    = round(100*mito_q30/max(1, bar_primary_q30_total), 2)
        nuclear_pct_q30 = round(100 - mito_pct_q30, 2)

        # 5) figures complémentaires
        donut_png = export / f"donut_nuc_vs_mito_rep{rep}.png"
        donut_nuc_mito(nuclear_q30, mito_q30, f"Bar — Nucléaire vs Mito (rep{rep})", donut_png)

        bar_png = export / f"bar_top{topn}_contigs_rep{rep}.png"
        barplot_top_contigs(df_q30, bar_png, topn, f"Bar — Lectures par contig (rep{rep}, Q≥30)")

        # 6) exports tabulaires
        df_q30.sort_values("q30_primary", ascending=False).to_csv(export / f"q30_counts_per_contig_rep{rep}.csv", index=False)

        # 7) résumé
        summary = {
            "replicate": rep,
            "fastq_used": sample_path if sample_n is not None else fastq_path,
            "sampled_reads": sample_n if sample_n is not None else None,
            "reads_total_flagstat": reads_total,
            "mapped_total_flagstat": mapped_total,
            "mapped_pct_flagstat": pct_mapped,
            "bar_primary_q30_total": bar_primary_q30_total,
            "bar_mito_primary_q30": mito_q30,
            "bar_nuclear_primary_q30": nuclear_q30,
            "bar_mito_pct_q30": mito_pct_q30,
            "bar_nuclear_pct_q30": nuclear_pct_q30,
            "preset": preset,
            "median_read_len": med,
        }
        all_summaries.append(summary)

    df_sum = pd.DataFrame(all_summaries)
    export.mkdir(exist_ok=True, parents=True)
    df_sum.to_csv(export/"summary_bar_alignment_q30.csv", index=False)

    print("\n=== RÉSUMÉ ALIGNEMENT BAR (Q≥30) ===")
    print(df_sum.to_string(index=False))
    print(f"\n📁 Exports dans: {export}")
    print(f" - Résumés: {export/'summary_bar_alignment_q30.csv'}")
    print(f" - Comptes par contig: q30_counts_per_contig_rep*.csv")
    print(f" - Comptes par chromosome: q30_per_chromosome_rep*.csv")
    print(f" - Figures: donut_nuc_vs_mito_rep*.png, bar_top{TOPN}_contigs_rep*.png, bar_per_chromosome_rep*.png")

    return df_sum, export

# ----------------- Exécution directe -----------------
if __name__ == "__main__":
    FASTQ = resolve_fastq()
    analyze_fastq(FASTQ, sample_n=SAMPLE_N, threads=THREADS, topn=TOPN, seed=SEED)


>> wget -O /root/.cache/edna/GCF_905237075.1_dlabrax2021_assembly_report.txt https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/905/237/075/GCF_905237075.1_dlabrax2021/GCF_905237075.1_dlabrax2021_assembly_report.txt
[rep1] Échantillon uniforme: 100000 reads tirés sur 205337.
[rep1] Preset auto: map-ont (médiane ~4788 bp; n_len=20000)
>> samtools index /tmp/edna_rep1_7b1u1dif/aligned.sorted.bam
[rep1] BAM: /tmp/edna_rep1_7b1u1dif/aligned.sorted.bam

=== RÉSUMÉ ALIGNEMENT BAR (Q≥30) ===
 replicate                                  fastq_used  sampled_reads  reads_total_flagstat  mapped_total_flagstat  mapped_pct_flagstat  bar_primary_q30_total  bar_mito_primary_q30  bar_nuclear_primary_q30  bar_mito_pct_q30  bar_nuclear_pct_q30  preset  median_read_len
         1 /tmp/reservoir_68exq047/sample_100000.fastq         100000                102453                   4183                 4.08                    937                     1                      936              0.11                99.89 m