# Taxonomic Assignment of Species
This script takes biotime_unique_entries.csv as input, assigns taxonomic information for each species, and outputs a taxonomy key. It relies on Biopython and calls upon the NCBI, COL, GBIF, and WORMS databases.

# Library and API Credentials

In [28]:
# Core libraries
import os
import time
import math
import glob
import shutil  
import glob
import json 
import pandas as pd
import numpy as np
import requests
import warnings

# Biopython for NCBI taxonomy
from Bio import Entrez

# CLI progress bar
from tqdm import tqdm

# NCBI credentials
Entrez.email = "emduggan@mit.edu"
Entrez.api_key = "2e5155aba559345711a3af676cb6c6703608"

# Defining functions

In [29]:
_warned_inputs = set()  # to suppress repeated genus-only warnings

def save_cache(filename, new_cache):
    # Load existing cache if available
    if os.path.exists(filename):
        with open(filename, "r") as f:
            try:
                existing_cache = json.load(f)
            except json.JSONDecodeError:
                existing_cache = {}
    else:
        existing_cache = {}

    # Merge: only update new keys or changed values
    existing_cache.update(new_cache)

    # Write back merged cache
    with open(filename, "w") as f:
        json.dump(existing_cache, f)

def load_cache(filename):
    if os.path.exists(filename):
        with open(filename) as f:
            return json.load(f)
    return {}

def extract(lineage, rank):
    """Extract the name for a given rank from a lineage list."""
    for entry in lineage:
        if entry.get("Rank", "").lower() == rank.lower():
            return entry.get("ScientificName")
        if entry.get("rank", "").lower() == rank.lower():
            return entry.get("name")
    return None

def ncbi_query(term):
    """
    Query NCBI Taxonomy for full lineage.

    Returns:
        dict or None: Lineage dict, or None if not found.
    """
    term = term.strip().title()
    if term in ncbi_cache:
        return ncbi_cache[term]
    try:
        search = Entrez.esearch(db="taxonomy", term=term, retmode="xml")
        result = Entrez.read(search)
        if not result["IdList"]:
            return None

        taxid = result["IdList"][0]
        fetch = Entrez.efetch(db="taxonomy", id=taxid, retmode="xml")
        record = Entrez.read(fetch)[0]
        lineage = record.get("LineageEx", [])

        result_lineage = {
            "kingdom": extract(lineage, "kingdom"),
            "phylum":  extract(lineage, "phylum"),
            "class":   extract(lineage, "class"),
            "order":   extract(lineage, "order"),
            "family":  extract(lineage, "family"),
            "genus":   extract(lineage, "genus"),
            "source":  "ncbi"
        }

        if not any(result_lineage.values()):
            return None

        ncbi_cache[term] = result_lineage
        return result_lineage
    except Exception:
        return None

#TODO make it so that fuzzy-matching on common names does not occur
def col_query(term):
    """
    Query Catalogue of Life (COL) API for lineage. Rejects fuzzy/common name matches.

    Args:
        term (str): Genus or species name.

    Returns:
        dict or None: Lineage dictionary if matched exactly; else None.
    """
    term = term.strip().title()
    if term in col_cache:
        return col_cache[term]

    url = f"https://api.catalogueoflife.org/nameusage/search?q={term}"
    try:
        response = session.get(url, timeout=timeout)
        response.raise_for_status()
        data = response.json()

        if data['total'] == 0 or not data['result']:
            return None

        result = data['result'][0]
        matched_name = result.get('scientificName', '').strip().title()
        match_type = result.get('matchType', '').upper()

        # Strict match: name must match exactly AND be an exact match type
        if matched_name != term or match_type not in {"EXACT", "EXACT_NAME"}:
            return None

        lineage = result.get('classification', [])

        result_lineage = {
            "kingdom": extract(lineage, "kingdom"),
            "phylum":  extract(lineage, "phylum"),
            "class":   extract(lineage, "class"),
            "order":   extract(lineage, "order"),
            "family":  extract(lineage, "family"),
            "genus":   extract(lineage, "genus"),
            "source":  "col"
        }

        if not any(result_lineage.values()):
            return None

        col_cache[term] = result_lineage
        return result_lineage

    except Exception:
        return None

