In [1]:
import pandas as pd
from Bio import Entrez
from Bio import SeqIO
from tqdm import tqdm
import time
import requests
import io

# ---- Parameters ---- #
FILE = "data/VP_library_all_sequences.csv"
TYPE_FILTER = ["VP", "VT"]
ID_COLUMN = "NCBI_id"
UNIPROT_ID_COLUMN = "Uniprot_id"
TYPE_COLUMN = "code"
OUTPUT_FASTA = "data/full_library_virus_proteins.fasta"
OUTPUT_CSV = "data/full_library_virus_proteins.csv"
ENTREZ_EMAIL = "phitro@bu.edu"  
ENTREZ_API_KEY = "9bb9af72db60905930367f8f543e5ef0d108"       

# UniProt API configuration
UNIPROT_BASE_URL = "https://rest.uniprot.org/uniprotkb/"
UNIPROT_HEADERS = {
    "User-Agent": f"PythonScriptForSequenceFetching/1.0 ({ENTREZ_EMAIL})",
    "Accept": "text/fasta" # Request FASTA format
}
UNIPROT_RATE_LIMIT_DELAY = 1.0 # UniProt recommends ~1 request per second without a specific key/quota

# --- Helper Function: Clean UniProt ID ---
def clean_uniprot_id(uniprot_id):
    """Ensures UniProt ID is canonical (e.g., P05221-1 -> P05221) and handles NaN."""
    if pd.isna(uniprot_id) or not isinstance(uniprot_id, str):
        return ""
    return uniprot_id.split('-')[0].strip()

# ---- Load Data ---- #
df = pd.read_csv(FILE)
filtered = df[df[TYPE_COLUMN].isin(TYPE_FILTER)].copy()

unique_ncbi_ids = sorted(filtered[ID_COLUMN].dropna().astype(str).unique())

# Extract unique UniProt IDs, clean them, and flatten if multiple are in one cell
flat_uniprot_ids = set()
if UNIPROT_ID_COLUMN in filtered.columns:
    for ids_str in filtered[UNIPROT_ID_COLUMN].dropna().astype(str).unique():
        # Replace common separators (semicolon, comma) with a single comma for splitting
        for uniprot_id_raw in ids_str.replace(';', ',').split(','):
            cleaned_id = clean_uniprot_id(uniprot_id_raw)
            if cleaned_id:
                flat_uniprot_ids.add(cleaned_id)
unique_uniprot_ids = sorted(list(flat_uniprot_ids))

print(f"Found {len(unique_ncbi_ids)} unique NCBI IDs of type {TYPE_FILTER}")
print(f"Found {len(unique_uniprot_ids)} unique UniProt IDs to fetch sequences for")

# ---- Set up Entrez ---- #
Entrez.email = ENTREZ_EMAIL
if ENTREZ_API_KEY:
    Entrez.api_key = ENTREZ_API_KEY

# ---- Download FASTA sequences (NCBI) ---- #
# Use a dictionary to store all sequences, with a key representing (identifier, source) for deduplication
all_sequences = {} # Key: (identifier_used_for_header, source), Value: Bio.SeqRecord object
seq_df_rows = [] # List to build the DataFrame rows

for ncbi_id in tqdm(unique_ncbi_ids, desc="Fetching NCBI FASTA"):
    try:
        # Try protein database first (most likely)
        handle = Entrez.efetch(db="protein", id=ncbi_id, rettype="fasta", retmode="text")
        records = list(SeqIO.parse(handle, "fasta"))
        handle.close()
        if records:
            for rec in records:
                # FASTA header format: NCBI_id|original_rec_id (e.g., 12345|gi|67890|ref|NP_123.1|)
                # If multiple records come from one NCBI ID, keep their unique IDs.
                # If only one, using just the NCBI_id can be simpler.
                header_id_for_fasta = f"{ncbi_id}|{rec.id}" if len(records) > 1 else str(ncbi_id)
                
                seq_content = str(rec.seq)
                
                # Use a combined key (header_id_for_fasta, source) to ensure uniqueness
                if (header_id_for_fasta, "NCBI") not in all_sequences:
                    rec.id = header_id_for_fasta # Update Bio.SeqRecord ID for the FASTA file
                    rec.description = f"NCBI {ncbi_id} {rec.description}" # Add context to description
                    all_sequences[(header_id_for_fasta, "NCBI")] = rec
                    seq_df_rows.append({'Identifier': ncbi_id, 'Sequence': seq_content, 'Source': 'NCBI'})
        else:
            print(f"NCBI ID {ncbi_id}: No sequences found.")
    except Exception as e:
        print(f"Failed for NCBI ID: {ncbi_id}\nError: {e}")
    time.sleep(0.11)  # NCBI: <=10/sec with API key, <=3/sec without

