# Final Scraper

In [None]:
#=============================================================Import all necessary libraries here=============================================================================================

import time, requests,os
from abc import ABC, abstractmethod
from xml.etree import ElementTree as ET
from tqdm import tqdm

#======================================================================The Abstract Interface=====================================================================================================

# This is the class for any new database any future scientist wants to use. We must define what we expect from the new database, and each database provides data in its own way.
# There is no one size fits all. But this is the structure any future class must use

#===============================================================================================================================================================================================

class ProteinDatabase(ABC):
    @abstractmethod
    def search_accessions(self, domain, organism_id=None, max_proteins=None): ...
    @abstractmethod
    def fetch_fasta(self, accession): ...

#======================================================================Implementing the Uniprot Version==========================================================================================

# Uniprot allows for organism ID.

#================================================================================================================================================================================================
class UniProtDatabase(ProteinDatabase):
    SEARCH = "https://rest.uniprot.org/uniprotkb/search" # The search url the function follows
    FASTA  = "https://rest.uniprot.org/uniprotkb/" # The url to find Fasta at

    def search_accessions(self, domain, organism_id=None, max_proteins=None): # setting the parameters
        q = f"xref:Pfam-{domain}" # The base query string for Uniprot API
        if organism_id:
            q += f" AND organism_id:{organism_id}" # Appending organism restriction

        if max_proteins == 0: # If there are no proteins stop processing here
            return [] # Return an empty list

        params = {"query": q, "fields": "accession", "size": 500} # Params are set here for querying
        ids, cursor = [], None # An empty list is set

        while True: # Infinite loop, fetches result pages until no more results
            if cursor:
                params["cursor"] = cursor # If we have cursor value from previous page, add it to request parameters. Uniprot knows where to continue from
            r = requests.get(self.SEARCH, params=params) # Send the request to UniProt API
            if r.status_code != 200: # Check the HTTP response, if not 200, raise an error, something on server broke or request bad
                raise RuntimeError(f"UniProt search failed: {r.status_code}")

            ids.extend(hit["primaryAccession"] for hit in r.json()["results"])# In JSON response, extract primaryAccession field and add it to ids list

            if max_proteins and len(ids) >= max_proteins:# Check if enough results
                return ids[:max_proteins] # If yes, truncate list and exit

            cursor = r.json().get("nextCursor") # Look in JSON in field called nextCursor, if there continue paginating
            if not cursor: # If not present, break, since we are done
                break

        # ----- guards after pagination -----
        if not ids: # If no ids, the protein list is empty and no proteins were found
            raise RuntimeError(f"No hits for {domain} (organism={organism_id})") # Raise error, no valid data
        return ids[:max_proteins] if max_proteins else ids # If there were ids, return all found accessions


    def fetch_fasta(self, accession):
        r = requests.get(f"{self.FASTA}{accession}.fasta") # Build url and send the HTTP GET request
        if r.status_code != 200: # If no errors then skip
            raise RuntimeError(f"UniProt efetch {accession}: {r.status_code}") # If other than 200, raise an error
        hdr, *seq = r.text.rstrip().split("\n") # Splits the text on newline and removes the whitespaces
        return "\n".join([hdr, "".join(s.replace('-', '') for s in seq)]) # return the FASTA with all _ characters removed

#===============================================================Implementing the CDDDatabase Version==============================================================================================

# CDD database is sensitive to time and does not allow too many queries from the same IP Address in a small timeframe, so an API Key may be needed.

#==================================================================================================================================================================================================

