In [17]:
import xml.etree.ElementTree as ET
import gzip
from pathlib import Path
import pandas as pd
import numpy as np

pd.set_option('display.max_rows', 5000)
DEFAULT_MMCIF_NUM = 50000

# Assuming 'sifts_file_path' is already defined
sifts_file_path = Path('./mypath/SIFTS/2aa3.xml.gz')  # Update this to your actual file path

def SIFTS_tree_parser(file):
    tree = ET.parse(file)
    root = tree.getroot()

    namespace = {'sifts': 'http://www.ebi.ac.uk/pdbe/docs/sifts/eFamily.xsd'}
    
    data = []
    for protein in root.findall('.//sifts:entity[@type="protein"]', namespace):
        entityId = protein.attrib["entityId"]
        for residue in protein.findall('.//sifts:residue[@dbSource="PDBe"]', namespace):
            entry = {
                "entityId": entityId,
                "PDBe_resNum": residue.attrib["dbResNum"],
                "PDBe_resName": residue.attrib["dbResName"],
                "PDB_resNum": None,
                "PDB_resName": None,
                "PDB_chainId": None,
                "UniProt_resNum": None,
                "UniProt_resName": None,
                "dbAccessionId": None,
                "UniProt_secondaryId": None  # Added field for secondaryId
            }
    
            crossRefs = residue.findall('.//sifts:crossRefDb', namespace)
            for crossRef in crossRefs:
                db_source = crossRef.attrib["dbSource"]
                if db_source == "PDB":
                    entry["PDB_resNum"] = crossRef.attrib["dbResNum"]
                    entry["PDB_resName"] = crossRef.attrib["dbResName"]
                    entry["PDB_chainId"] = crossRef.attrib["dbChainId"]
                elif db_source == "UniProt":
                    entry["UniProt_resNum"] = crossRef.attrib["dbResNum"]
                    entry["UniProt_resName"] = crossRef.attrib["dbResName"]
                    entry["dbAccessionId"] = crossRef.attrib["dbAccessionId"]
                    
                    # Look for the secondaryId within sibling dbDetail elements
                    dbDetails = protein.find('.//sifts:dbDetail[@dbSource="UniProt"]', namespace)
                    if dbDetails is not None:
                        entry["UniProt_secondaryId"] = dbDetails.text  # Capture the secondaryId
                       
            data.append(entry)
            
    df = pd.DataFrame(data)
    condition_mask = (df['PDB_resNum'].notnull() & df['UniProt_resNum'].notnull() &
                      (df['PDB_resNum'] != df['UniProt_resNum']) & (df['PDB_resNum'] != "null"))

    # Apply the condition mask within each group to identify if any discrepancies exist
    df['chains_to_change'] = condition_mask
    return df



In [66]:
from pathlib import Path
import gzip
import os
from typing import Callable, Optional, Union
import Bio.PDB.MMCIF2Dict
from src.download.downloadwithThreadPool import run_downloads_with_ThreadPool, url_formation_for_pool
from src.download.downloadwithThreadPool import download_file

def download_file_general(file_type: str, file_name: str, input_path: Path) -> bool:
    """Attempts to download a given file to the specified path."""
    url_path = url_formation_for_pool(file_type,[file_name], str(input_path))[0]
    return download_file(url_path)  # Assume this function returns True on success


def try_file_parser(default_path: Path, file_name: str,
                    parser_function: Callable[[Path], Union[Bio.PDB.MMCIF2Dict.MMCIF2Dict, Optional[any]]],
                    file_type: str) -> Union[Bio.PDB.MMCIF2Dict.MMCIF2Dict, Optional[any], None]:
    """General function to attempt loading a file into a parser, redownloading up to 3 times if necessary."""
    input_path = default_path / file_type
    input_path.mkdir(parents=True, exist_ok=True)
    file_path = input_path / file_name
    print(file_path)

    for attempt in range(3):
        try:
            with gzip.open(file_path, 'rt') as file:
                return parser_function(file)
        except (EOFError, ValueError, OSError) as e:
            print(f"Error processing {file_name}: {e}. Attempting to redownload.")
            file_path.unlink(missing_ok=True)  # Remove potentially corrupt file
            if not download_file_general(file_type, file_name, input_path):
                print(f"Failed to download {file_name}.")

    return None