print(f"Downloaded {len([rec for (id,src),rec in all_sequences.items() if src == 'NCBI'])} unique NCBI protein sequences.")

# ---- Download FASTA sequences (UniProt) ---- #
for uniprot_id in tqdm(unique_uniprot_ids, desc="Fetching UniProt FASTA"):
    if not uniprot_id: # Skip empty UniProt IDs
        continue
    try:
        response = requests.get(f"{UNIPROT_BASE_URL}{uniprot_id}.fasta", headers=UNIPROT_HEADERS, timeout=10)
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)

        fasta_content = response.text
        records = list(SeqIO.parse(io.StringIO(fasta_content), "fasta"))
        
        if records:
            for rec in records:
                # UniProt FASTA header from API: >sp|P12345|ABCDE_HUMAN ...
                # We want header: UniProt_id_from_DF|original_rec_id (e.g., P12345|sp|P12345|ABCDE_HUMAN)
                header_id_for_fasta = f"{uniprot_id}|{rec.id}"
                
                seq_content = str(rec.seq)

                # Use a combined key (header_id_for_fasta, source) to ensure uniqueness
                if (header_id_for_fasta, "UniProt") not in all_sequences:
                    rec.id = header_id_for_fasta # Update Bio.SeqRecord ID for the FASTA file
                    rec.description = f"UniProt {uniprot_id} {rec.description}" # Add context to description
                    all_sequences[(header_id_for_fasta, "UniProt")] = rec
                    seq_df_rows.append({'Identifier': uniprot_id, 'Sequence': seq_content, 'Source': 'UniProt'})
        else:
            print(f"UniProt ID {uniprot_id}: No sequences found.")
    except requests.exceptions.RequestException as e:
        print(f"Failed for UniProt ID: {uniprot_id}\nError: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"  Response status: {e.response.status_code}, content: {e.response.text[:200]}...")
    except Exception as e:
        print(f"Failed for UniProt ID (unexpected error): {uniprot_id}\nError: {e}")
    time.sleep(UNIPROT_RATE_LIMIT_DELAY) # Be polite to UniProt API

print(f"Downloaded {len([rec for (id,src),rec in all_sequences.items() if src == 'UniProt'])} unique UniProt protein sequences.")

# ---- Combine all SeqRecords and write to single FASTA file ---- #
# Convert dictionary values (SeqRecord objects) to a list
all_seq_records_list = [rec for rec in all_sequences.values()] 
print(f"Total {len(all_seq_records_list)} unique protein sequences (NCBI and UniProt combined).")

with open(OUTPUT_FASTA, "w") as out_handle:
    SeqIO.write(all_seq_records_list, out_handle, "fasta")

# ---- Write sequence metadata to CSV file ---- #
seq_df = pd.DataFrame(seq_df_rows)
seq_df.to_csv(OUTPUT_CSV, index=False)

print(f"Wrote all sequences to {OUTPUT_FASTA}")
print(f"Wrote sequence metadata to {OUTPUT_CSV}")

Found 1550 unique NCBI IDs of type ['VP', 'VT']
Found 103 unique UniProt IDs to fetch sequences for


Fetching NCBI FASTA: 100%|██████████| 1550/1550 [10:41<00:00,  2.42it/s]


Downloaded 1550 unique NCBI protein sequences.


Fetching UniProt FASTA:   5%|▍         | 5/103 [00:06<02:13,  1.37s/it]

UniProt ID A0A2I6J2E4: No sequences found.


Fetching UniProt FASTA: 100%|██████████| 103/103 [02:20<00:00,  1.36s/it]

Downloaded 102 unique UniProt protein sequences.
Total 1652 unique protein sequences (NCBI and UniProt combined).
Wrote all sequences to data/full_library_virus_proteins.fasta
Wrote sequence metadata to data/full_library_virus_proteins.csv



