In [9]:
#Load libraries
import os
import subprocess
import sys
import pandas as pd
import numpy as np
import pygbif


from pathlib import Path
root = Path("/Users/ricardorivero/Documents/GitHub/arenavirus_hantavirus")

In [15]:
import subprocess
import shutil

def call_extract_db(r_script: str, input_rds: str, db_name: str, output_csv: str = None):
    """
    Call the R extract_db script via subprocess.
    
    Parameters
    ----------
    r_script : str
        Path to the R script (e.g., RSD2CSV.R).
    input_rds : str
        Path to the input .rds file.
    db_name : str
        Name of the database/table inside the .rds to extract.
        Use '--list' to print available table names.
    output_csv : str, optional
        Path where the CSV will be written. Not required if db_name == '--list'.
    
    Returns
    -------
    str or list
        - If db_name == '--list': returns a list of available table names (strings).
        - Otherwise: returns the output_csv path if extraction succeeded.
    """
    print("Rscript at:", shutil.which("Rscript"))
    
    # Build command
    if db_name == "--list":
        cmd = ["Rscript", r_script, input_rds, "--list"]
    else:
        if output_csv is None:
            raise ValueError("output_csv must be provided unless db_name == '--list'")
        cmd = ["Rscript", r_script, input_rds, db_name, output_csv]
    
    # Run Rscript
    res = subprocess.run(cmd, text=True, capture_output=True)
    
    print("Return code:", res.returncode)
    print("\n--- STDOUT ---\n", res.stdout)
    print("\n--- STDERR ---\n", res.stderr)
    
    # Raises CalledProcessError if R returned non-zero
    res.check_returncode()
    
    if db_name == "--list":
        # Parse the table names from stdout
        lines = [line.strip(" -") for line in res.stdout.splitlines() if line.strip()]
        return lines
    else:
        return output_csv


# Example usage: list mode
# available = call_extract_db(
#     "arenavirus_hantavirus/R/RSD2CSV.R",
#     "arenavirus_hantavirus/data/database/Project_ArHa_database_2025-08-20.rds",
#     "--list"
# )
# print("Available tables:", available)

# Example usage: extract mode
# csv_path = call_extract_db(
#     "arenavirus_hantavirus/R/RSD2CSV.R",
#     "arenavirus_hantavirus/data/database/Project_ArHa_database_2025-08-20.rds",
#     "descriptives",
#     "arenavirus_hantavirus/data/test_data/paper_data.csv"
# )
# print("CSV written to:", csv_path)

Error in gzfile(file, "rb") : no se puede abrir la conexión
Calls: extract_db -> readRDS -> gzfile
In gzfile(file, "rb") :
  cannot open compressed file '../../data/database/Project_ArHa_database_2025-08-20.rds', probable reason 'No such file or directory'
Ejecución interrumpida


CalledProcessError: Command '['Rscript', '../R/RSD2CSV.R', '../../data/database/Project_ArHa_database_2025-08-20.rds', 'descriptive', '../../data/test_data/paper_data.csv']' returned non-zero exit status 1.

In [11]:
# taxonomy_clean_firstpass.py
import re, time, unicodedata
from typing import Optional, Dict, Any, List
import pandas as pd
from pygbif import species as gbif
import requests_cache

# --- Caching to be polite & fast ---
requests_cache.install_cache("gbif_cache", expire_after=7*24*3600)  # 7 days

# --- Normalization helpers ---
FLAG_TOKENS = {"cf.", "aff.", "sp.", "spp.", "nr.", "gr.", "sensu", "sl.", "ss."}

def normalize_name(name: str) -> Optional[str]:
    """Unicode-normalize, strip authorships/flags, collapse to Genus species."""
    if not isinstance(name, str):
        return None
    x = unicodedata.normalize("NFKC", name).strip()
    x = re.sub(r"\s+", " ", x)
    if x == "" or x.lower() in {"na", "n/a", "unknown", "undetermined"}:
        return None
    # Remove content in parentheses (often authorships)
    x = re.sub(r"\([^)]*\)", "", x).strip()
    # Remove terminal authorships and years (very rough but effective)
    x = re.sub(r"\b[A-Z][a-zA-Z-]+(?:\s*&\s*[A-Z][a-zA-Z-]+)?(?:\s*,\s*\d{4})?$", "", x).strip()
    # Token filter (drop from first flag onwards)
    tokens = x.split(" ")
    cleaned = []
    for t in tokens:
        if t.lower() in FLAG_TOKENS:
            break
        cleaned.append(t)
    if not cleaned:
        return None
    # Keep only Genus + species epithet (binomial) on first pass
    if len(cleaned) >= 2:
        genus = cleaned[0].capitalize()
        species = cleaned[1].lower()
        # Basic sanity: genus starts uppercase alpha
        if not re.match(r"^[A-Z][a-zA-Z-]+$", genus):
            return None
        # species epithet can include hyphen but no capitals/digits
        if not re.match(r"^[a-z-]+$", species):
            return None
        return f"{genus} {species}"
    # If only a genus is present, return None on first pass (too ambiguous)
    return None

