# MODS
Molecular Oncology Database of sdAbs

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Dict, Optional, List, Any
import matplotlib.pyplot as plt
import seaborn as sns  
import math
from collections import Counter, defaultdict
from abnumber import Chain
import json
import os, glob, re
from Bio.PDB import PDBParser
from Bio import SeqIO


# Columns of the final df
columns = ['Source',
           'Nanobody_ID',
           'Species',
           'Antigen',
           'DOI',
           'PBD',
           'Sequence',
           'CDR1',
           'CDR2',
           'CDR3',
           'FR1',
           'FR2',
           'FR3',
           'FR4',
           'Antigen_type',
           'Antigen_affinity_nM']


project_path = '/Users/javi/Library/CloudStorage/GoogleDrive-javiersp4.mail@gmail.com/.shortcut-targets-by-id/1b8C40NjyKMxPdoX0dkX-gW70Vgt8wzHU/VHH_ANALYSIS'

# from url

# ican_path = os.path.join(project_path, 'Data/iCAN/iCAN_db.xls')
ican_path = 'https://static-content.springer.com/esm/art%3A10.1186%2Fs12864-017-4204-6/MediaObjects/12864_2017_4204_MOESM4_ESM.xls'
plabdab_path = 'https://opig.stats.ox.ac.uk/webapps/plabdab-nano/static/downloads/all_sequences.csv.gz'

# downloaded

# folder paths
sabdab_path = os.path.join(project_path, 'Data/SAbDab-nano/all_nano_structures')
nbthermo_path = os.path.join(project_path, 'Data/NbThermo/main.c8f2e3ee8beed8c7b505.js')
sdAbDB_alpaca_path = os.path.join(project_path, 'Data/sdAb-DB/ALPACA')
sdAbDB_llama_path = os.path.join(project_path, 'Data/sdAb-DB/LLAMA')
sdAbDB_dromed_path = os.path.join(project_path, 'Data/sdAb-DB/DROMEDARIO')
sdAbDB_bactrian_path = os.path.join(project_path, 'Data/sdAb-DB/BACTRIAN CAMEL')
sdAbDB_unspecified_path = os.path.join(project_path,'Data/sdAb-DB/unspecified species')


# file paths
indi_abg_m_path = os.path.join(project_path, 'Data/INDI/abgenbank_meta.tsv')
indi_abg_s_path = os.path.join(project_path, 'Data/INDI/abgenbank_sequence.tsv')
indi_manual_m_path = os.path.join(project_path, 'Data/INDI/manual_meta.tsv')
indi_manual_s_path = os.path.join(project_path, 'Data/INDI/manual_sequence.tsv')
indi_structural_m_path = os.path.join(project_path, 'Data/INDI/structure_meta.tsv')
indi_structural_s_path = os.path.join(project_path, 'Data/INDI/structure_sequence.tsv')


###  Extract FR sequences ###

def extract_fr_sequences(row):
    sequence = row['Sequence']
    cdr1 = row['CDR1']
    cdr2 = row['CDR2']
    cdr3 = row['CDR3']

    fr1 = ""
    fr2 = ""
    fr3 = ""
    fr4 = ""

    # Find positions of CDRs
    cdr1_start = sequence.find(cdr1)
    cdr1_end = cdr1_start + len(cdr1) if cdr1_start != -1 else -1

    cdr2_start = sequence.find(cdr2, cdr1_end if cdr1_end != -1 else 0)
    cdr2_end = cdr2_start + len(cdr2) if cdr2_start != -1 else -1

    cdr3_start = sequence.find(cdr3, cdr2_end if cdr2_end != -1 else 0)
    cdr3_end = cdr3_start + len(cdr3) if cdr3_start != -1 else -1

    # Extract FR sequences based on CDR positions
    if cdr1_start != -1:
        fr1 = sequence[:cdr1_start]
        if cdr2_start != -1:
            fr2 = sequence[cdr1_end:cdr2_start]
            if cdr3_start != -1:
                fr3 = sequence[cdr2_end:cdr3_start]
                fr4 = sequence[cdr3_end:]
            else:
                fr3 = sequence[cdr2_end:]
        else:
            if cdr3_start != -1:
                fr2 = sequence[cdr1_end:cdr3_start]
                fr4 = sequence[cdr3_end:]
            else:
                fr2 = sequence[cdr1_end:]
    else: # No CDR1 found, assume FR1 is empty
         if cdr2_start != -1:
            fr2 = sequence[:cdr2_start]
            if cdr3_start != -1:
                fr3 = sequence[cdr2_end:cdr3_start]
                fr4 = sequence[cdr3_end:]
            else:
                fr3 = sequence[cdr2_end:]
         else: # No CDR1 or CDR2 found
            if cdr3_start != -1:
                fr3 = sequence[:cdr3_start]
                fr4 = sequence[cdr3_end:]
            else: # No CDRs found
                fr4 = sequence


    return fr1, fr2, fr3, fr4

### Splits sequences ###

def _looks_like_heavy_chain(seq: str) -> bool:
    
    _AA = set("ACDEFGHIKLMNPQRSTVWY")
    _VH_START = re.compile(r'^(QVQL|EVQL|QVEL|QVHL|QVVQ|QVQLV)', re.IGNORECASE)
    
    if not isinstance(seq, str): 
        return False
    s = seq.strip().upper()
    if not (80 <= len(s) <= 170):    
        return False
    if any(ch not in _AA for ch in s):
        return False
    return bool(_VH_START.match(s)) or True  

def _split_one(seq: str):

    cols = ['FR1','CDR1','FR2','CDR2','FR3','CDR3','FR4']
    if not isinstance(seq, str) or not seq.strip():
        return pd.Series([None]*7, index=cols)
    try:
        ch = Chain(seq.strip().upper(), scheme='imgt')
        return pd.Series([
            str(ch.fr1_seq) if ch.fr1_seq else None,
            str(ch.cdr1_seq) if ch.cdr1_seq else None,
            str(ch.fr2_seq) if ch.fr2_seq else None,
            str(ch.cdr2_seq) if ch.cdr2_seq else None,
            str(ch.fr3_seq) if ch.fr3_seq else None,
            str(ch.cdr3_seq) if ch.cdr3_seq else None,
            str(ch.fr4_seq) if ch.fr4_seq else None,
        ], index=cols)
    except Exception as e:
        return pd.Series([None]*7, index=cols)

def fill_fr_cdr_abnumber(df: pd.DataFrame, seq_col: str = 'Sequence', only_heavy_like: bool = True) -> pd.DataFrame:

    out = df.copy()

    for c in ['FR1','FR2','FR3','FR4','CDR1','CDR2','CDR3']:
        if c not in out.columns:
            out[c] = None

    if only_heavy_like:
        mask = out[seq_col].apply(_looks_like_heavy_chain)
        splits = out.loc[mask, seq_col].apply(_split_one)
        out.loc[mask, ['FR1','CDR1','FR2','CDR2','FR3','CDR3','FR4']] = splits.values
    else:
        splits = out[seq_col].apply(_split_one)
        for c in ['FR1','CDR1','FR2','CDR2','FR3','CDR3','FR4']:
            out[c] = splits[c]
    return out

# Data Integration

## PLabDab-nano

In [None]:
plabdab_raw = pd.read_csv(plabdab_path)

print(plabdab_raw.info())
plabdab_raw.head(2)

In [None]:
plabdab = plabdab_raw.copy()
plabdab.drop(columns=['source'], inplace=True)
plabdab['Source'] = 'PLabDab-nano'

# Extract CDR sequences
plabdab['cdr_sequences'] = plabdab['cdr_sequences'].apply(lambda x: json.loads(x.replace("'", '"')))
plabdab['CDR1'] = plabdab['cdr_sequences'].apply(lambda x: x.get('CDRH1', ''))
plabdab['CDR2'] = plabdab['cdr_sequences'].apply(lambda x: x.get('CDRH2', ''))
plabdab['CDR3'] = plabdab['cdr_sequences'].apply(lambda x: x.get('CDRH3', ''))