print("Starting PDBrenum...")
DEFAULT_PATH = Path("mypath")
mmCIF_files = "2aa3.cif.gz"
SIFTS_files = "2aa3.xml.gz"

mmcif_dict = try_file_parser(DEFAULT_PATH, mmCIF_files, Bio.PDB.MMCIF2Dict.MMCIF2Dict,
                             file_type="mmCIF_assembly" if "assembly" in mmCIF_files else "mmCIF")

SIFTS_data = try_file_parser(DEFAULT_PATH, SIFTS_files,SIFTS_tree_parser, file_type="SIFTS")

Starting PDBrenum...
mypath/mmCIF/2aa3.cif.gz
mypath/SIFTS/2aa3.xml.gz


In [29]:
def column_formation(mmcif_dict):
    mmcif_dict_keys = mmcif_dict.keys()
    aut_seq_all_splitted = list()
    for key in mmcif_dict_keys:
        key_dot_splitted = key.split(".")
        for tab_name_col_name in key_dot_splitted:
            if "auth_seq" in tab_name_col_name:
                if "auth_seq_id" in key:
                    aut_seq_all_splitted.append(key_dot_splitted[:1] + key_dot_splitted[1].split("auth_seq_id"))
                if "auth_seq_num" in key:
                    aut_seq_all_splitted.append(key_dot_splitted[:1] + key_dot_splitted[1].split("auth_seq_num"))

    totaling_combinations = list()
    for table_name_prefix_suffix in aut_seq_all_splitted:
        combinations = list()
        for key in mmcif_dict_keys:
            if table_name_prefix_suffix[0] == key.split(".")[0]:
                # res_num auth_seq_id or auth_seq_num
                if table_name_prefix_suffix[1] in key and table_name_prefix_suffix[2] in key \
                        and "auth_seq_id" in key or "auth_seq_num" in key:
                    combinations.append(key)
                # chain auth_asym_id or strand_id
                if "assembly" in mmcif_dict["data_"]:
                    if table_name_prefix_suffix[1] in key and table_name_prefix_suffix[2] in key \
                            and "orig_auth_asym_id" in key:
                        combinations.append(key)
                else:
                    if table_name_prefix_suffix[1] in key and table_name_prefix_suffix[2] in key \
                            and "auth_asym_id" in key or "strand_id" in key:
                        combinations.append(key)
                # ins_code
                if table_name_prefix_suffix[1] in key and table_name_prefix_suffix[2] in key \
                        and "ins_code" in key:
                    combinations.append(key)
                # monomer_type or auth_comp_id or auth_mon_id or mon_id for _struct_ref_seq_dif
                if table_name_prefix_suffix[1] in key and table_name_prefix_suffix[2] in key \
                        and "auth_comp_id" in key or "auth_mon_id" in key:
                    combinations.append(key)
                elif table_name_prefix_suffix[0] == "_struct_ref_seq_dif" \
                        and "mon_id" in key and "db_mon_id" not in key:
                    combinations.append(key)

        # work assuming all the elements in right order
        # and they are not crossing each other
        if len(combinations) > 4:
            combinations = combinations[:4]

        ordered_combination = list()
        for name in combinations:
            if "auth_seq" in name:
                ordered_combination.insert(0, name)
        for name in combinations:
            if "auth_asym_id" in name or "strand_id" in name:
                ordered_combination.insert(1, name)
        for name in combinations:
            if "ins_code" in name:
                ordered_combination.insert(2, name)
        for name in combinations:
            if "auth_comp_id" in name or "mon_id" in name:
                ordered_combination.insert(3, name)

        # exceptions
        if (  # "pdbx_unobs_or_zero_occ_residues" not in ordered_combination[0]
                "nonpoly_scheme" not in ordered_combination[0]
                and "poly_seq_scheme" not in ordered_combination[0]
                and "ndb_struct_na_base" not in ordered_combination[0]):
            totaling_combinations.append(ordered_combination)

    return totaling_combinations
        
    

