In [23]:
from Bio.SeqUtils.ProtParam import ProteinAnalysis
import pandas as pd
import requests
from io import StringIO
from tqdm import tqdm
from modlamp.descriptors import GlobalDescriptor
import pyarrow as pa


In [11]:
import math
import requests
import pandas as pd
from io import StringIO
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm

def fetch_uniprot_batches_parallel(
    query: str = "reviewed:true",
    batch_size: int = 500,
    total: int = 1000,
    max_workers: int = 32 
) -> pd.DataFrame:
    """
    Parallelized fetch of UniProt search results in TSV format.

    Args:
      query: UniProt query string
      batch_size: number of entries per page
      total: total entries desired
      max_workers: number of threads to use for HTTP fetches

    Returns:
      Pandas DataFrame with up to `total` rows.
    """
    base_url = "https://rest.uniprot.org/uniprotkb/search"
    fields = (
        "accession,id,protein_name,organism_name,length,sequence,mass,"
        "cc_subcellular_location,go_p,go_f,ec,cc_disruption_phenotype,"
        "cc_catalytic_activity,cc_pathway,cc_subcellular_location,"
        "cc_function,cc_domain,cc_induction,cc_pharmaceutical,"
        "cc_disruption_phenotype"
    )

    # how many pages we need
    num_pages = math.ceil(total / batch_size)
    
    def fetch_page(offset: int) -> pd.DataFrame:
        params = {
            "query": query,
            "format": "tsv",
            "fields": fields,
            "size": batch_size,
            "offset": offset
        }
        resp = requests.get(base_url, params=params)
        resp.raise_for_status()
        return pd.read_csv(StringIO(resp.text), sep="\t")

    # kick off all page fetches
    offsets = [i * batch_size for i in range(num_pages)]
    dfs = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(fetch_page, off): off for off in offsets}
        for future in tqdm(as_completed(futures),
                           total=len(futures),
                           desc="UniProt pages"):
            df = future.result()
            if df.empty:
                break
            dfs.append(df)

    # combine and cap to `total`
    all_data = pd.concat(dfs, ignore_index=True)
    return all_data.iloc[:total].reset_index(drop=True)


In [12]:
def fetch_uniprot_batches(query="reviewed:true", batch_size=500, total=10000):
    base_url = "https://rest.uniprot.org/uniprotkb/search"
    all_dataframes = []
    fetched = 0
    params = {
        "query": query,
        "format": "tsv",
        "fields": "accession,id,protein_name,organism_name,length,sequence,mass,cc_subcellular_location,go_p,go_f,ec,cc_disruption_phenotype,cc_catalytic_activity,cc_pathway,cc_subcellular_location,cc_function,cc_domain,cc_induction,cc_pharmaceutical,cc_disruption_phenotype",
        "size": batch_size,
    }

    response = requests.get(base_url, params=params)
    if not response.ok:
        raise Exception(f"Initial fetch failed: {response.status_code} {response.text}")

    with tqdm(total=total, desc="Fetching UniProt entries") as pbar:
        while True:
            df_batch = pd.read_csv(StringIO(response.text), sep="\t")
            if df_batch.empty:
                break

            all_dataframes.append(df_batch)
            batch_size = len(df_batch)
            fetched += batch_size
            pbar.update(batch_size)

            if fetched >= total:
                break

          # Get full next URL from response.links
            next_url = response.links.get("next", {}).get("url", None)
            if not next_url:
                break

            response = requests.get(next_url)
            if not response.ok:
                raise Exception(f"Next fetch failed: {response.status_code} {response.text}")

    return pd.concat(all_dataframes, ignore_index=True)



In [13]:
def compute_biopython_features(seq: str, entry: str) -> dict:
    analysis = ProteinAnalysis(seq)

    try:
        aa_counts = analysis.count_amino_acids()
        aa_percents = analysis.get_amino_acids_percent()
        sec_frac = analysis.secondary_structure_fraction()
    except Exception as e:
        print(f"Failed on sequence: {seq} - {e}")
        return None

    try:
        features = {
            "Entry": entry,
            "Sequence": seq,
            "length": len(seq),
            "mol_weight": analysis.molecular_weight(),
            "iso_point": analysis.isoelectric_point(),
            "aromaticity": analysis.aromaticity(),
            "instability_index": analysis.instability_index(),
            "gravy": analysis.gravy(),
            "helix_frac": sec_frac[0],
            "turn_frac": sec_frac[1],
            "sheet_frac": sec_frac[2],
        }
    except Exception as e:
        print(f"Failed to compute features for sequence: {seq} - {e}")
        return None

    # Add amino acid counts and percents
    for aa in aa_counts:
        features[f"count_{aa}"] = aa_counts[aa]
        features[f"percent_{aa}"] = round(aa_percents[aa],3)

    
    # --- modlamp GlobalDescriptor ---
    try:
        desc = GlobalDescriptor([seq])
        desc.calculate_all()
        modlamp_feats = desc.descriptor[0]
        modlamp_names = [
            "charge_pH7", "boman_index", "aliphatic_index", "hydrophobic_moment"
        ]

        for name, value in zip(modlamp_names, modlamp_feats):
            orig_name = name.split("_", 1)[-1]
            features[name] = value

    except Exception as e:
        print(f"modlamp failed on sequence: {seq} - {e}")
        return None

    return features


In [14]:
from concurrent.futures import ProcessPoolExecutor

def parallel_compute_features(seq_entry_list, max_workers=None):
    """
    seq_entry_list: List of (seq, entry) tuples
    max_workers:    Number of processes to spin up (defaults to os.cpu_count())
    """
    results = []
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # schedule all the jobs
        futures = {
            executor.submit(compute_biopython_features, seq, entry): (seq, entry)
            for seq, entry in seq_entry_list
        }

        # progress over completions
        for future in tqdm(as_completed(futures),
                           total=len(futures),
                           desc="Computing features"):
            seq, entry = futures[future]
            try:
                feat = future.result()
                if feat is not None:
                    results.append(feat)
            except Exception as exc:
                print(f"[Worker error] seq={entry} raised {exc!r}")
    return results

In [16]:
df = fetch_uniprot_batches_parallel(batch_size=250, total=50000, max_workers=256)
df_filtered = df[df['Length'] <= 1024]


UniProt pages: 100%|██████████| 200/200 [00:16<00:00, 12.19it/s]


In [17]:
sequences = df["Sequence"].tolist()
entry = df["Entry"].tolist()
seq_entry_list = list(zip(sequences, entry))

feature_dicts = []
feature_dicts = parallel_compute_features(seq_entry_list, max_workers=32)

Computing features: 100%|██████████| 50000/50000 [00:14<00:00, 3386.64it/s]


In [18]:
master_df = pd.DataFrame(feature_dicts)
master_master_df = master_df.merge(df, on="Sequence", how="left")
master_master_df = master_master_df.rename(columns={"Entry_x": "Entry"})


In [24]:
#save to parquet
df.to_parquet("data.parquet", engine="pyarrow", index=False)