# Cleaning columns
plabdab.rename(columns={'sequence': 'Sequence',
                        'ID': 'Nanobody_ID',
                        'organism': 'Species',
                        'targets_mentioned': 'Antigen'}, inplace=True)

plabdab[['FR1', 'FR2', 'FR3', 'FR4']] = plabdab.apply(extract_fr_sequences, axis=1, result_type='expand')


plabdab['DOI']=None
plabdab['PBD']=None
plabdab['Antigen_type']=None
plabdab['Antigen_affinity_nM']=None

# Select and reorder columns
plabdab = plabdab[columns]
plabdab.head(2)

In [None]:
plabdab.info()
plabdab.to_csv('plabdab_df.csv')

## SAbDab-nano

In [None]:
NB_KEYWORDS = (
    "NANOBODY","VHH","SINGLE-DOMAIN","SINGLE DOMAIN","CAMELID",
    "HEAVY CHAIN ANTIBODY","VH","SINGLE DOMAIN ANTIBODY"
)

def read_header(path):
    parser = PDBParser(QUIET=True) 
    structure_id = os.path.splitext(os.path.basename(path))[0]
    structure = parser.get_structure(structure_id, path)
    header = parser.get_header() 
    doi = None
    try:
        with open(path, "r", encoding="utf-8", errors="ignore") as fh:
            text = fh.read()
        m = re.search(r"10\.\d{4,9}/\S+\w", text)
        if m:
            doi = m.group(0)
    except Exception:
        pass
    return header, doi

def seqres_sequences(path):

    chain_to_seq = {}
    for rec in SeqIO.parse(path, "pdb-seqres"):
        chain = None
        if ":" in rec.id:
            chain = rec.id.split(":")[-1]
        elif "_" in rec.id:
            chain = rec.id.split("_")[-1]
        if chain:
            chain_to_seq[chain] = str(rec.seq).upper()
    return chain_to_seq

def normalize_compnd_source(header):

    chain2mol, chain2org = {}, {}
    comp = header.get("compound") or {}
    src  = header.get("source")   or {}

    mol_to_chains = defaultdict(list)
    for mol_id, d in comp.items():
        molname = (d.get("molecule") or "").strip()
        chains = [c.strip() for c in str(d.get("chain") or "").split(",") if c.strip()]
        for c in chains:
            chain2mol[c] = molname
            mol_to_chains[mol_id].append(c)

    for mol_id, d in src.items():
        org = (d.get("organism_scientific") or d.get("organism_common") or "").strip()
        chains = mol_to_chains.get(mol_id, [])
        for c in chains:
            chain2org[c] = org

    return chain2mol, chain2org

def detect_nanobody_chains(chain2mol, chain2seq, title):
    title_u = (title or "").upper()
    nb = [c for c, mol in chain2mol.items() if any(k in (mol or "").upper() for k in NB_KEYWORDS)]
    if nb:
        return sorted(set(nb))
    if any(k in title_u for k in ("NANOBODY","VHH","ANTIBODY")):
        for c, seq in chain2seq.items():
            if 95 <= len(seq) <= 140:
                nb.append(c)
    return sorted(set(nb))

def infer_antigen(chain2mol, chain2seq, nb_chains):
    non_nb = [c for c in chain2seq if c not in nb_chains]
    if not non_nb:
        return None, None
    by_mol = defaultdict(list)
    for c in non_nb:
        molname = (chain2mol.get(c) or "Unknown protein").strip()
        by_mol[molname].append(c)
    def total_len(chs): return sum(len(chain2seq.get(ch,"")) for ch in chs)
    antigen_mol, chains = max(by_mol.items(), key=lambda kv: total_len(kv[1]))
    antigen_type = "protein" if total_len(chains) > 0 else None
    return antigen_mol, antigen_type

def process_pdb_simple(path):
    header, doi = read_header(path)
    pdb_id = os.path.splitext(os.path.basename(path))[0].upper()
    title  = header.get("name") or ""
    chain2mol, chain2org = normalize_compnd_source(header)
    chain2seq = seqres_sequences(path)

    nb_chains = detect_nanobody_chains(chain2mol, chain2seq, title)
    antigen_name, antigen_type = infer_antigen(chain2mol, chain2seq, nb_chains)

    rows = []
    for c in nb_chains:
        rows.append({
            'Source': 'SAbDab-nano',
            'Nanobody_ID': f"{pdb_id}_{c}",
            'Species': chain2org.get(c) or None,
            'Antigen': antigen_name,
            'DOI': doi,
            'PBD': pdb_id,
            'Sequence': chain2seq.get(c),
            'CDR1': None, 'CDR2': None, 'CDR3': None,
            'FR1': None, 'FR2': None, 'FR3': None, 'FR4': None,
            'Antigen_type': antigen_type,
            'Antigen_affinity_nM': None
        })
    return pd.DataFrame(rows, columns=columns)

# Process folder

all_rows = []
pdb_files = sorted(glob.glob(os.path.join(sabdab_path, "*.pdb")))
print(f"Encontrados {len(pdb_files)} PDBs en {sabdab_path}")

for p in pdb_files:
    try:
        df_one = process_pdb_simple(p)
        if not df_one.empty:
            all_rows.append(df_one)
    except Exception as e:
        print(f"[WARN] {os.path.basename(p)} -> {e}")

sabdab_df = pd.concat(all_rows, ignore_index=True) if all_rows else pd.DataFrame(columns=columns)
print("Nanobody-chains:", len(sabdab_df))
sabdab_df.head()

## ICAN

In [None]:
ican_raw = pd.read_excel(ican_path)
ican_raw.head(2)

In [None]:
# Formatting

ican = ican_raw.copy()
ican.rename(columns={'Name': 'Nanobody_ID',
                     'Antigen ': 'Antigen',
                     'Antigen source': 'Antigen_type',
                     'Source Organism': 'Species',
                     'Protein seq': 'Sequence',
                     'PDB ID': 'PBD',
                     'CDR1 seq': 'CDR1',
                     'CDR2 seq': 'CDR2',
                     'CDR3 seq': 'CDR3',
                     }, inplace=True)

ican['DOI']=None
ican['Antigen_affinity_nM']=None
ican['FR1']=None
ican['FR2']=None
ican['FR3']=None
ican['FR4']=None

ican['Source'] = 'ICAN'

ican[['FR1', 'FR2', 'FR3', 'FR4']] = ican.apply(extract_fr_sequences, axis=1, result_type='expand')

ican = ican[columns]
print(ican.info())
ican.head()

## INDI

### Abgen

In [None]:
# AbGenbank sequence
abgen_s = pd.read_csv(indi_abg_s_path, sep='\t')
abgen_s.head(2)

In [None]:
# AbGenbank metadata
abgen_m = pd.read_csv(indi_abg_m_path, sep='\t')
abgen_m.head(2)

In [None]:
def _parse_antigen_from_description(text: str | None) -> str | None:
    """
    Try to extract an antigen token from a description string.
    Heuristics:
      1) 'anti-XXXX'  -> return XXXX
      2) 'against XXXX' -> return XXXX
    Allowed token chars: letters, digits, hyphen, underscore, dot.
    """
    if not isinstance(text, str) or not text.strip():
        return None
    s = text.strip()

    # Pattern 1: 'anti-XXXX'
    m = re.search(r'anti-([A-Za-z0-9][A-Za-z0-9\-\._]*)', s, flags=re.IGNORECASE)
    if m:
        token = m.group(1).strip('.,;:()[]{} ')
        return token or None

    # Pattern 2: 'against XXXX'
    m = re.search(r'\bagainst\s+([A-Za-z0-9][A-Za-z0-9\-\._]*)', s, flags=re.IGNORECASE)
    if m:
        token = m.group(1).strip('.,;:()[]{} ')
        return token or None

    return None

