# Format input data into SMILEs

11th November 2025


This notebook is for reformatting any input file into SMILES strings.

<br><br>


In [1]:
# Setup
from pathlib import Path
from tqdm import tqdm
import sys, csv
import numpy as np
import pandas as pd
import seaborn as sns
print(sys.executable)
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import sqlite3
import os 
from pathlib import Path
from tqdm.notebook import tqdm
from tqdm import tqdm
#from mol_utils import MolecularClassifier
from rdkit import Chem, RDLogger
import pyarrow
import fastparquet
import cirpy


from mol_utils import classify_smiles


/fsx/alex/MolecularTypeClassifier-1/.pixi/envs/default/bin/python


<br>

In [2]:
# Local setup for data imports

scratch_dir = Path("scratch")
scratch_dir.mkdir(parents=True, exist_ok=True)


In [19]:
from __future__ import annotations

import json
import tarfile
from typing import List, Optional

import pandas as pd
from tqdm import tqdm


def _log_info(message: str) -> None:
    """Write an informational message using tqdm."""
    tqdm.write(message)


def _log_warning(message: str) -> None:
    """Write a warning message using tqdm."""
    tqdm.write(f"WARNING: {message}")


# Optional RDKit import for SDF handling
try:
    from rdkit import Chem  # type: ignore
    _HAS_RDKIT = True
except Exception as rdkit_exc:  # pragma: no cover
    _log_warning(f"RDKit not available: {rdkit_exc}. SDF parsing will not work.")
    Chem = None  # type: ignore
    _HAS_RDKIT = False


def read_csv_column(path: Path, column: str) -> pd.DataFrame:
    """Read a CSV and return only the requested column as a DataFrame."""
    _log_info(f"Loading CSV: {path} (column={column})")
    df = pd.read_csv(path)
    if column not in df.columns:
        raise KeyError(f"Column '{column}' not found in {path}. Available: {list(df.columns)}")
    return df[[column]].dropna().reset_index(drop=True)


def read_xls_column(path: Path, column: str) -> pd.DataFrame:
    """Read an Excel file and return only the requested column as a DataFrame."""
    _log_info(f"Loading Excel: {path} (column={column})")
    df = pd.read_excel(path)
    if column not in df.columns:
        raise KeyError(f"Column '{column}' not found in {path}. Available: {list(df.columns)}")
    return df[[column]].dropna().reset_index(drop=True)


def read_dat_delimited_column(path: Path, column: str, sep: Optional[str] = None) -> pd.DataFrame:
    """Read a .dat file and return only the requested column.

    Tries tab, comma, and pipe separators if none is provided.
    """
    _log_info(f"Loading DAT: {path} (column={column})")
    seps = [sep] if sep else ["\t", ",", "|"]
    last_err: Optional[Exception] = None
    for s in seps:
        try:
            df = pd.read_csv(path, sep=s)
            if column in df.columns:
                return df[[column]].dropna().reset_index(drop=True)
            last_err = KeyError(f"Column '{column}' not in columns parsed with sep '{s}'")
        except Exception as e:
            last_err = e
    raise RuntimeError(f"Failed to parse {path} for column '{column}'. Last error: {last_err}")


def read_sdf_smiles(path: Path, smiles_field: str = "canonical_smiles") -> pd.DataFrame:
    """Read an SDF file and extract SMILES from a specified property or from the molecule."""
    if not _HAS_RDKIT:
        raise ImportError("RDKit is required to parse SDF files. Please install rdkit-pypi.")
    _log_info(f"Loading SDF: {path} (field={smiles_field})")
    supplier = Chem.SDMolSupplier(str(path), removeHs=False)  # type: ignore[attr-defined]
    smiles_list: List[str] = []
    for mol in tqdm(supplier, desc=f"Reading {path.name}"):
        if mol is None:
            continue
        value = mol.GetProp(smiles_field) if mol.HasProp(smiles_field) else None
        if not value:
            try:
                value = Chem.MolToSmiles(mol)  # type: ignore[attr-defined]
            except Exception:
                value = None
        if value:
            smiles_list.append(value)
    return pd.DataFrame({"canonical_smiles": smiles_list}).dropna().reset_index(drop=True)


