# **MAGENTA:** The Global **MA**ngrove **GEN**e Ca**TA**logue


---
# SUMMARY

Los manglares son una conocida reserva de diversidad biológica y un ecosistema altamente productivo. Diversos estudios metagenómicos en diferentes partes del mundo han reconocido a la comunidad microbiana del manglar como un agente importante dentro de los ciclos biogeoquímicos, en los cuales se llevan a cabo procesos tales como la transformación del carbono, la fotosíntesis, la fijación de nitrógeno y la reducción de azufre. En la actualidad, sin embargo, no contamos con una herramienta informática que nos permita entender estos procesos y relaciones a una **escala global**.
MAGENTA (o Global MAngrove GENe CaTAlogue) actúa como un catálogo global de genes únicos y no redundantes a nivel de especie (agrupados al 95% de identidad de nucleótidos) que, a partir de los datos de libre acceso disponibles en bases de datos especializadas (WGS, metagenomas de acceso público - ENA) de cinco de los principales hábitats de la comunidad microbiana del manglar (rizosfera, agua de mar, sedimento, suelo, humedal), busca formular nuevas hipótesis sobre la abundancia, distribución y funciones metabólicas de los microorganismos en el ecosistema del manglar, con miras a atender esta necesidad.


# MAGENTA — descarga y enriquecimiento de metadatos

El contexto de MAGENTA

Al integrar metagenomas públicos, incorporamos sesgos de curación heterogénea (especialmente en NCBI). Usando coordenadas como eje común, incorporamos metadatos de repositorios globales y de acceso publico. Su contexto ambiental explicito altitud, clima (BIO1–BIO19), ecorregión marina, huella humana, tipo de suelos, pH, secuestro de carbono, cobertura vegetal (canopy), y diversidad de especies, sitúan a cada metadato en su nicho ambiental.  Esta estandarización de metadatos permite modelos comparativos y reproducibles, con el potencial de generar irmformación util en el desarrollo de estrategias de conservación, restauración, y la generación de hipótesis funcionales

## Reproducibilidad y entorno

- Fecha de generación del _notebook_: **2025-08-11**
- Requisitos (sugeridos): `python>=3.10`, `pandas`, `requests`, `tqdm`, `rasterio`, `geopandas`, `numpy`, `elevation`, `biopython`, `entrez-direct`, `sra-tools`, `aria2c`/`wget`, opcionalmente `aspera`.
- Sistema de archivos: el flujo se diseñó para coexistir con dos estilos de rutas previamente usadas en MAGENTA.


## 0) Estandarización de rutas y configuración

In [None]:
from pathlib import Path
import os
import subprocess
import shlex
import shutil
import pandas as pd
import numpy as np
import requests
from urllib.parse import urlencode
from tqdm import tqdm
import geopandas as gpd
from shapely.geometry import Point
import rasterio
from rasterio.warp import transform as rio_transform

# MAGENTA_DIR: respeta env externo; si no, usa el por defecto
os.environ.setdefault("MAGENTA_DIR", "/nfs/testing/.jbalvino/MAGENTA/MAGENTA_DIR")
PROJECT_DIR = Path(os.environ.get("MAGENTA_DIR", Path.cwd())).resolve()

RAW_DIR  = PROJECT_DIR / "rawdata" / "fastq"
META_DIR = PROJECT_DIR / "metadata"
OUT_DIR  = PROJECT_DIR / "outputs"
LOGS_DIR = PROJECT_DIR / "logs"
AUX_DIR  = PROJECT_DIR / "aux"
for d in [RAW_DIR, META_DIR, OUT_DIR, LOGS_DIR, AUX_DIR]:
    d.mkdir(parents=True, exist_ok=True)

META_BASE = "metadatos_manglar"
META_FILE = OUT_DIR / f"{META_BASE}.csv"