def df_creation(sequence, metadata, antigen_type_rule: str = "none"):

    # Validate required columns
    required_meta_cols = {'id', 'organism', 'description', 'nucleotide sequence'}
    required_seq_cols  = {'sequence', 'CDR1', 'CDR2', 'CDR3', 'Comma-separated db-specific IDS'}
    missing_meta = required_meta_cols - set(metadata.columns)
    missing_seq  = required_seq_cols  - set(sequence.columns)
    if missing_meta:
        raise ValueError(f"Missing required metadata columns: {missing_meta}")
    if missing_seq:
        raise ValueError(f"Missing required sequence columns: {missing_seq}")

    # Normalize IDs in meta
    metadata = metadata.copy()
    metadata['id'] = metadata['id'].astype(str).str.strip()

    # Normalize and explode accessions in the sequence table
    sequence = sequence.copy()
    sequence['Comma-separated db-specific IDS'] = sequence['Comma-separated db-specific IDS'].fillna('').astype(str)
    sequence['accession_list'] = sequence['Comma-separated db-specific IDS'].apply(
        lambda s: [tok.strip() for tok in s.split(',') if tok.strip()]
    )
    seqs_exploded = sequence.explode('accession_list', ignore_index=True).rename(
        columns={'accession_list': 'accession'}
    )

    # Merge m:1 (many sequence rows -> one meta row)
    merged = seqs_exploded.merge(
        metadata,
        left_on='accession',
        right_on='id',
        how='left',
        validate='m:1'
    )

    # Map merged rows to the final schema
    rows = []
    for _, r in merged.iterrows():
        nb_id = r.get('id') or r.get('accession') or None
        species = r.get('organism') or None
        antigen = _parse_antigen_from_description(r.get('description'))

        doi = None
        pbd = None
        sequence = r.get('sequence') or None

        cdr1 = r.get('CDR1') or None
        cdr2 = r.get('CDR2') or None
        cdr3 = r.get('CDR3') or None

        fr1 = fr2 = fr3 = fr4 = None
        if antigen_type_rule == "protein_if_antigen":
            antigen_type = "protein" if antigen else None
        else:
            antigen_type = None

        antigen_affinity = None

        rows.append({
            'Nanobody_ID': nb_id,
            'Species': species,
            'Antigen': antigen,
            'DOI': doi,
            'PBD': pbd,
            'Sequence': sequence,
            'CDR1': cdr1,
            'CDR2': cdr2,
            'CDR3': cdr3,
            'FR1': fr1,
            'FR2': fr2,
            'FR3': fr3,
            'FR4': fr4,
            'Antigen_type': antigen_type,
            'Antigen_affinity_nM': antigen_affinity
        })

    final_df = pd.DataFrame(rows, columns=columns)
    final_df = final_df.drop_duplicates().reset_index(drop=True)
    return final_df

# df creation and formatting
abgen_df = df_creation(abgen_s, abgen_m)

abgen_df['Source'] = 'INDI - AbGenbank'
abgen_df[['FR1', 'FR2', 'FR3', 'FR4']] = abgen_df.apply(extract_fr_sequences, axis=1, result_type='expand')

abgen_df.head()

### Structural

In [None]:
meta = pd.read_csv(indi_structural_m_path, sep="\t")
seq  = pd.read_csv(indi_structural_s_path,  sep="\t")

fixed_cols = {"sequence","CDR1","CDR2","CDR3"}
present_fixed = [c for c in fixed_cols if c in seq.columns]
id_cols = [c for c in seq.columns if c not in present_fixed]

seq_long = seq.melt(
    id_vars=present_fixed,
    value_vars=id_cols,
    var_name="id_col",
    value_name="raw_id"
).dropna(subset=["raw_id"])

seq_long["raw_id"] = seq_long["raw_id"].astype(str)
seq_long["raw_id"] = seq_long["raw_id"].str.split(r"[,\s;]+")

seq_long = seq_long.explode("raw_id")

seq_long["Nanobody_ID"] = seq_long["raw_id"].str.strip()

# Merge
meta_min = meta.rename(columns={"id": "Nanobody_ID"})
merged = seq_long.merge(meta_min, on="Nanobody_ID", how="left")

structural_df = pd.DataFrame({
    "Source": "INDI - Structural",
    "Nanobody_ID": merged["Nanobody_ID"],
    "Species": merged.get("species"),
    "Antigen": merged.get("antigen"),
    "DOI": merged.get("doi"),
    "PBD": merged.get("pdb"),
    "Sequence": merged.get("sequence"),
    "CDR1": merged.get("CDR1"),
    "CDR2": merged.get("CDR2"),
    "CDR3": merged.get("CDR3"),
    "FR1": None,
    "FR2": None,
    "FR3": None,
    "FR4": None,
    "Antigen_type": merged.get("antigen_type"),
    "Antigen_affinity_nM": merged.get("antigen_affinity_nM"),
})[columns]

print(structural_df.shape)
structural_df.head()

### Manual

In [None]:
def build_manual_df(meta_path: str, seq_path: str) -> pd.DataFrame:
    meta = pd.read_csv(meta_path, sep="\t") 
    seq  = pd.read_csv(seq_path,  sep="\t")  

    ids_col = "Comma-separated db-specific IDS"
    seq = seq.rename(columns={ids_col: "ids"})
    seq["ids"] = seq["ids"].astype(str).fillna("")
    seq["ids"] = seq["ids"].str.split(",")
    seq = seq.explode("ids", ignore_index=True)
    seq["ids"] = seq["ids"].astype(str).str.strip()

    seq = seq[seq["ids"] != ""].copy()

    meta_min = meta.rename(columns={"id": "Nanobody_ID"})
    seq_min  = seq.rename(columns={
        "ids": "Nanobody_ID",
        "sequence": "Sequence"
    })
    
    meta_min = meta.rename(columns={"id": "Nanobody_ID"})[["Nanobody_ID"]].drop_duplicates()

    merged = pd.merge(seq_min, meta_min[["Nanobody_ID"]], on="Nanobody_ID", how="left")

    out = pd.DataFrame({
        "Source":              "INDI - Manual",
        "Nanobody_ID":         merged["Nanobody_ID"],
        "Species":             None,
        "Antigen":             None,
        "DOI":                 None,
        "PBD":                 None,
        "Sequence":            merged["Sequence"],
        "CDR1":                merged.get("CDR1"),
        "CDR2":                merged.get("CDR2"),
        "CDR3":                merged.get("CDR3"),
        "FR1":                 None,
        "FR2":                 None,
        "FR3":                 None,
        "FR4":                 None,
        "Antigen_type":        None,
        "Antigen_affinity_nM": None
    })

    out = out.reindex(columns=columns)
    return out

manual_df = build_manual_df(indi_manual_m_path, indi_manual_s_path)
print(manual_df.shape)
manual_df.head()

## sdAb-DB

In [None]:
from Bio import SeqIO

_SEP = re.compile(r"\s*\|\s*")

def fasta_to_df(path: str) -> pd.DataFrame:
    rows = []
    for rec in SeqIO.parse(path, "fasta-pearson"):
        header = rec.description.strip()
        parts = _SEP.split(header, maxsplit=2)
        parts += [""] * (3 - len(parts))
        id_, species, antigen = parts[:3]
        rows.append({
            "Nanobody_ID": id_,
            "Species": species.strip() or None,
            "Antigen": antigen.strip() or None,
            "Sequence": str(rec.seq).upper() or None
        })
    return pd.DataFrame(rows, columns=["Nanobody_ID","Species","Antigen","Sequence"])

def sdab_folder_to_df(folder: str) -> pd.DataFrame:
    files = sorted(Path(folder).glob("*.txt"))
    frames = [fasta_to_df(str(p)) for p in files]
    return (pd.concat(frames, ignore_index=True)
            if frames else pd.DataFrame(columns=["Nanobody_ID","Species","Antigen","Sequence"]))
    

