In [None]:
import requests, math
from collections import defaultdict
import pandas as pd

INTERPRO_URL = "https://www.ebi.ac.uk/interpro/api"
CATH_URL = f"https://www.cathdb.info/version/v4_3_0/api/rest/uniprot_to_funfam"

In [None]:
import requests, math
from collections import defaultdict
import pandas as pd

INTERPRO_URL = "https://www.ebi.ac.uk/interpro/api"
CATH_URL = f"https://www.cathdb.info/version/v4_3_0/api/rest/uniprot_to_funfam"


KNOWN_DATABASES = ["InterPro", "cathgene3d", "cdd", "HAMAP", "panther", "Pfam", "PIRSF", "PRINTS", "ssf", "antifam",
                        "PROSITE" "Patterns", "PROSITE", "profile", "smart", "SFLD", "SUPERFAMILY", "ncbifam"]
# ================================
# FINAL CONSOLIDATED FUNCTIONS
# ================================

def _get_protein_length(uniprot_acc):
    """
    Fetch protein length from UniProt API.
    Returns integer length or None if not found.
    """
    url = f"https://rest.uniprot.org/uniprotkb/{uniprot_acc}.json"
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        data = response.json()
        seq = data.get("sequence", {}).get("value")
        if seq:
            return len(seq)
    except Exception:
        pass
    return None


# Helper function to extract domains from a specific source
def _extract_domains_from_source(uniprot_acc, source, representative_only=False):
    """
    Extract domains from a specific InterPro database source with pagination support.

    Parameters:
    -----------
    uniprot_acc : str
        UniProt accession
    source : str
        Database source ('pfam', 'smart', etc.)
    representative_only : bool
        If True, only return representative domain hits
    """
    url = f"{INTERPRO_URL}/entry/{source}/protein/UniProt/{uniprot_acc}?page_size=200"
    domains = []

    while url:
        try:
            response = requests.get(url, timeout=30)
            if response.status_code == 404:
                break
            response.raise_for_status()
            data = response.json()

            results = data.get("results", []) if isinstance(data, dict) else data

            for entry in results:
                # Get entry metadata
                meta = entry.get("metadata", {})
                entry_acc = meta.get("accession")
                entry_name = meta.get("name", "")
                entry_type = meta.get("type")
                source_db = meta.get("source_database")

                # Find target protein and extract coordinates
                for protein in entry.get("proteins", []):
                    protein_acc = protein.get("accession")
                    if protein_acc and protein_acc.upper() == uniprot_acc.upper():
                        # Check for matches (entry_protein_locations)
                        for location in protein.get("entry_protein_locations", []):
                            # KEY: representative flag is at the LOCATION level, not fragment level
                            is_representative = bool(location.get("representative"))

                            # KEY FILTER: Skip non-representative locations if filtering
                            if representative_only and not is_representative:
                                continue

                            for fragment in location.get("fragments", []):
                                domains.append({
                                    "accession": entry_acc,
                                    "name": entry_name,
                                    "type": entry_type,
                                    "source": source_db,
                                    "start": fragment.get("start"),
                                    "end": fragment.get("end"),
                                    "score": location.get("score"),
                                    "representative": is_representative
                                })

            # Follow pagination
            url = data.get("next") if isinstance(data, dict) else None

        except requests.exceptions.RequestException:
            break

    return domains

def get_interpro_domains(uniprot_acc, source=None, representative_only=False):
    """
    Get protein domain coordinates from InterPro API.

    Parameters:
    -----------
    uniprot_acc : str
        UniProt accession (e.g., 'P28482')
    source : str, list, or None
        - str: Query a specific database ('pfam', 'smart', 'prosite', etc.)
        - list: Query multiple specific databases (['pfam', 'smart'])
        - None: Query all available databases (default: representative hits only)
    representative_only : bool or None
        - True: Only return representative domain hits (reduces redundancy)
        - False: Return all hits
        - None: Auto-decide (True if source=None, False if source is specified)

    Returns:
    --------
    list of dict
        Each domain contains: accession, name, type, source, start, end, score, representative
        Sorted by start coordinate, then end coordinate
    """

    # Known InterPro member databases
    #KNOWN_DATABASES = ["pfam", "smart", "prosite", "prints", "panther", "cdd", "superfamily",
    #                  "tigrfams", "gene3d", "hamap", "pirsf", "profile", "cath3d"]

    if source:
        # Handle list of databases
        if isinstance(source, list):
            all_domains = []
            available_dbs = []

            print(f"Querying specified databases for {uniprot_acc}: {', '.join(source)}")
            if representative_only:
                print("(filtering for representative hits only)")

            for db in source:
                domains = _extract_domains_from_source(uniprot_acc, db, representative_only)
                if domains:
                    available_dbs.append(f"{db}({len(domains)})")
                    all_domains.extend(domains)

            if available_dbs:
                print(f"Found data in: {', '.join(available_dbs)}")
            else:
                print(f"No data found in specified databases")

            # Sort by start, then end coordinate
            return sorted(all_domains, key=lambda x: (x["start"] if x["start"] is not None else float('inf'),
                                                       x["end"] if x["end"] is not None else float('inf')))

        # Handle single database (string)
        else:
            domains = _extract_domains_from_source(uniprot_acc, source, representative_only)
            # Sort by start, then end coordinate
            return sorted(domains, key=lambda x: (x["start"] if x["start"] is not None else float('inf'),
                                                   x["end"] if x["end"] is not None else float('inf')))

    else:
        # Query all known databases (default: representative only)
        all_domains = []
        available_dbs = []

        print(f"Scanning all databases for {uniprot_acc}...")
        if representative_only:
            print("(filtering for representative hits only)")

        for db in KNOWN_DATABASES:
            domains = _extract_domains_from_source(uniprot_acc, db, representative_only)
            if domains:
                available_dbs.append(f"{db}({len(domains)})")
                all_domains.extend(domains)

        print(f"Found data in: {', '.join(available_dbs)}")
        # Sort by start, then end coordinate
        return sorted(all_domains, key=lambda x: (x["start"] if x["start"] is not None else float('inf'),
                                                   x["end"] if x["end"] is not None else float('inf')))