In [58]:
old = column_formation(mmcif_dict)

In [69]:
def column_formation_refactored(mmcif_dict):
    mmcif_dict_keys = mmcif_dict.keys()
    relevant_keys = ["auth_seq_id", "auth_seq_num", "orig_auth_asym_id", "auth_asym_id", "strand_id", "ins_code", "auth_comp_id", "auth_mon_id", "mon_id"]
    filtered_keys = [key for key in mmcif_dict_keys if any(relevant_term in key for relevant_term in relevant_keys)]
    
    # Split and filter keys based on auth_seq_id and auth_seq_num
    aut_seq_keys = [key.split('.') for key in filtered_keys if "auth_seq_id" in key or "auth_seq_num" in key]
    
    # Generate prefix and suffix for matching
    prefix_suffix = {(key[0], key[1].replace("auth_seq_id", "").replace("auth_seq_num", "")) for key in aut_seq_keys}
    
    # Generate combinations based on the prefixes and suffixes
    combinations = []
    for prefix, suffix in prefix_suffix:
        combination = [key for key in filtered_keys if key.startswith(prefix) and suffix in key]
        
        # Apply conditions to filter and order combinations
        ordered_combination = sorted(set(combination), key=lambda x: ("auth_seq_id" in x or "auth_seq_num" in x, "auth_asym_id" in x or "strand_id" in x, "ins_code" in x, "auth_comp_id" in x or "mon_id" in x))
        
        if ordered_combination and not any(term in ordered_combination[0] for term in ["nonpoly_scheme", "poly_seq_scheme", "ndb_struct_na_base"]):
            combinations.append(ordered_combination)
    
    return combinations

In [46]:
table = set()
for s in r:
    for f in s:
        table.add(f)

In [47]:
table_old = set()
for s in old:
    for f in s:
        table_old.add(f)

In [48]:
table_old-table

{'_struct_mon_prot_cis.auth_asym_id',
 '_struct_mon_prot_cis.auth_seq_id',
 '_struct_mon_prot_cis.pdbx_auth_asym_id_2',
 '_struct_mon_prot_cis.pdbx_auth_seq_id_2',
 '_struct_ref_seq_dif.mon_id',
 '_struct_site.pdbx_auth_seq_id'}

In [70]:
old = column_formation(mmcif_dict)

In [71]:
old

[['_struct_ref_seq_dif.pdbx_auth_seq_num',
  '_struct_ref_seq_dif.pdbx_pdb_strand_id',
  '_struct_ref_seq_dif.pdbx_pdb_ins_code',
  '_struct_ref_seq_dif.mon_id'],
 ['_struct_conf.beg_auth_seq_id',
  '_struct_conf.beg_auth_asym_id',
  '_struct_conf.pdbx_beg_PDB_ins_code',
  '_struct_conf.beg_auth_comp_id'],
 ['_struct_conf.end_auth_seq_id',
  '_struct_conf.end_auth_asym_id',
  '_struct_conf.pdbx_end_PDB_ins_code',
  '_struct_conf.end_auth_comp_id'],
 ['_struct_mon_prot_cis.auth_seq_id',
  '_struct_mon_prot_cis.auth_asym_id',
  '_struct_mon_prot_cis.pdbx_PDB_ins_code',
  '_struct_mon_prot_cis.auth_comp_id'],
 ['_struct_mon_prot_cis.pdbx_auth_seq_id_2',
  '_struct_mon_prot_cis.pdbx_auth_asym_id_2',
  '_struct_mon_prot_cis.pdbx_PDB_ins_code_2',
  '_struct_mon_prot_cis.pdbx_auth_comp_id_2'],
 ['_struct_sheet_range.beg_auth_seq_id',
  '_struct_sheet_range.beg_auth_asym_id',
  '_struct_sheet_range.pdbx_beg_PDB_ins_code',
  '_struct_sheet_range.beg_auth_comp_id'],
 ['_struct_sheet_range.end_au