sdab_llama = sdab_folder_to_df(sdAbDB_llama_path)
sdab_alpaca = sdab_folder_to_df(sdAbDB_alpaca_path)
sdab_arabcamel = sdab_folder_to_df(sdAbDB_dromed_path)
sdab_bactriant = sdab_folder_to_df(sdAbDB_bactrian_path)
sdab_unspecified = sdab_folder_to_df(sdAbDB_unspecified_path)

# concat all the files
sdab_all = pd.concat([sdab_llama, sdab_alpaca, sdab_arabcamel, sdab_bactriant, sdab_unspecified], ignore_index=True)

sdab_all['Source'] = 'sdAb-DB'
sdab_all['DOI'] = None
sdab_all['PBD'] = None
sdab_all['CDR1'] = None
sdab_all['CDR2'] = None
sdab_all['CDR3'] = None
sdab_all['FR1'] = None
sdab_all['FR2'] = None
sdab_all['FR3'] = None
sdab_all['FR4'] = None
sdab_all['Antigen_type'] = None
sdab_all['Antigen_affinity_nM'] = None

sdab_all = sdab_all[columns]
print(sdab_all.info())
sdab_all.head()

## NbThermo

In [None]:
def _js_string_unescape(s: str) -> str:
    """
    Decode common JavaScript string escapes so that the content becomes valid JSON.
    Handles sequences like \n, \t, \", \\, \/, plus hex/unicode escapes.
    """
    # Normalize escaped slashes used in JSON-in-JS
    s = s.replace(r'\/', '/')
    # Decode using python's unicode_escape
    return bytes(s, 'utf-8').decode('unicode_escape')

def _find_json_string_in_js(js_text: str) -> str:
    """Locate the embedded JSON string inside main.js and return its raw content."""
    # Pattern A: JSON.parse(' ... ')
    m = re.search(r'JSON\.parse\(\s*([\'"])(.*?)\1\s*\)', js_text, flags=re.DOTALL)
    if m:
        return m.group(2)
    # Pattern B: var data='[...]';
    m = re.search(r'\b(?:var|let|const)\s+\w+\s*=\s*([\'"])(\s*\[\s*\{.*?\}\s*\]\s*)\1', js_text, flags=re.DOTALL)
    if m:
        return m.group(2)
    # Pattern C: fallback = longest quoted array
    candidates = list(re.finditer(r'([\'"])(\s*\[\s*\{.*?\}\s*\]\s*)\1', js_text, flags=re.DOTALL))
    if candidates:
        best = max(candidates, key=lambda mm: len(mm.group(2)))
        return best.group(2)
    raise ValueError("Could not locate embedded JSON string in main.js")

def _clean(s: Optional[str]) -> Optional[str]:
    return s.strip() if isinstance(s, str) and s.strip() else None

def _strip_hyphens(s: Optional[str]) -> Optional[str]:
    return s.replace('-', '') if isinstance(s, str) else None

def _pick_first_antigen(binding: Dict[str, Any]):
    if not isinstance(binding, dict):
        return (None, None, None)
    for k in sorted([k for k in binding.keys() if k.lower().startswith("antigen")]):
        item = binding.get(k) or {}
        name = _clean(item.get('name'))
        typ  = _clean(item.get('type'))
        aff  = item.get('affinity')
        return (name, typ, aff if isinstance(aff, (int, float)) else None)
    return (None, None, None)

def _map_record(rec: Dict[str, Any]) -> Dict[str, Any]:
    info    = rec.get('Info') or {}
    origin  = rec.get('Origin') or {}
    binding = rec.get('Binding') or {}
    seq     = rec.get('Sequence') or {}
    struct  = rec.get('Structure') or {}

    doi = _clean((info.get('Reference') or {}).get('DOI'))
    species = _clean((origin.get('Source') or {}).get('value'))
    antigen_name, antigen_type, antigen_aff = _pick_first_antigen(binding)

    seq_raw = _clean((seq.get('Raw') or {}).get('value'))
    cdr1 = _strip_hyphens(_clean((seq.get('CDR1') or {}).get('value')))
    cdr2 = _strip_hyphens(_clean((seq.get('CDR2') or {}).get('value')))
    cdr3 = _strip_hyphens(_clean((seq.get('CDR3') or {}).get('value')))
    fr1  = _strip_hyphens(_clean((seq.get('Framework1') or {}).get('value')))
    fr2  = _strip_hyphens(_clean((seq.get('Framework2') or {}).get('value'))) or \
           _strip_hyphens(_clean((seq.get('Framework2') or {}).get('Values')))
    fr3  = _strip_hyphens(_clean((seq.get('Framework3') or {}).get('value')))
    fr4  = _strip_hyphens(_clean((seq.get('Framework4') or {}).get('value')))
    pdb_val = _clean((struct.get('PDB') or {}).get('value'))

    return {
        'Nanobody_ID': _clean(rec.get('id')),
        'Species': species,
        'Antigen': antigen_name,
        'DOI': doi,
        'PBD': pdb_val,
        'Sequence': seq_raw,
        'CDR1': cdr1,
        'CDR2': cdr2,
        'CDR3': cdr3,
        'FR1': fr1,
        'FR2': fr2,
        'FR3': fr3,
        'FR4': fr4,
        'Antigen_type': antigen_type,
        'Antigen_affinity_nM': antigen_aff
    }

def nbthermo_from_main_js(js_path: str) -> pd.DataFrame:
    with open(js_path, 'r', encoding='utf-8', errors='ignore') as f:
        js_text = f.read()
    raw_js_string = _find_json_string_in_js(js_text)
    decoded = _js_string_unescape(raw_js_string)
    data = json.loads(decoded)  # now it's valid JSON
    rows = [_map_record(rec) for rec in data]
    df = pd.DataFrame(rows, columns=columns)
    return df

filepath = os.path.join(project_path, 'Data/NbThermo/main.c8f2e3ee8beed8c7b505.js')
nb_thermo = nbthermo_from_main_js(filepath)

nb_thermo['Source'] = 'NbThermo'
nb_thermo = nb_thermo[columns]
nb_thermo.head(3)

# Merge dfs

In [None]:
# Save dfs

plabdab.to_csv('plabdab_df.csv')
sabdab_df.to_csv('sabdab_df.csv')
ican.to_csv('ican_df.csv')
abgen_df.to_csv('INDI_abgen_df.csv')
structural_df.to_csv('INDI_structural_df.csv')
manual_df.to_csv('INDI_manual_df.csv')
sdab_all.to_csv('sdAb-DB.csv')
nb_thermo.to_csv('nb_thermo_df.csv')

# load saved dfs

saved_ican = pd.read_csv('ican_df.csv', index_col=0)
saved_abgen = pd.read_csv('INDI_abgen_df.csv', index_col=0)
saved_structural = pd.read_csv('INDI_structural_df.csv', index_col=0)
saved_manual = pd.read_csv('INDI_manual_df.csv', index_col=0)
saved_plabdab = pd.read_csv('plabdab_df.csv', index_col=0)
saved_sabdab = pd.read_csv('sabdab_df.csv', index_col=0)
saved_sdabdb = pd.read_csv('sdAb-DB_df.csv', index_col=0)
saved_thermo = pd.read_csv('nb_thermo_df.csv', index_col=0)

dfs = [saved_ican, saved_abgen, saved_structural, saved_manual, saved_plabdab, saved_sabdab, saved_sdabdb, saved_thermo]

# merge dfs

merged_df_databases = pd.concat(dfs, ignore_index=True)

# save
merged_df_databases.to_csv('nanobodies_merged.csv')
merged_df_databases.to_excel('nanobodies_merged.xlsx')

print(merged_df_databases.info())
merged_df_databases.head()

# Data cleaning

In [None]:
merged_df = pd.read_csv('nanobodies_merged.csv', index_col=0)
print(merged_df.info())
merged_df.head()

## Remove empty species

In [None]:
merged_df = merged_df[merged_df['Species'].notnull()]
merged_df.info()

## Select species

In [None]:
merged_df['Species'] = (
    merged_df['Species']
    .astype(str).str.strip().str.lower()
    .replace({'nan': None, '': None})
)

