# Install & Import Dependencies

In [None]:
! pip install rcsb-api
! pip install biopython



In [None]:
! pip install swifter

Collecting swifter
  Downloading swifter-1.4.0.tar.gz (1.2 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.2/1.2 MB[0m [31m39.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m24.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: swifter
  Building wheel for swifter (setup.py) ... [?25l[?25hdone
  Created wheel for swifter: filename=swifter-1.4.0-py3-none-any.whl size=16505 sha256=4479dad92ed34cdc2fe92fdd0cb94d1f7648c34a4fd23924aa38e4e8cffdb7b6
  Stored in directory: /root/.cache/pip/wheels/d9/31/ff/ff51141a088571a9f672449e5aad5ea8bb35ca5d95ba135f30
Successfully built swifter
Installing collected packages: swifter
Successfully installed swifter-1.4.0


In [None]:
from rcsbapi.data import DataQuery as Query
import json
from rcsbapi.search import search_attributes as attrs
import pandas as pd
from Bio import pairwise2
from Bio.Seq import Seq
from Bio.Align import substitution_matrices
import re
from Bio.pairwise2 import format_alignment
import os
import subprocess
import swifter

In [None]:
import os
num_cores = os.cpu_count()
if num_cores is not None:
    print(f"Number of logical CPU cores: {num_cores}")
else:
    print("Could not determine CPU count.")

# RCSB Search Query

In [None]:
q1 = attrs.rcsb_entity_source_organism.scientific_name == "Homo sapiens"
q2 = attrs.exptl.method == "X-RAY DIFFRACTION"

In [None]:
query = q1 & q2

In [None]:
results = query()
output = list()
for rid in results:
    output.append(rid)

In [None]:
len(output)

59477

In [None]:
# output[0:5]

# RCSB Data Query

In [None]:
query = Query(
    input_type="entries",
    input_ids=output,
    return_data_list=[
        "exptl.method",
        "polymer_entities.polymer_entity_instances.rcsb_polymer_entity_instance_container_identifiers.entity_id",
        "polymer_entities.uniprots.rcsb_uniprot_protein.sequence",
        "polymer_entities.entity_poly.pdbx_seq_one_letter_code",
        "polymer_entities.uniprots.rcsb_uniprot_protein.source_organism"
    ]
)
query.exec(progress_bar=True)
response_data = query.get_response()
# response_data

100%|██████████| 199/199 [04:31<00:00,  1.36s/it]


In [None]:
len(response_data['data']['entries'])

59477

# Creating Pandas DF

In [None]:
rcsb_ids = list()
rcsb_entity_ids = list()
uniprot_seqs = list()
pbd_ids = list()

for result in response_data['data']['entries']:
  for entity in result['polymer_entities']:
    if entity['uniprots']:
      for uniprot in entity['uniprots']:
        if uniprot['rcsb_uniprot_protein']['source_organism']['taxonomy_id'] == 9606:
          rcsb_ids.append(result['rcsb_id'])
          rcsb_entity_ids.append(entity['polymer_entity_instances'][0]['rcsb_polymer_entity_instance_container_identifiers']['entity_id'])
          uniprot_seqs.append(uniprot['rcsb_uniprot_protein']['sequence'])
          pbd_ids.append(entity['entity_poly']['pdbx_seq_one_letter_code'])

In [None]:
len(rcsb_ids), len(rcsb_entity_ids), len(uniprot_seqs), len(pbd_ids)

(70732, 70732, 70732, 70732)

In [None]:
df = pd.DataFrame(
    data = {'rcsb_id': rcsb_ids, 'rcsb_entity_ids': rcsb_entity_ids, 'uniprot_seq': uniprot_seqs, 'pbd_id': pbd_ids}
)

In [None]:
df.tail(20)

Unnamed: 0,rcsb_id,rcsb_entity_ids,uniprot_seq,pbd_id
70712,2HI9,1,MQLFLLLCLVLLSPQGASLHRHHPREMKKRVEDLHVGATVAPSSRR...,SRRDFTFDLYRALASAAPSQNIFFSPVSISMSLAMLSLGAGSSTKM...
70713,2HIB,1,MAAPALGLVCGRCPELGLVLLLLLLSLLCGAAGSQEAGTGAGAGSL...,LAHSKMVPIPAGVFTMGTDDPQIKQDGEAPARRVTIDAFYMDAYEV...
70714,2HIJ,1,MYSNVIGTVTSGKRKVYLLSLLLIGFWDCVTCHGSPVDICTAKPRD...,HGSPVDICTAKPRDIPMNPMCIYRSPEKKATEDEGSEQKIPEATNR...
70715,2HIJ,2,MYSNVIGTVTSGKRKVYLLSLLLIGFWDCVTCHGSPVDICTAKPRD...,HGSPVDICTAKPRDIPMNPMCIYRSPEKKATEDEGSEQKIPEATNR...
70716,2HIW,1,MLEICLKLVGCKSKKGLSSSSSCYLEEALQRPVASDFEPQGLSEAA...,GAMDPNYDKWEMERTDITMKHKLGGGQYGEVYEGVWKKYSLTVAVK...
70717,2HIZ,1,MAQALPWLLLWMGAGVLPAHGTQHGIRLPLRSGLGGAPLGLRLPRE...,MASMTGGQQMGRGSMAGVLPAHGTQHGIRLPLRSGLGGAPLGLRLP...
70718,2HJK,1,MRVTAPRTVLLLLWGAVALTETWAGSHSMRYFYTAMSRPGRGEPRF...,GSHSMRYFYTAMSRPGRGEPRFIAVGYVDDTQFVRFDSDAASPRMA...
70719,2HJK,2,MSRSVALAVLALLSLSGLEAIQRTPKIQVYSRHPAENGKSNFLNCY...,IQRTPKIQVYSRHPAENGKSNFLNCYVSGFHPSDIEVDLLKNGERI...
70720,2HJL,1,MLVMAPRTVLLLLSAALALTETWAGSHSMRYFYTSVSRPGRGEPRF...,GSHSMRYFYTAMSRPGRGEPRFIAVGYVDDTQFVRFDSDAASPRMA...
70721,2HJL,2,MSRSVALAVLALLSLSGLEAIQRTPKIQVYSRHPAENGKSNFLNCY...,IQRTPKIQVYSRHPAENGKSNFLNCYVSGFHPSDIEVDLLKNGERI...


In [None]:
df.to_excel("project_data_v3.xlsx")

In [None]:
df.to_csv("project_data_v3.csv")

In [None]:
df.shape

(70732, 4)

In [None]:
df = pd.read_csv('project_data_v3.csv')

# Cleaning Data

In [None]:
def sanitize_sequence_advanced(sequence: str) -> str:
    """
    Cleans a protein sequence by replacing common PTMs and modifications with their
    parent amino acid, and then removes any other invalid characters.

    Args:
        sequence: The input protein sequence string, which may be a float (NaN).

    Returns:
        The sanitized protein sequence string.
    """
    # Handle potential missing data (NaNs are floats)
    if not isinstance(sequence, str):
        return "" # Return an empty string for non-string inputs

    # --- Step 1: Define the dictionary of replacements ---
    # This is the single source of truth. Easy to add to later!
    ptm_replacements = {
        "(MSE)": "M",  # Selenomethionine -> Methionine
        "(SEP)": "S",  # Phosphoserine -> Serine
        "(TPO)": "T",  # Phosphothreonine -> Threonine
        "(PTR)": "Y",  # Phosphotyrosine -> Tyrosine
        "(NEP)": "K",  # N-Epsilon-Phospholysine -> Lysine
        "(MLY)": "K",  # Monomethyllysine -> Lysine
        "(M2L)": "K",  # Dimethyllysine -> Lysine
        "(M3L)": "K",  # Trimethyllysine -> Lysine
        "(ALY)": "K",  # Acetyllysine -> Lysine
        "(HLY)": "K",  # Hydroxylysine -> Lysine
        "(M1G)": "R",  # Monomethylarginine -> Arginine
        "(M2G)": "R",  # Dimethylarginine -> Arginine
        "(CIR)": "R",  # Citrulline -> Arginine
        "(HYP)": "P",  # Hydroxyproline -> Proline
        "(CGU)": "E",  # Gamma-carboxyglutamate -> Glutamate
        "(NH2)": "",   # C-Terminal Amidation -> Remove
        "(ACE)": "",   # N-Acetyl Group -> Remove
    }

    processed_seq = sequence
    for mod_code, standard_aa in ptm_replacements.items():
        processed_seq = processed_seq.replace(mod_code, standard_aa)

    # --- Step 2: General cleanup of any remaining non-standard characters ---
    valid_chars = "ACDEFGHIKLMNPQRSTVWY"
    # Replace any character NOT in the valid set with 'X'
    sanitized_seq = re.sub(f"[^{valid_chars}]", "X", processed_seq.upper())

    return sanitized_seq

In [None]:
# df['pbd_id'] = df['pbd_id'].str.replace("(MSE)", "M")
# df['pbd_id'] = df['pbd_id'].str.replace("(TPO)", "T")
# df['pbd_id'] = df['pbd_id'].str.replace("(SEP)", "S")
# df['pbd_id'] = df['pbd_id'].str.replace("(NH2)", "")
# df['pbd_id'] = df['pbd_id'].str.replace("(PTR)", "Y")
# df['pbd_id'] = df['pbd_id'].str.replace("(M3L)", "K")
# df['pbd_id'] = df['pbd_id'].str.replace("(NEP)", "K")
df['pdb_sequence_sanitized'] = df['pbd_id'].apply(sanitize_sequence_advanced)
df['pdb_sequence_sanitized'] = df['pdb_sequence_sanitized'].str.replace("U", "C")

In [None]:
# df[df['pbd_id'].str.contains("\(")]['pbd_id']

# Labeling Data

In [None]:
# df = pd.read_csv('project_data_v3.csv')

In [None]:
df.head()

Unnamed: 0.1,Unnamed: 0,rcsb_id,rcsb_entity_ids,uniprot_seq,pbd_id
0,0,7FRJ,1,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPCRVAKLPKNKNRNRY...,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPSRVAKLPKNKNRNRY...
1,1,7FRK,1,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPCRVAKLPKNKNRNRY...,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPSRVAKLPKNKNRNRY...
2,2,7FRL,1,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPCRVAKLPKNKNRNRY...,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPSRVAKLPKNKNRNRY...
3,3,7FRM,1,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPCRVAKLPKNKNRNRY...,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPSRVAKLPKNKNRNRY...
4,4,7FRN,1,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPCRVAKLPKNKNRNRY...,MEMEKEFEQIDKSGSWAAIYQDIRHEASDFPSRVAKLPKNKNRNRY...


In [None]:
def create_multi_class_mask(uniprot_sequence: str, pdb_construct_sequence: str) -> list[int] | None:
    """
    Generates a multi-class modification mask by globally aligning a UniProt
    sequence with a PDB construct sequence.

    The mask is the same length as the UniProt sequence. Each position is labeled:
    - 0: Maintained (the residue is the same in both sequences)
    - 1: Deleted (the residue is in UniProt but absent in the PDB construct)
    - 2: Mutated (the residue is present but changed to a different amino acid)

    Args:
        uniprot_sequence: The full-length, canonical protein sequence.
        pdb_construct_sequence: The engineered sequence from the PDB.

    Returns:
        A list of integers (0, 1, or 2) representing the modification mask,
        or None if no alignment can be generated.
    """
    try:
      # print("--- Performing Global Alignment ---")

      # Load a standard substitution matrix
      blosum62 = substitution_matrices.load("BLOSUM62")

      # Perform a global alignment. 'd' means we use the provided matrix for scores,
      # 's' means the gap penalties are the same for opening and extending.
      # A high gap open penalty discourages creating gaps.
      alignments = pairwise2.align.globalds(
          uniprot_sequence,
          pdb_construct_sequence,
          blosum62,
          -10,  # Gap open penalty
          -0.5  # Gap extend penalty
      )

      if not alignments:
          print("Error: Could not generate an alignment.")
          return None

      # The best alignment is the first one in the list
      best_alignment = alignments[0]
      aligned_uniprot, aligned_pdb, score, begin, end = best_alignment

      # print("Alignment successful. Generating mask...")

      # --- Generate the multi-class mask ---
      modification_mask = []

      # The alignment strings (aligned_uniprot, aligned_pdb) are the key.
      # They will have the same length, with '-' characters indicating gaps.
      for uniprot_char, pdb_char in zip(aligned_uniprot, aligned_pdb):
          if uniprot_char == '-':
              # This case means there's an insertion in the PDB sequence (e.g., a tag).
              # It doesn't correspond to a position in the UniProt sequence, so we skip it.
              continue

          if pdb_char == '-':
              # A gap in the PDB sequence means the UniProt residue was deleted.
              modification_mask.append(1) # 1 = Deleted
          elif uniprot_char == pdb_char:
              # The characters match, so the residue was maintained.
              modification_mask.append(0) # 0 = Maintained
          else:
              # The characters are different, so the residue was mutated.
              modification_mask.append(2) # 2 = Mutated

      # Sanity check: The final mask must have the same length as the original UniProt sequence.
      if len(modification_mask) != len(uniprot_sequence):
          print(f"Error: Mask length ({len(modification_mask)}) does not match UniProt sequence length ({len(uniprot_sequence)}).")
          return None

      return modification_mask, best_alignment
    except:
      return None, None

def format_alignment_for_display(alignment):
    """Helper function to print the alignment nicely."""
    uniprot_alg, pdb_alg, score, begin, end = alignment

    # Create the connector line for the alignment
    connector = ""
    for u_char, p_char in zip(uniprot_alg, pdb_alg):
        if u_char == p_char:
            connector += "|"
        elif u_char == '-' or p_char == '-':
            connector += " "
        else:
            connector += "."

    return (
        f"Score: {score}\n\n"
        f"UniProt: {uniprot_alg}\n"
        f"         {connector}\n"
        f"PDB    : {pdb_alg}"
    )

In [None]:
i = 2001
pdb_sequence = df.loc[i]['pbd_id'] #df[df['pbd_id'].str.contains('HHHHHH')].loc[i]['pbd_id']
uniprot_sequence = df.loc[i]['uniprot_seq'] #df[df['pbd_id'].str.contains('HHHHHH')].loc[i]['uniprot_seq']

In [None]:
pdb_sequence

'SLGVQPPNFSWVLPGRLAGLALPRLPAHYQFLLDLGVRHLVSLTERGPPHSDSCPGLTLHRLRIPDFCPPAPDQIDRFVQIVDEANARGEAVGVHCALGFGRTGTMLACYLVKERGLAAGDAIAEIRRLRPGSIETYEQEKAVFQFYQRTK'

In [None]:
uniprot_sequence

'MGVQPPNFSWVLPGRLAGLALPRLPAHYQFLLDLGVRHLVSLTERGPPHSDSCPGLTLHRLRIPDFCPPAPDQIDRFVQIVDEANARGEAVGVHCALGFGRTGTMLACYLVKERGLAAGDAIAEIRRLRPGSIETYEQEKAVFQFYQRTK'

In [None]:
pdb_seq = pdb_sequence
uniprot_seq = uniprot_sequence

result = create_multi_class_mask(uniprot_seq, pdb_seq)

if result:
    mask, alignment = result

    print("\n" + "="*80)
    print("RESULTS")
    print("="*80)

    print("\n--- Visual Alignment ---")
    print(format_alignment_for_display(alignment))

    print(f"\n--- Multi-Class Mask (first 100 values) ---")
    print(mask)

    # --- Statistics ---
    maintained_count = mask.count(0)
    deleted_count = mask.count(1)
    mutated_count = mask.count(2)

    print("\n--- Summary ---")
    print(f"UniProt Sequence Length: {len(uniprot_seq)}")
    print(f"Mask Length:             {len(mask)}")
    print(f"Residues Maintained (0): {maintained_count}")
    print(f"Residues Deleted (1):    {deleted_count}")
    print(f"Residues Mutated (2):    {mutated_count}")

In [None]:
# def pd_align(row):
#     try:
#         return create_multi_class_mask(row['uniprot_seq'], row['pbd_id'])[0]
#     except:
#         print("jere")

df['label_mask'] = df.apply(lambda row: create_multi_class_mask(row['uniprot_seq'], row['pdb_sequence_sanitized'])[0], axis=1)

Exception ignored in tp_clear of: <class 'dict'>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/Bio/pairwise2.py", line 1298, in __call__
    return self.score_dict[(charA, charB)]
           ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/Bio/pairwise2.py", line 1298, in __call__
    return self.score_dict[(charA, charB)]
           ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/Bio/pairwise2.py", line 1298, in __call__
    return self.score_dict[(charA, charB)]
           ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^
  [Previous line repeated 335 more times]
KeyError: ('M', 'U')
Exception ignored in garbage collection:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/Bio/pairwise2.py", line 1292, in __call__
    def __call__(self, charA, charB):

SystemError: <function dictionary_match.__call__ at 0x7e70fe1572e0> returned a result with an exception set
Exception ig

In [None]:
df['label_mask'] = df.apply(lambda row: create_multi_class_mask(row['uniprot_seq'], row['pdb_sequence_sanitized'])[0], axis=1)

In [None]:
df['label_mask'] = df.swifter.apply(lambda row: create_multi_class_mask(row['uniprot_seq'], row['pdb_sequence_sanitized'])[0], axis=1)

Pandas Apply:   0%|          | 0/70732 [00:00<?, ?it/s]

Exception ignored in garbage collection:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/Bio/pairwise2.py", line 1292, in __call__
    def __call__(self, charA, charB):

SystemError: <function dictionary_match.__call__ at 0x7a36766980e0> returned a result with an exception set
Exception ignored in garbage collection:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/Bio/pairwise2.py", line 1292, in __call__
    def __call__(self, charA, charB):

SystemError: <function dictionary_match.__call__ at 0x7a36766980e0> returned a result with an exception set
Exception ignored in tp_clear of: <class 'type'>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/Bio/pairwise2.py", line 1292, in __call__
    def __call__(self, charA, charB):

SystemError: <function dictionary_match.__call__ at 0x7a36766980e0> returned a result with an exception set