def save_tsv(df: pd.DataFrame, path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(path, sep="\t", index=False)

def load_tsv(path: Path) -> pd.DataFrame:
    return pd.read_csv(path, sep="\t", dtype=str)

def ensure_float(df: pd.DataFrame, cols):
    for c in cols:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    return df

def km_to_deg(km: float) -> float:
    return km / 111.32

print("PROJECT_DIR:", PROJECT_DIR)

# --- Entorno seguro para curl/EDirect y requests ---
CA_BUNDLE = certifi.where()
SAFE_ENV = dict(os.environ)
SAFE_ENV["CURL_CA_BUNDLE"] = CA_BUNDLE
SAFE_ENV["SSL_CERT_FILE"]  = CA_BUNDLE
SAFE_ENV.setdefault("TERM", "dumb")
SAFE_ENV.setdefault("EUTILS_TOOL", "magenta_edirect")
SAFE_ENV.setdefault("EUTILS_EMAIL", "jbalvino@masternew")
os.environ["REQUESTS_CA_BUNDLE"] = CA_BUNDLE

def run_cmd(cmd: str, retries: int = 3) -> int:
    print(">>", cmd)
    last = 1
    for i in range(1, retries+1):
        last = subprocess.call(cmd, shell=True, env=SAFE_ENV)
        if last == 0:
            return 0
        print(f"[run_cmd][retry {i}/{retries}] exit={last}")
    return last

def have_edirect() -> bool:
    for tool in ("esearch", "efetch"):
        code = subprocess.call(f"command -v {tool} >/dev/null 2>&1", shell=True, env=SAFE_ENV)
        if code != 0:
            return False
    return True

def postfilter_wgs_illumina_csv(csv_in: Path, csv_out: Path):
    """Filtra RunInfo por LibraryStrategy=WGS y Platform=ILLUMINA (case-insensitive)."""
    if not csv_in.exists() or csv_in.stat().st_size == 0:
        return
    with open(csv_in, newline='', encoding='utf-8', errors="ignore") as f:
        reader = csv.DictReader(f)
        rows = list(reader)
        if not rows:
            csv_out.write_text("") ; return
        fields = reader.fieldnames
    def ok(row):
        ls = (row.get("LibraryStrategy") or "").upper()
        pf = (row.get("Platform") or "").upper()
        return ("WGS" in ls) and ("ILLUMINA" in pf)
    kept = [r for r in rows if ok(r)]
    with open(csv_out, "w", newline='', encoding='utf-8') as g:
        w = csv.DictWriter(g, fieldnames=fields)
        w.writeheader()
        for r in kept:
            w.writerow(r)

### 1. Configuración y utilidades iniciales  

Importa librerías, define rutas estándar para el proyecto, crea las carpetas necesarias, configura un entorno seguro para ejecutar comandos de EDirect/requests, y declara funciones auxiliares (lectura/escritura de TSV, conversión de tipos, conversión km→grados, ejecución robusta de comandos y filtrado de metadatos WGS+ILLUMINA).  

**Entradas:**  
Ninguna (solo variables de entorno como `MAGENTA_DIR` si están definidas).  

**Salidas:**  
- Estructura de carpetas lista (`rawdata`, `metadata`, `outputs`, `logs`, `aux`)  
- Variables globales de rutas  
- Entorno seguro de red para `requests` y `curl/EDirect`  
- Funciones auxiliares disponibles (`save_tsv`, `load_tsv`, `ensure_float`, `km_to_deg`, `run_cmd`, `have_edirect`, `postfilter_wgs_illumina_csv`)  

**Parámetros editables:**  
- `MAGENTA_DIR`  
- `EUTILS_TOOL`  
- `EUTILS_EMAIL`  
- Número de reintentos en `run_cmd`  




# 1) Descarga de metadatos (ENA + NCBI)

### 1.1 ENA Portal API

In [None]:
# %%
ENA_BASE = "https://www.ebi.ac.uk/ena/portal/api/search"

def ena_query_for_mangrove() -> str:
    # scientific_name preferido + texto libre 'mangrove' como respaldo
    terms = '(scientific_name="mangrove metagenome" OR mangrove)'
    return f'(library_strategy="WGS" AND instrument_platform="ILLUMINA" AND {terms})'

ENA_FIELDS = [
    "run_accession","sample_accession","study_accession",
    "library_strategy","library_layout","instrument_platform",
    "collection_date","country",
    "location","lat","lon",
    "fastq_ftp","fastq_http","fastq_md5"
]

params = {
    "result": "read_run",
    "query":  ena_query_for_mangrove(),
    "fields": ",".join(ENA_FIELDS),
    "format": "tsv",
    "limit":  0
}
url = f"{ENA_BASE}?{urlencode(params)}"
print("URL ENA:", url)

r = requests.get(url, timeout=180, verify=CA_BUNDLE)
r.raise_for_status()

ena_tsv = META_DIR / "ena_mangrove.tsv"
with open(ena_tsv, "wb") as f:
    f.write(r.content)

ena_df = pd.read_csv(ena_tsv, sep="\t", dtype=str)
print("ENA metadatos filas:", len(ena_df), "->", ena_tsv)

## ENA Portal API — Manglar  

Consulta el portal de ENA (API) con una query específica para **metagenomas de manglar**. Luego los carga en un DataFrame de pandas para su posterior análisis.  

**Entradas:**  
- Endpoint público de ENA (`https://www.ebi.ac.uk/ena/portal/api/search`)  
- Parámetros de búsqueda (`params`) incluyendo filtro por:  
  - `library_strategy="WGS"`  
  - `instrument_platform="ILLUMINA"`  
  - `scientific_name="mangrove metagenome"` o término libre `mangrove`.  

**Salidas:**  
- Archivo `metadata/ena_mangrove.tsv`  
- DataFrame `ena_df` en memoria con los metadatos cargados.  

**Parámetros editables:**  
- `ena_query_for_mangrove()` (criterio de búsqueda en ENA)  
- `ENA_FIELDS` (campos a descargar, incluye coordenadas `lat`, `lon` y `location`)  
- `limit` (número de resultados, 0 = todos)  
- `timeout` (segundos de espera de la petición HTTP)

### 1.2 NCBI/SRA — EDirect

In [None]:
# %%
query_ncbi = (
    '("WGS"[Strategy] AND "ILLUMINA"[Platform]) AND '
    '( ( mangrove[All Fields] OR mangroves[All Fields] ) '
    '  AND ( metagenome[All Fields] OR metagenomic[All Fields] OR metagenomics[All Fields] ) )'
)

sra_raw_csv = META_DIR / "sra_mangrove_raw_runinfo.csv"
sra_csv     = META_DIR / "sra_mangrove_wgs_illumina.csv"

cmd = 'esearch -db sra -query {q} | efetch -format runinfo > {out}'.format(
    q=shlex.quote(query_ncbi),
    out=shlex.quote(str(sra_raw_csv))
)
print("Comando EDirect:")
print(cmd)

ret = run_cmd(cmd)
need_fallback = (ret != 0) or (not sra_raw_csv.exists()) or (sra_raw_csv.stat().st_size == 0)
if need_fallback:
    print("[SRA][WARN] EDirect falló o sin filas. Intentando fallback RunInfo CSV…")
    url_fb = "https://trace.ncbi.nlm.nih.gov/Traces/sra-db-be/runinfo?term=" + quote_plus(query_ncbi)
    print("[SRA][fallback] GET", url_fb)
    try:
        rr = requests.get(url_fb, timeout=180, verify=CA_BUNDLE)
        rr.raise_for_status()
        txt = rr.text
        if txt and "Run,ReleaseDate,LoadDate" in txt.splitlines()[0]:
            sra_raw_csv.write_text(txt, encoding="utf-8")
            print("[SRA][fallback] OK ->", sra_raw_csv)
        else:
            print("[SRA][fallback][WARN] Respuesta no luce como RunInfo CSV.")
    except Exception as e:
        print("[SRA][fallback][ERROR]", e)

postfilter_wgs_illumina_csv(sra_raw_csv, sra_csv)
if sra_csv.exists() and sra_csv.stat().st_size > 0:
    print("NCBI RunInfo (filtrado WGS+ILLUMINA) ->", sra_csv)
else:
    print("NCBI RunInfo no disponible.")

## NCBI/SRA — EDirect con fallback RunInfo (Manglar)  

Ejecuta una consulta en **NCBI SRA** usando **EDirect** (`esearch` + `efetch`) para obtener metadatos `RunInfo` de corridas relacionadas con **metagenomas de manglar**.  
- Refuerza el filtrado a **WGS + ILLUMINA** mediante un post-procesamiento.  
- Si EDirect falla o devuelve vacío, recurre a un **fallback** descargando el `RunInfo CSV` directamente desde el portal web de NCBI.  

**Entradas:**  
- Herramientas `esearch` y `efetch` disponibles en el `PATH`.  
- Endpoint web de NCBI SRA RunInfo como respaldo.  
- Query definida en `query_ncbi` que incluye:  
  - `"WGS"[Strategy] AND "ILLUMINA"[Platform]`  
  - `(mangrove OR mangroves)`  
  - `(metagenome OR metagenomic OR metagenomics)`  

**Salidas:**  
- `metadata/sra_mangrove_raw_runinfo.csv` → resultado bruto (EDirect o fallback).  
- `metadata/sra_mangrove_wgs_illumina.csv` → resultado filtrado solo con corridas WGS+ILLUMINA.  

**Parámetros editables:**  
- `query_ncbi` (criterio de búsqueda en NCBI).  
- Rutas de salida (`sra_raw_csv`, `sra_csv`).  
- `timeout` (segundos de espera en el fallback HTTP).  
- Número de reintentos en `run_cmd`.  


### 1.3 Unificación y filtro técnico mínimo

In [None]:
# %%
def normalize_ena(df: pd.DataFrame) -> pd.DataFrame:
    return df.rename(columns={
        "run_accession":      "Run",
        "sample_accession":   "Sample",
        "study_accession":    "Study",
        "library_layout":     "LibraryLayout",
        "library_strategy":   "LibraryStrategy",
        "instrument_platform":"Platform",
        "lat":                "latitude",
        "lon":                "longitude",
        "location":           "geographic_location",
    })

def normalize_sra(df: pd.DataFrame) -> pd.DataFrame:
    ren = {}
    if "lat" in df.columns: ren["lat"] = "latitude"
    if "lon" in df.columns: ren["lon"] = "longitude"
    return df.rename(columns=ren)

def _parse_ns_ew(val: str):
    if not isinstance(val, str):
        return None, None
    tokens = re.findall(r'([+-]?\d+(?:\.\d+)?)([NnSsEeWw]?)', val.strip())
    nums = []
    for num, hemi in tokens:
        x = float(num)
        if hemi.upper() == 'S': x = -abs(x)
        elif hemi.upper() == 'N': x =  abs(x)
        elif hemi.upper() == 'W': x = -abs(x)
        elif hemi.upper() == 'E': x =  abs(x)
        nums.append(x)
    if len(nums) >= 2:
        return nums[0], nums[1]
    return None, None

def _extract_lat_lon_from_text(s: str):
    if not isinstance(s, str):
        return np.nan, np.nan
    lat, lon = _parse_ns_ew(s)
    if lat is not None and lon is not None:
        return lat, lon
    nums = re.findall(r'([+-]?\d+(?:\.\d+)?)', s)
    if len(nums) >= 2:
        return float(nums[0]), float(nums[1])
    return np.nan, np.nan

def _valid_lat_lon(lat, lon):
    try:
        if pd.isna(lat) or pd.isna(lon):
            return False
        lat = float(lat); lon = float(lon)
        return np.isfinite(lat) and np.isfinite(lon) and -90.0 <= lat <= 90.0 and -180.0 <= lon <= 180.0
    except Exception:
        return False

ENA_BASE = "https://www.ebi.ac.uk/ena/portal/api/search"

def enrich_coords_from_ena_by_runs(run_ids, chunk_size=200):
    """Devuelve DataFrame: Run, latitude, longitude, geographic_location (ENA read_run)."""
    records = []
    run_ids = [r for r in run_ids if isinstance(r, str) and r.strip()]
    for i in range(0, len(run_ids), chunk_size):
        chunk = run_ids[i:i+chunk_size]
        ors = " OR ".join([f'run_accession="{r}"' for r in chunk])
        params = {
            "result": "read_run",
            "query":  ors,
            "fields": ",".join(["run_accession", "lat", "lon", "location"]),
            "format": "tsv",
            "limit":  0,
        }
        url = f"{ENA_BASE}?{urlencode(params)}"
        try:
            r = requests.get(url, timeout=180, verify=CA_BUNDLE)
            r.raise_for_status()
            if r.content:
                dfc = pd.read_csv(io.StringIO(r.content.decode("utf-8")), sep="\t", dtype=str)
                if not dfc.empty:
                    dfc = dfc.rename(columns={
                        "run_accession": "Run",
                        "lat": "latitude",
                        "lon": "longitude",
                        "location": "geographic_location",
                    })
                    records.append(dfc)
        except Exception as e:
            print(f"[ENA][enrich run][WARN] chunk {i//chunk_size}: {e}")
    if not records:
        return pd.DataFrame(columns=["Run","latitude","longitude","geographic_location"])
    return pd.concat(records, ignore_index=True, sort=False)

def enrich_coords_from_ena_by_samples(sample_ids, chunk_size=200):
    """Devuelve DataFrame: Sample, latitude, longitude, geographic_location (ENA read_sample)."""
    records = []
    sample_ids = [s for s in sample_ids if isinstance(s, str) and s.strip()]
    for i in range(0, len(sample_ids), chunk_size):
        chunk = sample_ids[i:i+chunk_size]
        ors = " OR ".join([f'sample_accession="{s}"' for s in chunk])
        params = {
            "result": "sample",
            "query":  ors,
            "fields": ",".join(["sample_accession","lat","lon","location","country"]),
            "format": "tsv",
            "limit":  0,
        }
        url = f"{ENA_BASE}?{urlencode(params)}"
        try:
            r = requests.get(url, timeout=180, verify=CA_BUNDLE)
            r.raise_for_status()
            if r.content:
                dfc = pd.read_csv(io.StringIO(r.content.decode("utf-8")), sep="\t", dtype=str)
                if not dfc.empty:
                    dfc = dfc.rename(columns={
                        "sample_accession": "Sample",
                        "lat": "latitude",
                        "lon": "longitude",
                        "location": "geographic_location",
                    })
                    records.append(dfc)
        except Exception as e:
            print(f"[ENA][enrich sample][WARN] chunk {i//chunk_size}: {e}")
    if not records:
        return pd.DataFrame(columns=["Sample","latitude","longitude","geographic_location"])
    return pd.concat(records, ignore_index=True, sort=False)

def fill_and_filter_geo(all_df: pd.DataFrame) -> pd.DataFrame:
    # Asegura columnas mínimas
    for col in ["latitude","longitude","geographic_location","Sample","Run"]:
        if col not in all_df.columns:
            all_df[col] = np.nan

    # (1) ENA por Run
    need_geo = all_df["Run"].notna() & (all_df["latitude"].isna() | all_df["longitude"].isna())
    runs_to_enrich = all_df.loc[need_geo, "Run"].dropna().astype(str).unique().tolist()
    if runs_to_enrich:
        print(f"[GEO] ENA read_run por Run (n={len(runs_to_enrich)})…")
        enr = enrich_coords_from_ena_by_runs(runs_to_enrich, chunk_size=200)
        if not enr.empty:
            all_df = all_df.merge(enr[["Run","latitude","longitude","geographic_location"]],
                                  on="Run", how="left", suffixes=("", "_ena"))
            for col in ["latitude","longitude","geographic_location"]:
                all_df[col] = all_df[col].fillna(all_df.get(f"{col}_ena"))
                if f"{col}_ena" in all_df.columns:
                    all_df.drop(columns=[f"{col}_ena"], inplace=True)

    # (2) ENA por Sample
    still_missing = (all_df["latitude"].isna() | all_df["longitude"].isna()) & all_df["Sample"].notna()
    samples_to_enrich = all_df.loc[still_missing, "Sample"].dropna().astype(str).unique().tolist()
    if samples_to_enrich:
        print(f"[GEO] ENA sample por Sample (n={len(samples_to_enrich)})…")
        ens = enrich_coords_from_ena_by_samples(samples_to_enrich, chunk_size=200)
        if not ens.empty:
            all_df = all_df.merge(ens[["Sample","latitude","longitude","geographic_location"]],
                                  on="Sample", how="left", suffixes=("", "_sena"))
            for col in ["latitude","longitude","geographic_location"]:
                all_df[col] = all_df[col].fillna(all_df.get(f"{col}_sena"))
                if f"{col}_sena" in all_df.columns:
                    all_df.drop(columns=[f"{col}_sena"], inplace=True)

    # (3) Parseo texto geographic_location
    still_missing2 = all_df["latitude"].isna() | all_df["longitude"].isna()
    if still_missing2.any():
        print(f"[GEO] Parseando geographic_location para {int(still_missing2.sum())} filas…")
        latlon = all_df.loc[still_missing2, "geographic_location"].apply(_extract_lat_lon_from_text)
        lat_parsed = latlon.map(lambda t: t[0])
        lon_parsed = latlon.map(lambda t: t[1])
        all_df.loc[still_missing2, "latitude"]  = all_df.loc[still_missing2, "latitude"].fillna(lat_parsed)
        all_df.loc[still_missing2, "longitude"] = all_df.loc[still_missing2, "longitude"].fillna(lon_parsed)

    # Convertir a numérico y validar
    all_df["latitude"]  = pd.to_numeric(all_df["latitude"], errors="coerce")
    all_df["longitude"] = pd.to_numeric(all_df["longitude"], errors="coerce")
    valid_mask = all_df.apply(lambda r: _valid_lat_lon(r["latitude"], r["longitude"]), axis=1)
    before_n = len(all_df)
    all_df = all_df[valid_mask].copy()
    after_n  = len(all_df)
    print(f"[GEO] Filtrado por coordenadas válidas: {before_n} -> {after_n}")
    return all_df

# ---- Unificación ----
ena_df_norm = pd.DataFrame()
sra_df_norm = pd.DataFrame()

if 'ena_df' in globals() and not ena_df.empty:
    ena_df_norm = normalize_ena(ena_df.copy())
    ena_df_norm["source"] = "ENA"
    ena_df_norm["ecosystem"] = "mangrove"

if (META_DIR / "sra_mangrove_wgs_illumina.csv").exists():
    sra_df = pd.read_csv(META_DIR / "sra_mangrove_wgs_illumina.csv", dtype=str)
    if not sra_df.empty:
        sra_df_norm = normalize_sra(sra_df.copy())
        sra_df_norm["source"] = "SRA"
        sra_df_norm["ecosystem"] = "mangrove"

frames = [x for x in [ena_df_norm, sra_df_norm] if not x.empty]
if not frames:
    print("[WARN] No hay tablas para unir.")
    all_df = pd.DataFrame()
else:
    all_df = pd.concat(frames, ignore_index=True, sort=False)

# Columnas mínimas
for col in ["Run","Sample","Study","LibraryLayout","LibraryStrategy","Platform",
            "latitude","longitude","geographic_location","ecosystem","source"]:
    if col not in all_df.columns:
        all_df[col] = np.nan

# Filtro WGS + ILLUMINA y de-dup por Run
mask = (
    all_df["LibraryStrategy"].fillna("").str.upper().str.contains("WGS") &
    all_df["Platform"].fillna("").str.upper().str.contains("ILLUMINA")
)
all_df = all_df[mask].copy()
all_df["Run"] = all_df["Run"].astype(str)
all_df = all_df.drop_duplicates(subset=["Run"], keep="first")

# Enriquecer coordenadas y filtrar geo válido
if not all_df.empty:
    all_df = fill_and_filter_geo(all_df)

# Exportar
if all_df.empty:
    print("[WARN] Tras filtrar por coordenadas válidas no quedaron corridas.")
    out_eco = OUT_DIR / "mangrove_metadata.tsv"
    save_tsv(pd.DataFrame(), out_eco)
    unified_tsv = META_DIR / "metadatos_unificados.tsv"
    save_tsv(pd.DataFrame(), unified_tsv)
else:
    out_eco = OUT_DIR / "mangrove_metadata.tsv"
    save_tsv(all_df, out_eco)
    unified_tsv = META_DIR / "metadatos_unificados.tsv"
    save_tsv(all_df, unified_tsv)
    print(f"[OK] mangrove: {len(all_df)} filas -> {out_eco}")
    print(f"[OK] Metadatos unificados (manglar, geo-válidos) -> {unified_tsv}")

    summary = (
        all_df.groupby(["ecosystem","source"])["Run"]
        .nunique()
        .reset_index()
        .rename(columns={"Run":"n_runs"})
        .sort_values(["ecosystem","source"])
    )
    log_path = LOGS_DIR / "summary_mangrove_wgs_illumina.tsv"
    save_tsv(summary, log_path)
    print(f"[OK] Resumen -> {log_path}")

# Unificación y filtros técnicos de metadatos genómicos

Combina y depura metadatos de **ENA** y **NCBI/SRA** para generar un dataset unificado con coordenadas geográficas validadas.

1. **Normalización** de columnas (`normalize_ena`, `normalize_sra`).
2. **Unificación** de tablas de ENA y SRA.
3. **Filtrado técnico**:  
   - Mantiene solo registros con `LibraryStrategy` metagenómica.  
   - (Opcional) restringe además a `WGS` y `ILLUMINA`.  
4. **Eliminación de duplicados** por `Run`.
5. **Enriquecimiento geográfico**:  
   - Consulta coordenadas en ENA (`read_run`, `sample`).  
   - Parseo de campos de texto (`geographic_location`).  
   - Validación de lat/lon.
6. **Exportación** de resultados en formatos `TSV`.

---

##  Funciones principales

- `normalize_ena(df)`: renombra columnas clave de ENA.  
- `normalize_sra(df)`: renombra columnas clave de SRA.  
- `_parse_ns_ew(val)`: interpreta coordenadas con hemisferio N/S/E/W.  
- `_extract_lat_lon_from_text(s)`: extrae lat/lon desde cadenas libres.  
- `_valid_lat_lon(lat, lon)`: valida rango y formato de coordenadas.  
- `enrich_coords_from_ena_by_runs(run_ids)`: obtiene coordenadas desde ENA por *Run*.  
- `enrich_coords_from_ena_by_samples(sample_ids)`: obtiene coordenadas desde ENA por *Sample*.  
- `fill_and_filter_geo(all_df)`: completa coordenadas faltantes y filtra inválidas.

---

##  Entradas

- `metadata/ena_mangrove.tsv` → tabla ENA.  
- `metadata/sra_mangrove_wgs_illumina.csv` → tabla NCBI/SRA.  

---

##  Salidas

- `metadata/metadatos_unificados.tsv` → tabla consolidada.  
- `output/mangrove_metadata.tsv` → dataset final filtrado por ecosistema.  
- `logs/summary_mangrove_wgs_illumina.tsv` → resumen por `ecosystem` y `source`.  
- `metadata/metadatos_unificados.csv` → versión CSV con todos los metadatos asociados.  

---

##  Parámetros editables

- **Mapeos de columnas** (`rename`).  
- **Filtros aplicados**:  
  - `metagenomic`  
  - `WGS`  
  - `ILLUMINA`

## 2) Descarga y conversión de lecturas (ENA/NCBI)

### 2.1 ENA — descarga paralela con aria2c

In [None]:
import os
import sys
import re
import subprocess
import datetime
from pathlib import Path
from urllib.parse import unquote
from shutil import which
import pandas as pd

# ---------------------------------------------------------------------
# Rutas fijas del proyecto (ajústalas si hace falta)
# ---------------------------------------------------------------------
PROJECT_DIR = Path("/nfs/testing/.jbalvino/MAGENTA/MAGENTA_DIR").resolve()
META_PATH   = PROJECT_DIR / "metadata" / "metadatos_unificados.tsv"
ENA_DIR     = PROJECT_DIR / "rawdata" / "ena"
CONV_DIR    = PROJECT_DIR / "rawdata" / "convertidos"
TMP_FQD     = PROJECT_DIR / "tmp_fqd"
LOGS_DIR    = PROJECT_DIR / "logs"
AUX_DIR     = PROJECT_DIR / "aux"
SRA_IMG     = "docker://ncbi/sra-tools:3.1.1"  # fallback en contenedor (si existe)

for d in (ENA_DIR, CONV_DIR, TMP_FQD, LOGS_DIR, AUX_DIR):
    d.mkdir(parents=True, exist_ok=True)

if not META_PATH.exists():
    print(f"[ERROR] No se encuentra {META_PATH}")
    sys.exit(1)

RUNLOG = (LOGS_DIR / f"descarga_all_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log").open("w")

def logprint(*args):
    msg = " ".join(str(a) for a in args)
    print(msg)
    try:
        RUNLOG.write(msg + "\n"); RUNLOG.flush()
    except Exception:
        pass

def run_cmd(cmd_list, check=True, env=None):
    """Ejecuta comando mostrando stdout/stderr en el log. Devuelve True/False."""
    logprint("[CMD]", " ".join(cmd_list))
    try:
        res = subprocess.run(
            cmd_list, check=check, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env
        )
        out = (res.stdout.decode(errors="ignore") + res.stderr.decode(errors="ignore")).strip()
        if out:
            logprint("[OUT]", out)
        return True
    except subprocess.CalledProcessError as e:
        out = ((e.stdout.decode(errors="ignore") if e.stdout else "") +
               (e.stderr.decode(errors="ignore") if e.stderr else "")).strip()
        if out:
            logprint("[ERR]", out)
        return False

# ---------------------------------------------------------------------
# EXCLUSIONES (opcional)
# ---------------------------------------------------------------------
EXCLUDE_RUNS = set(r.strip() for r in os.environ.get("EXCLUDE_RUNS", "").split(",") if r.strip())

# ---------------------------------------------------------------------
# CA corporativa / TLS
# ---------------------------------------------------------------------
def apply_corporate_ca():
    corp_ca = os.environ.get("CORP_CA", "").strip()
    sys_cas = [
        "/etc/pki/tls/certs/ca-bundle.crt",
        "/etc/ssl/certs/ca-certificates.crt",
        "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
    ]
    vdbc = which("vdb-config")
    def vdb_set(k, v):
        if vdbc:
            run_cmd([vdbc, "-s", f"{k}={v}"], check=False)

    if vdbc:
        run_cmd([vdbc, "--restore-defaults"], check=False)

    if corp_ca and Path(corp_ca).exists():
        os.environ["SSL_CERT_FILE"] = corp_ca
        os.environ["REQUESTS_CA_BUNDLE"] = corp_ca
        os.environ["CURL_CA_BUNDLE"] = corp_ca
        vdb_set("/tls/verify-peer", "true")
        vdb_set("/tls/use-system-ca-cert", "false")
        vdb_set("/tls/ca-file", corp_ca)
        logprint(f"[TLS] CA corporativa aplicada: {corp_ca}")
        return corp_ca

    for p in sys_cas:
        if Path(p).exists():
            os.environ.setdefault("SSL_CERT_FILE", p)
            os.environ.setdefault("REQUESTS_CA_BUNDLE", p)
            os.environ.setdefault("CURL_CA_BUNDLE", p)
            vdb_set("/tls/verify-peer", "true")
            vdb_set("/tls/use-system-ca-cert", "yes")
            vdb_set("/tls/ca-file", p)
            logprint(f"[TLS] CA del sistema aplicada: {p}")
            return p

    vdb_set("/tls/verify-peer", "true")
    vdb_set("/tls/use-system-ca-cert", "yes")
    logprint("[TLS] Sin CA explícita; configuración por defecto.")
    return None

CA_IN_USE = apply_corporate_ca()

def wget_base_args():
    base = [which("wget") or "wget", "-c", "--tries=10", "--read-timeout=30"]
    if os.environ.get("ALLOW_INSECURE_WGET") == "1":
        base.insert(1, "--no-check-certificate")
    elif CA_IN_USE and Path(CA_IN_USE).exists():
        base.extend(["--ca-certificate", CA_IN_USE])
    return base

# ---------------------------------------------------------------------
# Utilidades generales
# ---------------------------------------------------------------------
def sra_tools_version():
    fqd = which("fasterq-dump")
    if not fqd:
        return (0, 0, 0)
    ok = subprocess.run([fqd, "--version"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    m = re.search(r"fasterq-dump\.(\d+)\.(\d+)\.(\d+)", ok.stdout.decode())
    if m:
        return tuple(int(x) for x in m.groups())
    return (0, 0, 0)

SRA_VER = sra_tools_version()
logprint(f"[SRA-TOOLS] fasterq-dump versión detectada: {'.'.join(map(str,SRA_VER))}")

def parse_semicolon_urls(s: str):
    if not isinstance(s, str):
        return []
    return [u.strip() for u in s.split(";") if u and u.strip()]

def list_fastq_for_run(base_dir: Path, run_id: str):
    outs = []
    for root, _, files in os.walk(base_dir):
        for f in files:
            if f.endswith((".fastq", ".fastq.gz")) and (f.startswith(run_id) or f.startswith(f"{run_id}.sra")):
                outs.append(Path(root) / f)
    return sorted(outs)

def have_apptainer():
    return bool(which("apptainer") or which("singularity"))

def digits_after_prefix(run_id: str) -> str:
    # p.ej. SRR29002871 -> "29002871"
    return re.sub(r"^[A-Za-z]+", "", run_id)

def ena_dir2(run_id: str) -> str | None:
    # Regla ENA: 7 dígitos -> "00x"; 8 -> "0xx"; 9+ -> "xxx"; <=6 -> sin subcarpeta
    d = digits_after_prefix(run_id)
    L = len(d)
    if L <= 6: return None
    if L == 7:  return f"00{d[-1]}"
    if L == 8:  return f"0{d[-2:]}"
    return d[-3:]  # 9+

def ena_fastq_candidates(run_id: str):
    pre6 = run_id[:6]
    d2 = ena_dir2(run_id)
    parts = []
    if d2:
        parts.append((pre6, d2, run_id))
    parts.append((pre6, run_id))  # fallback sin d2
    urls = []
    for scheme in ("https", "http", "ftp"):
        for p in parts:
            base = f"{scheme}://ftp.sra.ebi.ac.uk/vol1/fastq/" + "/".join(p)
            urls += [f"{base}/{run_id}_1.fastq.gz", f"{base}/{run_id}_2.fastq.gz"]
    return urls

def ena_sra_candidates(run_id: str):
    pre6 = run_id[:6]
    d2 = ena_dir2(run_id)
    tuples = []
    if d2:
        tuples.append((pre6, d2, run_id))
    tuples.append((pre6, run_id))
    urls = []
    for scheme in ("https", "http", "ftp"):
        for p in tuples:
            base = f"{scheme}://ftp.sra.ebi.ac.uk/vol1/srr/" + "/".join(p)
            urls.append(f"{base}/{run_id}.sra")
    return urls

def wget_spider(url: str) -> bool:
    cmd = wget_base_args() + ["--spider", url]
    ok = run_cmd(cmd, check=False)
    return ok

def download_from_url(url: str, dest: Path) -> bool:
    dest.parent.mkdir(parents=True, exist_ok=True)
    if dest.exists() and dest.stat().st_size > 0:
        logprint(f"[DL] Ya existe: {dest.name}")
        return True
    cmd = wget_base_args() + [url, "-O", str(dest)]
    ok = run_cmd(cmd, check=False)
    return ok and dest.exists() and dest.stat().st_size > 0

def normalize_fastq_names(run_id: str):
    """Ajusta nombres post-dump: SRR.fastq -> SRR_1.fastq; gestiona single-end y opcional _2 vacío."""
    outs = list_fastq_for_run(CONV_DIR, run_id)
    if not outs:
        return

    # Caso común con input "archivo.sra": queda "SRRxxxxxx.sra.fastq"
    for p in list(outs):
        if p.name.endswith(".sra.fastq"):
            newp = p.with_name(p.name.replace(".sra.fastq", ".fastq"))
            p.rename(newp)
            logprint(f"[FIX] Renombrado {p.name} -> {newp.name}")
    outs = list_fastq_for_run(CONV_DIR, run_id)

    # Si hay exactamente 1 archivo y no es *_1/_2, renombrar a _1
    only = [p for p in outs if p.name == f"{run_id}.fastq" or p.name == f"{run_id}.fastq.gz"]
    if len(outs) == 1 and only:
        p = outs[0]
        suf = (p.suffixes[-2] + p.suffixes[-1]) if "".join(p.suffixes).endswith(".fastq.gz") else p.suffix
        target = p.with_name(f"{run_id}_1{suf}")
        p.rename(target)
        logprint(f"[FIX] Dataset single-end: {p.name} -> {target.name}")
        if os.environ.get("TOUCH_EMPTY_R2") == "1":
            r2 = target.with_name(f"{run_id}_2{suf}")
            if r2.suffix == ".gz":
                run_cmd(["bash", "-lc", f": | gzip -c > {r2}"], check=False)
            else:
                r2.touch()
            logprint(f"[FIX] Creado _2 vacío: {r2.name}")

# ---------------------------------------------------------------------
# Carga metadatos y detección de columnas
# ---------------------------------------------------------------------
logprint(f"[INFO] Cargando {META_PATH}")
df = pd.read_csv(META_PATH, sep="\t", dtype=str, low_memory=False).fillna("")
logprint(f"[INFO] Filas: {len(df):,}")

cols_lower = {c.lower(): c for c in df.columns}
COL_RUN = cols_lower.get("run") or cols_lower.get("run_accession") or cols_lower.get("run id") or cols_lower.get("run_id")

LINK_KEYS = ("ftp_link", "download_path", "fastq_ftp", "fastq_http", "sra_link", "sra_ftp", "sra_http")
COL_LINKS = [cols_lower[k] for k in LINK_KEYS if k in cols_lower]

if not COL_RUN:
    logprint("[ERROR] No se encontró columna Run/run_accession en el TSV.")
    sys.exit(1)

def row_get_links(row):
    links = []
    for col in (COL_LINKS or []):
        links += parse_semicolon_urls(str(row.get(col, "")))
    if not links:
        big = " ".join(map(str, row.values))
        if ";" in big:
            links = parse_semicolon_urls(big)
    return links

def is_ena_row(row):
    return any(".fastq.gz" in u.lower() for u in row_get_links(row))

def is_ncbi_row(row):
    L = [u.lower() for u in row_get_links(row)]
    return any(".lite.1" in u or u.endswith(".sra") for u in L)

# ---------------------------------------------------------------------
# ENA directa (si hay)
# ---------------------------------------------------------------------
def download_ena_row(row):
    run = str(row.get(COL_RUN, "")).strip()
    urls = [u for u in row_get_links(row) if ".fastq.gz" in u.lower()]
    if not urls:
        urls = ena_fastq_candidates(run)

    got = 0
    seen = set()
    aria2 = which("aria2c")
    if aria2 and urls:
        lst = AUX_DIR / f"ena_{run}.txt"
        with open(lst, "w") as f:
            for u in urls:
                if u not in seen:
                    f.write(u + "\n"); seen.add(u)
        cmd = [aria2, "--continue=true",
               "--max-concurrent-downloads=4", "--max-connection-per-server=8",
               "--retry-wait=5", "--max-tries=10", "--file-allocation=none",
               f"--input-file={str(lst)}", f"--dir={str(ENA_DIR)}"]
        run_cmd(cmd, check=False)
        for p in ENA_DIR.glob(f"{run}*.fastq.gz"):
            logprint(f"[ENA] descargado: {p.name}"); got += 1
    else:
        for u in urls:
            if u in seen: continue
            seen.add(u)
            fname = ENA_DIR / os.path.basename(unquote(u))
            if wget_spider(u) and download_from_url(u, fname):
                logprint(f"[ENA] descargado: {fname.name}")
                got += 1
    return got

# ---------------------------------------------------------------------
# Cadena NCBI (igual que el original corregido)
# ---------------------------------------------------------------------
def fasterq_dump_remote(run_id: str) -> bool:
    fqd = which("fasterq-dump")
    if not fqd:
        return False
    threads = os.environ.get("THREADS", "8")
    cmd = [fqd, run_id, "--split-files", "-O", str(CONV_DIR),
           "--threads", threads, "--temp", str(TMP_FQD)]
    ok = run_cmd(cmd, check=False, env=os.environ.copy())
    normalize_fastq_names(run_id)
    outs = list_fastq_for_run(CONV_DIR, run_id)
    return bool(outs)

def fasterq_dump_local(sra_path: Path, run_id: str) -> bool:
    fqd = which("fasterq-dump")
    if not fqd:
        return False
    threads = os.environ.get("THREADS", "8")
    cmd = [fqd, str(sra_path), "--outdir", str(CONV_DIR),
           "--split-files", "--threads", threads, "--temp", str(TMP_FQD)]
    ok = run_cmd(cmd, check=False)
    normalize_fastq_names(run_id)
    outs = list_fastq_for_run(CONV_DIR, run_id)
    if outs and os.environ.get("KEEP_SRA", "0") != "1":
        try: sra_path.unlink(); logprint(f"[NCBI] Eliminado {sra_path.name}")
        except Exception: pass
    return bool(outs)

def fastq_dump_local(sra_path: Path, run_id: str) -> bool:
    fd = which("fastq-dump")
    if not fd:
        return False
    cmd = [fd, str(sra_path), "--outdir", str(CONV_DIR),
           "--split-files", "--gzip", "--skip-technical", "--readids", "--dumpbase", "--clip"]
    ok = run_cmd(cmd, check=False)
    normalize_fastq_names(run_id)
    outs = list_fastq_for_run(CONV_DIR, run_id)
    if outs and os.environ.get("KEEP_SRA", "0") != "1":
        try: sra_path.unlink(); logprint(f"[NCBI] Eliminado {sra_path.name}")
        except Exception: pass
    return bool(outs)

def fasterq_dump_container(sra_path: Path, run_id: str) -> bool:
    runner = which("apptainer") or which("singularity")
    if not runner:
        return False
    bind_arg = f"{str(PROJECT_DIR)}:/work"
    in_path  = f"/work/{sra_path.relative_to(PROJECT_DIR)}"
    out_path = f"/work/{CONV_DIR.relative_to(PROJECT_DIR)}"
    tmp_path = f"/work/{TMP_FQD.relative_to(PROJECT_DIR)}"
    threads = os.environ.get("THREADS", "8")
    cmd = [runner, "exec", "--cleanenv", "--bind", bind_arg,
           SRA_IMG, "fasterq-dump", in_path,
           "--outdir", out_path, "--split-files", "--threads", threads, "--temp", tmp_path]
    ok = run_cmd(cmd, check=False)
    normalize_fastq_names(run_id)
    outs = list_fastq_for_run(CONV_DIR, run_id)
    if outs and os.environ.get("KEEP_SRA", "0") != "1":
        try: sra_path.unlink(); logprint(f"[NCBI] Eliminado {sra_path.name}")
        except Exception: pass
    return bool(outs)

def download_sra_from_ena(run_id: str) -> Path | None:
    for u in ena_sra_candidates(run_id):
        if wget_spider(u):
            dest = CONV_DIR / f"{run_id}.sra"
            if download_from_url(u, dest):
                logprint(f"[ENA-SRA] descargado: {dest.name}")
                return dest
    logprint("[ENA-SRA] No se encontró .sra en ENA.")
    return None

def download_fastq_from_ena(run_id: str) -> int:
    got = 0
    for u in ena_fastq_candidates(run_id):
        dest = CONV_DIR / os.path.basename(unquote(u))
        if wget_spider(u) and download_from_url(u, dest):
            logprint(f"[ENA-FASTQ] descargado: {dest.name}")
            got += 1
    return got

def download_ncbi_row_and_convert(row):
    run = str(row.get(COL_RUN, "")).strip()
    links = row_get_links(row)

    logprint(f"[NCBI] Intento remoto por accesión: {run}")
    if fasterq_dump_remote(run):
        for p in list_fastq_for_run(CONV_DIR, run):
            logprint(f"[NCBI] generado (remoto): {p.name}")
        return

    # Preferir SRA completo si el TSV lo trae
    sra_link  = next((u for u in links if u.lower().endswith(".sra")), None)
    lite_link = next((u for u in links if u.lower().endswith(".lite.1")), None)
    sra_path = None

    if sra_link:
        base = os.path.basename(sra_link)
        sra_path = CONV_DIR / base
        if not sra_path.exists():
            logprint(f"[NCBI] Descargando archivo del TSV: {base}")
            if not download_from_url(sra_link, sra_path):
                logprint("[NCBI] Descarga .sra desde NCBI falló.")
                sra_path = None
        else:
            logprint(f"[NCBI] Ya existe archivo: {sra_path.name}")

    if sra_path and sra_path.exists() and sra_path.stat().st_size > 0:
        if fasterq_dump_local(sra_path, run) or fastq_dump_local(sra_path, run) or fasterq_dump_container(sra_path, run):
            for p in list_fastq_for_run(CONV_DIR, run):
                logprint(f"[NCBI] generado (.sra): {p.name}")
            return
        logprint("[NCBI] Conversión falló con .sra del TSV.")

    # ENA: primero FASTQ (si existen), luego SRA
    logprint("[NCBI] Intento obtener FASTQ directos desde ENA.")
    if download_fastq_from_ena(run) > 0:
        for p in list_fastq_for_run(CONV_DIR, run):
            logprint(f"[NCBI] generado (ENA FASTQ): {p.name}")
        return

    logprint("[NCBI] Intento obtener .sra completo desde ENA.")
    ena_sra = download_sra_from_ena(run)
    if ena_sra and ena_sra.exists() and ena_sra.stat().st_size > 0:
        if fasterq_dump_local(ena_sra, run) or fastq_dump_local(ena_sra, run) or fasterq_dump_container(ena_sra, run):
            for p in list_fastq_for_run(CONV_DIR, run):
                logprint(f"[NCBI] generado (ENA .sra): {p.name}")
            return

    if lite_link:
        base = os.path.basename(lite_link)
        lite_path = CONV_DIR / base
        if not lite_path.exists():
            logprint(f"[NCBI] Descargando archivo .lite.1 del TSV: {base}")
            if not download_from_url(lite_link, lite_path):
                lite_path = None
        if lite_path and lite_path.exists() and lite_path.stat().st_size > 0:
            if fasterq_dump_container(lite_path, run):
                for p in list_fastq_for_run(CONV_DIR, run):
                    logprint(f"[NCBI] generado (.lite.1 → contenedor 3.x): {p.name}")
                return

    logprint("[NCBI] No fue posible generar FASTQ para la corrida.")

# ---------------------------------------------------------------------
# EJECUCIÓN PARA TODAS LAS FILAS
# ---------------------------------------------------------------------
procesadas = set()
ok_ena = ok_ncbi = 0

for i, row in df.iterrows():
    run = str(row.get(COL_RUN, "")).strip()
    if not run:
        continue
    if run in EXCLUDE_RUNS:
        logprint(f"[SKIP] EXCLUDE_RUNS: {run}")
        continue
    if run in procesadas:
        continue
    procesadas.add(run)

    try:
        if is_ena_row(row):
            logprint(f"\n[PROCESS][ENA] idx={i} Run={run}")
            got = download_ena_row(row)
            ok_ena += 1 if got > 0 else 0
            # Si el TSV trae .fastq.gz ya no intentamos NCBI para evitar duplicados
            continue

        if is_ncbi_row(row):
            logprint(f"\n[PROCESS][NCBI] idx={i} Run={run}")
            download_ncbi_row_and_convert(row)
            ok_ncbi += 1
        else:
            logprint(f"[WARN] Fila sin enlaces útiles: idx={i} Run={run}")

    except Exception as e:
        logprint(f"[ERROR] Fallo procesando idx={i} Run={run}: {e}")

# ---------------------------------------------------------------------
# Verificaciones simples
# ---------------------------------------------------------------------
try:
    ena_files = [f for f in os.listdir(ENA_DIR) if f.endswith(".fastq.gz")]
    print(f"[CHK][ENA] archivos (directos): {len(ena_files)}")
except Exception as e:
    print("[CHK][ENA] error:", e)

try:
    conv_files = [f for f in os.listdir(CONV_DIR) if f.endswith(".fastq") or f.endswith(".fastq.gz")]
    print(f"[CHK][NCBI] archivos: {len(conv_files)}")
except Exception as e:
    print("[CHK][NCBI] error:", e)

print(f"Procesadas ENA: {ok_ena}, NCBI: {ok_ncbi}, Total RUNs únicos: {len(procesadas)}")
print("Proceso ENA + NCBI para TODAS las muestras finalizado.")


# Descarga y conversión ENA/NCBI

- Procesa todas las filas del **TSV**.  
- Descarga **FASTQ** desde **ENA** cuando hay `.fastq.gz`.  
- Si no hay, convierte desde **NCBI**, intentando en orden:
  1. `fasterq-dump <RUN>` remoto  
  2. Descarga y conversión de `.sra` (ENA/NCBI)  
  3. Conversión de `.lite.1` con contenedor **sra-tools 3.x** (si existe)  
- Normaliza nombres: `<RUN>_1.fastq` y opcional `<RUN>_2.fastq`.

---

## Entradas
- **TSV**: `metadata/metadatos_unificados.tsv` con columnas:
  - `Run` / `run_accession`
  - `fastq_ftp`, `fastq_http`
  - `sra_ftp`, `sra_http`
  - `ftp_link`
  - `download_path`
- **CA opcional**: `CORP_CA`

---

## Salidas
- **FASTQ** en `rawdata/convertidos/`  
  - *Paired*: `<RUN>_1.fastq[.gz]`, `<RUN>_2.fastq[.gz]`  
  - *Single*: `<RUN>_1.fastq[.gz]`  
- Descargas **ENA** en `rawdata/ena/`  
- **Logs** en: `logs/descarga_all_YYYYmmdd_HHMMSS.log`  
- Temporales en: `tmp_fqd/`

---

## Parámetros editables
- `THREADS`  
- `KEEP_SRA`  
- `EXCLUDE_RUNS`  
- `ALLOW_INSECURE_WGET`  
- `TOUCH_EMPTY_R2`  
- `CORP_CA`  

### Rutas base
- `PROJECT_DIR`  
- `ENA_DIR`  
- `CONV_DIR`  


# 3) Limpieza y enriquecimiento de metadatos (centrado en coordenadas)

## 3.1 Altitud (SRTM)

In [None]:
import elevation

INPUT_TSV = OUT_DIR / "metadatos_unificados.tsv"
OUTPUT_TSV= OUT_DIR / "metadatos_con_altitud.tsv"
BUFFER    = 1.0

df = pd.read_csv(INPUT_TSV, sep="\t")
lat_min, lat_max = df["latitude"].astype(float).min()-BUFFER, df["latitude"].astype(float).max()+BUFFER
lon_min, lon_max = df["longitude"].astype(float).min()-BUFFER, df["longitude"].astype(float).max()+BUFFER

DEM_TIF.parent.mkdir(parents=True, exist_ok=True)
elevation.clip(bounds=(lon_min, lat_min, lon_max, lat_max), output=str(DEM_TIF))
elevation.clean()

vals = []
with rasterio.open(DEM_TIF) as src:
    for _, r in df.iterrows():
        lon, lat = float(r["longitude"]), float(r["latitude"])
        x, y = rio_transform({'init': 'EPSG:4326'}, src.crs, [lon], [lat])
        v = next(src.sample([(x[0], y[0])]))[0]
        vals.append(float(v) if v is not None else np.nan)

df["altitude_m"] = vals
save_tsv(df, OUTPUT_TSV)
print("Altitud añadida ->", OUTPUT_TSV)

**Resumen del bloque — SRTM**  
**Qué hace:** recorta DEM y extrae altitud por coordenada.  
**Entradas:** `outputs/metadatos_con_coord.tsv`, DEM `srtm.tif`.  
**Salidas:** `outputs/metadatos_con_altitud.tsv`.  
**Parámetros editables:** `BUFFER`, `DEM_TIF`.


## 3.3 WorldClim 2.1 (BIO1–BIO19)

In [None]:
INPUT_TSV  = OUT_DIR / "metadatos_con_altitud.tsv"
OUTPUT_TSV = OUT_DIR / "metadatos_con_clima.tsv"

df = pd.read_csv(INPUT_TSV, sep="\t")
for i in range(1, 20):
    df[f"bio{i}"] = np.nan

def extract_from_raster(src, lon, lat):
    try:
        row, col = src.index(lon, lat)
        return float(src.read(1)[row, col])
    except Exception:
        return np.nan

for i in range(1, 19+1):
    tif = CLIM_DIR / f"wc2.1_30s_bio_{i}.tif"
    if not tif.exists():
        print("Falta", tif, "omitiendo bio", i)
        continue
    with rasterio.open(tif) as src:
        for idx,(lon,lat) in enumerate(tqdm(zip(df["longitude"].astype(float), df["latitude"].astype(float)), total=len(df), desc=f"bio{i}")):
            v = extract_from_raster(src, float(lon), float(lat))
            if not np.isfinite(v):
                arr = src.read(1)
                r,c = src.index(float(lon), float(lat))
                r0, r1 = max(0, r-10), min(arr.shape[0], r+11)
                c0, c1 = max(0, c-10), min(arr.shape[1], c+11)
                window = arr[r0:r1, c0:c1]
                valid = window[np.isfinite(window)]
                v = float(valid[0]) if valid.size>0 else np.nan
            df.at[idx, f"bio{i}"] = v

save_tsv(df, OUTPUT_TSV)
print("WorldClim añadido ->", OUTPUT_TSV)

**Resumen del bloque — WorldClim**  
**Qué hace:** extrae BIO1–BIO19 por coordenada con fallback local.  
**Entradas:** `outputs/metadatos_con_altitud.tsv`, `wc2.1_30s_bio_*.tif`.  
**Salidas:** `outputs/metadatos_con_clima.tsv`.  
**Parámetros editables:** `CLIM_DIR`, vecindad.


## 3.4 Ecorregiones marinas (MEOW)

In [None]:
INPUT_TSV  = OUT_DIR / "metadatos_con_clima.tsv"
OUTPUT_TSV = OUT_DIR / "metadatos_con_ecoregion_marina.tsv"

df = pd.read_csv(INPUT_TSV, sep="\t")
gdf_points = gpd.GeoDataFrame(
    df.copy(),
    geometry=[Point(xy) for xy in zip(df["longitude"].astype(float), df["latitude"].astype(float))],
    crs="EPSG:4326"
)

if not MEOW_SHP.exists():
    raise FileNotFoundError(f"No se encontró el shapefile MEOW en {MEOW_SHP}. Ajuste la ruta MEOW_SHP.")
gdf_meow = gpd.read_file(MEOW_SHP).to_crs("EPSG:4326")

joined = gpd.sjoin(
    gdf_points,
    gdf_meow[["ECOREGION", "PROVINCE", "REALM", "geometry"]],
    how="left",
    predicate="intersects"
)

out = joined.drop(columns="geometry")
save_tsv(out, OUTPUT_TSV)
print("MEOW añadido ->", OUTPUT_TSV)

**Resumen del bloque — MEOW**  
**Qué hace:** crea `GeoDataFrame` de puntos y une con MEOW para asignar ecorregión/provincia/reino.  
**Entradas:** `outputs/metadatos_con_clima.tsv`, shapefile `MEOW`.  
**Salidas:** `outputs/metadatos_con_ecoregion_marina.tsv`.  
**Parámetros editables:** `MEOW_SHP`, `predicate`.


## 3.5 Huella humana (HII 2020)

In [None]:
INPUT_TSV = OUT_DIR / "metadatos_con_ecoregion_marina.tsv"
OUTPUT_TSV= OUT_DIR / "metadatos_con_hii.tsv"

if not HII_TIF.exists():
    raise FileNotFoundError(f"No se encontró el raster HII en {HII_TIF}. Ajuste la ruta HII_TIF.")

df = pd.read_csv(INPUT_TSV, sep="\t")
vals = []
with rasterio.open(HII_TIF) as src:
    arr = src.read(1)
    nodata = src.nodata
    for lon, lat in tqdm(zip(df["longitude"].astype(float), df["latitude"].astype(float)), total=len(df), desc="Extrayendo HII"):
        try:
            r, c = src.index(float(lon), float(lat))
            if 0 <= r < arr.shape[0] and 0 <= c < arr.shape[1]:
                v = arr[r, c]
            else:
                v = np.nan
            if (nodata is not None and v == nodata) or not np.isfinite(v):
                r0, r1 = max(0, r-10), min(arr.shape[0], r+11)
                c0, c1 = max(0, c-10), min(arr.shape[1], c+11)
                sub = arr[r0:r1, c0:c1]
                valid = sub[(sub != nodata) & np.isfinite(sub)]
                v = valid[0] if valid.size > 0 else np.nan
            vals.append(float(v) if np.isfinite(v) else np.nan)
        except Exception:
            vals.append(np.nan)

df["human_footprint_2020"] = vals
save_tsv(df, OUTPUT_TSV)
print("HII añadido ->", OUTPUT_TSV)

**Resumen del bloque — HII**  
**Qué hace:** extrae huella humana para cada coordenada con manejo de NoData y fallback local.  
**Entradas:** `outputs/metadatos_con_ecoregion_marina.tsv`, raster `hii_2020.tif`.  
**Salidas:** `outputs/metadatos_con_hii.tsv`.  
**Parámetros editables:** `HII_TIF`, tamaño de vecindad.


## 3.6 Suelos — SoilGrids v2.0 (0–30 cm) + WRB

In [None]:
import time
import math
from typing import Dict, Tuple, Any, Optional, List
import requests
import numpy as np
import pandas as pd
from tqdm import tqdm

INPUT_TSV  = OUT_DIR / "metadatos_con_hii.tsv"
OUTPUT_TSV = OUT_DIR / "metadatos_con_suelos.tsv"

ALL_PROPS = ["bdod", "cec", "clay", "sand", "silt", "phh2o", "soc", "nitrogen", "cfvo", "ocs"]
DEPTHS    = ["0-5cm","5-15cm","15-30cm"]
VALUES    = ["mean","Q0.5"]

REQUESTS_PER_SEC = 6
SLEEP_BETWEEN = 1.0 / REQUESTS_PER_SEC

MAX_RETRIES = 4
RETRY_BACKOFF = [1.0, 2.0, 4.0, 8.0]

SOIL_ENDPOINTS = [
    "https://rest.isric.org/soilgrids/v2.0/properties/query",
    "https://api.soilgrids.org/soilgrids/v2.0/properties/query",
]

CONNECT_TIMEOUT_S = 8
READ_TIMEOUT_S = 20


RADIUSES_KM = [0.0, 0.5, 1.0, 1.5, 2.0]
DIRECTIONS_DEG = [0, 45, 90, 135, 180, 225, 270, 315]

ROUND_DECIMALS: Optional[int] = None  # p.ej., 4 => ~11 m

def move_point(lat: float, lon: float, distance_km: float, bearing_deg: float) -> Tuple[float, float]:
    R = 6371.0
    b = math.radians(bearing_deg)
    lat1 = math.radians(lat); lon1 = math.radians(lon)
    d = distance_km / R
    lat2 = math.asin(math.sin(lat1)*math.cos(d) + math.cos(lat1)*math.sin(d)*math.cos(b))
    lon2 = lon1 + math.atan2(math.sin(b)*math.sin(d)*math.cos(lat1), math.cos(d) - math.sin(lat1)*math.sin(lat2))
    return (math.degrees(lat2), (math.degrees(lon2) + 540) % 360 - 180)

def pick_endpoint() -> str:
    return SOIL_ENDPOINTS[int(time.time()) % len(SOIL_ENDPOINTS)]

_last_request_time = 0.0
def throttle():
    global _last_request_time
    now = time.time()
    wait = SLEEP_BETWEEN - (now - _last_request_time)
    if wait > 0:
        time.sleep(wait)
    _last_request_time = time.time()

def build_params(lat: float, lon: float, properties: List[str]) -> Dict[str, Any]:
    return {
        "lat": float(lat),
        "lon": float(lon),
        "property": ",".join(properties),
        "depth": ",".join(DEPTHS),
        "value": ",".join(VALUES),
        "prefix": "mean",
    }

def do_request(params: Dict[str, Any]) -> Optional[requests.Response]:
    for attempt in range(MAX_RETRIES):
        throttle()
        try:
            resp = requests.get(pick_endpoint(), params=params, timeout=(CONNECT_TIMEOUT_S, READ_TIMEOUT_S))
            if resp.status_code == 200:
                return resp
            if 500 <= resp.status_code < 600:
                time.sleep(RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF)-1)])
                continue
            return resp
        except requests.RequestException:
            time.sleep(RETRY_BACKOFF[min(attempt, len(RETRY_BACKOFF)-1)])
    return None