In [None]:
species_to_keep = ['camelus dromedarius', 'camelus bactrianus', 
       'lama glama', 'lama glama(lama pacos)', 'vicugna pacos', 'human cytomegalovirus, lama glama',
       'lama pacos', 'llama glama', 'llama', 'lama', "lama glama, escherichia coli (strain k12)"
       'lama glama', 'vicugna pacos huacaya', 'alpaca', 'arabian camel',
       'bactrian camel']

df_species_clean = merged_df[merged_df['Species'].isin(species_to_keep)]
print(df_species_clean['Species'].unique())
print(df_species_clean.info())

## Consolidate categories

In [None]:
species_category_map = {
    'vicugna pacos huacaya': 'Vicugna pacos',
    'alpaca': 'Vicugna pacos',
    'vicugna pacos': 'Vicugna pacos',
    'lama glama': 'Lama glama',
    'lama glama(lama pacos)': 'Lama glama',
    'lama pacos': 'Lama glama',
    'llama glama': 'Lama glama',
    'llama': 'Lama glama',
    'lama': 'Lama glama',
    'lama glama, escherichia coli (strain k12)': 'Lama glama',
    'human cytomegalovirus, lama glama': 'Lama glama',
    'bactrian camel': 'Bactrian camel',
    'camelus dromedarius': 'Camelus dromedarius',
    'camelus bactrianus': 'Camelus dromedarius',
    'arabian camel': 'Camelus dromedarius',
}

df_sp_con = df_species_clean.copy()
df_sp_con['Species_Category'] = df_sp_con['Species'].map(species_category_map)

df_sp_con.drop(columns=['Species'], inplace=True)
df_sp_con.rename(columns={'Species_Category': 'Species'}, inplace=True)

df_sp_con = df_sp_con[columns]
print(df_sp_con.info())
df_sp_con['Species'].value_counts()


## Remove empty sequences


In [None]:
def explore_sequences(df):
    
    # no sequence
    noseq_ = len(df[df['Sequence'].isnull()])
    # no sequence but with CDRs
    nosequ_withcdr_ = len(df[(df['Sequence'].isnull()) & (df['CDR1'].notnull())])
    # sequence and no CDRs
    wseq_nocdr_ = len(df[(df['Sequence'].notnull()) & (df['CDR1'].isnull())])
    # no sequence but with CDRs and FRs
    noseq_wcdr_wfr_ = len(df[(df['Sequence'].isnull()) & (df['CDR1'].notnull()) & (df['FR1'].notnull())])
    # sequence and CDRs and FRs
    wseq_wcdr_wfr_ = len(df[(df['Sequence'].notnull()) & (df['CDR1'].notnull()) & (df['FR1'].notnull())])
    # no sequence or no CDRs
    noseq_or_nocdr_ = len(df[(df['Sequence'].isnull()) | (df['CDR1'].isnull())])
    # no sequence and no CDRs
    noseq_and_nocdr_ = len(df[(df['Sequence'].isnull()) & (df['CDR2'].isnull())])
    print(f'no sequence: {noseq_}')
    print(f'no sequence but with CDRs: {nosequ_withcdr_}')
    print(f'sequence and no CDRs: {wseq_nocdr_}')
    print(f'no sequence but with CDRs and FRs: {noseq_wcdr_wfr_}')
    print(f'sequence and CDRs and FRs: {wseq_wcdr_wfr_}')
    print(f'no sequence or no CDRs: {noseq_or_nocdr_}')
    print(f'no sequence and no CDRs: {noseq_and_nocdr_}')
    
    return noseq_, nosequ_withcdr_, wseq_nocdr_, noseq_wcdr_wfr_, noseq_or_nocdr_, noseq_and_nocdr_ 

# Explore sequences
noseq, nosequ_withcdr, wseq_nocdr, noseq_wcdr_wfr, noseq_or_nocdr, noseq_and_nocdr = explore_sequences(df_sp_con)

In [None]:
# Remove Empty Sequences and cdrs frs
df_rm_no_seq_no_cdr = df_sp_con.dropna(subset=['Sequence', 'CDR1'], how='all')
print(f'Before removal: {len(df_species_clean)}')
print(f'After removal: {len(df_rm_no_seq_no_cdr)}')
df_rm_no_seq_no_cdr.info()

## Fill in empty CDRs

In [None]:
# Fill
filled_cdrs = fill_fr_cdr_abnumber(df_rm_no_seq_no_cdr, seq_col='Sequence', only_heavy_like=True)
wseq_nocdr_filled = len(filled_cdrs[(filled_cdrs['Sequence'].notnull()) & (filled_cdrs['CDR1'].isnull())])
print(f'missing CDRs before: {wseq_nocdr}')
print(f'missing CDRs after: {wseq_nocdr_filled}')

# Check filled CDRs
noseq_, nosequ_withcdr_, wseq_nocdr_, noseq_wcdr_wfr_, noseq_or_nocdr_, noseq_and_nocdr_ = explore_sequences(filled_cdrs)

## Reconstruct Sequence from CDRs and FRs

In [None]:
recon_seq = filled_cdrs.copy()

# Identify rows where Sequence is missing but CDRs and FRs are present
condition = (recon_seq['Sequence'].isnull()) & \
            (recon_seq['CDR1'].notnull()) & (recon_seq['CDR2'].notnull()) & (recon_seq['CDR3'].notnull()) & \
            (recon_seq['FR1'].notnull()) & (recon_seq['FR2'].notnull()) & (recon_seq['FR3'].notnull()) & (recon_seq['FR4'].notnull())


print(f'no seq but cdr and fr: {len(recon_seq[condition])}')

# Reconstruct Sequence for these rows
recon_seq.loc[condition, 'Sequence'] = recon_seq.loc[condition].apply(
    lambda row: row['FR1'] + row['CDR1'] + row['FR2'] + row['CDR2'] + row['FR3'] + row['CDR3'] + row['FR4'],
    axis=1
)

In [None]:
# Verify the reconstruction
print("Number of rows with empty Sequence and non-empty CDRs/FRs before reconstruction:")
print(len(recon_seq[(recon_seq['Sequence'].isnull()) & (recon_seq['CDR1'].notnull()) & (recon_seq['FR1'].notnull())]))

print("\nNumber of rows with empty Sequence after reconstruction:")
print(len(recon_seq[recon_seq['Sequence'].isnull()]))
noseq_, nosequ_withcdr_, wseq_nocdr_, noseq_wcdr_wfr_, noseq_or_nocdr_, noseq_and_nocdr_ = explore_sequences(recon_seq)
print(recon_seq.info())

In [None]:
# remove unsuccessful splits
df_rm_no_cdr = recon_seq.dropna(subset=['CDR1'], how='all')
print(f'Before removal: {len(recon_seq)}')
print(f'After removal: {len(df_rm_no_cdr)}')
df_rm_no_cdr.info()

## Drop duplicates in Sequence

In [None]:
# Count and remove duplicates based on 'Sequence' column
df_rm_duplicates = df_rm_no_cdr.copy()
initial_rows = len(df_rm_duplicates)
df_rm_duplicates.drop_duplicates(subset=['Sequence'], inplace=True)
rows_after_dropping = len(df_rm_duplicates)
duplicates_removed = initial_rows - rows_after_dropping

print(f"Initial number of rows: {initial_rows}")
print(f"Number of rows after dropping duplicates (based on Sequence): {rows_after_dropping}")
print(f"Number of duplicate sequences removed: {duplicates_removed}")

df_rm_duplicates.info()

## Check for sorted CDRs

In [None]:
def seq_is_sorted(s: str) -> bool:
    if not isinstance(s, str) or not s.strip():
        return False
    return list(s) == sorted(s)

df_rm_duplicates = df_rm_duplicates.copy()

df_rm_duplicates['Any_CDR_sorted'] = df_rm_duplicates.apply(
    lambda r: all(seq_is_sorted(r[c]) for c in ['CDR1','CDR2','CDR3']),
    axis=1
)