def summarize_protein_domains_dict(uniprot_acc, source=None, representative_only=True):
    """
    Return a dict with protein name, domains (with coords and scores), and protein length.

    Parameters:
    -----------
    uniprot_acc : str
        UniProt accession
    source : str, list, or None
        - str: Query a specific database
        - list: Query multiple specific databases
        - None: Query all available databases (default: representative only)
    representative_only : bool or None
        - True: Only return representative domain hits
        - False: Return all hits

    Returns:
    --------
    dict with keys: uniprot_acc, protein_length, domains (sorted by start coordinate), available_databases
    """
    # Get protein length
    length = _get_protein_length(uniprot_acc)

    # Get domains (this will scan databases if source=None)
    # Domains are already sorted by get_interpro_domains
    domains = get_interpro_domains(uniprot_acc, source=source, representative_only=representative_only)

    # Build available databases dict from domains already retrieved
    if source:
        if isinstance(source, list):
            databases = {}
            for domain in domains:
                db = domain.get('source')
                if db:
                    databases[db] = databases.get(db, 0) + 1
        else:
            databases = {source: len(domains)}
    else:
        # Extract database info from the domains we already have
        databases = {}
        for domain in domains:
            db = domain.get('source')
            if db:
                databases[db] = databases.get(db, 0) + 1

    # Build output dict
    result = {
        "uniprot_acc": uniprot_acc,
        "protein_length": length,
        "domains": domains,
        "available_databases": databases
    }
    return result


In [None]:
# ================================
# TEST
# ================================
# Test with pfam database only (default: all hits, not just representative)
result = summarize_protein_domains_dict("E7BSV0", source=None, representative_only=True)

print(f"\nProtein: {result['uniprot_acc']}")
print(f"Length: {result['protein_length']} residues")
print(f"Found data in databases: {list(result['available_databases'].keys())}")
print(f"\nTotal domains found: {len(result['domains'])}")

for i, domain in enumerate(result['domains'], 1):
    score_str = f" (score: {domain['score']})" if domain.get('score') else " (no score)"
    rep_str = " [REPRESENTATIVE]" if domain.get('representative') else ""
    print(f"  {i}. {domain['name']}: {domain['start']}-{domain['end']} [{domain['source']}]{score_str}{rep_str}")


In [None]:
# Compute linker regions from domain coordinates

In [None]:
def cath_gene3d(uniprot_acc):
    """
    Pull CATH FunFams for a UniProt and infer domain spans if present.
    (Some responses include mapped regions to the sequence.)
    """
    url = f"{CATH_URL}/{uniprot_acc}?content-type=application/json"
    r = requests.get(url, timeout=30)
    if r.status_code == 404:
        return []
    r.raise_for_status()
    data = r.json()
    hits = []
    for ff in data.get("data", []) if isinstance(data, dict) else data:
        sfam = ff.get("superfamily_id")
        funfam = ff.get("funfam_number")
        for reg in ff.get("regions", []):
            hits.append({
                "db": "CATH-Gene3D",
                "accession": f"{sfam}/FF{funfam}",
                "start": reg.fet("aln_start") or reg.get("start"),
                "end": reg.get("aln_end") or reg.get("end"),
                "score": reg.get("evalue") or reg.get("bitscore")
            })
    return sorted([h for h in hits if h["start"] and h["end"]], key=lambda h: (h["start"], h["end"]))