def _depth_variants(depth_label: str) -> List[str]:
    return [depth_label, depth_label.replace("-", "_")]

def any_numeric_value(props: Dict[str, Any], prop: str, depth_label: str) -> bool:
    for d in _depth_variants(depth_label):
        for stat in VALUES:
            v = props.get(f"{prop}_{d}_{stat}")
            if isinstance(v, (int, float)):
                return True
    return False

def extract_value(props: Dict[str, Any], prop: str, depth_label: str) -> Optional[float]:

    for d in _depth_variants(depth_label):
        for stat in VALUES:
            v = props.get(f"{prop}_{d}_{stat}")
            if isinstance(v, (int, float)):
                return float(v)
    return None

def probe_cell_has_numeric(lat: float, lon: float) -> bool:
    params = build_params(lat, lon, ["clay"])
    resp = do_request(params)
    if resp is None or resp.status_code != 200:
        return False
    try:
        data = resp.json()
        props = data.get("properties", {})
    except Exception:
        return False
    for d in DEPTHS:
        if any_numeric_value(props, "clay", d):
            return True
    return False

def fetch_all_properties(lat: float, lon: float, props_list: List[str]) -> Optional[Dict[str, Any]]:
    params = build_params(lat, lon, props_list)
    resp = do_request(params)
    if resp is None or resp.status_code != 200:
        return None
    try:
        return resp.json()
    except Exception:
        return None