# Filter out the rows that meet the condition
sorted_rows = df_rm_duplicates[df_rm_duplicates['Any_CDR_sorted']]

print("Rows with any sorted CDR:", len(sorted_rows))
sorted_rows['Source'].value_counts()
sorted_rows.head()
df_rm_duplicates = df_rm_duplicates[~df_rm_duplicates['Any_CDR_sorted']]
df_rm_duplicates.info()

## Has not antigen or DOI

In [None]:
df_rm_duplicates = df_rm_duplicates.dropna(subset=['Antigen', 'DOI'], how='all')
df_rm_duplicates.info()

## Eliminate extra text

In [None]:
def drop_predict_rows(df):
    """
    Drop rows where any column contains the string '[predict]'.
    """
    mask = df.applymap(lambda x: isinstance(x, str) and "(predict)" in x)
    return df[~mask.any(axis=1)].copy()

df_processed = drop_predict_rows(df_rm_duplicates)

print("Before:", len(df_rm_duplicates))
print("After:", len(df_processed))

## Save df

In [None]:
df_processed.to_excel('nanobodies_merged_clean.xlsx', index=False)
df_processed.to_csv('nanobodies_merged_clean.csv', index=False)

# Graphs and stats

In [None]:
df_cleaned = pd.read_csv('nanobodies_merged_clean.csv')
df_cleaned.head(2)
df_cleaned.info()

In [None]:
species_counts = df_cleaned['Species'].value_counts()

plt.figure(figsize=(10, 6))
species_counts.plot(kind='bar')
plt.title('Number of nanobodies per Species', fontsize=22, pad=12)
plt.xlabel('Species Category', fontsize=18, labelpad=8)
plt.ylabel('Number of nanobodies', fontsize=18, labelpad=8)
plt.xticks(rotation=45, ha='right', fontsize=12)
plt.tight_layout()
plt.show()

### Descriptive statistics for the sequences

In [None]:
SEQ_COLS = ['Sequence','CDR1','CDR2','CDR3','FR1','FR2','FR3','FR4']

def length_describe(df: pd.DataFrame, by_species: bool = False):
    """
    Compute descriptive stats of sequence lengths (Sequence, CDRs, FRs).
    Does not add new columns to df.
    """
    def _len(x):
        return len(str(x)) if isinstance(x, str) else None

    if by_species:
        return (
            df.groupby("Species")[SEQ_COLS]
              .agg(lambda col: col.dropna().map(_len).describe())
        )
    else:
        return df[SEQ_COLS].agg(lambda col: col.dropna().map(_len).describe())

global_stats = length_describe(df_cleaned, by_species=False)

print("=== Global stats ===")
global_stats

In [None]:
def _len_or_none(x):
    return len(x) if isinstance(x, str) else None

def length_stats_by_species(df: pd.DataFrame, cols=SEQ_COLS, combine=False):
    """
    Compute per-Species descriptive stats of lengths for the given sequence columns.
    Does NOT add any column to df.

    Returns:
      - if combine=False: dict {col_name: stats_df}
      - if combine=True:  single DataFrame with a column MultiIndex (col_name, stat)
    """
    results = {}
    for col in cols:
        tmp = df[['Species', col]].dropna(subset=[col]).copy()
        tmp['len'] = tmp[col].map(_len_or_none)
        # drop None lengths
        tmp = tmp.dropna(subset=['len'])
        # group and describe
        stats = tmp.groupby('Species')['len'].describe()  # count, mean, std, min, 25%, 50%, 75%, max
        results[col] = stats

    if not combine:
        return results

    # Combine into a single DF with a MultiIndex on columns: (col_name, stat)
    combined = pd.concat(results, axis=1)
    # Optional: sort species and columns for readability
    combined = combined.sort_index(axis=0).sort_index(axis=1, level=0)
    return combined

stats_dict = length_stats_by_species(df_cleaned, combine=True)
stats_dict.head()
stats = length_stats_by_species(df_cleaned, combine=False)
stats['FR2'].describe()

In [None]:
def plot_kde_by_species(df, col='CDR3'):
    """
    Smoothed density curves (KDE) of sequence lengths by species.
    """
    tmp = df[['Species', col]].dropna(subset=[col]).copy()
    tmp['len'] = tmp[col].map(lambda x: len(x) if isinstance(x, str) else None)
    tmp = tmp.dropna(subset=['len'])

    plt.figure(figsize=(8, 5))
    palette = plt.get_cmap("tab10")

    for i, sp in enumerate(tmp['Species'].unique()):
        vals = tmp.loc[tmp['Species'] == sp, 'len']
        sns.kdeplot(vals, label=str(sp), color=palette(i))

    plt.xlabel(f"{col} length (aa)")
    plt.ylabel("Density")
    plt.title(f"Smoothed distribution of {col} length by Species")
    plt.legend()
    plt.tight_layout()
    plt.show()
    
plot_kde_by_species(df_cleaned, col='FR2')

In [None]:
def plot_box_by_species(df, col='CDR3'):
    """
    Boxplot of sequence lengths by species.
    """
    tmp = df[['Species', col]].dropna(subset=[col]).copy()
    tmp['len'] = tmp[col].map(lambda x: len(x) if isinstance(x, str) else None)
    tmp = tmp.dropna(subset=['len'])

    plt.figure(figsize=(8, 5))
    palette = plt.get_cmap("tab10")
    species_list = tmp['Species'].unique()

    data = [tmp.loc[tmp['Species'] == sp, 'len'].values for sp in species_list]
    box = plt.boxplot(data,
                      labels=[str(sp) for sp in species_list],
                      patch_artist=True,
                      showfliers=False)

    for patch, color in zip(box['boxes'], [palette(i) for i in range(len(species_list))]):
        patch.set_facecolor(color)

    plt.ylabel(f"{col} length (aa)")
    plt.title(f"Boxplot of {col} length by Species")
    plt.xticks(rotation=20, ha='right')
    plt.tight_layout()
    plt.show()
    
plot_box_by_species(df_cleaned, col='CDR3')

### Create expanded dfs

In [None]:
def add_length_columns(df, seq_cols=SEQ_COLS):
    df = df.copy()
    for col in seq_cols:
        df[f"{col}_len"] = df[col].apply(lambda x: len(x) if isinstance(x, str) else None)
    return df

df_with_len = add_length_columns(df_cleaned)
# df_with_len.to_csv('df_with_len.csv', index=False)
df_with_len.head()

### Check for outliers

In [None]:
def remove_outliers_multi(df, k=4):

    df = df.copy()
    mask = pd.Series(True, index=df.index)

    len_cols = [c for c in df.columns if c.endswith("_len")]

    for col in len_cols:
        series = df[col].dropna()
        if series.empty:
            continue
        q1, q3 = series.quantile([0.25, 0.75])
        iqr = q3 - q1
        lower, upper = q1 - k * iqr, q3 + k * iqr
        mask &= df[col].between(lower, upper) | df[col].isna()

    return df[mask]

df_no_outliers = remove_outliers_multi(df_with_len)

print("Before:", len(df_with_len))
print("After:", len(df_no_outliers))

In [None]:
SEQ_COLS = ['Sequence_len','CDR1_len','CDR2_len','CDR3_len','FR1_len','FR2_len','FR3_len','FR4_len']
for i in SEQ_COLS:
    plt.hist(df_with_len[i],bins=50)
    plt.title({str(i)})
    plt.show()
df_no_outliers.to_csv('df_no_outliers.csv', index=False)

### Conservación de secuencias

In [None]:
AA_VALID = set(list("ACDEFGHIKLMNPQRSTVWY"))

def _clean_seq(s: Optional[str]) -> Optional[str]:
    if not isinstance(s, str) or not s.strip():
        return None
    s = s.strip().upper().replace(" ", "")
    s = "".join(ch if ch in AA_VALID else "-" for ch in s)
    return s if s else None