def read_glycan_csv_fourth_column(path: Path) -> pd.DataFrame:
    """Read a CSV and return the 4th column (index 3)."""
    _log_info(f"Loading Glycan CSV: {path} (4th column)")
    df = pd.read_csv(path, header=0)
    if df.shape[1] < 4:
        raise ValueError(f"Expected at least 4 columns in {path}, got {df.shape[1]}")
    col_name = df.columns[3]
    out = df[[col_name]].dropna().reset_index(drop=True)
    out.columns = ["glycan_field"]
    return out

def extract_field(data, field):
    if isinstance(data, dict):
        for k, v in data.items():
            if k == field and v:
                yield v
            else:
                yield from extract_field(v, field)
    elif isinstance(data, list):
        for item in data:
            yield from extract_field(item, field)

def read_rips_from_tar_json(path: Path, json_field: str = "translation") -> pd.DataFrame:
    values = []
    with tarfile.open(path, "r") as tf:
        members = [m for m in tf.getmembers() if m.isfile() and m.name.lower().endswith(".json")]
        for m in tqdm(members, desc=f"Reading {path.name}"):
            f = tf.extractfile(m)
            if f is None:
                continue
            try:
                data = json.load(f)
                for val in extract_field(data, json_field):
                    values.append(str(val))
            except Exception as e:
                _log_warning(f"Failed to parse {m.name} inside {path.name}: {e}")
            finally:
                f.close()
    return pd.DataFrame({json_field: values}).dropna().reset_index(drop=True)


_log_info("Data loader utilities ready.")



Data loader utilities ready.


In [4]:
# Functions to convert different inputs to SMILES


#########################################
#                                       #
#        For canonical peptides         #
#                                       #
#########################################

# pip install rdkit-pypi
from rdkit import Chem, RDLogger

# Silence RDKit warnings
RDLogger.DisableLog('rdApp.*')

def seq_to_smiles(seq: str) -> str | None:
    """
    Convert a DNA/RNA/canonical peptides sequence string to canonical SMILES using RDKit.
    Handles FASTA headers, whitespace, and is case-insensitive.
    """
    if not isinstance(seq, str) or not seq.strip():
        return None
    # Clean input: strip FASTA headers and whitespace
    s = "".join(ln.strip() for ln in seq.splitlines() if not ln.startswith(">"))
    s = s.replace(" ", "").replace("\t", "").upper()
    try:
        mol = Chem.MolFromSequence(s)
        if mol is None:
            return None
        Chem.SanitizeMol(mol)
        # directly call MolToSmiles from rdmolfiles for canonical SMILES
        return Chem.rdmolfiles.MolToSmiles(mol)
    except Exception:
        return None



#########################################
#                                       #
#     For non-canonical peptides        #
#                                       #
#########################################

# pip install rdkit-pypi
from rdkit import Chem, RDLogger
RDLogger.DisableLog('rdApp.*')

# Standard 20 amino acids (one-letter)
_AA20 = set("ACDEFGHIKLMNPQRSTVWY")

# Common noncanonical / ambiguous mappings â†’ standard
# U: selenocysteineâ†’C, O: pyrrolysineâ†’K, B: Asxâ†’N, Z: Glxâ†’Q, J: Leu/Ileâ†’L, X: unknownâ†’G
_NONCANON_MAP = {
    'U': 'C',  # selenocysteine
    'O': 'K',  # pyrrolysine
    'B': 'N',  # Asx (D/N)
    'Z': 'Q',  # Glx (E/Q)
    'J': 'L',  # Leu/Ile
    'X': 'G',  # unknown -> glycine
}

def _strip_fasta_headers(seq: str) -> str:
    return "".join(ln.strip() for ln in str(seq).splitlines() if ln and not ln.startswith(">"))