def enrich_coord_with_fallback(lat: float, lon: float, props_list: List[str]) -> Optional[Dict[str, Any]]:
    for radius in RADIUSES_KM:
        candidates = [(lat, lon)] if radius == 0 else [move_point(lat, lon, radius, b) for b in DIRECTIONS_DEG]
        for lt, ln in candidates:
            if not (-90 <= lt <= 90 and -180 <= ln <= 180):
                continue
            if not probe_cell_has_numeric(lt, ln):
                continue
            data = fetch_all_properties(lt, ln, props_list)
            if data is not None:
                return data
    return None

df = pd.read_csv(INPUT_TSV, sep="\t").reset_index(drop=True)


for p in ALL_PROPS:
    for d in DEPTHS:
        col = f"{p}_{d}_mean"
        if col not in df.columns:
            df[col] = np.nan


def coord_key(lat: float, lon: float) -> Tuple[float, float]:
    if ROUND_DECIMALS is not None:
        return (round(lat, ROUND_DECIMALS), round(lon, ROUND_DECIMALS))
    return (lat, lon)

cache: Dict[Tuple[float, float], Optional[Dict[str, Any]]] = {}

lats = df["latitude"].astype(float).to_numpy()
lons = df["longitude"].astype(float).to_numpy()

unique_keys = []
seen = set()
for lt, ln in zip(lats, lons):
    k = coord_key(lt, ln)
    if k not in seen:
        seen.add(k)
        unique_keys.append(k)

for (lt, ln) in tqdm(unique_keys, desc="SoilGrids (props/dep/values actualizados)"):
    cache[(lt, ln)] = enrich_coord_with_fallback(lt, ln, ALL_PROPS)