def gbif_query(term):
    """
    Query GBIF species match API for lineage.

    Args:
        term (str): Genus or species name.

    Returns:
        dict or None: Lineage dict if found and valid, else None.
    """
    term = term.strip().title()
    if term in gbif_cache:
        return gbif_cache[term]

    url = f"https://api.gbif.org/v1/species/match?name={term}"
    try:
        response = session.get(url, timeout=timeout)
        response.raise_for_status()
        data = response.json()

        # Reject results without a confident match
        if data.get("matchType") == "NONE" or data.get("confidence", 0) < 90:
            return None

        result_lineage = {
            "kingdom": data.get("kingdom"),
            "phylum":  data.get("phylum"),
            "class":   data.get("class"),
            "order":   data.get("order"),
            "family":  data.get("family"),
            "genus":   data.get("genus"),  # no fallback
            "source":  "gbif"
        }

        gbif_cache[term] = result_lineage
        return result_lineage
    except Exception:
        return None

def worms_query(term):
    """
    Query WoRMS (World Register of Marine Species) for lineage.

    Returns:
        dict or None: Lineage dict, or None if not found.
    """
    term = term.strip().title()
    if term in worms_cache:
        return worms_cache[term]
    url = f"http://www.marinespecies.org/rest/AphiaRecordsByName/{term}?like=false&marine_only=false"
    try:
        response = session.get(url, timeout=timeout)
        response.raise_for_status()
        data = response.json()
        if not data or not isinstance(data, list):
            return None

        entry = data[0]
        result_lineage = {
            "kingdom": entry.get("kingdom"),
            "phylum":  entry.get("phylum"),
            "class":   entry.get("class"),
            "order":   entry.get("order"),
            "family":  entry.get("family"),
            "genus":   entry.get("genus"),
            "source":  "worms"
        }

        if not any(result_lineage.values()):
            return None

        worms_cache[term] = result_lineage
        return result_lineage
    except Exception:
        return None

def query_taxonomy_lineage(term, allow_genus_fallback=True, db_order=None, raise_errors=True):
    """
    Query taxonomy hierarchy using NCBI, then fallback to COL, GBIF, WoRMS.

    Args:
        term (str): Binomial or genus name (e.g., "Panthera leo" or "Panthera").
        allow_genus_fallback (bool): Whether to try genus-level lookup if species fails.
        db_order (list[str]): Priority order of databases to query.
        raise_errors (bool): If True, raise query exceptions; else warn and continue.

    Returns:
        dict: Taxonomic lineage with source label. Fields may be None.
    """
    original_term = term  # store before any cleanup
    term = term.strip().title()
    parts = term.split()
    genus = parts[0]

    # Only warn for genus-only original inputs
    warning_key = original_term.lower()
    if len(parts) < 2 and warning_key not in _warned_inputs:
        warnings.warn(f"Input '{original_term}' looks like a genus-only name. Only genus-level query will be attempted.")
        _warned_inputs.add(warning_key)

    db_order = db_order or ["ncbi", "gbif", "worms", "col"]
    query_funcs = {
        "ncbi": ncbi_query,
        "col": col_query,
        "gbif": gbif_query,
        "worms": worms_query}

    for db in db_order:
        if db not in query_funcs:
            raise ValueError(f"Unrecognized database: '{db}'")

        fn = query_funcs[db]

        try:
            # Try species-level query
            if len(parts) >= 2:
                result = fn(term)
                if result and result.get("class"):
                    return result

            # Try genus-level fallback
            if allow_genus_fallback:
                genus_result = fn(genus)
                if genus_result and genus_result.get("class"):
                    if "(genus-level)" not in genus_result["source"]:
                        genus_result["source"] += " (genus-level)"
                    if not genus_result.get("genus"):
                        genus_result["genus"] = genus
                    return genus_result

        except Exception as e:
            msg = f"Failed querying {db} for '{term}': {e}"
            if raise_errors:
                raise RuntimeError(msg) from e
            # warnings.warn(msg)
            continue

    # Total failure — return all Nones including genus
    return {
        "kingdom": None, "phylum": None, "class": None,
        "order": None, "family": None, "genus": None,
        "source": "not_found"
    }


# Creating shared session and caches

In [None]:
# species list
df = pd.read_csv("../data/biotime_unique_entries.csv")
species_list = df["genus_species"].dropna().str.strip().str.title().unique()

# Create shared session for reuse
session = requests.Session()
timeout = 10  # seconds

# create cache subfolder
os.makedirs("../data/cache", exist_ok=True)

# Load persistent caches
ncbi_cache = load_cache("../data/cache/ncbi_cache.json")
col_cache = load_cache("../data/cache/col_cache.json")
gbif_cache = load_cache("../data/cache/gbif_cache.json")
worms_cache = load_cache("../data/cache/worms_cache.json")