def noncanonical_peptide_seq_to_smiles(seq: str, mode: str = "lenient") -> str | None:
    """
    Convert a non-canonical peptide sequence to canonical SMILES using RDKit.
    - mode="lenient": map known noncanonicals; drop any remaining unknown letters.
    - mode="strict":  fail (return None) if any unknown letters remain after mapping.
    """
    if not isinstance(seq, str) or not seq.strip():
        return None

    # Clean: remove FASTA headers/whitespace; keep letters only, uppercase
    s = _strip_fasta_headers(seq)
    s = "".join(ch for ch in s if ch.isalpha()).upper()
    if not s:
        return None

    # Normalize: map common noncanonicals/ambiguities
    out_chars = []
    unknown_seen = False
    for ch in s:
        if ch in _AA20:
            out_chars.append(ch)
        elif ch in _NONCANON_MAP:
            out_chars.append(_NONCANON_MAP[ch])
        else:
            # unknown symbol (e.g., modified residues encoded as letters)
            if mode == "strict":
                return None
            unknown_seen = True
            # lenient: drop it
            continue

    norm = "".join(out_chars)
    if not norm:
        return None

    try:
        # RDKit peptide builders
        mol = Chem.MolFromSequence(norm)
        if mol is None:
            mol = Chem.MolFromFASTA(norm)
        if mol is None:
            return None
        Chem.SanitizeMol(mol)
        # Canonical SMILES
        return Chem.rdmolfiles.MolToSmiles(mol)
    except Exception:
        return None






#########################################
#                                       #
#             For glycans               #
#                                       #
#########################################


# pip install requests tqdm
import time
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

try:
    from tqdm.auto import tqdm
except Exception:
    tqdm = None

GLYCOSMOS_BASE = "https://api.glycosmos.org/glycanformatconverter/2.10.4"
CONVERT_PATH = "/wurcs2iupaccondensed"

def _is_wurcs(s: str) -> bool:
    return isinstance(s, str) and s.strip().upper().startswith("WURCS=")

def _session_with_retries(total=5, backoff=0.5) -> requests.Session:
    s = requests.Session()
    retry = Retry(
        total=total,
        backoff_factor=backoff,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=frozenset(["POST"]),
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retry, pool_connections=100, pool_maxsize=100)
    s.mount("http://", adapter)
    s.mount("https://", adapter)
    return s

def _post_batch(wurcs_batch, session: requests.Session, timeout: float = 30.0) -> list[str | None]:
    """
    Send a batch (list) of WURCS strings.
    Returns a list aligned to the input batch, each item is IUPAC string or None.
    """
    url = f"{GLYCOSMOS_BASE}{CONVERT_PATH}"
    for attempt in range(3):  # light manual retries on top of session retries
        try:
            r = session.post(url, json=wurcs_batch, timeout=timeout)
            if r.status_code == 413 and len(wurcs_batch) > 1:
                # Payload too large: split and recurse.
                mid = len(wurcs_batch) // 2
                return (_post_batch(wurcs_batch[:mid], session, timeout) +
                        _post_batch(wurcs_batch[mid:], session, timeout))
            r.raise_for_status()
            data = r.json()
            out = []
            for obj in data if isinstance(data, list) else []:
                out.append(obj.get("IUPACcondensed") if isinstance(obj, dict) else None)
            if len(out) != len(wurcs_batch):
                out = (out + [None] * len(wurcs_batch))[:len(wurcs_batch)]
            return out
        except Exception:
            time.sleep(0.4 * (attempt + 1))
    return [None] * len(wurcs_batch)