def _align_by_padding(seqs: List[str]) -> np.ndarray:
    """Pad right with '-' to the max length and return (N, L) char array."""
    if not seqs:
        return np.empty((0, 0), dtype="<U1")
    L = max(len(s) for s in seqs)
    arr = np.full((len(seqs), L), "-", dtype="<U1")
    for i, s in enumerate(seqs):
        arr[i, :len(s)] = list(s)
    return arr

def _column_stats(col: np.ndarray) -> Dict[str, object]:
    """Stats for a single alignment column (array of single-char strings)."""
    vals = [a for a in col if a != "-"]
    n = len(vals)
    if n == 0:
        return {"consensus": "-", "cons_freq": 0.0, "entropy": 0.0, "counts": {}}
    counts = Counter(vals)
    aa_cons, cmax = max(counts.items(), key=lambda kv: kv[1])
    pmax = cmax / n
    # Shannon entropy (bits) over non-gap residues
    H = -sum((c/n) * math.log2(c/n) for c in counts.values())
    return {"consensus": aa_cons, "cons_freq": pmax, "entropy": H, "counts": dict(counts), "n_eff": n}

def conservation_for_region(df: pd.DataFrame, fr_col: str) -> pd.DataFrame:
    """
    Compute per-position conservation for one FR column (e.g., 'FR1').
    Returns a DataFrame with:
      pos, consensus, cons_freq (0-1), entropy (bits), n_eff, and counts (dict)
    """
    seqs = [ _clean_seq(s) for s in df[fr_col].dropna().tolist() ]
    seqs = [s for s in seqs if s and set(s) - {"-"}]  # keep non-empty
    if not seqs:
        return pd.DataFrame(columns=["pos","consensus","cons_freq","entropy","n_eff","counts"])
    aln = _align_by_padding(seqs)  # (N, L)
    stats = [ _column_stats(aln[:, j]) for j in range(aln.shape[1]) ]
    out = pd.DataFrame(stats)
    out.insert(0, "pos", np.arange(1, len(out)+1))  # 1-based
    return out

def conservation_all_FRs(df: pd.DataFrame, fr_cols: List[str] = ["FR1","FR2","FR3","FR4"]) -> Dict[str, pd.DataFrame]:
    """Compute conservation for each FR; returns dict {FR: stats_df}."""
    return { col: conservation_for_region(df, col) for col in fr_cols }

def conservation_by_species(df: pd.DataFrame,
                            species: str,
                            fr_cols: List[str] = ["FR1","FR2","FR3","FR4"]) -> Dict[str, pd.DataFrame]:
    """Same as above, but restricted to a single Species value."""
    sub = df[df["Species"] == species]
    return conservation_all_FRs(sub, fr_cols=fr_cols)

def consensus_strings(stats_dict: Dict[str, pd.DataFrame]) -> Dict[str, str]:
    """Build consensus sequence string per FR from stats tables."""
    cons = {}
    for fr, s in stats_dict.items():
        if s.empty:
            cons[fr] = None
        else:
            cons[fr] = "".join(s["consensus"].astype(str).tolist())
    return cons

fr_stats = conservation_all_FRs(df_no_outliers)

fr3 = fr_stats["FR3"]
print(fr3.sort_values(["cons_freq", "entropy"], ascending=[False, True]).head(10))

print(consensus_strings(fr_stats))
llama_stats = conservation_by_species(df_no_outliers, species="Lama glama")
print(consensus_strings(llama_stats))

In [None]:
def flatten_conservation_dict(conservation_dict: dict, species_name: str) -> pd.DataFrame:

    df = (
        pd.concat(conservation_dict, names=["Region"]) 
          .reset_index(level=0)                       
          .assign(Species=species_name)
    )
    return df

species_list = ["Lama glama", "Vicugna pacos", "Bactrian camel", "Camelus dromedarius"]

all_species_dfs = []
for sp in species_list:
    stats_dict = conservation_by_species(df_no_outliers, species=sp)
    all_species_dfs.append(flatten_conservation_dict(stats_dict, sp))

conservation_all = pd.concat(all_species_dfs, ignore_index=True)
conservation_all.to_csv('conservation_by_species_.csv')
conservation_all.head()

In [None]:
def plot_fr_conservation_heatmap(stats_dict, title="FR conservation heatmap"):
    """
    Plot a heatmap of consensus frequency across all FRs.
    Rows = FR regions (FR1..FR4), columns = positions (aligned per FR).
    """
    frs = ["FR1","FR2","FR3","FR4"]
    data = []
    lengths = []
    for fr in frs:
        if fr in stats_dict and not stats_dict[fr].empty:
            vals = stats_dict[fr]["entropy"].values #cons_freq or entropy
            data.append(vals)
            lengths.append(len(vals))
        else:
            data.append([])
            lengths.append(0)
    max_len = max(lengths)
    
    # pad with NaN so all rows have same length cons_freq
    mat = np.full((len(frs), max_len), np.nan)
    for i, vals in enumerate(data):
        if len(vals) > 0:
            mat[i, :len(vals)] = vals
    
    plt.figure(figsize=(12, 3))
    im = plt.imshow(mat, aspect="auto", cmap="plasma", vmin=0, vmax=1)
    plt.colorbar(im, label="Entropy")
    plt.yticks(range(len(frs)), frs)
    plt.xlabel("Position (aligned per FR)")
    plt.title(title)
    plt.tight_layout()
    plt.show()
    
fr_stats = conservation_all_FRs(df_no_outliers)
plot_fr_conservation_heatmap(fr_stats, "Framework conservation across all species - Entropy")

In [None]:
def plot_fr_conservation_by_species(df, fr_cols=["FR1","FR2","FR3","FR4"]):
    """
    For each species in the DataFrame, compute FR conservation and plot a heatmap.
    """
    species_list = sorted(df["Species"].dropna().unique())

    for sp in species_list:
        sub_stats = conservation_by_species(df, species=sp, fr_cols=fr_cols)
        plot_fr_conservation_heatmap(sub_stats, title=f"FR conservation - Entropy — {sp}")
        
plot_fr_conservation_by_species(df_no_outliers)

### Aminoacid stats

In [None]:
def amino_acid_stats(df, seq_cols=["Sequence"], aa_list=None):
    """
    Compute amino acid statistics for the given sequence columns in a DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with sequence columns (strings of amino acids).
    seq_cols : list of str
        Columns to analyze.
    aa_list : list of str or None
        If provided, restrict analysis to this set of amino acids.
        If None, will use all 20 standard amino acids.

    Returns
    -------
    stats_dict : dict
        {col_name: DataFrame with per-amino-acid counts, frequencies, mean per sequence, etc.}
    """
    if aa_list is None:
        aa_list = list("ACDEFGHIKLMNPQRSTVWY")  # 20 standard amino acids

    stats_dict = {}
    for col in seq_cols:
        seqs = df[col].dropna().astype(str)

        # Global counts across all sequences
        total_counts = Counter()
        for s in seqs:
            total_counts.update(s)

        # Normalize to provided aa_list
        total_counts = {aa: total_counts.get(aa, 0) for aa in aa_list}
        total_residues = sum(total_counts.values())

        # Per-sequence frequencies (useful for averages/variability)
        per_seq = []
        for s in seqs:
            c = Counter(s)
            per_seq.append([c.get(aa, 0) for aa in aa_list])
        per_seq = pd.DataFrame(per_seq, columns=aa_list)

        # Build stats table
        stats = pd.DataFrame({
            "count_total": total_counts,
            "freq_global": {aa: total_counts[aa] / total_residues if total_residues else 0 for aa in aa_list},
            "mean_per_seq": per_seq.mean().to_dict(),
            "std_per_seq": per_seq.std().to_dict(),
            "min_per_seq": per_seq.min().to_dict(),
            "max_per_seq": per_seq.max().to_dict()
        })
        stats_dict[col] = stats

    return stats_dict

aa_stats = amino_acid_stats(df_no_outliers, seq_cols=["Sequence"])