NCBI_EXTRAS = {"tool": "ProteinBatchDownloader", "email": "me@example.com"}  # add api_key for a faster querying rate
class CDDDatabase(ProteinDatabase):
    def search_accessions(self, domain, organism_id=None, max_proteins=None): # Setting parameters
        term = domain + (f" AND txid{organism_id}[Organism:exp]" if organism_id else "") # If the organism suffix is provided it will add it

        if max_proteins == 0: # If max proteins requested is 0 then truncate right now and return an empty list
            return []

        retmax, retstart, cdd_ids = 10000, 0, [] # retmax is number if results to request per batch, retstart is to start undex for first batch, and cc_ids to collect cdd_ids from all pages

        # --- eSearch pagination ---
        while True: # A loop that runs until truncated
            r = requests.get(# This sends a GET request to NCBI E-utilities esearch endpoint

      # NCBI_EXTRAS → Contains required metadata like tool name and email, to comply with NCBI policies.

       # "db": "cdd" → Targeting the CDD (Conserved Domain Database).

       # "term": term → The search query, e.g., "cd00184 AND txid9606[Organism:exp]".

     #   "retmax": retmax → Number of results to retrieve in this batch (up to 10,000).

       # "retstart": retstart → Start offset (which record to start from).

      #  "retmode": "xml" → Request XML format for easier parsing.


                "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi",
                params={**NCBI_EXTRAS, "db": "cdd", "term": term,
                        "retmax": retmax, "retstart": retstart, "retmode": "xml"}
    # ET.fromstring(r.text)
    # Parses the XML text returned by NCBI into a tree structure.

    # .findall(".//Id")
    # Finds all <Id> elements in the XML tree (each <Id> represents a single CDD hit).

    # [x.text for x in ...]
    # Extracts the actual ID numbers as plain strings from those XML elements.

    # batch
    # This is a list of CDD IDs found in the current batch/page.


            )
            if r.status_code != 200: # If there was an error and the value is not 200
                raise RuntimeError("CDD esearch failed") # Raise an error

            batch = [x.text for x in ET.fromstring(r.text).findall(".//Id")]
            if not batch: # If no IDs found, then break since we are at the end
                break
            cdd_ids.extend(batch) # Extend all IDs from this batch into the master list cdd_ids

            if max_proteins and len(cdd_ids) >= max_proteins: # If we collected enough then break
                break
            retstart += retmax # moving starting offset forward by retmax

        # --- map CDD → protein -----------------------------
        prot_ids = [] # Will store protein IDs
        for chunk in (cdd_ids[i:i+500] for i in range(0, len(cdd_ids), 500)): # Loop over cdd_ids in chunks of 500 IDs at a time since elink cannot handle too many IDs in one go
            r = requests.get( # Make the call to the server
                "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi",
                params={**NCBI_EXTRAS, "dbfrom": "cdd", "db": "protein",
                        "id": ",".join(chunk), "retmode": "xml"}
            )
            if r.status_code != 200: # Check the status, if there is an error or not
                raise RuntimeError("CDD elink failed")
            prot_ids.extend(x.text for x in ET.fromstring(r.text).findall(".//LinkSetDb/Link/Id")) # Extract the protein ids from the returned XML and extract ids and add to prot_ids

            if max_proteins and len(prot_ids) >= max_proteins: # If reached max proteins requeted then break
                break

        # ----- guards -----
        if not prot_ids: # If there are no proteins found
            raise RuntimeError(f"No hits for {domain} (organism={organism_id})") # Quit and raise an error
        return prot_ids[:max_proteins] if max_proteins else prot_ids # Otherwise return the prot_ids

    def fetch_fasta(self, accession): # A method to get FASTA swquence for one protein ID
        r = requests.get( # Send the request to NCBI
            "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi",
            params={**NCBI_EXTRAS, "db": "protein", "id": accession, "rettype": "fasta", "retmode": "text"}
        )
        if r.status_code != 200: # Check if the request succeeded
            raise RuntimeError(f"NCBI efetch {accession}: {r.status_code}")
        hdr, *seq = r.text.rstrip().split("\n") # Split text
        return "\n".join([hdr, "".join(s.replace('-', '') for s in seq)]) # Clean and join to one string

#===============================================================Implementing the PDBDatabase Version========================================================================================

# Allows the user to define the organism they are interested in.

#===========================================================================================================================================================================================
class PDBDatabase(ProteinDatabase):
    SEARCH = "https://search.rcsb.org/rcsbsearch/v2/query" # The URL to use for the PDB

    def search_accessions(self, domain, organism_id=None, max_proteins=None): # Use this search with these parameters
        if max_proteins == 0: # If no proteins requested then stop computing and return an empty list
            return []

        domain_node = { # a dictionary for everything we are supposed to receive
            "type": "terminal", "service": "text",
            "parameters": {
                "attribute": "rcsb_polymer_entity_annotation.annotation_id",
                "operator":  "exact_match",
                "value":     domain
            }
        }

        if organism_id is not None: # If the organism is defined this dictionary must be used as well
            tax_node = {
                "type": "terminal", "service": "text",
                "parameters": {
                    "attribute": "rcsb_entity_source_organism.taxonomy_lineage.id",
                    "operator":  "exact_match",
                    "value":     str(organism_id)
                }
            }
            query = {"type": "group", "logical_operator": "and", # the query dictionary
                     "nodes": [domain_node, tax_node]}
        else:
            query = domain_node # The query dictionary if organism not defined

        hits, start, rows = [], 0, 1000 # start an empty list for results and start and row to manage chunked downloads
        while True: # To collect hits
            payload = { # Dictionary to handle the payload
                "query": query,
                "return_type": "polymer_entity",
                "request_options": {"paginate": {"start": start, "rows": rows}}
            }
            r = requests.post(self.SEARCH, json=payload) # The JSON we get from requests
            if r.status_code != 200: # If there is an error or not, if there us raise a run time error
                raise RuntimeError(f"PDB search failed: {r.status_code} {r.text}")

            chunk = [h["identifier"] for h in r.json().get("result_set", [])] # If we have a chunk from the parsed JSON we break it here
            if not chunk: # If no chunk we are done paginating
                break
            hits.extend(chunk) # add this chunk of IDs to growing list of hits
            if max_proteins and len(hits) >= max_proteins: # Stop early if have at least max_proteins
                break
            start += rows # Move onto the next page

        entry_ids = list({h.split("_")[0] for h in hits}) # We get things like 1ABC_1 and we break and deduplicate to get things like 1ABC

        # ----- guards -----
        if not entry_ids: # If no entries after a search, raise an error
            raise RuntimeError(f"No hits for {domain} (organism={organism_id})")
        return entry_ids[:max_proteins] if max_proteins else entry_ids # If max proteins set return after reaching it

    # --------------------------------------------------------------------------
    def fetch_fasta(self, accession, organism_id=None):
        """Download one PDB entry FASTA; optionally keep only chains
        whose header mentions the organism_id substring."""
        r = requests.get(f"https://www.rcsb.org/fasta/entry/{accession}") # use this to call for FASTA
        if r.status_code != 200: # Check if we get an error
            raise RuntimeError(f"PDB efetch {accession}: {r.status_code}")

        fasta = r.text # Store FASTA text body in fasta
        if organism_id is not None: # If organism specified
            blocks = fasta.strip().split("\n>") # Split the fasta on this term because FASTA starts with >
            fasta = "\n".join(  # Clean and join the fasta
                (b if b.startswith(">") else ">" + b)
                for b in blocks
                if f"({organism_id})" in b.split("\n", 1)[0]
            )

        if ">" not in fasta: # no > headers left after filtering, error out
            raise RuntimeError(
                f"No chains for organism {organism_id} found in {accession}"
            )

        lines   = fasta.split("\n") # Spluit into individual lines
        cleaned = [lines[0]] + [l.replace("-", "") for l in lines[1:]] # Cleaned text
        return "\n".join(cleaned) # Return the joined and cleaned text