def glycans_wurcs_to_cols_fast(
    df: pd.DataFrame,
    colname: str = "glycan_field",
    smiles_col: str = "smiles",
    iupac_col: str = "iupac_condensed",
    chunk_size: int = 200,
    max_workers: int = 4,
    cache: dict[str, str | None] | None = None,
    show_progress: bool = True,           # ðŸ‘ˆ enable/disable progress bar
    progress_desc: str = "Converting WURCSâ†’IUPAC",
) -> pd.DataFrame:
    """
    Faster conversion with batching, optional parallelism, caching, and a tqdm progress bar.
    """
    out = df.copy()
    series = out[colname].astype("string", copy=False)

    # Collect unique, valid WURCS
    unique_valid = [w for w in pd.Series(series.unique()) if _is_wurcs(w)]
    if not unique_valid:
        out[iupac_col] = None
        out[smiles_col] = None
        return out

    # Initialize / reuse cache
    local_cache: dict[str, str | None] = {} if cache is None else cache
    to_query = [w for w in unique_valid if w not in local_cache]
    if not to_query:
        out[iupac_col] = series.map(lambda w: local_cache.get(w) if _is_wurcs(w) else None)
        out[smiles_col] = None
        return out

    batches = [to_query[i:i + chunk_size] for i in range(0, len(to_query), chunk_size)]
    session = _session_with_retries()

    # Prepare progress bar (counts items, not batches)
    pbar = None
    if show_progress and tqdm is not None:
        pbar = tqdm(total=len(to_query), desc=progress_desc, unit="glycan")

    def _record_batch(batch, results):
        for w, iupac in zip(batch, results):
            local_cache[w] = iupac
        if pbar is not None:
            # advance by the number of items processed in this batch
            pbar.update(len(batch))

    if max_workers and max_workers > 1 and len(batches) > 1:
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            future_map = {ex.submit(_post_batch, b, session): b for b in batches}
            for fut in as_completed(future_map):
                b = future_map[fut]
                res = fut.result()
                _record_batch(b, res)
    else:
        for b in batches:
            res = _post_batch(b, session)
            _record_batch(b, res)

    if pbar is not None:
        pbar.close()

    # Map back to rows
    out[iupac_col] = series.map(lambda w: local_cache.get(w) if _is_wurcs(w) else None)
    out[smiles_col] = None
    return out


# pip install requests
import requests
import pandas as pd

GLYCOSMOS_BASE = "https://api.glycosmos.org/glycanformatconverter/2.10.4"

def _is_wurcs(s: str) -> bool:
    return isinstance(s, str) and s.strip().upper().startswith("WURCS=")

def _wurcs_to_iupac(wurcs: str) -> str | None:
    """
    Convert a single WURCS string to IUPAC-condensed using Glycosmos.
    Returns IUPAC string or None on failure.
    """
    if not _is_wurcs(wurcs):
        return None
    try:
        # API accepts a JSON array of WURCS strings; we send one item
        url = f"{GLYCOSMOS_BASE}/wurcs2iupaccondensed"
        r = requests.post(url, json=[wurcs], timeout=60)
        r.raise_for_status()
        data = r.json()
        if isinstance(data, list) and data and isinstance(data[0], dict):
            return data[0].get("IUPACcondensed")
    except Exception:
        return None
    return None

def glycans_wurcs_to_cols(df: pd.DataFrame,
                          colname: str = "glycan_field",
                          smiles_col: str = "smiles",
                          iupac_col: str = "iupac_condensed") -> pd.DataFrame:
    """
    For a DataFrame with WURCS in `colname`:
      - Writes IUPAC-condensed to `iupac_condensed`
      - Sets `smiles` to None (placeholder until you add an IUPACâ†’SMILES tool)
    Keeps rows as-is; no drops.
    """
    out = df.copy()
    # IUPAC from WURCS
    out[iupac_col] = out[colname].apply(_wurcs_to_iupac)
    # No direct SMILES yet without a glycan IUPACâ†’SMILES library
    out[smiles_col] = None
    return out


In [5]:
# Now, import all input files required
# Define paths
SM = Path("/fsx/data/raw/drugbank/DrugBank_SM_drugs.csv")
oligos = Path("/fsx/data/raw/DNA.RNA_seq/random_DNA_RNA_sequences_10000.csv")
can_peptides = Path("/fsx/data/raw/TPDB/main.xlsx")
noncan_peptides = Path("/fsx/data/raw/NCPbook_noncanonicals/NCP.book_Homo_sapiens.dat")
cyclic_pep_lariat = Path("/fsx/data/raw/CycPeptMPDB/CycPeptMPDB_Peptide_Shape_Lariat.csv")
cyclic_pep_circle = Path("/fsx/data/raw/CycPeptMPDB/CycPeptMPDB_Peptide_Shape_Circle.csv")
nat_prod = Path("/fsx/data/raw/supernatural/supernatural3-11-2025.sdf")
glycans = Path("/fsx/data/raw/GlycoShape/GlycoShape_smiles.csv")
RIPs = Path("/fsx/data/raw/mibig/mibig_json_4.0.tar")

