# Preprocessing

This is a neat script that downloads UniProt data and extracts subcellular localization annotations for each protein. We'll use this data to later train the Random Forest model.

## Shell script

This script downloads the UniProt data and decompresses it. Takes around 3 minutes to run. We later extract the sequences and re-write them as FASTA ourselves; however, you are more than welcome to use the original FASTA files if you prefer using:

```bash
wget ftp://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/complete/uniprot_sprot.fasta.gz
gunzip uniprot_sprot.fasta.gz
```

In [None]:
%%sh
set -euo pipefail
mkdir -p data/raw
cd data/raw

echo "Downloading UniProtKB/Swiss-Prot..."
wget -q ftp://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/complete/uniprot_sprot.dat.gz

In [None]:
%%sh
echo "Verifying checksum..."
MD5_DAT_EXPECTED="ecfb866a5de8f27497af396735f09b30"

MD5_DAT_ACTUAL=$(md5sum data/raw/uniprot_sprot.dat.gz | cut -d' ' -f1)

if [[ "$MD5_DAT_ACTUAL" != "$MD5_DAT_EXPECTED" ]]; then
  echo "ERROR: DAT checksum mismatch ($MD5_DAT_ACTUAL)"
  exit 1
fi

echo "Download and verification complete."

In [None]:
%%sh
echo "Decompressing..."
gunzip -f data/raw/uniprot_sprot.dat.gz

## Python module

The following Python module contains the code that processes the UniProt data and extracts subcellular localization annotations for each protein. It also splits the data into training, validation, and test sets, and saves them to CSV files.

In [None]:
import os
import re
from typing import List, Dict, Any

import pandas as pd
from Bio import SwissProt, SeqIO
from Bio.SeqUtils.ProtParam import ProteinAnalysis

from sklearn.model_selection import train_test_split

In [None]:
# paths for raw data
INPUT_DAT = "data/raw/uniprot_sprot.dat"
# processed annotation and filtering
INPUT_ANN = "data/processed/annotations.csv"
INPUT_FASTA = "data/processed/nonredundant.fasta"

# outputs
OUTPUT_CSV = "data/processed/annotations.csv"
OUTPUT_FASTA = "data/processed/filtered.fasta"
OUTPUT_FEAT = "data/processed/features.csv"

# split datasets
OUTPUT_X_TRAIN = "data/processed/X/train.csv"
OUTPUT_X_TMP   = "data/processed/X/tmp.csv"
OUTPUT_X_VAL   = "data/processed/X/val.csv"
OUTPUT_X_TEST  = "data/processed/X/test.csv"

# split labels
OUTPUT_Y_TRAIN = "data/processed/Y/train.csv"
OUTPUT_Y_TMP   = "data/processed/Y/tmp.csv"
OUTPUT_Y_VAL   = "data/processed/Y/val.csv"
OUTPUT_Y_TEST  = "data/processed/Y/test.csv"

In [None]:
# create processed output directories
for path in [
    OUTPUT_CSV, OUTPUT_FASTA, OUTPUT_FEAT,
    OUTPUT_X_TRAIN, OUTPUT_X_TMP, OUTPUT_X_VAL, OUTPUT_X_TEST,
    OUTPUT_Y_TRAIN, OUTPUT_Y_TMP, OUTPUT_Y_VAL, OUTPUT_Y_TEST,
    ]:
    os.makedirs(os.path.dirname(path), exist_ok=True)

We manually exclude a few terms that indicates non-experimental evidence or are not specific enough to be useful for localization prediction. Additionally, we map some specific biological locations to more general terms to reduce the number of unique labels. This should be extended as needed, as this list is far from exhaustive.