# Run Taxonomic Assignment with Batching and Caching


In [27]:
batch_size = 8000
flush_interval = 1000
num_batches = math.ceil(len(species_list) / batch_size)

for i in range(num_batches):
    start = i * batch_size
    end = start + batch_size
    batch = species_list[start:end]

    output_path = f"../data/cache/classified_batch_{i+1:03}.csv"

    # Resume support: skip already-processed species in batch
    if os.path.exists(output_path):
        try:
            processed = pd.read_csv(output_path)["genus_species"]
            if len(processed) >= len(batch):
                print(f"Batch {i+1} already fully processed. Skipping.")
                continue
            else:
                print(f"Batch {i+1} partially processed. Resuming unfinished entries.")
                processed_species = set(processed)
        except Exception:
            print(f"Could not read existing batch {i+1}. Reprocessing from scratch.")
            processed_species = set()
    else:
        processed_species = set()


    print(f"Processing batch {i+1}/{num_batches} ({len(batch)} species)")

    buffer = []
    for j, sp in enumerate(tqdm(batch)):
        if sp in processed_species:
            continue

        try:
            result = query_taxonomy_lineage(sp)
        except Exception as e:
            result = {"genus_species": sp, "error": str(e)}
        result["genus_species"] = sp
        buffer.append(result)

        if (j + 1) % flush_interval == 0 or (j + 1) == len(batch):
            df = pd.DataFrame(buffer)
            df.to_csv(output_path, mode='a', index=False, header=not os.path.exists(output_path))
            buffer.clear()

            save_cache("../data/cache/ncbi_cache.json", ncbi_cache)
            save_cache("../data/cache/col_cache.json", col_cache)
            save_cache("../data/cache/gbif_cache.json", gbif_cache)
            save_cache("../data/cache/worms_cache.json", worms_cache)

    print(f"Saved batch {i+1} to {output_path}")

Batch 1 already fully processed. Skipping.
Processing batch 2/5 (8000 species)


100%|██████████| 8000/8000 [14:07:43<00:00,  6.36s/it]         


Saved batch 2 to ../data/cache/classified_batch_002.csv
Processing batch 3/5 (8000 species)


100%|██████████| 8000/8000 [33:51<00:00,  3.94it/s]  


Saved batch 3 to ../data/cache/classified_batch_003.csv
Processing batch 4/5 (8000 species)


100%|██████████| 8000/8000 [34:04<00:00,  3.91it/s]  


Saved batch 4 to ../data/cache/classified_batch_004.csv
Processing batch 5/5 (7056 species)


100%|██████████| 7056/7056 [29:06<00:00,  4.04it/s]  

Saved batch 5 to ../data/cache/classified_batch_005.csv





# Exporting Results

In [31]:
# --- Step 1: Merge all batch CSVs ---
print("Merging all classified batches...")
batch_files = sorted(glob.glob("../data/cache/classified_batch_*.csv"))
df_all = pd.concat([pd.read_csv(f) for f in batch_files], ignore_index=True)

# --- Step 2: Separate classified and unclassified ---
# Use "class" as the required indicator of a successful classification
required_fields = ["class"]  # can expand to include "kingdom", "phylum", etc.

# Successful: all required fields are not null
df_classified = df_all[df_all[required_fields].notna().all(axis=1)]

# Unsuccessful: all required fields are null or there's an error field
if "error" in df_all.columns:
    df_unclassified = df_all[
        df_all[required_fields].isna().all(axis=1) | df_all["error"].notnull()
    ]
else:
    df_unclassified = df_all[df_all[required_fields].isna().all(axis=1)]

# --- Step 3: Export ---
df_classified.to_csv("../data/classified_species.csv", index=False)
df_unclassified.to_csv("../data/unclassified_species.csv", index=False)

# --- Step 4: Summary ---
print("Exported classified and unclassified results.")
print(f"Total species processed:     {len(df_all):,}")
print(f"Classified species:          {len(df_classified):,}")
print(f"Unclassified species:        {len(df_unclassified):,}")


Merging all classified batches...
Exported classified and unclassified results.
Total species processed:     39,556
Classified species:          39,534
Unclassified species:        22


In [32]:
print(query_taxonomy_lineage("Barentsia benedeni"))


{'kingdom': None, 'phylum': None, 'class': None, 'order': None, 'family': None, 'genus': None, 'source': 'not_found'}