for idx, (lt, ln) in enumerate(zip(lats, lons)):
    k = coord_key(lt, ln)
    data = cache.get(k)
    if not data:
        continue
    props_dict = data.get("properties", {}) if isinstance(data, dict) else {}
    for p in ALL_PROPS:
        for d in DEPTHS:
            col = f"{p}_{d}_mean"
            v = extract_value(props_dict, p, d)
            if v is not None:
                df.at[idx, col] = v

save_tsv(df, OUTPUT_TSV)
print("SoilGrids añadido ->", OUTPUT_TSV)


**Resumen del bloque — SoilGrids + WRB**  
**Qué hace:** consulta SoilGrids por punto con control de tasa y reintentos; agrega propiedades y WRB si disponible.  
**Entradas:** `outputs/metadatos_con_hii.tsv`, API SoilGrids.  
**Salidas:** `outputs/metadatos_con_suelos.tsv`.  
**Parámetros editables:** `PROPERTIES`, `DEPTHS`, `MAX_WORKERS`, `REQUESTS_PER_SEC`, `RETRIES`, `BACKOFF`.


## 3.7 GBIF — especies de manglar y biodiversidad de plantas

In [None]:
from __future__ import annotations
import time, math, threading
from typing import Dict, Any, Optional, List, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import Counter

import numpy as np
import pandas as pd
import requests

INPUT_TSV  = OUT_DIR / "metadatos_con_suelos.tsv"
OUTPUT_TSV = OUT_DIR / "metadatos_con_gbif.tsv"

RADIUS_M = 2000
KINGDOM_KEY = None
YEAR_FROM = None
YEAR_TO   = None

GBIF_OCC_URL   = "https://api.gbif.org/v1/occurrence/search"
GBIF_TAXON_URL = "https://api.gbif.org/v1/species/"


PAGE_LIMIT = 300
MAX_PAGES_PER_POINT = 50
MAX_RECORDS_PER_POINT = PAGE_LIMIT * MAX_PAGES_PER_POINT


MAX_WORKERS = 8
GLOBAL_RPS = 3
MAX_BACKOFF_SEC = 60

_print_lock = threading.Lock()
def log(msg: str):
    with _print_lock:
        print(msg, flush=True)

class RateLimiter:
    def __init__(self, rate_per_sec: float, capacity: int | None = None):
        self.rate = float(rate_per_sec)
        self.capacity = int(capacity or max(1, int(rate_per_sec * 2)))
        self.tokens = self.capacity
        self.lock = threading.Lock()
        self.ts = time.time()
    def acquire(self):
        while True:
            with self.lock:
                now = time.time()
                refill = (now - self.ts) * self.rate
                if refill > 0:
                    self.tokens = min(self.capacity, self.tokens + refill)
                    self.ts = now
                if self.tokens >= 1:
                    self.tokens -= 1
                    return
            time.sleep(0.01)

RATE = RateLimiter(GLOBAL_RPS)

def make_session() -> requests.Session:
    s = requests.Session()
    ad = requests.adapters.HTTPAdapter(pool_connections=MAX_WORKERS*2, pool_maxsize=MAX_WORKERS*2, max_retries=0)
    s.mount("https://", ad); s.mount("http://", ad)
    s.headers.update({"User-Agent": "MAGENTA-GBIF-diversity/2km/1.0 (+local)"})
    return s

SESSION = make_session()

def _get(url: str, params: Dict[str, Any], max_attempts=8) -> Optional[requests.Response]:
    """GET con RPS global y backoff exponencial (+ Retry-After cuando esté presente)."""
    attempt = 0; backoff = 1.0
    while attempt < max_attempts:
        attempt += 1
        RATE.acquire()
        try:
            r = SESSION.get(url, params=params, timeout=30)
        except requests.RequestException:
            time.sleep(min(backoff, MAX_BACKOFF_SEC))
            backoff = min(MAX_BACKOFF_SEC, backoff * 2) + 0.1 * attempt
            continue
        if r.status_code == 200:
            return r
        if r.status_code in (429, 500, 502, 503, 504):
            ra = r.headers.get("Retry-After")
            wait = float(ra) if ra and str(ra).isdigit() else backoff
            time.sleep(min(wait, MAX_BACKOFF_SEC))
            backoff = min(MAX_BACKOFF_SEC, backoff * 2) + 0.1 * attempt
            continue

        return None
    return None

_taxon_cache: Dict[int, str | None] = {}

def gbif_species_name(species_key: int) -> Optional[str]:
    """Resuelve nombre científico desde speciesKey, con caché simple."""
    if species_key in _taxon_cache:
        return _taxon_cache[species_key]
    r = _get(f"{GBIF_TAXON_URL}{species_key}", {})
    if not r:
        _taxon_cache[species_key] = None
        return None
    try:
        js = r.json()
        name = js.get("scientificName") or js.get("canonicalName")
    except Exception:
        name = None
    _taxon_cache[species_key] = name
    return name

def _apply_common_filters(q: Dict[str, Any]):
    q["hasCoordinate"] = "true"
    q["hasGeospatialIssue"] = "false"
    q["occurrenceStatus"] = "PRESENT"
    if KINGDOM_KEY is not None:
        q["kingdomKey"] = KINGDOM_KEY
    if YEAR_FROM is not None:
        q["year"] = f"{YEAR_FROM},{YEAR_TO or ''}".strip(",")

def _bbox_wkt(lat: float, lon: float, radius_m: int) -> str:
    """BBox aproximado en grados a partir de radio en metros (depende de latitud)."""
    dlat = radius_m / 1000.0 / 111.32
    dlon = dlat / max(0.1, math.cos(math.radians(lat)))
    minx = lon - dlon; maxx = lon + dlon
    miny = lat - dlat; maxy = lat + dlat
    return f"POLYGON(({minx:.6f} {miny:.6f},{maxx:.6f} {miny:.6f},{maxx:.6f} {maxy:.6f},{minx:.6f} {maxy:.6f},{minx:.6f} {miny:.6f}))"

def _page_species_counts(params: Dict[str, Any]) -> Tuple[int, Counter]:
    """
    Pagina occurrence/search sumando speciesKey (o taxonKey) en cliente.
    Devuelve (total_reported, species_counter).
    """
    p0 = dict(params); p0["limit"] = 0
    total_reported = 0
    r0 = _get(GBIF_OCC_URL, p0)
    if r0:
        try:
            total_reported = int(r0.json().get("count", 0))
        except Exception:
            total_reported = 0

    species_counter: Counter = Counter()
    fetched = 0
    offset = 0
    pages = 0

    while fetched < min(total_reported, MAX_RECORDS_PER_POINT) and pages < MAX_PAGES_PER_POINT:
        q = dict(params)
        q["limit"] = PAGE_LIMIT
        q["offset"] = offset
        r = _get(GBIF_OCC_URL, q)
        if not r:
            break
        try:
            js = r.json()
        except Exception:
            break
        results = js.get("results", [])
        if not results:
            break
        for rec in results:
            sk = rec.get("speciesKey") or rec.get("taxonKey")
            if isinstance(sk, int):
                species_counter[sk] += 1
        n = len(results)
        fetched += n
        offset += n
        pages += 1
        if n < PAGE_LIMIT:
            break

    return total_reported, species_counter

def gbif_occurrence_counts_spatial(lat: float, lon: float, radius_m: int) -> Tuple[int, List[Tuple[int,int]], str]:
    """
    Devuelve (total_occurrences, [(speciesKey, count), ...], spatial_mode)
      A) geo_distance  (snake_case)
      B) geoDistance   (camelCase)
      C) geometry=BBOX (WKT)
    """
    baseA = {}
    _apply_common_filters(baseA)
    baseA["geo_distance"] = f"{radius_m}m,{lat:.6f},{lon:.6f}"
    total, counter = _page_species_counts(baseA)
    if total > 0 and counter:
        items = sorted(counter.items(), key=lambda x: x[1], reverse=True)
        return total, items, "geo_distance"

    baseB = {}
    _apply_common_filters(baseB)
    baseB["geoDistance"] = f"{radius_m}m,{lat:.6f},{lon:.6f}"
    total, counter = _page_species_counts(baseB)
    if total > 0 and counter:
        items = sorted(counter.items(), key=lambda x: x[1], reverse=True)
        return total, items, "geoDistance"

    baseC = {}
    _apply_common_filters(baseC)
    baseC["geometry"] = _bbox_wkt(lat, lon, radius_m)
    total, counter = _page_species_counts(baseC)
    items = sorted(counter.items(), key=lambda x: x[1], reverse=True)
    return total, items, "bbox_wkt" if total > 0 else "none"

def diversity_metrics(species_counts: List[int], label_suffix: str) -> Dict[str, float]:
    out = {f"gbif_richness_species_{label_suffix}": 0.0,
           f"gbif_shannon_H_{label_suffix}": np.nan,
           f"gbif_pielou_J_{label_suffix}": np.nan,
           f"gbif_simpson_1minD_{label_suffix}": np.nan,
           f"gbif_chao1_{label_suffix}": np.nan}
    S = int(sum(1 for c in species_counts if c > 0))
    out[f"gbif_richness_species_{label_suffix}"] = float(S)
    N = float(sum(species_counts))
    if S == 0 or N <= 0:
        return out
    p = [c / N for c in species_counts if c > 0]
    H = -sum(pi * math.log(pi) for pi in p)
    out[f"gbif_shannon_H_{label_suffix}"] = float(H)
    out[f"gbif_pielou_J_{label_suffix}"] = float(H / math.log(S)) if S > 1 else np.nan
    D = sum(pi * pi for pi in p)
    out[f"gbif_simpson_1minD_{label_suffix}"] = float(1.0 - D)
    F1 = sum(1 for c in species_counts if c == 1)
    F2 = sum(1 for c in species_counts if c == 2)
    chao1 = (S + (F1 * F1) / (2.0 * F2)) if F2 > 0 else (S + (F1 * (F1 - 1)) / 2.0)
    out[f"gbif_chao1_{label_suffix}"] = float(chao1)
    return out

def fetch_point_fixed_2km(lat: float, lon: float) -> Dict[str, Any]:
    out: Dict[str, Any] = {
        "gbif_occurrences_effective": 0,
        "gbif_top_species_effective": None,
        "gbif_scope": ("All" if KINGDOM_KEY is None else str(KINGDOM_KEY)),
        "gbif_radius_effective_m": RADIUS_M,
        "gbif_spatial_mode": None,
        "gbif_error": None
    }

    total, items, spatial_mode = gbif_occurrence_counts_spatial(lat, lon, RADIUS_M)
    out["gbif_spatial_mode"] = spatial_mode
    out["gbif_occurrences_effective"] = int(total) if total else 0

    counts = [c for _, c in items] if items else []
    out.update(diversity_metrics(counts, "2km"))

    top = (items or [])[:5]
    names = []
    for sk, c in top:
        nm = gbif_species_name(sk)
        if nm:
            names.append(f"{nm} ({c})")
    if names:
        out["gbif_top_species_effective"] = "; ".join(names)

    return out

def _timed_fetch(lat: float, lon: float):
    t0 = time.time()
    out = fetch_point_fixed_2km(lat, lon)
    return out, time.time() - t0

df = pd.read_csv(INPUT_TSV, sep="\t").reset_index(drop=True)
if not {"latitude","longitude"}.issubset(df.columns):
    raise ValueError("Se requieren columnas 'latitude' y 'longitude' en el TSV de entrada.")

idxs = [i for i in range(len(df)) if pd.notnull(df.at[i, "latitude"]) and pd.notnull(df.at[i, "longitude"])]

features_by_idx: Dict[int, Dict[str, Any]] = {}
completed = 0
total = len(idxs)
log(f"GBIF diversidad (ALL taxa) **FIJO 2 km** | filas={total} | workers={MAX_WORKERS} | rps={GLOBAL_RPS} | radius={RADIUS_M} m")

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futs = {ex.submit(_timed_fetch, float(df.at[i, "latitude"]), float(df.at[i, "longitude"])): i for i in idxs}
    for fut in as_completed(futs):
        i = futs[fut]
        try:
            row, elapsed = fut.result()
            ok = "OK"
        except Exception as e:
            row = {"gbif_error": str(e),
                   "gbif_occurrences_effective": 0,
                   "gbif_scope": ("All" if KINGDOM_KEY is None else str(KINGDOM_KEY)),
                   "gbif_radius_effective_m": RADIUS_M,
                   "gbif_spatial_mode": "error"}
            ok = f"ERR:{e}"
            elapsed = np.nan

        features_by_idx[i] = row
        lat = float(df.at[i, "latitude"]); lon = float(df.at[i, "longitude"])
        sfx = "2km"
        log(f"[{completed+1}/{total}] lat={lat:.5f} lon={lon:.5f} "
            f"S={row.get(f'gbif_richness_species_{sfx}', 0)} "
            f"H'={row.get(f'gbif_shannon_H_{sfx}', np.nan)} "
            f"occ={row.get('gbif_occurrences_effective', 0)} "
            f"r_eff={row.get('gbif_radius_effective_m')} mode={row.get('gbif_spatial_mode')} {ok} | {elapsed:.2f}s")
        completed += 1

feat_df = pd.DataFrame.from_dict(features_by_idx, orient="index").reindex(df.index)
out_df = pd.concat([df, feat_df], axis=1)
save_tsv(out_df, OUTPUT_TSV)
print("GBIF añadido ->", OUTPUT_TSV)


**Resumen del bloque — GBIF (manglar y plantas)**  
**Qué hace:** calcula riqueza de plantas y riqueza/top-5 de especies de manglar por coordenada.  
**Entradas:** `outputs/metadatos_con_suelos.tsv`, API GBIF.  
**Salidas:** `outputs/metadatos_con_gbif.tsv`.  
**Parámetros editables:** `MANGROVE_TAXA`, `RADIUS_KM`, `MAX_ROWS`, `REQUEST_PAUSE`.


## 4) Salidas clave y consolidación final

In [None]:
final_tsv = OUT_DIR / "metadatos_con_gbif.tsv"
if final_tsv.exists():
    df_final = pd.read_csv(final_tsv, sep="\t")
    df_final.to_csv(META_FILE, index=False)
    print("Escrito estándar:", META_FILE)
else:
    print("No se encontró", final_tsv)

**Resumen del bloque — Consolidación**  
**Qué hace:** escribe la última tabla enriquecida al nombre estándar `metadatos_enriquecidos.csv`.  
**Entradas:** `outputs/metadatos_con_gbif.tsv`.  
**Salidas:** `outputs/metadatos_enriquecidos.csv`.  
**Parámetros editables:** `META_BASE`, `META_FILE`.


# Resumen de metadatos generados (esperados)

Este resumen describe **qué columnas** deberían haberse añadido durante el flujo de enriquecimiento, asumiendo que todos los insumos locales y APIs estuvieron disponibles y accesibles.

## Núcleo geo y filtrado
- `latitude`, `longitude` (validadas y acotadas a rangos plausibles).

## Altitud (SRTM / DEM)
- `altitude_m` → **1 columna**  
*Requiere `data/dem/srtm.tif` (o ajustar `DEM_TIF`).*

## Clima (WorldClim 2.1, BIO1–BIO19)
- `bio1` … `bio19` → **19 columnas**  
*Requiere TIFFs `wc2.1_30s_bio_1..19.tif` en `data/clim/` (o ajustar `CLIM_DIR`).*

## Ecorregiones marinas (MEOW)
- `ECOREGION`, `PROVINCE`, `REALM` → **3 columnas**  
*Requiere shapefile `meow_ecos.shp` (+ `.shx/.dbf/.prj`) en `data/marine_ecoregions/MEOW/` (o ajustar `MEOW_SHP`).*

## Huella humana (HII 2020)
- `human_footprint_2020` → **1 columna**  
*Requiere `data/human/hii_2020.tif` (o ajustar `HII_TIF`).*

## Suelos (SoilGrids v2.0, 0–30 cm)
Propiedades: `bdod`, `cec`, `clay`, `sand`, `silt`, `phh2o`, `soc`, `nitrogen`, `cfvo`, `ocs`  
Profundidades: `0-5cm`, `5-15cm`, `15-30cm`  
Estadístico consolidado: `mean`  
- Columnas esperadas: **10 propiedades × 3 profundidades = 30 columnas**  
  Formato: `prop_depth_mean`, p.ej. `clay_0-5cm_mean`.  
