## RSALOR: Ready-to-Use Notebook
[![PyPi Version](https://img.shields.io/pypi/v/rsalor.svg)](https://pypi.org/project/rsalor/) [![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](
https://colab.research.google.com/github/3BioCompBio/RSALOR/blob/main/colab_notebook_RSALOR.ipynb)

<img src="https://raw.githubusercontent.com/3BioCompBio/RSALOR/main/Logo.png" height="250" align="right" style="height:200px;">

Ready-to-Use Notebook to run the RSALOR model:
 - Upload or fetch an MSA file
 - Uploae or fetch a 3D structure file
 - Run predictions on all single-site mutations

The `rsalor` package combines structural data (Relative Solvent Accessibility, RSA) and evolutionary data (Log Odd Ratio, LOR from MSA) to evaluate effects of missense mutations in proteins.
It computes the `RSA*LOR` score for each single-site missense mutation in a target protein by combining multiple computational steps into a fast and user-friendly tool.
Source code in the [RSALOR GitHub](https://github.com/3BioCompBio/RSALOR).

**Please cite**:
- [Matsvei Tsishyn, Pauline Hermans, Fabrizio Pucci, Marianne Rooman (2025). Residue conservation and solvent accessibility are (almost) all you need for predicting mutational effects in proteins. Bioinformatics, btaf322](https://doi.org/10.1093/bioinformatics/btaf322).

- [Pauline Hermans, Matsvei Tsishyn, Martin Schwersensky, Marianne Rooman, Fabrizio Pucci (2024). Exploring evolution to uncover insights into protein mutational stability. Molecular Biology and Evolution, 42(1), msae267](https://doi.org/10.1093/molbev/msae267).

In [None]:
#@title Install dependencies

%pip install rsalor
%pip install requests

In [None]:
#@title Imports and utility functions

# Imports ----------------------------------------------------------------------
import os
import tarfile
import time
from typing import Callable
import requests
from requests import Response
from google.colab import files
from Bio.PDB import PDBParser, PPBuilder, PDBList
from rsalor import MSA
from rsalor.sequence import Sequence, FastaReader, PairwiseAlignment


# Init paths -------------------------------------------------------------------
# Set these paths to None to prevent the user to execute cells in incorrect order
msa_path = None
pdb_path = None
chain = None
output_path = None


# Dependencies (small helper functions) ----------------------------------------
def clip_string(input_str: str, max_len: int=100) -> str:
  """Truncate a string and append '...' if it exceeds max_len."""
  return input_str if len(input_str) <= max_len else input_str[:max_len] + "..."


# Dependencies (fetch functions) -----------------------------------------------
def is_valid_pdb_id(pdb_id :str) -> bool:
  """Returns if 'input' can be a PDB-ID."""
  POSSIBLE_FIRST_CHARACTERS = "123456789"
  POSSIBLE_FOLLOWING_CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  input_upper = pdb_id.upper()
  return len(input_upper) == 4 \
    and input_upper[0] in POSSIBLE_FIRST_CHARACTERS \
    and all([char in POSSIBLE_FOLLOWING_CHARACTERS for char in input_upper[1:]])

def fetch_structure_by_pdb(pdb_id: str) -> str:
  """Fetch a '.pdb' file from the PDB and return its path."""
  pdb_id = pdb_id.lower().strip()
  if not is_valid_pdb_id(pdb_id):
    raise ValueError(f"❌ pdb_id='{pdb_id}' is not a valid PDB Id.")
  pdb_fetcher = PDBList()
  file_path = pdb_fetcher.retrieve_pdb_file(pdb_id, file_format="pdb", pdir="./")
  if file_path is None:
    raise ValueError(f"❌ Fetch pdb_id='{pdb_id}' has failed.")
  if file_path.endswith(".ent"):
    file_path_old = file_path
    file_path = file_path.removesuffix(".ent") + ".pdb"
    os.rename(file_path_old, file_path)
  return file_path

def fetch_structure_by_uniprot(uniprot_id: str) -> str:
  """Fetch a '.pdb' file from the AlphaFoldDB by its UniProt ID and return its path."""

  # Init
  uniprot_id = uniprot_id.upper().strip()
  filename = f"AF-{uniprot_id}-F1-model_v6.pdb"
  url = f"https://alphafold.ebi.ac.uk/files/{filename}"
  print(f" * fetch 3D structure from '{url}'")

  # Request structure
  try:
    r = requests.get(url)
    r.raise_for_status()
  except Exception as err:
    raise ValueError(f"❌ AlphaFoldDB structure fetch failed for UniProt ID '{uniprot_id}' from '{url}': {err}")

  # Save structure and return path
  with open(filename, "wb") as fs:
    fs.write(r.content)
  return filename

def fetch_msa_by_uniprot(uniprot_id: str) -> str:
  """Fetch an MSA file from the AlphaFoldDB by its UniProt ID and return its path."""

  # Init
  uniprot_id = uniprot_id.upper().strip()
  filename = f"AF-{uniprot_id}-F1-msa_v6.a3m"
  url = f"https://alphafold.ebi.ac.uk/files/msa/{filename}"
  print(f" * fetch MSA from '{url}'")

  # Steam and write MSA to file and return path
  try:
    with requests.get(url, stream=True) as resp:
      resp.raise_for_status()
      with open(filename, "wb") as fs:
        for chunk in resp.iter_content(1 << 14): # 16 KB chunks
          if chunk:
            fs.write(chunk)
  except Exception as err:
    raise ValueError(f"❌ AlphaFoldDB MSA fetch failed for UniProt ID '{uniprot_id}': download failed from '{url}': {err}")
  return filename

def fetch_sequence_by_uniprot(uniprot_id: str) -> str:
  """Fetch an FASTA sequence from UniProt by its UniProt ID and return its path."""

  # Init
  uniprot_id = uniprot_id.upper().strip()
  filename = f"{uniprot_id}.fasta"
  url = f"https://rest.uniprot.org/uniprotkb/{filename}"
  print(f" * fetch FASTA sequence from '{url}'")

  # Request fasta
  try:
    r = requests.get(url)
    r.raise_for_status()
  except Exception as err:
    raise ValueError(f"❌ UniProt FASTA sequence fetch failed for UniProt ID '{uniprot_id}' from '{url}': {err}")

  # Save fasta and return path
  with open(filename, "wb") as fs:
    fs.write(r.content)
  return filename

# Dependencies (MMSeqs2 API) ---------------------------------------------------

def repeated_request(request_function: Callable):
  """Decorator to repeat a request 5 times before trowing an error."""

  def wrapper(*args, **kwargs) -> Response:
    MAX_REQUEST_TRIES = 5
    FAIL_SLEEP_TIME = 5.0
    print(f" * Request [{request_function.__name__}] for '{clip_string(args[0])}' ({MAX_REQUEST_TRIES} repeats)")
    for i in range(MAX_REQUEST_TRIES):
      print(f"   - [{request_function.__name__}] repeat [{i+1}/{MAX_REQUEST_TRIES}] ...")
      try:
        res: Response = request_function(*args, **kwargs)
      except Exception as err:
        print(f"   - [{request_function.__name__}] Error on attempt [{i+1}/{MAX_REQUEST_TRIES}]: {err}")
        time.sleep(FAIL_SLEEP_TIME)
      else:
        print(f"   - [{request_function.__name__}] status: {res.status_code}")
        return res
    raise ValueError(f"Too many failed attempts ({MAX_REQUEST_TRIES}) for request '{request_function.__name__}'.")

  return wrapper

def json_request(request_function: Callable):
  """Decorator run a request and parse output as JSON if possible."""

  def wrapper(*args, **kwargs):
    res: Response = request_function(*args, **kwargs)
    try:
      return res.json()
    except Exception as err:
      msg = (
        f"Failed to parse response from '{request_function.__name__}' as JSON.\n"
        f"Original error: {err}\n"
        f"HTTP status: {res.status_code}\n"
        f"Response (truncated): {clip_string(res.text)}"
      )
      raise ValueError(msg) from err

  return wrapper

@json_request
@repeated_request
def submit_mmseqs2(seq: str, mode: str, url: str, endpoint: str, timeout: float, query_name: str="query"):
  query_name = query_name.strip().replace(" ", "_").replace("\t", "_").replace("\n", "_")
  assert len(query_name) > 0, f"ERROR in submit_mmseqs2(): invalid query_name='{query_name}'"
  return requests.post(
    f"{url}/{endpoint}",
    data={"q": f">{query_name}\n{seq}\n", "mode": mode},
    timeout=timeout,
  )

@json_request
@repeated_request
def get_mmseqs2_status(id: str, url: str, timeout: float):
  res = requests.get(
    f"{url}/ticket/{id}",
    timeout=timeout,
  )
  return res

@repeated_request
def download_from_mmseqs2(id: str, url: str, timeout: float) -> Response:
  res = requests.get(
    f"{url}/result/download/{id}",
    timeout=timeout,
  )
  return res

class MMSeqs2API:

  # Constructor
  def __init__(
    self,
    tmp_dir: str,
    n_status_loop: int=100,
    loop_timeout: float = 5.0,
    single_request_timeout: float=6.02,
    use_env: bool = False,
    use_filter: bool = True,
    url: str="https://api.colabfold.com",
    endpoint: str="ticket/msa",
  ):

    # Set properties
    self.tmp_dir = tmp_dir
    self.url = url
    self.endpoint = endpoint
    self.n_status_loop = n_status_loop
    self.loop_timeout = loop_timeout
    self.single_request_timeout = single_request_timeout
    self.use_env = use_env
    self.use_filter = use_filter
    if self.use_filter:
      self.mode = "env" if self.use_env else "all"
    else:
      self.mode = "env-nofilter" if self.use_env else "nofilter"

    # Init tmp directory
    if not os.path.isdir(self.tmp_dir):
      os.mkdir(self.tmp_dir)

    # Init paths
    self.tar_gz_file = os.path.join(self.tmp_dir, "out.tar.gz")
    if not self.use_env:
      self.a3m_files = [os.path.join(self.tmp_dir, "uniref.a3m")]
    else:
      self.a3m_files = [
        os.path.join(self.tmp_dir, "uniref.a3m"),
        os.path.join(self.tmp_dir, "bfd.mgnify30.metaeuk30.smag30.a3m")
      ]

  # Run API
  def run(
    self,
    sequence: str,
    save_path: str,
    query_name: str="query",
  ) -> str:

    # Log
    print(f"Run MSA API to MMSeqs2 server '{self.url}/{self.endpoint}': ")
    print(f"   - sequence: '{clip_string(sequence)}'")
    print(f"   - save_path: '{save_path}'")

    # Submit to MMSeqs2 API
    print("Submit request:")
    out = submit_mmseqs2(sequence, self.mode, self.url, self.endpoint, self.single_request_timeout, query_name=query_name)
    status, id = out["status"], out["id"]
    if status in ["ERROR", "MAINTENANCE"]:
      raise Exception(f"MMseqs2 API is giving errors with API status: '{status}' (please try again later)")

    # Wait for request to be completed
    print("Wait for request to be completed:")
    for i in range(self.n_status_loop):
      if status not in ["UNKNOWN", "RUNNING", "PENDING"]:
        break
      print(f"   - sleep [{i+1}/{self.n_status_loop}] for {self.loop_timeout:.1f} sec. (status='{status}') ...")
      time.sleep(self.loop_timeout)
      out = get_mmseqs2_status(id, self.url, self.single_request_timeout)
      status = out["status"]

    # Status loop finished and request still not complete
    if status in ["UNKNOWN", "RUNNING", "PENDING"]:
      msg = (
        f"MMseqs2 API ERROR: maximum number of sleep loop exceeded.\n"
        f" - {self.n_status_loop} loops (of {self.loop_timeout:.1f} sec. each)\n"
        f" - request ID='{id}'\n"
      )
      raise Exception(msg)

    # Final API status check
    if status != "COMPLETE":
      raise Exception(f"MMseqs2 API is giving errors with API status: '{status}' (please try again later)")

    # Download results
    print("Download MSA:")
    download_output = download_from_mmseqs2(id, self.url, self.single_request_timeout)
    with open(self.tar_gz_file, "wb") as fs:
      fs.write(download_output.content)

    # Extract a3m files from '.tar.gz'
    print(f"Collect MSA data:")
    with tarfile.open(self.tar_gz_file) as tar_gz:
      tar_gz.extractall(self.tmp_dir, filter="data")

    # Collect and return all '.a3m' lines
    a3m_lines: list[str] = []
    for a3m_file in self.a3m_files:
      for line in open(a3m_file, "r"):
        if len(line) == 0: continue
        line = line.replace("\x00", "")
        a3m_lines.append(line)
    msa_str = "".join(a3m_lines)

    # Save MSA to file
    print(f"Save MSA to file:")
    print(f"   - save_path: '{save_path}'")
    with open(save_path, "w") as fs:
      fs.write(msa_str)

    # Return
    return save_path


In [None]:
#@title 3D Structure options (upload file or fetch from PDB or AlphaFold-DB)

# Define upload method ---------------------------------------------------------
upload_method = "upload_local_file" # @param ["upload_local_file","fetch_from_pdb","fetch_from_alphafold_db"]
#@markdown ---------------------------------------------------------------------
retrieval_id = "" # @param {"type":"string","placeholder":"PDB ID like '6m0j' or UniProt ID like 'Q9LW00'"}
#@markdown - if using `fetch_from_pdb`, specify a **PDB ID** (like `6m0j`)
#@markdown - if using `fetch_from_alphafold_db`, specify a **UniProt ID** (like `Q9LW00`)


# Case: Drag-and-drop local file -----------------------------------------------
pdb_path = None
if upload_method == "upload_local_file":

  # Drag-and-drop file picker
  uploaded = files.upload()
  pdb_path: str = list(uploaded.keys())[0]

  # Guardian for correct extension
  if not pdb_path.endswith(".pdb"):
    pdb_path_failed = pdb_path
    pdb_path = None
    raise ValueError(f"❌ Uploaded PDB file '{pdb_path_failed}' should have extention '.pdb'.")
  pdb_name = pdb_path.removesuffix(".pdb")
  print(f" * ✅ PDB file uploaded: '{pdb_path}'")

# Case: fetch file from the PDB ------------------------------------------------
elif upload_method == "fetch_from_pdb":

  # retrieval_id not fill error
  if retrieval_id == "" or retrieval_id is None:
    raise ValueError(f"❌ If upload_method='{upload_method}', please specify a retrieval_id.")

  # Fetch PDB file
  pdb_path: str = fetch_structure_by_pdb(retrieval_id)
  pdb_name = pdb_path.removesuffix(".pdb")
  print(f" * ✅ PDB file fetched: '{pdb_path}'")

# Case: fetch file from the AlphaFold-DB ---------------------------------------
elif upload_method == "fetch_from_alphafold_db":

  # retrieval_id not fill error
  if retrieval_id == "" or retrieval_id is None:
    raise ValueError(f"❌ If upload_method='{upload_method}', please specify a retrieval_id.")

  # Fetch PDB file
  pdb_path: str = fetch_structure_by_uniprot(retrieval_id)
  pdb_name = pdb_path.removesuffix(".pdb")
  print(f" * ✅ AF-DB file fetched: '{pdb_path}'")

# Case: error ------------------------------------------------------------------
else:
  raise ValueError(f"❌ Unknown upload_method='{upload_method}'.")

# Validate and Log -------------------------------------------------------------
# Parse PDB file
pp_builer = PPBuilder()
structure = PDBParser(QUIET=True).get_structure("protein", pdb_path)

# Log uploaded PDB file
sequences_by_chain: dict[str, Sequence] = {}
for chain in structure[0]: # loop only on model 1
  aa_seq_segments: list[str] = pp_builer.build_peptides(chain)
  aa_seq: str = "".join([str(pp.get_sequence()) for pp in aa_seq_segments]) # concatenate fragments if multiple
  print(f"    - chain {chain.id} (L={len(aa_seq)}): '{aa_seq}'")
  sequences_by_chain[chain.id] = Sequence(f"{pdb_name}_{chain.id}", aa_seq)
print(f" * ✅ Choose target chain among {len(sequences_by_chain)} detected chain(s): '{''.join(sequences_by_chain.keys())}'")

# Select chain if there is only one
if len(sequences_by_chain) == 1:
  chain = list(sequences_by_chain.keys())[0]
  print(f" * ✅ Target chain set to: chain='{chain}'")


In [None]:
#@title MSA options (upload file, fetch from AlphaFold-DB or run MMSeqs2 API)

# Define upload method ---------------------------------------------------------
upload_method = "upload_local_file" # @param ["upload_local_file","fetch_from_alphafold_db","run_mmseqs2_api"]
#@markdown ---------------------------------------------------------------------
uniprot_id = "" # @param {"type":"string","placeholder":"UniProt ID like 'Q9LW00'"}
#@markdown - if using `fetch_from_alphafold_db`, specify a **UniProt ID** (like `Q9LW00`)
#@markdown ---------------------------------------------------------------------
query_sequence = "" # @param {"type":"string","placeholder":"protein sequence like 'VSVELPAPSSWKKLFYPNKVGSVKKTEVVFVAPTGEEISNRKQLEQYLKSHPGNPAIAEFDWTTSG'"}
#@markdown - if using `run_mmseqs2_api`, specify a protein sequence
query_name = "query_sequence_1" # @param {"type":"string","placeholder":"set name of the query sequence"}
mmseqs2_use_filtering = True # @param {"type":"boolean"}
mmseqs2_use_environmental_sequences = False # @param {"type":"boolean"}

# Case: Drag-and-drop local file -----------------------------------------------
msa_path = None
if upload_method == "upload_local_file":

  # Drag-and-drop file picker
  uploaded = files.upload()
  msa_path: str = list(uploaded.keys())[0]

  # Guardian for correct extension
  if not any(msa_path.endswith(ext) for ext in MSA.ACCEPTED_EXTENTIONS):
      msa_path_failed = msa_path
      msa_path = None
      raise ValueError(f"❌ Uploaded MSA file '{msa_path_failed}' should have extention among {MSA.ACCEPTED_EXTENTIONS}.")
  print(f" * ✅ MSA file uploaded: '{msa_path}'")

# Case: fetch file from the AlphaFold-DB ---------------------------------------
elif upload_method == "fetch_from_alphafold_db":

  # retrieval_id not fill error
  if uniprot_id == "" or uniprot_id is None:
    raise ValueError(f"❌ If upload_method='{upload_method}', please specify a uniprot_id.")

  # Fetch PDB file
  msa_path: str = fetch_msa_by_uniprot(uniprot_id)
  print(f" * ✅ MSA file fetched from AF-DB: '{msa_path}'")

# Case: run MMSeqs2 API --------------------------------------------------------
elif upload_method == "run_mmseqs2_api":

  # Pre-process query_sequence
  query_sequence = query_sequence.strip().replace(" ", "").upper()
  if query_sequence == "" or query_sequence is None:
    raise ValueError(f"❌ If upload_method='{upload_method}', please specify a query_sequence.")
  if not all([aa in "ACDEFGHIKLMNPQRSTVWY" for aa in query_sequence]):
    raise ValueError(f"❌ query_sequence contains invalid amino acids.")
  query_name = query_name.strip().replace(" ", "_")
  if query_name == "" or query_name is None:
    raise ValueError(f"❌ If upload_method='{upload_method}', please specify a query_name.")

  # Run MMSeqs2 API
  api = MMSeqs2API(
    f"./{query_name}_mmseqs2_out",
    use_env=mmseqs2_use_environmental_sequences,
    use_filter=mmseqs2_use_filtering,
  )
  msa_path: str = api.run(query_sequence, f"./{query_name}.a3m", query_name)
  print(f" * ✅ MSA file obtained from MMSeqs2 API: '{msa_path}'")

# Case: error ------------------------------------------------------------------
else:
  raise ValueError(f"❌ Unknown upload_method='{upload_method}'.")

# Validate and Log -------------------------------------------------------------
taget_sequence = FastaReader.read_first_sequence(msa_path)
print(f" * ✅ taget sequence (L={len(taget_sequence)}): '{taget_sequence.sequence}'")



In [None]:
#@title Select PDB chain
chain = "" # @param {"type":"string","placeholder":"select a single chains like 'A'"}

# Input guardians
if msa_path == "" or msa_path is None:
  raise ValueError(f"❌ ERROR: msa_path is not set: Please first select an MSA.")
if pdb_path == "" or pdb_path is None:
  raise ValueError(f"❌ ERROR: pdb_path is not set: Please first, select a 3D structure.")

# Guardians
if len(chain) != 1:
  chain_failed = chain
  chain = None
  raise ValueError(f"❌ chain='{chain_failed}' should be a string of length 1.")
if chain not in sequences_by_chain:
  chain_failed = chain
  chain = None
  raise ValueError(f"❌ chain='{chain_failed}' not in PDB '{pdb_path}' (among chains '{''.join(sequences_by_chain.keys())}')")

# Show alignments
print(f" * ✅ MSA target sequence aligned to chain '{chain}' in PDB structure.")
align = PairwiseAlignment(taget_sequence, sequences_by_chain[chain])
align.show();


In [None]:
#@title Run RSALOR and Settings

# Output settings
#@markdown ### Output settings
output_name = "" # @param {"type":"string","placeholder":"<msa_name>_rsalor"}
#@markdown - leave empty for auto
sep = "," # @param {"type":"string","placeholder":"CSV separator like ',' or ';'"}
#@markdown - separator in output CSV file

# RSALOR run settings
#@markdown ---------------------------------------------------------------------
#@markdown ### RSALOR settings
theta_regularization = 0.01 # @param {"type":"number","placeholder":"0.01"}
#@markdown - regularization term for LOR/LR at amino acid frequencies level
seqid_weights = 0.80 # @param {"type":"number","placeholder":"0.80"}
#@markdown - seqid threshold to consider two sequences in the same cluster for weighting (leave empty to ignore)
min_seqid = 0.35 # @param {"type":"number","placeholder":"0.35"}
#@markdown - discard sequences which seqid with target sequence is below (leave empty to ignore)
num_threads = 2 # @param {"type":"integer","placeholder":"2"}
#@markdown - number of threads (CPUs) for weights evaluation (in the C++ backend)

# Input guardians
if msa_path == "" or msa_path is None:
  raise ValueError(f"❌ ERROR: msa_path is not set: Please first select an MSA.")
if pdb_path == "" or pdb_path is None:
  raise ValueError(f"❌ ERROR: pdb_path is not set: Please first, select a 3D structure.")
if chain == "" or chain is None:
  raise ValueError(f"❌ ERROR: chain is not set: Please first, select a chain.")

# Run RSALOR
output_path = None
msa = MSA(
    msa_path, pdb_path, chain,
    theta_regularization=theta_regularization,
    seqid_weights=seqid_weights,
    min_seqid=min_seqid,
    num_threads=num_threads,
    verbose=True,
)

# Set output path
if output_name == "" or output_name is None:
  output_name = f"{msa.name}_rsalor"
output_path = f"{output_name}.csv"

# Compute and save scores
rsalor_scores = msa.save_scores(
    output_path,
    sep=sep,
    round_digit=6,
    log_results=False,
)



In [None]:
#@title Download RSALOR output

# Input guardians
if output_path == "" or output_path is None:
  raise ValueError(f"❌ ERROR: output_path is not set: Please first run RSALOR.")

# Download
files.download(output_path)