In [None]:
_EXCLUDE_TERMS = {"probable", "potential", "by similarity", "prediction"}
ALLOWED_LOCS = {
    "Cytoplasm",
    "Nucleus",
    "Secreted",
    "Mitochondrion",
    "Periplasm",
    "Virion",
    "Plastid",
    "Membrane",
    "Peroxisome",
    "Endoplasmic Reticulum",
    "Golgi Apparatus",
    "Lysosome",
    "Vacuole",
    "Cell Projection",
    "Cell Surface",
    "Cell Junction",
    "Endosome",
}
_SYNONYM_MAP = {
    "host membrane": "Membrane",
    "host cell membrane": "Membrane",
    "cell membrane": "Membrane",
    "cell outer membrane": "Membrane",
    "plasma membrane": "Membrane",
    "mitochondrial matrix": "Mitochondrion",
    "mitochondrion matrix": "Mitochondrion",
    "endoplasmic reticulum lumen": "Endoplasmic Reticulum",
    "er lumen": "Endoplasmic Reticulum",
    "golgi": "Golgi Apparatus",
    # extend as needed
}

In [None]:
def _clean_and_primary(text: str) -> str:
    text = re.split(r"Note=", text, maxsplit=1)[0]
    text = re.sub(r"\{.*?\}|\(.*?\)", "", text)
    part = re.split(r"[.;]", text, maxsplit=1)[0].strip()
    if not part:
        return ""
    low = part.lower()
    if low in _SYNONYM_MAP:
        canon = _SYNONYM_MAP[low]
    else:
        canon = part.title()
    return canon if canon in ALLOWED_LOCS else ""

We exclude multi-compartment entries for this iteration. Single-label classifiers can’t handle proteins annotated to two or more compartments - splitting multi-labels naively can inflate class counts and introduce bias.

Some more considerations:

+ Performance benchmarks (speed, memory) become harder to interpret when outputs are vectors rather than one label.
+ Decision-support tools often struggle to map multi-compartment calls to single ACMG evidence codes.
+ Rare two-compartment combinations will have very few examples. This can undermine learning.

Future iterations can revisit multi-label approaches once the single-label pipeline produces a good benchmark, as clinically, mis- or multi-localization can be disease-relevant, and it is always good to retain them and preserve for downstream pathway analysis.

In [None]:
def extract_protein_data(dat_file: str) -> List[Dict[str, Any]]:
    """
    Parse a UniProt .dat file and return only entries with exactly one
    experimentally-verified subcellular location from ALLOWED_LOCS.
    """
    results: List[Dict[str, Any]] = []

    try:
        handle = open(dat_file)
    except OSError as e:
        raise RuntimeError(f"Cannot open file {dat_file}: {e}")

    with handle:
        for rec in SwissProt.parse(handle):
            locs: List[str] = []

            # 1) try structured API
            if hasattr(rec, "subcellular_locations") and rec.subcellular_locations:
                for loc_tuple in rec.subcellular_locations:
                    loc = loc_tuple.location or ""
                    cleaned = _clean_and_primary(loc)
                    if cleaned:
                        locs.append(cleaned)
            else:
                # 2) fallback to scanning comments
                for comment in rec.comments:
                    if not comment.upper().startswith("SUBCELLULAR LOCATION:"):
                        continue
                    body = comment.split(":", 1)[1]
                    for piece in re.split(r"[;]", body):
                        cleaned = _clean_and_primary(piece)
                        if cleaned:
                            locs.append(cleaned)

            # exclude non-experimental evidence
            combined = " ".join(rec.comments).lower()
            if any(term in combined for term in _EXCLUDE_TERMS):
                continue

            # dedupe and require exactly one compartment
            unique = list(dict.fromkeys(locs))
            if len(unique) != 1:
                continue

            results.append(
                {
                    "entry_name": rec.entry_name,
                    "sequence": rec.sequence,
                    "localization": unique[0],
                }
            )

    return results

In [None]:
if os.path.exists(OUTPUT_CSV):
    df = pd.read_csv(OUTPUT_CSV)
    print(f"Loaded existing annotations from {OUTPUT_CSV}")
else:
    print(f"No existing annotations found, extracting from {INPUT_DAT}")
    print("Extracting protein data...")
    protein_data = extract_protein_data(INPUT_DAT)

    df = pd.DataFrame(protein_data)
    print(f"Data shape: {df.shape}")

    df.to_csv(OUTPUT_CSV, index=False)
    print(f"\nWrote annotations to {OUTPUT_CSV}!")
    print(f"DataFrame saved with {len(df)} entries")