> Nota: si la API expone `Q0.5` (mediana), el flujo puede intentar leerla, pero la salida consolidada se guarda en `*_mean` cuando hay valores.

## Biodiversidad (GBIF, radio 2 km)
- `gbif_richness_species_2km` (riqueza específica)  
- `gbif_shannon_H_2km` (diversidad de Shannon)  
- `gbif_pielou_J_2km` (equitatividad de Pielou)  
- `gbif_simpson_1minD_2km` (1 − índice de Simpson)  
- `gbif_chao1_2km` (riqueza estimada, Chao1)  
- `gbif_occurrences_effective` (número de ocurrencias contabilizadas)  
- `gbif_top_species_effective` (Top 5 especies con conteos)  
- `gbif_scope`, `gbif_radius_effective_m`, `gbif_spatial_mode`, `gbif_error` (diagnóstico)  
→ **≈ 10 columnas**

---

## Total **adicional** estimado (si todo estuvo disponible)
- Altitud: 1  
- Clima: 19  
- MEOW: 3  
- HII: 1  
- Suelos: 30  
- GBIF (métricas + diagnóstico): ~10  
**Suma mínima esperada:** **64 columnas** adicionales.

> El conteo real puede variar por: presencia/ausencia de insumos locales, errores de red o cambios en APIs. En tales casos se esperan `NaN` y el flujo debería continuar (salvo pasos marcados como obligatorios por el propio notebook).

## Utilidad
- **Contexto ambiental** (clima, altitud, ecorregión, presión antrópica) por muestra.  
- **Propiedades del suelo** (pH, textura, carbono/nitrógeno, densidad aparente) relevantes para funciones microbianas.  
- **Biodiversidad local** (riqueza/diversidad) para relacionar composición/función metagenómica con entorno biológico.  
- **Comparabilidad y reproducibilidad** al estandarizar metadatos para análisis multi-estudio, conservación y restauración.


# 5) Preprocesamiento de lecturas: QC → trimming → ensamblaje

**Binarios requeridos en PATH:** `fastqc`, `multiqc`, `fastp`, `megahit`.

**Variables de entorno opcionales:**
- `MAG_PROJECT_DIR`: fuerza la base del proyecto (por defecto usa el `PROJECT_DIR`).
- `MAG_RAW_DIR`: ruta a FASTQ crudos. Por defecto: `PROJECT_DIR/mags/data/raw`.
- `MAG_QC_THREADS`, `MAG_FASTP_THREADS`, `MAG_FASTP_MAX_WORKERS`, `MAG_VALIDATE_READS`, `MAG_MEGAHIT_THREADS`.


In [None]:
from __future__ import annotations
from pathlib import Path
import os, re, sys, shutil, subprocess, csv, json, gzip
from datetime import datetime
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

# Raíz del proyecto: MAG_PROJECT_DIR tiene prioridad, si no MAGENTA_DIR, si no cwd.
PROJECT_DIR = Path(
    os.environ.get("MAG_PROJECT_DIR", os.environ.get("MAGENTA_DIR", Path.cwd()))
).resolve()

# Estructura base tipo MAGs dentro del proyecto
MAGS_DIR    = PROJECT_DIR / "mags"
DATA_DIR    = MAGS_DIR / "data"            # (no se usa como default de RAW)
RESULTS_DIR = MAGS_DIR / "results"
SCRIPTS_DIR = MAGS_DIR / "scripts"

# Subcarpetas de resultados
QC_DIR      = RESULTS_DIR / "01.qc"
TRIM_DIR    = RESULTS_DIR / "02.trimmed"
ASM_DIR     = RESULTS_DIR / "03.assembly"
ASM_LOG_DIR = ASM_DIR / "logs"

# Metadatos / reportes
PIPE_META          = RESULTS_DIR / "pipeline_meta"
PIPE_META.mkdir(parents=True, exist_ok=True)
PIPE_MANIFEST_CSV  = PIPE_META / "pipeline_manifest.csv"
TRIM_REPORT_CSV    = PIPE_META / "trim_report.csv"
ASM_RESUMEN_CSV    = ASM_DIR   / "resumen_ensamblaje.csv"
VALID_SAMPLES_CSV  = PIPE_META / "muestras_validas.csv"

# Donde 02_descargar_y_convertir_mangrove.py deja los FASTQ
DEFAULT_RAW_SRC = PROJECT_DIR / "rawdata" / "convertidos"
RAW_SRC = Path(os.environ.get("MAG_RAW_DIR", DEFAULT_RAW_SRC))