# --- GBIF resolution with gentle backoff ---
def gbif_resolve(name: str, min_confidence: int = 85, retries: int = 3, sleep: float = 0.3) -> Dict[str, Any]:
    """Resolve via GBIF backbone; return rich dict with match metadata."""
    last_err = None
    for i in range(retries):
        try:
            res = gbif.name_backbone(name=name, strict=False)
            if not res or "matchType" not in res:
                return {"best_match": None, "confidence": None, "status": "no_match", "note": "empty_response"}
            # Consider a match acceptable if GBIF gives a canonical name + reasonable confidence
            conf = res.get("confidence", 0)
            can = res.get("canonicalName") or res.get("scientificName")
            status = res.get("status")  # e.g., "ACCEPTED", "SYNONYM"
            match_type = res.get("matchType")
            accepted_usage_key = res.get("acceptedUsageKey") or res.get("usageKey")
            accepted_name = None
            if status == "SYNONYM" and res.get("acceptedUsageKey"):
                # Look up the accepted name quickly (cached)
                acc = gbif.name_usage(key=res["acceptedUsageKey"])
                accepted_name = acc.get("canonicalName") or acc.get("scientificName")
            out = {
                "query": name,
                "best_match": can,
                "accepted_name": accepted_name,
                "usageKey": res.get("usageKey"),
                "acceptedUsageKey": accepted_usage_key,
                "rank": res.get("rank"),
                "status": status,
                "matchType": match_type,
                "confidence": conf,
                "kingdom": res.get("kingdom"),
                "phylum": res.get("phylum"),
                "class": res.get("class"),
                "order": res.get("order"),
                "family": res.get("family"),
                "genus": res.get("genus"),
                "note": None,
                "source": "GBIF",
            }
            if conf is None or conf < min_confidence:
                out["note"] = f"low_confidence({conf})"
                out["status"] = "low_confidence"
            return out
        except Exception as e:
            last_err = str(e)
            time.sleep(sleep * (i + 1))
    return {"query": name, "best_match": None, "confidence": None, "status": "error", "note": last_err, "source": "GBIF"}

def firstpass_cleaning(species_list: List[str],
                       min_confidence: int = 85) -> pd.DataFrame:
    """
    Normalize and resolve names; return tidy mapping table.
    Columns: original, normalized, cleaned_name, match_status, source, usageKey, acceptedUsageKey, confidence, note
    """
    records = []
    for raw in species_list:
        normalized = normalize_name(raw)
        if not normalized:
            records.append({
                "original": raw, "normalized": None, "cleaned_name": None,
                "match_status": "unusable_input", "source": None,
                "usageKey": None, "acceptedUsageKey": None, "confidence": None, "note": "no_binomial"
            })
            continue
        m = gbif_resolve(normalized, min_confidence=min_confidence)
        cleaned = None
        status = m.get("status")
        if status in {"ACCEPTED", "low_confidence"} and m.get("best_match"):
            cleaned = m["best_match"]
        elif status == "SYNONYM" and (m.get("accepted_name") or m.get("best_match")):
            cleaned = m.get("accepted_name") or m.get("best_match")
        elif status == "no_match":
            cleaned = normalized  # keep normalized binomial but flag as no_match
        records.append({
            "original": raw,
            "normalized": normalized,
            "cleaned_name": cleaned,
            "match_status": status,
            "source": m.get("source"),
            "usageKey": m.get("usageKey"),
            "acceptedUsageKey": m.get("acceptedUsageKey"),
            "confidence": m.get("confidence"),
            "note": m.get("note"),
        })
    return pd.DataFrame.from_records(records)

if __name__ == "__main__":
    # Example pipeline
    host_data = host_data = pd.read_csv(root / "data" / "test_data" / "host_data.csv")
    species_list = host_data["host_species"].astype(str).unique().tolist()

    mapping = firstpass_cleaning(species_list, min_confidence=85)

    # Merge back into your dataset
    host_data = host_data.merge(mapping[["original","cleaned_name","match_status","confidence"]],
                                left_on="host_species", right_on="original", how="left") \
                         .drop(columns=["original"]) \
                         .rename(columns={"cleaned_name":"species_clean"})

    host_data.to_csv(root / "data" / "test_data" / "host_data.cleaned.csv",
                    index=False, encoding="utf-8")

    mapping.to_csv(root / "data" / "test_data" / "host_species_cleaning_map.csv",
                index=False, encoding="utf-8")
    print("Wrote host_data.cleaned.csv and host_species_cleaning_map.csv")

Wrote host_data.cleaned.csv and host_species_cleaning_map.csv


In [12]:
#Quantify changes
# Count mismatches between original and cleaned species names
def quantify_corrections(host_data):
    """
    Compare original host_species vs species_clean.
    Returns dictionary with counts of corrected, unchanged, and missing values.
    """
    total = len(host_data)
    # Exclude rows where species_clean is NA (unmatched / unusable input)
    valid = host_data.dropna(subset=["species_clean"])
    
    corrected = (valid["host_species"] != valid["species_clean"]).sum()
    unchanged = (valid["host_species"] == valid["species_clean"]).sum()
    missing = host_data["species_clean"].isna().sum()
    
    return {
        "total_rows": total,
        "with_cleaned_match": len(valid),
        "corrected": corrected,
        "unchanged": unchanged,
        "missing_or_unmatched": missing,
        "correction_rate": corrected / total if total > 0 else 0
    }

# Example usage

host_data_cleaned = pd.read_csv(root / "data" / "test_data" / "host_data.cleaned.csv")

summary = quantify_corrections(host_data_cleaned)
print(summary)

{'total_rows': 55716, 'with_cleaned_match': 52028, 'corrected': 0, 'unchanged': 52028, 'missing_or_unmatched': 3688, 'correction_rate': 0.0}


In [13]:
# Get rows where the cleaning failed (species_clean is NaN)
unmatched = host_data[host_data_cleaned["species_clean"].isna()]

# Show the unique original values that failed to resolve
unmatched_species = unmatched["host_species"].unique().tolist()

print("Number of unmatched species:", len(unmatched_species))
print("Unmatched species:", unmatched_species)
unmatched_counts = unmatched["host_species"].value_counts()
print(unmatched_counts)

unmatched

Number of unmatched species: 1
Unmatched species: [nan]
Series([], Name: count, dtype: int64)