In [None]:
print(f"\nDataset statistics:")
print(f"Total entries: {len(df)}")
print(f"Unique localizations: {df['localization'].nunique()}")
print(f"Average sequence length: {df['sequence'].str.len().mean():.1f}")

print(f"\nTop 10 most common localizations:")
print(df["localization"].value_counts().head(10))

The following code is for debugging purposes and can be removed. It simply saves the full localization distribution to a text file for later analysis. This is useful to understand the distribution of localizations in the dataset, and to ensure that the filtering is working as expected.

In [None]:
with open("data/localization_distribution.txt", "w") as f:
    f.write("Full localization distribution:\n")
    f.write(df["localization"].value_counts().to_string())
    print(
        f"\nFull localization distribution saved to data/localization_distribution.txt"
    )

In [None]:
# optimization: convert string columns to categorical to save memory
df['entry_name'] = df['entry_name'].astype('category')
df['localization'] = df['localization'].astype('category')

print(f"Optimized memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

In [None]:
print(f"\nQuick data exploration:")
print(f"Sequence length distribution:")
print(df['sequence'].str.len().describe())

print(f"\nData quality checks:")
print(f"Entries with very short sequences (<50 AA): {(df['sequence'].str.len() < 50).sum()}")
print(f"Entries with very long sequences (>2000 AA): {(df['sequence'].str.len() > 2000).sum()}")

print(f"\nSample of processed data:")
print(df.head())

In [None]:
with open(OUTPUT_FASTA, "w") as out:
    for _, row in df.iterrows():
        header = f">{row['entry_name']}|{row['localization']}"
        seq = row["sequence"]
        out.write(f"{header}\n")

        for i in range(0, len(seq), 80):
            out.write(seq[i : i + 80] + "\n")

print(f"Wrote FASTA to {OUTPUT_FASTA}")
print(f"Generated {len(df)} sequences in FASTA format")

We now use [CD-HIT](https://github.com/weizhongli/cdhit/) to cluster the UniProtKB/Swiss-Prot FASTA file at 90% sequence identity, which is a common practice to reduce redundancy in protein datasets.

In [None]:
%%sh
INPUT_FASTA="data/processed/filtered.fasta"
OUTPUT_FASTA="data/processed/nonredundant.fasta"
THREADS=4

echo "Running CD-HIT..."
cd-hit -i "$INPUT_FASTA" \
       -o "$OUTPUT_FASTA" \
       -c 0.90 -n 5 \
       -M 16000 -T $THREADS

if command -v cd-hit &> /dev/null; then
    echo "CD-HIT is installed, proceeding with clustering..."
    cd-hit -i "$INPUT_FASTA" \
       -o "$OUTPUT_FASTA" \
       -c 0.90 -n 5 \
       -M 16000 -T $THREADS
    echo "Clustering completed, nonredundant FASTA at $OUTPUT_FASTA"
else
    echo "CD-HIT is not installed. Please install it to perform clustering."
fi

### Feature engineering

We use the following features for the Random Forest model:

+ **Amino acid composition**: The percentage of each standard amino acid in the protein sequence.
+ **Dipeptide frequencies**: The frequency of each dipeptide (pair of amino acids) in the protein sequence.
+ **GRAVY**: The grand average of hydropathy (GRAVY) of the protein sequence, which is a measure of hydrophobicity.
+ **pI**: The isoelectric point of the protein sequence, which is the pH at which the protein has no net charge.
+ **Sequence length**: The length of the protein sequence.
+ **Original length**: The length of the original protein sequence (before cleaning).

In [None]:
def build_feature_matrix(fasta_path: str, ann_path: str) -> pd.DataFrame:
    """
    Parse a UniProt FASTA and annotation CSV to compute sequence features
    (amino-acid composition, dipeptide frequencies, GRAVY, pI).
    """
    # Load annotation map
    ann = pd.read_csv(ann_path).set_index("entry_name")["localization"].to_dict()

    # Standard amino acids & dipeptides
    standard_aas = list("ACDEFGHIKLMNPQRSTVWY")
    all_dipeps = [a + b for a in standard_aas for b in standard_aas]

    rows = []
    skipped = 0

    for rec in SeqIO.parse(fasta_path, "fasta"):
        # parse from the full description (captures spaces)
        try:
            entry, header_loc = rec.description.split("|", 1)
            header_loc = header_loc.strip()
        except ValueError:
            print(f"Warning: unexpected header format '{rec.description}'")
            continue

        loc = ann.get(entry)
        if loc is None:
            print(f"Warning: no annotation for {entry}")
            continue
        if loc != header_loc:
            print(f"Warning: header says {header_loc}, ann says {loc}")

        # sequence cleaning
        original_seq = str(rec.seq)
        cleaned = "".join([aa for aa in original_seq.upper() if aa in standard_aas])
        if len(cleaned) < 10:
            print(f"Skipping {entry}: cleaned length = {len(cleaned)}")
            skipped += 1
            continue

        # feature dict
        feats = {
            "entry": entry,
            "localization": loc,
            "sequence_length": len(cleaned),
            "original_length": len(original_seq),
        }

        # amino-acid composition + physico-chemical
        try:
            pa = ProteinAnalysis(cleaned)
            comp = { aa: pct/100 for aa, pct in pa.amino_acids_percent.items() }
            feats.update(comp)
            feats["gravy"] = pa.gravy()
            feats["pI"] = pa.isoelectric_point()
        except Exception as e:
            print(f"Warning: feature calc failed for {entry}: {e}")
            for aa in standard_aas:
                feats[aa] = 0.0
            feats.update({"gravy": None, "pI": None})

        # dipeptide freqs (fixed key set)
        dipep = dict.fromkeys(all_dipeps, 0)
        for i in range(len(cleaned) - 1):
            dp = cleaned[i : i + 2]
            dipep[dp] += 1
        total = sum(dipep.values())
        if total > 0:
            for dp in dipep:
                dipep[dp] /= total

        feats.update({f"dp_{dp}": freq for dp, freq in dipep.items()})
        rows.append(feats)

    print(f"Processed {len(rows)} sequences, skipped {skipped}.")
    df = pd.DataFrame(rows).fillna(0)
    return df

In [None]:
df = build_feature_matrix(INPUT_FASTA, INPUT_ANN)
df.to_csv(OUTPUT_FEAT, index=False)
print(f"Wrote features to {OUTPUT_FEAT}")
print(f"Feature matrix shape: {df.shape}")
print(f"Features per sequence: {df.shape[1] - 2}")

For the purposes of this project, we will only stratify the first split to preserve overall class balance in the training set. We will use a plain random split (no stratify) when dividing the held-out data into validation and test sets.

**Why not stratify the held-out data?** Even after removing global singletons, we can still end up with some classes that have only one member in the temporary pool. Therefore, a stratified split there will always fail for any class with fewer than 2 samples.

In [None]:
df = pd.read_csv(OUTPUT_FEAT)
y = df.pop("localization")

X_train, X_tmp, y_train, y_tmp = train_test_split(
    df, y, test_size=0.30, stratify=y, random_state=42
)

X_val, X_test, y_val, y_test = train_test_split(
    X_tmp, y_tmp, test_size=0.50, random_state=42
)

print("Train dist:\n", y_train.value_counts(normalize=True), "\n")
print("Val dist (approx):\n", y_val.value_counts(normalize=True), "\n")
print("Test dist (approx):\n", y_test.value_counts(normalize=True))

In [None]:
X_train.to_csv(OUTPUT_X_TRAIN, index=False)
X_tmp.to_csv(OUTPUT_X_TMP, index=False)
X_val.to_csv(OUTPUT_X_VAL, index=False)
X_test.to_csv(OUTPUT_X_TEST, index=False)

y_train.to_csv(OUTPUT_Y_TRAIN, index=False)
y_tmp.to_csv(OUTPUT_Y_TMP, index=False)
y_val.to_csv(OUTPUT_Y_VAL, index=False)
y_test.to_csv(OUTPUT_Y_TEST, index=False)

print(f"Split data into train/val/test sets:")
print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"X_val: {X_val.shape}, y_val: {y_val.shape}")
print(f"X_test: {X_test.shape}, y_test: {y_test.shape}")
print(f"Data written to {OUTPUT_X_TRAIN}, {OUTPUT_Y_TRAIN}, etc.")