# Crear estructura necesaria
for d in [MAGS_DIR, DATA_DIR, RESULTS_DIR, SCRIPTS_DIR, QC_DIR, TRIM_DIR, ASM_DIR, ASM_LOG_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Parámetros
N_THREADS_FASTQC   = int(os.environ.get("MAG_QC_THREADS",        "12"))
N_THREADS_FASTP    = int(os.environ.get("MAG_FASTP_THREADS",     "12"))
MAX_WORKERS_FASTP  = int(os.environ.get("MAG_FASTP_MAX_WORKERS", "4"))
MAX_READS_VALIDATE = int(os.environ.get("MAG_VALIDATE_READS",    "10000"))
MEGAHIT_THREADS    = int(os.environ.get("MAG_MEGAHIT_THREADS",   "40"))
SAMPLE_REGEX       = os.environ.get(
    "MAG_SAMPLE_REGEX",
    r"^([A-Za-z0-9_\-]+)_([12])\.fastq(?:\.gz)?$"
)

def have_bin(b: str) -> bool:
    return shutil.which(b) is not None

def run(cmd, **kwargs) -> int:
    printable = " ".join(map(str, cmd)) if isinstance(cmd, (list,tuple)) else str(cmd)
    print("[CMD]", printable)
    return subprocess.call(cmd, shell=isinstance(cmd, str), **kwargs)

print("[PORTABLE] PROJECT_DIR =", PROJECT_DIR)
print("[PORTABLE] RAW_SRC     =", RAW_SRC)
print("[PORTABLE] RESULTS_DIR  =", RESULTS_DIR)

## Rutas y estructura de directorios

**Base del proyecto (PROJECT_DIR):**  
Determinada en este orden:
1. `MAG_PROJECT_DIR`
2. `MAGENTA_DIR`
3. Directorio de trabajo actual (`cwd`)

**Estructura creada dentro de `PROJECT_DIR`:**
- `mags/`
- `data/` (no se usa como entrada por defecto de FASTQ)
- `results/`
  - `01.qc/` (FastQC crudo y post-trim, MultiQC)
  - `02.trimmed/` (salidas de fastp)
  - `03.assembly/` (ensamblajes MEGAHIT)
- `pipeline_meta/` (manifiestos y reportes)
- `scripts/`

**Entrada por defecto de FASTQ:**  
`rawdata/convertidos/`, generada por el script de descarga y conversión.  
Puede sobrescribirse con la variable `MAG_RAW_DIR`.

---

## Parámetros configurables

Los siguientes parámetros se leen de variables de entorno, con valores por defecto sensatos:

- `MAG_QC_THREADS` (12) → hilos para FastQC  
- `MAG_FASTP_THREADS` (12) → hilos para fastp  
- `MAG_FASTP_MAX_WORKERS` (4) → número máximo de muestras en paralelo para fastp  
- `MAG_VALIDATE_READS` (10000) → lecturas muestreadas para validación de FASTQ  
- `MAG_MEGAHIT_THREADS` (40) → hilos para MEGAHIT  
- `MAG_SAMPLE_REGEX` → expresión regular para identificar muestras a partir del nombre de archivo  

---

## Utilidades del bloque

- **`have_bin(b: str)`**  
  Comprueba si un binario requerido está disponible en el `PATH`.

- **`run(cmd, **kwargs)`**  
  Ejecuta un comando, imprime la línea para trazabilidad y devuelve el código de salida.

---

## Archivos de salida esperados

- `pipeline_manifest.csv` → listado de muestras y rutas de FASTQ detectadas  
- `trim_report.csv` → reporte de trimming  
- `resumen_ensamblaje.csv` → estado de los ensamblajes por muestra  
- `muestras_validas.csv` → resumen de pares de FASTQ validados  

---

## 🔍 5.1 Control de calidad inicial — FastQC + MultiQC


In [None]:
import glob

# 1) Detectar pares (y single-end) a partir de SAMPLE_REGEX
def list_pairs(raw_dir: Path, sample_regex: str) -> dict[str, dict[str, Path]]:
    rx = re.compile(sample_regex)
    buckets: dict[str, dict[str, Path]] = {}
    for p in sorted(raw_dir.rglob("*.fastq*")):
        m = rx.match(p.name)
        if not m:
            continue
        sample, read = m.group(1), m.group(2)
        d = buckets.setdefault(sample, {"R1": None, "R2": None})
        if read == "1" and d["R1"] is None:
            d["R1"] = p
        elif read == "2" and d["R2"] is None:
            d["R2"] = p
    # Aceptar single-end con nombres <sample>.fastq(.gz)
    for p in sorted(raw_dir.rglob("*.fastq*")):
        if rx.match(p.name):
            continue
        base = p.name.replace(".gz", "")
        if base.endswith(".fastq"):
            sample = base[:-6]  # quita ".fastq"
            buckets.setdefault(sample, {"R1": None, "R2": None})
            if buckets[sample]["R1"] is None:
                buckets[sample]["R1"] = p
    return buckets

def write_manifest(pairs: dict[str, dict[str, Path]]) -> pd.DataFrame:
    rows = []
    for s, rr in sorted(pairs.items()):
        rows.append({"sample": s, "R1": str(rr.get("R1") or ""), "R2": str(rr.get("R2") or "")})
    df = pd.DataFrame(rows)
    df.to_csv(PIPE_MANIFEST_CSV, index=False)
    print(f"[META] Manifiesto muestras -> {PIPE_MANIFEST_CSV}")
    return df

# 2) FastQC crudo (paralelo por lotes para no saturar la CLI)
def run_fastqc(inputs: list[Path], outdir: Path, threads: int) -> None:
    outdir.mkdir(parents=True, exist_ok=True)
    if not inputs:
        print("[FASTQC] Sin archivos de entrada.")
        return
    batch = []
    for p in inputs:
        batch.append(str(p))
        if len(batch) >= 64:
            run(["fastqc", "-t", str(threads), "-o", str(outdir)] + batch)
            batch = []
    if batch:
        run(["fastqc", "-t", str(threads), "-o", str(outdir)] + batch)

# Ejecutar manifiesto + FastQC
if not RAW_SRC.exists():
    raise SystemExit(f"[ERROR] No existe RAW_SRC: {RAW_SRC}")

pairs = list_pairs(RAW_SRC, SAMPLE_REGEX)
if not pairs:
    raise SystemExit(f"[ERROR] No se detectaron FASTQ en {RAW_SRC}")

manifest_df = write_manifest(pairs)

raw_fastqs = []
for rr in pairs.values():
    if rr.get("R1"): raw_fastqs.append(rr["R1"])
    if rr.get("R2"): raw_fastqs.append(rr["R2"])

if have_bin("fastqc"):
    run_fastqc(raw_fastqs, QC_DIR / "raw_fastqc", N_THREADS_FASTQC)
    if have_bin("multiqc"):
        run(["multiqc", str(QC_DIR / "raw_fastqc"), "-o", str(QC_DIR / "raw_fastqc")])
else:
    print("[AVISO] 'fastqc' no está en PATH. Se omite QC inicial.")

Este bloque se encarga de **detectar archivos FASTQ** en el directorio de entrada (`RAW_SRC`), generar un **manifiesto de muestras** y ejecutar **FastQC en paralelo** (con soporte opcional para MultiQC).

## Flujo principal

1. **Descubrimiento de muestras (`list_pairs`)**
   - Detecta pares de archivos `R1`/`R2` usando `SAMPLE_REGEX`.
   - Admite también **single-end** con nombres `<sample>.fastq(.gz)`.
   - Devuelve un diccionario con las rutas asociadas a cada muestra.

2. **Generación de manifiesto (`write_manifest`)**
   - Construye un `DataFrame` con columnas:
     - `sample`
     - `R1`
     - `R2`
   - Guarda el archivo `pipeline_manifest.csv`.
   - Imprime la ruta generada.

3. **Ejecución de FastQC (`run_fastqc`)**
   - Revisa que existan archivos de entrada.
   - Crea el directorio `QC_DIR/raw_fastqc` si no existe.
   - Procesa los archivos en **lotes de hasta 64** para no saturar la CLI.
   - Ejecuta:  
     ```bash
     fastqc -t <threads> -o <QC_DIR/raw_fastqc> <archivos>
     ```
     ### Parámetros en *paired-end*

Cuando hay **R1** y **R2**:

- `-I`: archivo de entrada **R2** (lecturas *reverse*).  
- `-O`: archivo de salida de lecturas **R2** procesadas.  
- `--detect_adapter_for_pe`: activa la detección automática de adaptadores para datos *paired-end*.  

4. **MultiQC opcional**
   - Si está disponible, ejecuta:  
     ```bash
     multiqc <QC_DIR/raw_fastqc> -o <QC_DIR/raw_fastqc>
     ```
---

## Condiciones y validaciones

- Si `RAW_SRC` no existe → termina con error.  
- Si no se detectan FASTQ → termina con error.  
- Si `fastqc` no está en el `PATH` → omite la etapa con aviso.  
- Si `multiqc` no está disponible → solo se ejecuta FastQC.  

## Archivos generados

- `pipeline_manifest.csv` → listado de muestras y archivos FASTQ detectados.  
- Resultados de **FastQC** en `QC_DIR/raw_fastqc/`.  
- Reporte combinado de **MultiQC** (si está instalado).  

## 5.2 Trimming de lecturas — *fastp* en paralelo

In [None]:
from __future__ import annotations
from pathlib import Path
import os, re, sys, shutil, subprocess, argparse, shlex
import pandas as pd
from typing import Dict, Optional, List, Tuple

# ---------------------------
# 0) Rutas y parámetros base
# ---------------------------
PROJECT_DIR = Path(os.environ.get("MAG_PROJECT_DIR", os.environ.get("MAGENTA_DIR", Path.cwd()))).resolve()
MAGS_DIR    = PROJECT_DIR / "mags"
RESULTS_DIR = MAGS_DIR / "results"
TRIM_DIR    = RESULTS_DIR / "02.trimmed"
PIPE_META   = RESULTS_DIR / "pipeline_meta"
PIPE_META.mkdir(parents=True, exist_ok=True)

PIPE_MANIFEST_CSV = PIPE_META / "pipeline_manifest.csv"
TRIM_REPORT_CSV   = PIPE_META / "trim_report.csv"

DEFAULT_RAW_SRC = PROJECT_DIR / "rawdata" / "convertidos"
RAW_SRC = Path(os.environ.get("MAG_RAW_DIR", DEFAULT_RAW_SRC))

SAMPLE_REGEX = os.environ.get("MAG_SAMPLE_REGEX", r"^([A-Za-z0-9_\-]+)_([12])\.fastq(?:\.gz)?$")

N_THREADS_FASTP = int(os.environ.get("MAG_FASTP_THREADS", "12"))

def have_bin(b: str) -> bool:
    from shutil import which
    return which(b) is not None

def run(cmd, **kwargs) -> int:
    printable = " ".join(map(str, cmd)) if isinstance(cmd, (list,tuple)) else str(cmd)
    print("[CMD]", printable, flush=True)
    return subprocess.call(cmd, shell=isinstance(cmd, str), **kwargs)

print(f"[TRIM] PROJECT_DIR = {PROJECT_DIR}")
print(f"[TRIM] RAW_SRC     = {RAW_SRC}")
print(f"[TRIM] TRIM_DIR    = {TRIM_DIR}")

# --------------------------------
# 1) Cargar o descubrir 'pairs'
# --------------------------------
def rebuild_pairs_from_manifest(manifest_csv: Path) -> Dict[str, Dict[str, Optional[Path]]]:
    df = pd.read_csv(manifest_csv)
    pairs: Dict[str, Dict[str, Optional[Path]]] = {}
    for _, row in df.iterrows():
        s = str(row["sample"])
        r1 = Path(row["R1"]) if isinstance(row["R1"], str) and row["R1"] else None
        r2 = Path(row["R2"]) if isinstance(row["R2"], str) and row["R2"] else None
        pairs[s] = {"R1": r1, "R2": r2}
    return pairs

def discover_pairs(raw_dir: Path, sample_regex: str) -> Dict[str, Dict[str, Optional[Path]]]:
    rx = re.compile(sample_regex)
    buckets: Dict[str, Dict[str, Optional[Path]]] = {}
    for p in sorted(raw_dir.rglob("*.fastq*")):
        m = rx.match(p.name)
        if not m: continue
        sample, read = m.group(1), m.group(2)
        d = buckets.setdefault(sample, {"R1": None, "R2": None})
        if read == "1" and d["R1"] is None: d["R1"] = p
        elif read == "2" and d["R2"] is None: d["R2"] = p
    # Single-end: <sample>.fastq(.gz)
    for p in sorted(raw_dir.rglob("*.fastq*")):
        if rx.match(p.name): continue
        base = p.name.replace(".gz", "")
        if base.endswith(".fastq"):
            sample = base[:-6]
            buckets.setdefault(sample, {"R1": None, "R2": None})
            if buckets[sample]["R1"] is None:
                buckets[sample]["R1"] = p
    return buckets

# --------------------------------
# 2) fastp (default, sin FastQC/MultiQC)
# --------------------------------
def build_fastp_cmd(R1: Path, R2: Optional[Path], r1_out: Path, r2_out: Optional[Path],
                    threads: int, extra_opts: List[str]) -> List[str]:
    cmd = ["fastp", "-w", str(threads), "-o", str(r1_out)]
    if R2:
        cmd += ["-O", str(r2_out), "-i", str(R1), "-I", str(R2), "--detect_adapter_for_pe"]
    else:
        cmd += ["-i", str(R1)]
    # NOTA: NO añadimos -j/-h para no generar reportes fastp explícitos (evitar ruido).
    # fastp aún podría crear fastp.json/html por defecto en algunos builds; se ignoran.
    cmd += extra_opts
    return cmd

def fastp_one(sample: str, R1: Optional[Path], R2: Optional[Path],
              threads: int, extra_opts: List[str]) -> dict:
    TRIM_DIR.mkdir(parents=True, exist_ok=True)
    if not have_bin("fastp"):
        return {"sample": sample, "ok": False, "reason": "fastp_not_found"}
    if not (R1 and Path(R1).exists()):
        return {"sample": sample, "ok": False, "reason": "no_R1"}
    if R2 and not Path(R2).exists():
        return {"sample": sample, "ok": False, "reason": "R2_missing"}

    r1_out = TRIM_DIR / f"{sample}_R1.fastq.gz"
    r2_out = TRIM_DIR / f"{sample}_R2.fastq.gz" if R2 else None

    code = run(build_fastp_cmd(R1, R2, r1_out, r2_out, threads, extra_opts))
    ok = (code == 0) and r1_out.exists() and (r2_out is None or r2_out.exists())
    return {
        "sample": sample,
        "ok": ok,
        "trimmer": "fastp",
        "R1_out": str(r1_out) if r1_out.exists() else "",
        "R2_out": str(r2_out) if (r2_out and r2_out.exists()) else "",
    }

# --------------------------------
# 3) Trim Galore (sin --fastqc)
# --------------------------------
def _find_tg_outputs_pe(R1: Path, R2: Path, outdir: Path) -> Tuple[Optional[Path], Optional[Path]]:
    base1 = Path(R1).stem
    base2 = Path(R2).stem
    cand1 = list(outdir.glob(f"*{base1}*_val_1.fq.gz")) + list(outdir.glob(f"*{base1}*_val_1.fastq.gz"))
    cand2 = list(outdir.glob(f"*{base2}*_val_2.fq.gz")) + list(outdir.glob(f"*{base2}*_val_2.fastq.gz"))
    if not cand1:
        cand1 = list(outdir.glob("*_val_1.fq.gz")) + list(outdir.glob("*_val_1.fastq.gz"))
    if not cand2:
        cand2 = list(outdir.glob("*_val_2.fq.gz")) + list(outdir.glob("*_val_2.fastq.gz"))
    return (cand1[0] if cand1 else None, cand2[0] if cand2 else None)

def _find_tg_outputs_se(R1: Path, outdir: Path) -> Optional[Path]:
    base1 = Path(R1).stem
    cand = list(outdir.glob(f"*{base1}*trimmed.fq.gz")) + list(outdir.glob(f"*{base1}*trimmed.fastq.gz"))
    if not cand:
        cand = list(outdir.glob("*trimmed.fq.gz")) + list(outdir.glob("*trimmed.fastq.gz"))
    return cand[0] if cand else None

def trim_galore_one(sample: str, R1: Optional[Path], R2: Optional[Path],
                    tg_opts_pe: List[str], tg_opts_se: List[str]) -> dict:
    TRIM_DIR.mkdir(parents=True, exist_ok=True)
    if not have_bin("trim_galore"):
        return {"sample": sample, "ok": False, "reason": "trim_galore_not_found"}
    if not (R1 and Path(R1).exists()):
        return {"sample": sample, "ok": False, "reason": "no_R1"}
    if R2 and not Path(R2).exists():
        return {"sample": sample, "ok": False, "reason": "R2_missing"}

    # Importante: sin --fastqc y con --no_report_file para evitar QC redundante
    base_opts = ["--no_report_file"]
    if R2:
        cmd = ["trim_galore", "--paired"] + base_opts + tg_opts_pe + ["-o", str(TRIM_DIR), str(R1), str(R2)]
    else:
        cmd = ["trim_galore"] + base_opts + tg_opts_se + ["-o", str(TRIM_DIR), str(R1)]
    code = run(cmd)

    if R2:
        out1_src, out2_src = _find_tg_outputs_pe(R1, R2, TRIM_DIR)
        out1_dst = TRIM_DIR / f"{sample}_R1.fastq.gz"
        out2_dst = TRIM_DIR / f"{sample}_R2.fastq.gz"
        if out1_src and not out1_dst.exists(): shutil.move(str(out1_src), str(out1_dst))
        if out2_src and not out2_dst.exists(): shutil.move(str(out2_src), str(out2_dst))
        ok = (code == 0) and out1_dst.exists() and out2_dst.exists()
        return {"sample": sample, "ok": ok, "trimmer": "trim_galore",
                "R1_out": str(out1_dst) if out1_dst.exists() else "",
                "R2_out": str(out2_dst) if out2_dst.exists() else ""}
    else:
        out_src = _find_tg_outputs_se(R1, TRIM_DIR)
        out_dst = TRIM_DIR / f"{sample}_R1.fastq.gz"
        if out_src and not out_dst.exists(): shutil.move(str(out_src), str(out_dst))
        ok = (code == 0) and out_dst.exists()
        return {"sample": sample, "ok": ok, "trimmer": "trim_galore",
                "R1_out": str(out_dst) if out_dst.exists() else "",
                "R2_out": ""}

# --------------------------------
# 4) Main
# --------------------------------
def main(argv: Optional[List[str]] = None) -> int:
    parser = argparse.ArgumentParser(description="Trimming con fastp (default) o Trim Galore, sin QC post-trimming.")
    parser.add_argument("--trimmer", choices=["fastp","trim_galore"], default=os.environ.get("MAG_TRIM_TOOL","fastp"),
                        help="Herramienta de trimming (default: fastp).")
    parser.add_argument("--threads-fastp", type=int, default=N_THREADS_FASTP, help="Hilos para fastp.")
    parser.add_argument("--fastp-opts", type=str, default=os.environ.get("MAG_FASTP_OPTS",""),
                        help="Flags extra para fastp (string estilo shell).")
    parser.add_argument("--tg-pe-opts", type=str,
                        default=os.environ.get("MAG_TRIMGALORE_PE","--cores 20 --quality 15 --length 50"),
                        help="Flags Trim Galore para paired-end (sin --fastqc).")
    parser.add_argument("--tg-se-opts", type=str,
                        default=os.environ.get("MAG_TRIMGALORE_SE","--cores 20 --quality 15 --length 50"),
                        help="Flags Trim Galore para single-end (sin --fastqc).")
    args = parser.parse_args(argv)

    TRIM_DIR.mkdir(parents=True, exist_ok=True)

    # Pairs desde manifiesto o descubrimiento
    if PIPE_MANIFEST_CSV.exists():
        pairs = rebuild_pairs_from_manifest(PIPE_MANIFEST_CSV)
        print(f"[TRIM] Cargado pairs desde {PIPE_MANIFEST_CSV} ({len(pairs)} muestras).")
    else:
        if not RAW_SRC.exists():
            print(f"[ERROR] No existe RAW_SRC: {RAW_SRC}", file=sys.stderr)
            return 2
        pairs = discover_pairs(RAW_SRC, SAMPLE_REGEX)
        print(f"[TRIM] Descubierto pairs escaneando {RAW_SRC} ({len(pairs)} muestras).")
    if not pairs:
        print("[ERROR] No hay muestras detectadas.", file=sys.stderr)
        return 3

    reports: List[dict] = []
    for sample, rr in sorted(pairs.items()):
        R1, R2 = rr.get("R1"), rr.get("R2")
        if args.trimmer == "fastp":
            extra = shlex.split(args.fastp_opts) if args.fastp_opts else []
            rep = fastp_one(sample, R1, R2, threads=args.threads_fastp, extra_opts=extra)
        else:
            tg_pe = shlex.split(args.tg_pe_opts) if args.tg_pe_opts else []
            tg_se = shlex.split(args.tg_se_opts) if args.tg_se_opts else []
            rep = trim_galore_one(sample, R1, R2, tg_pe, tg_se)
        reports.append(rep)

    pd.DataFrame(reports).sort_values("sample").to_csv(TRIM_REPORT_CSV, index=False)
    print(f"[TRIM] Reporte -> {TRIM_REPORT_CSV}")
    print("[TRIM] Finalizado (sin QC post-trim: usar 04_fastqc_parallel.py).")
    return 0

if __name__ == "__main__":
    sys.exit(main())

Implementa un módulo de **trimming de lecturas** para un pipeline de ensamblaje de MAGs.  
Es compatible con dos herramientas de recorte de secuencias:

- **fastp** (por defecto)  
- **Trim Galore**

Su propósito es limpiar las lecturas crudas, generar archivos de salida filtrados y producir un reporte de trimming.

1. Configura rutas de proyecto, resultados y directorios de calidad (`QC_DIR`, `TRIM_DIR`, `PIPE_META`).  
2. Carga un **manifiesto de muestras** (`pipeline_manifest.csv`) con la información de las lecturas.  
3. Ejecuta el trimming con la herramienta seleccionada:
   - **fastp**: usando hilos definidos (`N_THREADS_FASTP`) y soporte para *single-end* o *paired-end*.  
   - **Trim Galore**: ajustando automáticamente según el tipo de datos (single o paired).  
4. Guarda un reporte (`trim_report.csv`) con el detalle de los archivos generados por cada muestra.  

## Inputs
- **Archivo de manifiesto** (`pipeline_manifest.csv`), con columnas:  
  - `sample`: nombre de la muestra  
  - `R1`: archivo de lecturas forward  
  - `R2`: archivo de lecturas reverse (opcional, si es paired-end)  
- Variables de entorno (opcionales):  
  - `MAG_PROJECT_DIR` o `MAGENTA_DIR`: define el directorio base del proyecto.  
  - `MAG_FASTP_THREADS`: número de hilos para `fastp` (default: 12).  
  - `MAG_FASTP_MAX_WORKERS`: número máximo de procesos paralelos (default: 4).  
  - `MAG_TRIM_TOOL`: herramienta de trimming (`fastp` o `trim_galore`).  

## Outputs
- Archivos **FASTQ recortados**, ubicados en `02.trimmed/<sample>/`:  
  - `<sample>_R1.trimmed.fastq.gz`  
  - `<sample>_R2.trimmed.fastq.gz` (si aplica)  
- **Reporte de trimming** (`trim_report.csv`) con columnas:  
  - `sample` → nombre de muestra  
  - `tool` → herramienta usada (`fastp` o `trim_galore`)  
  - `R1` → archivo trimmed forward  
  - `R2` → archivo trimmed reverse (si aplica)  

## Parámetros usados

### Trim Galore
Parámetros mínimos aplicados en el flujo:

- `--paired` → para lecturas pareadas  
- `--no_report_file` → desactiva reportes por muestra (se usa MultiQC después)  
- `--cores 20` → número de hilos  
- `--quality 15` → trimming de bases con calidad < Q15  
- `--length 50` → descarta lecturas post-trimming con longitud < 50 nt  
- `--fastqc` → corre FastQC por muestra  

### fastp (por defecto)

- `-q 15` / `--qualified_quality_phred 15` → calidad mínima por base (Q ≥ 15)  
- `-u 40` / `--unqualified_percent_limit 40` → descarta la lectura si >40% de bases < Q15  
- `-l 15` / `--length_required 15` → descarta lecturas más cortas que 15 nt  
- `--detect_adapter_for_pe` → detección y recorte automático de adaptadores (paired-end)  
- **Poly-G trimming** → activado automáticamente en datos de NovaSeq/NextSeq  

- **Trim Galore** en el pipeline está configurado para ser más estricto con la longitud (≥50 nt) y siempre corre FastQC.  
- **fastp**, por defecto, aplica filtros de calidad equivalentes (Q15) y longitud mínima (≥15 nt), además de recorte automático de adaptadores.  

## 5.3 Ensamblaje metagenómico (MEGAHIT por defecto, MetaSPAdes opcional)

In [None]:
from __future__ import annotations
import os, sys, csv, shutil, subprocess, argparse
from pathlib import Path
from datetime import datetime
from typing import List, Tuple, Optional

# ==========
# Rutas base
# ==========
PROJECT_DIR = Path(os.environ.get("MAG_PROJECT_DIR", os.environ.get("MAGENTA_DIR", Path.cwd()))).resolve()
MAGS_DIR    = PROJECT_DIR / "mags"
RESULTS_DIR = MAGS_DIR / "results"

TRIM_DIR    = RESULTS_DIR / "02.trimmed"
ASM_DIR     = RESULTS_DIR / "03.assembly"
ASM_LOG_DIR = ASM_DIR / "logs"
RESUMEN_CSV = ASM_DIR / "resumen_ensamblaje.csv"

ASM_LOG_DIR.mkdir(parents=True, exist_ok=True)

# ==========
# Utilidades
# ==========
def have_bin(b: str) -> bool:
    from shutil import which
    return which(b) is not None

def run(cmd, log_file: Path | None = None) -> int:
    printable = " ".join(map(str, cmd))
    print("[CMD]", printable, flush=True)
    if log_file:
        with open(log_file, "a") as log:
            log.write(f"[CMD] {printable}\n")
            return subprocess.call(cmd, stdout=log, stderr=log)
    else:
        return subprocess.call(cmd)

def size_sum(f1: Path, f2: Path) -> int:
    try:
        return f1.stat().st_size + f2.stat().st_size
    except Exception:
        return 0

# =========================
# Descubrimiento de pares
# =========================
R1_GLOBS_DEFAULT = [
    "*_R1*.fastq.gz", "*_1*.fastq.gz", "*R1*.fastq.gz", "*1*.fastq.gz",
    "*_R1*.fq.gz",    "*_1*.fq.gz",    "*R1*.fq.gz",    "*1*.fq.gz",
]
TOKEN_PAIRS = [("_R1", "_R2"), ("_1", "_2"), ("R1", "R2"), ("1", "2")]

def _try_match_r2_by_tokens(r1: Path) -> Optional[Path]:
    """Intenta encontrar R2 reemplazando tokens comunes en el nombre de R1, en el mismo directorio."""
    name = r1.name
    for t1, t2 in TOKEN_PAIRS:
        if t1 in name:
            cand = r1.with_name(name.replace(t1, t2, 1))
            if cand.exists():
                return cand
    return None

def _best_effort_r2_in_dir(r1: Path) -> Optional[Path]:
    """
    Si el reemplazo directo falla, intenta localizar un R2 plausible en el mismo directorio:
    - Algún archivo que contenga un token R2 y termine en .f*q.gz
    - Preferir el que comparte mayor prefijo con R1.
    """
    dirp = r1.parent
    candidates: List[Path] = []
    for patt in ["*_R2*.fastq.gz", "*_2*.fastq.gz", "*R2*.fastq.gz", "*2*.fastq.gz",
                 "*_R2*.fq.gz",   "*_2*.fq.gz",   "*R2*.fq.gz",   "*2*.fq.gz"]:
        candidates.extend(dirp.glob(patt))
    if not candidates:
        return None

    def common_prefix_len(a: str, b: str) -> int:
        n = min(len(a), len(b)); i = 0
        while i < n and a[i] == b[i]:
            i += 1
        return i

    best = max(candidates, key=lambda p: common_prefix_len(p.name, r1.name))
    return best

def _collect_sample_dirs(root: Path) -> List[Path]:
    """Devuelve subdirectorios de primer nivel si existen; si no, devuelve [root] para fallback plano."""
    subdirs = [d for d in sorted(root.iterdir()) if d.is_dir()]
    return subdirs if subdirs else [root]

def find_pairs(trim_root: Path, debug: bool=False) -> List[Tuple[str, Path, Path]]:
    """
    Busca pares R1/R2 en estructura:
      trim_root/<sample>/**/{R1,R2}*.f*q.gz
    - Explora recursivamente debajo de cada subcarpeta de muestra.
    - Acepta múltiples patrones de nombres.
    Devuelve: lista de (sample, R1, R2) con sample = nombre del subdirectorio de primer nivel
              o, en fallback plano, el prefijo derivado del archivo R1.
    """
    pairs: List[Tuple[str, Path, Path]] = []
    sample_dirs = _collect_sample_dirs(trim_root)

    for sample_dir in sample_dirs:
        if sample_dir == trim_root and sample_dir.is_dir() and not any(sample_dir.iterdir()):
            continue

        sample_name = sample_dir.name if sample_dir != trim_root else None

        r1_candidates: List[Path] = []
        for patt in R1_GLOBS_DEFAULT:
            r1_candidates.extend(sample_dir.rglob(patt))
        r1_candidates = sorted(set(r1_candidates))

        if debug:
            print(f"[DEBUG] Dir muestra: {sample_dir}")
            for r1 in r1_candidates:
                print(f"[DEBUG]   R1 cand: {r1}")

        used: set[Path] = set()
        for r1 in r1_candidates:
            if r1 in used:
                continue
            r2 = _try_match_r2_by_tokens(r1)
            if r2 is None or not r2.exists():
                r2 = _best_effort_r2_in_dir(r1)
            if r2 and r2.exists() and r2 not in used:
                used.add(r1); used.add(r2)
                if sample_dir == trim_root:
                    sname = r1.name
                    for suf in ["_R1.fastq.gz","_1.fastq.gz","R1.fastq.gz","1.fastq.gz",
                                "_R1.fq.gz","_1.fq.gz","R1.fq.gz","1.fq.gz"]:
                        if sname.endswith(suf):
                            sname = sname[:-len(suf)]
                            break
                else:
                    sname = sample_name
                pairs.append((sname, r1, r2))

        if debug and not pairs:
            print(f"[DEBUG]   No se encontraron pares en {sample_dir}")

    return pairs

# =========================
# Normalización de salidas
# =========================
def normalize_and_prefix_outputs_megahit(sample: str, out_dir: Path, log: Path):
    """
    MEGAHIT produce final.contigs.fa.
    Crear contigs.fasta y renombrar a <sample>_contigs.fasta para uniformidad.
    """
    src = out_dir / "final.contigs.fa"
    mid = out_dir / "contigs.fasta"
    if src.exists():
        try:
            shutil.copyfile(src, mid)
        except Exception as e:
            with open(log, "a") as lg:
                lg.write(f"[WARN] No se pudo copiar a contigs.fasta: {e}\n")
    for fname in ["contigs.fasta", "scaffolds.fasta"]:
        source = out_dir / fname
        if source.exists():
            renamed = out_dir / f"{sample}_{fname}"
            if not renamed.exists():
                source.rename(renamed)
            print(f"[RENAME] {renamed.name}")

def prefix_outputs_metaspades(sample: str, out_dir: Path):
    """
    MetaSPAdes produce contigs.fasta y scaffolds.fasta.
    Renombrar con prefijo de muestra.
    """
    for fname in ["contigs.fasta", "scaffolds.fasta"]:
        source = out_dir / fname
        if source.exists():
            renamed = out_dir / f"{sample}_{fname}"
            if not renamed.exists():
                source.rename(renamed)
            print(f"[RENAME] {renamed.name}")

# ==========
# Ensamblaje
# ==========
def assemble_one(sample: str, R1: Path, R2: Path, assembler: str, threads: int, mem_gb: int,
                 overwrite: bool=False) -> tuple[str, str, str]:
    sample_outdir = ASM_DIR / f"{sample}_assembled"
    log_file = ASM_LOG_DIR / f"{sample}.log"

    # Manejo del directorio de salida
    if sample_outdir.exists():
        if overwrite:
            try:
                shutil.rmtree(sample_outdir)
            except Exception as e:
                return (sample, "ERROR", f"No se pudo borrar salida previa: {e}")
        else:
            return (sample, "SKIPPED", "Ya existe (use --overwrite para rehacer)")

    try:
        if assembler == "megahit":
            if not have_bin("megahit"):
                return (sample, "ERROR", "megahit no encontrado en PATH")
            cmd = [
                "megahit",
                "-1", str(R1),
                "-2", str(R2),
                "-o", str(sample_outdir),   # dejar que megahit cree la carpeta
                "-t", str(threads),
                "--presets", "meta-sensitive",
                # Correcciones añadidas (flags extra de ejemplo):
                "--min-contig-len", "1000",
                "--k-list", "21,29,39,59,79,99,119",
            ]
            code = run(cmd, log_file)
            if code != 0:
                try:
                    print(f"[LOG TAIL] {log_file}")
                    os.system(f"tail -n 200 {log_file}")
                except Exception:
                    pass
                return (sample, "ERROR", f"megahit exit code {code}")
            normalize_and_prefix_outputs_megahit(sample, sample_outdir, log_file)

        elif assembler == "metaspades":
            exe = "metaspades.py" if have_bin("metaspades.py") else ("spades.py" if have_bin("spades.py") else None)
            if exe is None:
                return (sample, "ERROR", "metaspades.py no encontrado en PATH")
            cmd = [exe, "-1", str(R1), "-2", str(R2), "-o", str(sample_outdir), "-t", str(threads)]
            if mem_gb and int(mem_gb) > 0:
                cmd += ["-m", str(int(mem_gb))]
            # Correcciones añadidas (flag extra de ejemplo):
            cmd += ["--only-assembler"]  # si ya tienes corrección hecha
            # cmd += ["--meta"]  # (metaspades.py ya implica modo meta)
            code = run(cmd, log_file)
            if code != 0:
                try:
                    print(f"[LOG TAIL] {log_file}")
                    os.system(f"tail -n 200 {log_file}")
                except Exception:
                    pass
                return (sample, "ERROR", f"metaspades exit code {code}")
            prefix_outputs_metaspades(sample, sample_outdir)

        else:
            return (sample, "ERROR", f"Assembler no soportado: {assembler}")

        return (sample, "OK", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    except Exception as e:
        try:
            print(f"[LOG TAIL] {log_file}")
            os.system(f"tail -n 200 {log_file}")
        except Exception:
            pass
        return (sample, "ERROR", str(e))

def obtener_muestras_pendientes(pairs: List[Tuple[str, Path, Path]]) -> List[str]:
    """
    Filtra muestras sin carpeta de ensamblaje y las ordena por tamaño total ascendente.
    """
    info = []
    for sample, r1, r2 in pairs:
        outfolder = ASM_DIR / f"{sample}_assembled"
        if not outfolder.exists():
            sz = size_sum(r1, r2)
            info.append((sample, sz))
    return [m for m, _ in sorted(info, key=lambda x: x[1])]

# ==========
# Main
# ==========
def main(argv=None) -> int:
    parser = argparse.ArgumentParser(description="Ensamblaje metagenómico en serie (MEGAHIT/MetaSPAdes) con 02.trimmed/<sample>/**/R1,R2.")
    parser.add_argument("--assembler", choices=["megahit","metaspades"],
                        default=os.environ.get("MAG_ASSEMBLER", "megahit"),
                        help="Algoritmo de ensamblaje (default: megahit).")
    parser.add_argument("--threads", type=int,
                        default=int(os.environ.get("MAG_MEGAHIT_THREADS", os.environ.get("MAG_SPADES_THREADS", "40"))),
                        help="Hilos por muestra (default: env o 40).")
    parser.add_argument("--mem-gb", type=int,
                        default=int(os.environ.get("MAG_SPADES_MEM_GB", "0")),
                        help="Memoria (GB) para MetaSPAdes; 0=auto.")
    parser.add_argument("--debug", action="store_true",
                        help="Imprime información de depuración del descubrimiento de pares.")
    parser.add_argument("--overwrite", action="store_true",
                        help="Si existe el directorio de salida de la muestra, lo elimina y vuelve a ensamblar.")
    args = parser.parse_args(argv)

    if not TRIM_DIR.exists():
        print(f"[ERROR] No existe carpeta de trimmed: {TRIM_DIR}", file=sys.stderr)
        return 2

    pairs = find_pairs(TRIM_DIR, debug=args.debug)
    if not pairs:
        print(f"[ERROR] No se detectaron pares R1/R2 dentro de subdirectorios en {TRIM_DIR}", file=sys.stderr)
        if not args.debug:
            print("Sugerencia: ejecute nuevamente con --debug para imprimir candidatos encontrados.", file=sys.stderr)
        return 3

    pendientes = obtener_muestras_pendientes(pairs)
    print(f" Muestras detectadas: {len(pairs)}")
    print(f" Muestras a ensamblar (ordenadas por tamaño): {len(pendientes)}")
    print(f" Iniciando ensamblaje en serie con {args.threads} hilos por muestra usando '{args.assembler}'...\n")

    idx = {s: (r1, r2) for s, r1, r2 in pairs}

    resultados = []
    for sample in pendientes:
        R1, R2 = idx[sample]
        print(f" Ensamblando: {sample}")
        res = assemble_one(sample, R1, R2, args.assembler, args.threads, args.mem_gb, overwrite=args.overwrite)
        resultados.append(res)
        print(f" Resultado: {res[1]} - {res[2]}\n")

    ASM_DIR.mkdir(parents=True, exist_ok=True)
    with open(RESUMEN_CSV, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Muestra", "Estado", "Detalle"])
        writer.writerows(resultados)

    print(f"\n Logs por muestra en: {ASM_LOG_DIR}")
    print(f" Contigs generados en: {ASM_DIR}")
    print(f" Resumen CSV: {RESUMEN_CSV}")
    print("\n Ensamblajes completados.")
    return 0

if __name__ == "__main__":
    sys.exit(main())


# Ensamblaje metagenómico (MEGAHIT / MetaSPAdes)

Ensamblar muestras **paired-end** a partir de lecturas *trimmed*, procesando **una muestra a la vez**, puedes paralelizar este proceso.  
Usa **MEGAHIT** por defecto y **MetaSPAdes** como alternativa.

1. **Localiza pares R1/R2** dentro de `mags/results/02.trimmed/<sample>/**/{R1,R2}*.f*q.gz` con patrones y reemplazo de tokens.
2. **Ordena** las muestras pendientes por **tamaño total** (R1+R2) ascendente.
3. **Ensamblaje** por muestra:
   - **MEGAHIT** (default) con `--presets meta-sensitive` **y** flags adicionales sugeridos.
   - **MetaSPAdes** (`metaspades.py` o `spades.py`) con `--only-assembler` como ejemplo extra.
4. **Normaliza salidas**:
   - MEGAHIT: `final.contigs.fa → contigs.fasta → <sample>_contigs.fasta`.
   - MetaSPAdes: renombra `contigs.fasta`/`scaffolds.fasta` a `<sample>_*`.
5. **Registra logs** por muestra y compone un **resumen CSV**.

## Entradas (inputs)
- Directorio con lecturas **recortadas**: `mags/results/02.trimmed/`
- Archivos esperados:
  - `<sample>/**/R1*.f*q.gz` y `<sample>/**/R2*.f*q.gz`
- **Variables de entorno** (opcionales):
  - `MAG_PROJECT_DIR` o `MAGENTA_DIR`: raíz del proyecto (default: `cwd`).
  - `MAG_ASSEMBLER`: `megahit` o `metaspades` (si no se usa `--assembler`).
  - `MAG_MEGAHIT_THREADS` / `MAG_SPADES_THREADS`: hilos por muestra (fallback: 40).
  - `MAG_SPADES_MEM_GB`: memoria (GB) para MetaSPAdes (0 = auto).

## Salidas (outputs)
- Ensamblajes por muestra: `mags/results/03.assembly/<sample>_assembled/`
  - **MEGAHIT**: `<sample>_contigs.fasta` (a partir de `final.contigs.fa`)
  - **MetaSPAdes**: `<sample>_contigs.fasta` y opcional `<sample>_scaffolds.fasta`
- **Logs**: `mags/results/03.assembly/logs/<sample>.log`
- **Resumen**: `mags/results/03.assembly/resumen_ensamblaje.csv`
  - Columnas: `Muestra`, `Estado` (`OK`, `ERROR`, `SKIPPED`), `Detalle` (timestamp o mensaje)

## Descubrimiento de pares (R1/R2)
- Patrones R1 (recursivo): `*_R1*.fastq.gz`, `*_1*.fq.gz`, `*R1*.fq.gz`, etc.
- R2 por **reemplazo de tokens** (`_R1↔_R2`, `_1↔_2`, `R1↔R2`, `1↔2`) o **búsqueda mejor-esfuerzo** en el mismo directorio (prefiere mayor prefijo compartido).
- Fallback plano: si no hay subcarpetas en `02.trimmed`, deriva el nombre de la muestra del prefijo del archivo.

---

## Ensambladores y **parámetros usados por el script**

### Parámetros — MEGAHIT

- `-1`, `-2` → FASTQ pareados.  
- `-o <sample_outdir>` → carpeta de salida (`<sample>_assembled`).  
- `-t <threads>` → hilos por muestra.  
- `--presets meta-sensitive` → preset sensible para metagenomas.  
- `--min-contig-len 1000` → **nuevo**: contigs mínimos a reportar (ejemplo didáctico).  
- `--k-list 21,29,39,59,79,99,119` → **nuevo**: lista de *k-mers* (ejemplo didáctico).  

Puedes ajustar `--min-contig-len` y `--k-list` según cobertura/heterogeneidad.  

---

### Parámetros — MetaSPAdes

- `-1`, `-2` → FASTQ pareados.  
- `-o <sample_outdir>` → carpeta de salida (`<sample>_assembled`).  
- `-t <threads>` → hilos por muestra.  
- `-m <mem_gb>` → **opcional** si `--mem-gb > 0`; si `0`, deja el autoajuste.  
- `--only-assembler` → **nuevo**: omite la fase de corrección si ya hiciste QC/corrección aguas arriba.  

## Ejemplos prácticos

### MEGAHIT con contigs ≥1 kb y k-list amplio
```bash
python 06_assembly_serial.py --assembler megahit --threads 48
(El script ya incluye --min-contig-len 1000 y --k-list 21,29,39,59,79,99,119 como ejemplo didáctico.)

MetaSPAdes con 64 hilos, 512 GB y solo ensamblado
python 06_assembly_serial.py --assembler metaspades --threads 64 --mem-gb 512


