In [1]:
import json
import requests
import warnings
from typing import Optional
import os
import time
import pandas as pd
from tqdm import tqdm

In [2]:
def request_limited(url: str,
                    rtype: str = "GET",
                    num_attempts: int = 3,
                    sleep_time=0.5,
                    **kwargs) -> Optional[requests.models.Response]:
    """
    HTML request with rate-limiting base on response code

    
    Parameters
    ----------
    url : str
        The url for the request
    rtype : str
        The request type (oneof ["GET", "POST"])
    num_attempts : int
        In case of a failed retrieval, the number of attempts to try again
    sleep_time : int
        The amount of time to wait between requests, in case of
        API rate limits
    **kwargs : dict
        The keyword arguments to pass to the request

    Returns
    -------

    response : requests.models.Response
        The server response object. Only returned if request was successful,
        otherwise returns None.

    """

    if rtype not in ["GET", "POST"]:
        warnings.warn("Request type not recognized")
        return None

    total_attempts = 0
    while (total_attempts <= num_attempts):
        if rtype == "GET":
            response = requests.get(url, **kwargs)
        elif rtype == "POST":
            response = requests.post(url, **kwargs)

        if response.status_code == 200:
            return response

        if response.status_code == 429:
            curr_sleep = (1 + total_attempts) * sleep_time
            warnings.warn("Too many requests, waiting " + str(curr_sleep) +
                          " s")
            time.sleep(curr_sleep)
        elif 500 <= response.status_code < 600:
            warnings.warn("Server error encountered. Retrying")
        total_attempts += 1

    warnings.warn("Too many failures on requests. Exiting...")
    return None


def get_nonpolymer_bound_components(entry_id='7rfs', 
                                    url_root='https://data.rcsb.org/rest/v1/core/entry/'):
    
    url = url_root + entry_id
    response = request_limited(url)
    if response is None or response.status_code != 200:
        warnings.warn("Retrieval failed, returning None")
        return None
    result = str(response.text)
    all_info = json.loads(result)
    try:
        ligands = all_info['pdbx_vrpt_summary']['restypes_notchecked_for_bond_angle_geometry']
        return ligands
    except:
        warnings.warn("No ligands found, returning None")
        return None



def get_comp_smiles(comp_id='4WI', 
                    url_root='https://data.rcsb.org/rest/v1/core/chemcomp/', 
                    stero=True):
    
    url = url_root + comp_id
    response = request_limited(url)
    if response is None or response.status_code != 200:
        warnings.warn("Retrieval failed, returning None")
        return None
    result = str(response.text)
    comp_info = json.loads(result)
    try:
        if stero:
            smiles = comp_info['rcsb_chem_comp_descriptor']['smilesstereo']
        else:
            smiles = comp_info['rcsb_chem_comp_descriptor']['smiles']
        return smiles
    except:
        warnings.warn("No smiles found, returning None")
        return None


def get_ligands_smiles(entry_id='7rfs', 
                       stero=True):
    
    ligands = get_nonpolymer_bound_components(entry_id=entry_id)
    ligands2smiles = {}
    for ligand in ligands:
        if ligand in ligands2smiles:
            continue
        ligands2smiles[ligand] = get_comp_smiles(comp_id=ligand, stero=stero)
    return ligands2smiles

In [3]:
def download_comp_sdf_file(comp_id='4WI', 
                           ideal=True, 
                           url_root='https://files.rcsb.org/ligands/download/', 
                           save_root='./'):
    
    if ideal:
        file_name = comp_id + '_ideal.sdf'
    else:
        file_name = comp_id + '_model.sdf'
    url = url_root + file_name
    response = request_limited(url)
    if response is None or response.status_code != 200:
        warnings.warn("Retrieval failed, returning None")
        return None
    sdf_text = str(response.text)
    if not sdf_text:
        raise ValueError
    if not os.path.exists(save_root):
        os.mkdir(save_root)
    save_path = os.path.join(save_root, file_name)
    with open(save_path, 'w') as f:
        f.write(sdf_text)
    return save_path

In [5]:
with open('./gett_pdb/pdbbid.csv', 'r') as f:
    text = f.read()
pdb_ids = text.split(',')

In [6]:
ligands_smiles = {}
error_list = []
for i in tqdm(pdb_ids):
    if os.path.exists(f'./gett_pdb/{i}'):
        continue
    else:
        # print(i)
        try:
            data_tot = get_ligands_smiles(entry_id=i, stero=True)
            ligands_smiles.update(data_tot)
            for key in data_tot:
                download_comp_sdf_file(comp_id=key, ideal=True, save_root=f'./gett_pdb/{i}')
        except TypeError:
            error_list.append(i)
            continue


100%|██████████| 1634/1634 [04:42<00:00,  5.79it/s]


In [10]:
with open('./gett_pdb/error_list.txt', 'w') as f:
    for i in error_list:
        f.write(i + ' ')

['7N6N', '7MGR', '7JOY', '7VFB', '7DVP', '7DVX', '7DVY', '7DW0', '7DW6', '7MB4', '7MB5', '7MB6', '7MB7', '7MB8', '7MB9', '3AW1', '7KQE', '7U0E', '7U09', '7RZQ', '7Y9N', '7E3O', '7VGR', '7VGS', '7RZR', '7RZS', '7RZT', '7RZU', '7RZV', '7E7X', '7X9E', '8CXQ', '8CY9', '8CYC', '8CYD', '8DRS', '7W1S', '7E86', '7VYR', '8DRT', '7ZF8', '7EK0', '7QCS', '7E0B', '7SKZ', '7SL5', '7CH5', '7E7Y', '8D36', '7EU2', '7F4W', '7DHG', '7XDA', '7CHF', '8D6Z', '6XE1', '7NTJ', '8CZI', '8DTR', '8DTT', '8DTX', '7CHC', '7R9D', '7KGR', '7DCD', '8DAO', '7STS', '7SUE', '7EJL', '7JLT', '6M71', '7EJM', '7EJN', '6YHU', '7VPH', '6WIQ', '7WZO', '7JQB', '7VPG', '7W0Q', '7W0T', '6M5I', '7X6Y', '7X6Z', '7X70', '4ZUH', '7X25', '7X29', '7X2A', '8DGW', '5ZQG', '8DGV', '8DGX', '7F60', '2Q6G', '7F2O', '7LFZ']
