In [34]:
# Load libraries
import pandas as pd
import time
import random
from typing import List, Optional

from Bio import Entrez, SeqIO

In [65]:
# Set email (required by NCBI)
Entrez.email = "James.Chang@bcm.edu"
number_of_samples = 100  # Number of samples to retrieve per query fragment

# File paths
search_config_path = "/home/azureuser/cloudfiles/code/Users/jc62/projects/direct_sequence_analysis/data/ncbi_search_parameters/ncbi_retrieval_config.csv"
query_path = "/home/azureuser/cloudfiles/code/Users/jc62/projects/direct_sequence_analysis/data/ncbi_search_parameters/ads_system_names_short.csv"
output_file_csv = "/home/azureuser/cloudfiles/code/Users/jc62/projects/direct_sequence_analysis/data/ncbi_search_parameters/ads_sequences_100_500_short.csv"
output_file_fasta = "/home/azureuser/cloudfiles/code/Users/jc62/projects/direct_sequence_analysis/data/ncbi_search_parameters/ads_sequences_100_500_short.fasta"

In [36]:
# Load config and query information
class SearchQueryBuilder:
    def __init__(self, return_maximum: int, step: int, delay: float, txid: str, email: str):
        self.return_maximum = return_maximum
        self.step = step
        self.delay = delay
        self.txid = txid
        self.email = email
        self.query = ""

    @classmethod
    def from_csv(cls, path: str) -> "SearchQueryBuilder":
        """Create instance from CSV containing return_maximum, step, delay."""
        df = pd.read_csv(path)
        if df.empty or df.shape[0] < 1:
            raise ValueError("Configuration CSV is empty or malformed.")

        row = df.iloc[0]
        return cls(
            return_maximum=int(row["return_maximum"]),
            step=int(row["step"]),
            delay=float(row["delay"]),
            txid=str(row.get("txid","txid2")),  # Default to 2 (Bacteria) if not provided
            email=str(row.get("email"))
        )

    def build_query_from_list(self, terms: List[str]) -> str:
        self.query = self._format_terms(terms)
        return self.query

    def build_query_from_csv(self, path: str) -> str:
        terms = pd.read_csv(path, header=None)[0].tolist()
        self.query = self._format_terms(terms)
        return self.query

    def _format_terms(self, terms: List[str]) -> str:
        return " OR ".join([f"{term}[Title]" for term in terms])

In [37]:
# Fetch and process protein sequences from NCBI
class NCBIProteinFetcher:
    def __init__(self, email: str, return_maximum: int):
        Entrez.email = email
        self.return_maximum = return_maximum

    # Fetch sequences from NCBI using protein IDs
    def fetch_sequences(self, protein_ids):
        """Fetch GenBank-format protein records from NCBI."""
        handle = Entrez.efetch(
            db="protein",
            id=",".join(protein_ids),
            rettype="gb",
            retmode="text",
            retmax=self.return_maximum
        )
        records = list(SeqIO.parse(handle, "genbank"))
        handle.close()
        return records

In [38]:
# Search and fetch records
def safe_esearch(query, return_maximum: int, delay: float):
    time.sleep(delay)
    handle = Entrez.esearch(
        db="protein", 
        term=query, 
        retmax=return_maximum
    )
    record = Entrez.read(handle)
    handle.close()
    return record

def fetch_and_update_sequences(protein_ids):
    records = fetcher.fetch_sequences(protein_ids)
    return records

In [39]:
# Define a class to retrieve one query term
class ProteinSequenceRetriever:
    def __init__(self, return_maximum: int, step: int, delay: float):
        self.return_maximum = return_maximum
        self.step = step
        self.delay = delay

    def fetch_entry_ids(self, query: str) -> List[str]:
        """Fetch protein IDs from NCBI for a given query."""
        search_result = safe_esearch(
            query, 
            return_maximum=self.return_maximum, 
            delay=self.delay
        )
        return search_result.get("IdList", [])

    def fetch_entry_sequences(self, protein_ids: List[str]) -> List:
        """Fetch protein sequences for given protein IDs."""
        records_entry = []

        for i in range(0, len(protein_ids), self.step):
            print(f"Processing records from {i} to {min(i + self.step, len(protein_ids))}")
            chunk = protein_ids[i:min(i + self.step, len(protein_ids))]

            try:
                records_chunk = fetch_and_update_sequences(chunk)
                records_entry.extend(records_chunk)
            except Exception as e:
                print(f"Error fetching chunk {i}-{i+self.step}: {e}")
            
            time.sleep(self.delay)  # Respect NCBI's rate limits

        return records_entry

In [40]:
# Overall retrieve sequences by query
def retrieve_sequences_by_query(subquery: str):
    records_all = []
    subquery = f"{subquery} AND {builder.txid}[Organism]"
    print(f"Searching for: {subquery}")
    protein_ids = retriever.fetch_entry_ids(subquery)
    records = retriever.fetch_entry_sequences(protein_ids)
    records_all.extend(records)

    return records_all

In [44]:
# Retrieve sequences from NCBI
# Load config and query list
builder = SearchQueryBuilder.from_csv(search_config_path)
query = builder.build_query_from_csv(query_path)

# Instantiate classes
fetcher = NCBIProteinFetcher(
    email=builder.email, 
    return_maximum=builder.return_maximum
)
retriever = ProteinSequenceRetriever(
    return_maximum=builder.return_maximum, 
    step=builder.step, 
    delay=builder.delay
)

In [45]:
# Retrieve all sequences
records_total = []

for subquery in query.split(" OR "):
    records = retrieve_sequences_by_query(subquery)
    records_total.extend(records)
print(f"Total records retrieved: {len(records_total)}") 

In [None]:
# If randomzed sampling is required
random_records = random.sample(records_total, number_of_samples)
for record in random_records: 
    print(f">{record.id} {record.description}\n{record.seq}\n")
records_total = random_records

In [46]:
# Write sequences to files:
# Extract desired fields into a list of dicts
records_data = [{
    "ID": record.id,
    "Description": record.description,
    "Sequence": str(record.seq)
} for record in records_total]

# Create a DataFrame and write to csv 
records_data_df = pd.DataFrame(records_data)
records_data_df.to_csv(output_file_csv, index=False)

# Write to fasta
with open(output_file_fasta, "w") as fasta_out:
    SeqIO.write(records_total, fasta_out, "fasta")

In [68]:
# Write sequences to files:
# Extract desired fields into a list of dicts

# Create a DataFrame and write to csv 
records_data_df = pd.DataFrame(filtered_records)
records_data_df.to_csv(output_file_csv, index=False)

# Write to fasta
with open(output_file_fasta, "w") as fasta_out:
    SeqIO.write(filtered_records, fasta_out, "fasta")