# Load datasets into DataFrames
_log_info("Starting dataset imports...")

# Approved drugs (SMs): column 'moldb_smiles'
df_sm = read_csv_column(SM, "moldb_smiles")

# Oligos/Nucleotides: column 'Sequence'
df_oligos = read_csv_column(oligos, "Sequence")
df_oligos["smiles"] = df_oligos["Sequence"].apply(seq_to_smiles)

# Peptides (canonical): column 'Sequence'
df_can_peptides = read_xls_column(can_peptides, "Sequence")
df_can_peptides["smiles"] = df_can_peptides["Sequence"].apply(seq_to_smiles)

# Non-canonical peptides: column 'NCP_sequence'
df_noncan_peptides = read_dat_delimited_column(noncan_peptides, "NCP_sequence")
df_noncan_peptides["smiles"] = df_noncan_peptides["NCP_sequence"].apply(
    lambda s: noncanonical_peptide_seq_to_smiles(s, mode="lenient")
)

# CycPeptMPDB (Macrocycles-lariat): column 'SMILES'
df_cyclic_pep_lariat = read_csv_column(cyclic_pep_lariat, "SMILES")

# CycPeptMPDB (Macrocycles-circular): column 'SMILES'
df_cyclic_pep_circle = read_csv_column(cyclic_pep_circle, "SMILES")

# Natural products: SDF field 'canonical_smiles'
df_nat_prod = read_sdf_smiles(nat_prod, smiles_field="canonical_smiles")

# Glycans: 4th column in GlyTouCan
df_glycans = read_csv_column(glycans, "smiles")

# RIPs: 'translation' field in each JSON inside tar
df_rips = read_rips_from_tar_json(RIPs, json_field="translation")
df_rips["smiles"] = df_rips["translation"].apply(
    lambda s: noncanonical_peptide_seq_to_smiles(s, mode="lenient")
)

# Basic summary
summary = {
    "df_sm": len(df_sm),
    "df_oligos": len(df_oligos),
    "df_can_peptides": len(df_can_peptides),
    "df_noncan_peptides": len(df_noncan_peptides),
    "df_cyclic_pep_lariat": len(df_cyclic_pep_lariat),
    "df_cyclic_pep_circle": len(df_cyclic_pep_circle),
    "df_nat_prod": len(df_nat_prod),
    "df_glycans": len(df_glycans),
    "df_rips": len(df_rips),
}
_log_info(f"Import summary: {summary}")
summary


Starting dataset imports...
Loading CSV: /fsx/data/raw/drugbank/DrugBank_SM_drugs.csv (column=moldb_smiles)
Loading CSV: /fsx/data/raw/DNA.RNA_seq/random_DNA_RNA_sequences_10000.csv (column=Sequence)
Loading Excel: /fsx/data/raw/TPDB/main.xlsx (column=Sequence)
Loading DAT: /fsx/data/raw/NCPbook_noncanonicals/NCP.book_Homo_sapiens.dat (column=NCP_sequence)
Loading CSV: /fsx/data/raw/CycPeptMPDB/CycPeptMPDB_Peptide_Shape_Lariat.csv (column=SMILES)
Loading CSV: /fsx/data/raw/CycPeptMPDB/CycPeptMPDB_Peptide_Shape_Circle.csv (column=SMILES)
Loading SDF: /fsx/data/raw/supernatural/supernatural3-11-2025.sdf (field=canonical_smiles)


Reading supernatural3-11-2025.sdf:   0%|          | 0/121636 [00:00<?, ?it/s]