print(aa_stats["Sequence"].loc[["K","C"], ["count_total","freq_global","mean_per_seq"]])
aa_table = aa_stats["Sequence"].sort_values("freq_global", ascending=False)
pd.set_option("display.max_rows", None)  
display(aa_table)

In [None]:
def plot_aa_composition(stats, title="Amino acid composition in sequences"):
    stats = stats.sort_values("freq_global", ascending=False)
    plt.figure(figsize=(10,5))
    plt.bar(stats.index, stats["freq_global"], color="teal")
    plt.ylabel("Frequency")
    plt.xlabel("Amino acid")
    plt.title(title)
    plt.tight_layout()
    plt.show()

plot_aa_composition(aa_stats["Sequence"])

In [None]:
def aa_composition_by_species(df, aa_list=list("ACDEFGHIKLMNPQRSTVWY")):
    """
    Returns a DataFrame: rows=Species, cols=AA, values=freq_global.
    """
    results = []
    for sp, sub in df.groupby("Species"):
        seqs = sub["Sequence"].dropna().astype(str)
        total_counts = Counter()
        for s in seqs:
            total_counts.update(s)
        total_counts = {aa: total_counts.get(aa, 0) for aa in aa_list}
        total_residues = sum(total_counts.values())
        freqs = {aa: (total_counts[aa]/total_residues if total_residues else 0) for aa in aa_list}
        freqs["Species"] = sp
        results.append(freqs)
    return pd.DataFrame(results).set_index("Species")

aa_species = aa_composition_by_species(df_no_outliers)

plt.figure(figsize=(14,6))
sns.heatmap(aa_species, cmap="plasma", annot=False)
plt.title("Amino acid composition per species")
plt.xlabel("Amino acid")
plt.ylabel("Species")
plt.show()

In [None]:
def aa_composition_by_species(df, seq_col="Sequence", aa_list=list("ACDEFGHIKLMNPQRSTVWY")):
    """
    Compute amino acid frequencies per species.
    Returns a dict {species: DataFrame of amino acid stats}.
    """
    results = {}
    for sp, sub in df.groupby("Species"):
        seqs = sub[seq_col].dropna().astype(str)

        total_counts = Counter()        
        for s in seqs:
            total_counts.update(s)

        # Fill missing amino acids with 0
        total_counts = {aa: total_counts.get(aa, 0) for aa in aa_list}
        total_residues = sum(total_counts.values())

        # Per-sequence table
        per_seq = []
        for s in seqs:
            c = Counter(s)
            per_seq.append([c.get(aa, 0) for aa in aa_list])
        per_seq = pd.DataFrame(per_seq, columns=aa_list)

        stats = pd.DataFrame({
            "count_total": total_counts,
            "freq_global": {aa: total_counts[aa] / total_residues if total_residues else 0 for aa in aa_list},
            "mean_per_seq": per_seq.mean().to_dict(),
            "std_per_seq": per_seq.std().to_dict(),
            "min_per_seq": per_seq.min().to_dict(),
            "max_per_seq": per_seq.max().to_dict()
        })

        results[sp] = stats
    return results

aa_species_stats = aa_composition_by_species(df_no_outliers)
aa_species_stats["Bactrian camel"]

In [None]:
def plot_aa_composition_species(stats_dict, species, title=None):
    """
    Plot amino acid frequencies for a given species.
    """
    stats = stats_dict[species].sort_values("freq_global", ascending=False)

    plt.figure(figsize=(10,5))
    plt.bar(stats.index, stats["freq_global"], color="seagreen")
    plt.ylabel("Frequency")
    plt.xlabel("Amino acid")
    if title is None:
        title = f"Amino acid composition — {species}"
    plt.title(title)
    plt.tight_layout()
    plt.show()

plot_aa_composition_species(aa_species_stats, "Lama glama")

In [None]:
# Analyses on K and C

REGIONS = ['CDR1','CDR2','CDR3','FR1','FR2','FR3','FR4']
AAS = ['K','C']  # Lysine & Cysteine

def _counts_per_seq(seq: str, aas=AAS):
    if not isinstance(seq, str):
        return {aa: 0 for aa in aas}
    c = Counter(seq)
    return {aa: c.get(aa, 0) for aa in aas}

def kc_region_tables(df: pd.DataFrame, regions=REGIONS, aas=AAS):
    """
    Returns a DataFrame with per-region stats for K and C:
      - *_count_total  (suma absoluta en la región)
      - *_freq_global  (proporción sobre todos los aa observados en la región)
      - *_mean_per_seq (media por secuencia)
      - *_std_per_seq  (desviación por secuencia)
    """
    rows = []
    for col in regions:
        seqs = df[col].dropna().astype(str)
        if seqs.empty:
            row = {'Region': col}
            for aa in aas:
                row[f'{aa}_count_total'] = 0
                row[f'{aa}_freq_global'] = 0.0
                row[f'{aa}_mean_per_seq'] = 0.0
                row[f'{aa}_std_per_seq'] = 0.0
            rows.append(row)
            continue

        total_len = sum(len(s) for s in seqs)
        totals = Counter()
        for s in seqs:
            totals.update(s)

        per_seq = pd.DataFrame([_counts_per_seq(s, aas=aas) for s in seqs])

        row = {'Region': col}
        for aa in aas:
            aa_total = totals.get(aa, 0)
            row[f'{aa}_count_total'] = aa_total
            row[f'{aa}_freq_global'] = (aa_total / total_len) if total_len else 0.0
            row[f'{aa}_mean_per_seq'] = float(per_seq[aa].mean())
            row[f'{aa}_std_per_seq']  = float(per_seq[aa].std(ddof=1)) if len(per_seq) > 1 else 0.0
        rows.append(row)

    out = pd.DataFrame(rows).set_index('Region')
    return out

kc_table = kc_region_tables(df_no_outliers)
kc_table  

In [None]:
def plot_kc_freq_global(kc_table: pd.DataFrame, title="K and C — global frequency per region"):
    regions = kc_table.index.tolist()
    x = np.arange(len(regions))
    width = 0.35

    K_freq = kc_table['K_freq_global'].values
    C_freq = kc_table['C_freq_global'].values

    plt.figure(figsize=(9,5))
    palette = plt.get_cmap("tab10")
    plt.bar(x - width/2, K_freq, width, label='K (Lysine)', color=palette(0))
    plt.bar(x + width/2, C_freq, width, label='C (Cysteine)', color=palette(2))

    plt.xticks(x, regions, rotation=0)
    plt.ylabel("Global frequency (fraction)")
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()

plot_kc_freq_global(kc_table, title="K and C frequency — CDRs & FRs")

In [None]:
def kc_by_species(df, regions=REGIONS, aas=AAS):
    """
    Compute K and C stats per species and region.
    Returns a dict {species: kc_table}.
    """
    out = {}
    for sp, sub in df.groupby("Species"):
        out[sp] = kc_region_tables(sub, regions=regions, aas=aas)
    return out

kc_species_tables = kc_by_species(df_cleaned)
kc_species_tables["Bactrian camel"]

In [None]:
def plot_kc_species_heatmap(kc_species_tables, value="freq_global", aa="K", title=None):
    """
    Build heatmap for one amino acid (K or C), across species (rows) and regions (cols).
    value: one of ["freq_global", "mean_per_seq", "count_total"]
    """
    frames = []
    for sp, tab in kc_species_tables.items():
        row = {col: tab.loc[col, f"{aa}_{value}"] for col in tab.index}
        row["Species"] = sp
        frames.append(row)
    mat = pd.DataFrame(frames).set_index("Species")

    plt.figure(figsize=(10,6))
    sns.heatmap(mat, annot=True, fmt=".3f", cmap="Blues")
    plt.title(title or f"{aa} — {value} per region and species")
    plt.xlabel("Region")
    plt.ylabel("Species")
    plt.tight_layout()
    plt.show()

plot_kc_species_heatmap(kc_species_tables, value="freq_global", aa="K")
plot_kc_species_heatmap(kc_species_tables, value="freq_global", aa="C")