#===============================================================Orchestrator Function==============================================================================================

# This function brings together the functions that have already been defined earlier in the file. It is pivotal to running the program.

#==================================================================================================================================================================================
def fetch_domain_proteins_fasta(db, domain, organism_id, out_file, max_proteins, retry_attempts, pause):
    ids = db.search_accessions(domain, organism_id, max_proteins) # Takes ids from the functions above
    if not ids: raise RuntimeError("No hits") # If no ids raises runtime error
    with open(out_file, "w") as fh: # Opens a file to write
        for acc in tqdm(ids, desc=db.__class__.__name__): # Loop over each accession id and show a progress bar with DB class name
            for a in range(retry_attempts): # try each accession up to retry attempt
                try:
                    fh.write(db.fetch_fasta(acc) + "\n") # Adding to a file these names
                    break # Then break
                except RuntimeError as e: # If there is an error
                    if "429" in str(e) and a < retry_attempts - 1: # Check if rate limit error, too many requests, if so check if retries remain, and if they do sleep and try again
                        time.sleep(2 ** a)
                    else:
                        raise # else raise an error
            time.sleep(pause) # after each accession sleep to avoid hammering the server

def run_pipeline(db_name, domain, organism_id=None, out_file="output.fasta", max_proteins=None):
    if os.path.exists(out_file): # If a file exists where we are trying to write then do not overwrite
      raise FileExistsError(f"{out_file} already exists – will not overwrite.")
    db = {"uniprot": UniProtDatabase, "cdd": CDDDatabase, "pdb": PDBDatabase}.get(db_name.lower()) # dictionary defined to look things up
    if not db: # if no valid db then get a value error
        raise ValueError("bad db")
    fetch_domain_proteins_fasta(db(), domain, organism_id, out_file, max_proteins, 3, 0.34)
# db() creates an instance of the selected database class.

# Calls fetch_domain_proteins_fasta() with:

#     db() → the DB instance

#     domain, organism_id, out_file, max_proteins → passed as arguments

#     3 → retry attempts

#     0.34 → pause after each accession

    print(f"{db_name}: wrote {out_file}") # log success

#==============================================================================Main Function==================================================================================================
if __name__ == "__main__":
    for db in ("uniprot", "cdd", "pdb"):
        run_pipeline(db, "PF00018" if db != "cdd" else "cd00184", None, f"{db}.fasta", 5)



# db: database name string ("uniprot", "cdd", or "pdb")

# domain: as discussed above (Pfam or CDD ID) -> Important and where ID is added

# organism_id: None, meaning all organisms -> Important and where organism is specified

# out_file: filename string, e.g., "uniprot.fasta"

# max_proteins: 5 -> Important and where proteins are specified


UniProtDatabase: 100%|██████████| 5/5 [00:06<00:00,  1.38s/it]


uniprot: wrote uniprot.fasta


CDDDatabase: 100%|██████████| 5/5 [00:05<00:00,  1.15s/it]


cdd: wrote cdd.fasta


PDBDatabase: 100%|██████████| 5/5 [00:03<00:00,  1.31it/s]

pdb: wrote pdb.fasta