Loading Glycan CSV: /fsx/data/raw/Glytoucan/glycan.csv (4th column)
Loading RIPs from TAR JSON: /fsx/data/raw/mibig/mibig_json_4.0.tar (field=translation)


Reading mibig_json_4.0.tar:   0%|          | 0/3013 [00:00<?, ?it/s]

Import summary: {'df_sm': 13335, 'df_oligos': 10000, 'df_can_peptides': 58583, 'df_noncan_peptides': 31806, 'df_cyclic_pep_lariat': 2936, 'df_cyclic_pep_circle': 5530, 'df_nat_prod': 121636, 'df_glycans': 253440, 'df_rips': 0}


{'df_sm': 13335,
 'df_oligos': 10000,
 'df_can_peptides': 58583,
 'df_noncan_peptides': 31806,
 'df_cyclic_pep_lariat': 2936,
 'df_cyclic_pep_circle': 5530,
 'df_nat_prod': 121636,
 'df_glycans': 253440,
 'df_rips': 0}

In [41]:
#Save intermediary outputs just in case
#df_oligos.to_csv("Pos.ctrl_oligos.csv", index=False);
#df_can_peptides.to_csv("Pos.ctrl_CanonicalPeptides.csv", index=False);
#df_noncan_peptides.to_csv("Pos.ctrl_NoncanonicalPeptides.csv", index=False);


<br>

In [33]:
#Pivot longer to merge the dataframes 
#Use the file name (without the "df_" prefix as a unique identifier -> with unique identifier to "Class" column & values to "SMILES" column
#Dataframes to be merged: df_sm, df_oligos, df_can_peptides, df_noncan_peptides, df_cyclic_pep_lariat, df_cyclic_pep_circle, df_nat_prod, df_glycans
def pivot_longer_dfs():
    dfs = []

    # Use 1st column for these
    for name, df in {
        "sm": df_sm,
        "cyclic_pep_lariat": df_cyclic_pep_lariat,
        "cyclic_pep_circle": df_cyclic_pep_circle,
        "nat_prod": df_nat_prod,
        "glycans": df_glycans,

    }.items():
        col = df.columns[0]
        dfs.append(pd.DataFrame({
            "Class": name,
            "SMILES": df[col]
        }))

    # Use 2nd column for these
    for name, df in {
        "oligos": df_oligos,
        "can_peptides": df_can_peptides,
        "noncan_peptides": df_noncan_peptides,
        "rips": df_rips,
    }.items():
        col = df.columns[1]
        dfs.append(pd.DataFrame({
            "Class": name,
            "SMILES": df[col]
        }))

    # Combine all into one long table
    merged = pd.concat(dfs, ignore_index=True)

    # Drop missing or empty SMILES if desired
    merged = merged.dropna(subset=["SMILES"])
    merged = merged[merged["SMILES"].astype(str).str.strip() != ""].reset_index(drop=True)

    return merged

# ---- Run the function ----
df_merged = pivot_longer_dfs()


In [17]:

print(df_merged.head(5))
print("\nSummary by Class:")
print(df_merged["Class"].value_counts())


  Class                                             SMILES
0    sm  CC[C@H](C)[C@H](NC(=O)[C@H](CCC(O)=O)NC(=O)[C@...
1    sm  CC(C)C[C@H](NC(=O)[C@@H](COC(C)(C)C)NC(=O)[C@H...
2    sm  CC(C)C[C@@H](NC(=O)CNC(=O)[C@@H](NC=O)C(C)C)C(...
3    sm  NC(=O)CC[C@@H]1NC(=O)[C@H](CC2=CC=CC=C2)NC(=O)...
4    sm  CC(C)C[C@H](NC(=O)[C@@H](CCCNC(N)=O)NC(=O)[C@H...

Summary by Class:
Class
nat_prod             121636
can_peptides          56300
noncan_peptides       31806
sm                    13335
cyclic_pep_circle      5530
oligos                 5087
cyclic_pep_lariat      2936
glycans                2697
Name: count, dtype: int64


In [35]:
df_merged.to_csv("Pos.ctrl_Molecules_merged.csv", index=False);


<br><br><br><br><br><br>