In [1]:
### IMPORT LIBRARIES ###
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (TimeoutException,
                                       UnexpectedAlertPresentException,
                                       NoSuchElementException)
import argparse
import pandas as pd
from io import StringIO
from concurrent.futures import ThreadPoolExecutor, as_completed
from bioservices import UniProt
import time
from typing import List, Tuple, Optional

### CONFIGURATION ###
MAX_RETRIES = 3
PAGE_LOAD_TIMEOUT = 30
DRIVER_TIMEOUT = 200
MAX_WORKERS = 50
UNIPROT_RETRIES = 3
UNIPROT_DELAY = 1

### CORE FUNCTIONALITY ###
class TargetCrawler:
    def __init__(self, headless: bool = True):
        self.driver = self._init_driver(headless)
        self.uniprot = UniProt(verbose=False)
        
    def _init_driver(self, headless: bool) -> webdriver.Chrome:
        options = webdriver.ChromeOptions()
        options.add_experimental_option('excludeSwitches', ['enable-logging'])
        if headless:
            options.add_argument("--headless=new")
        return webdriver.Chrome(options=options)

    def _handle_exceptions(self, func, *args, **kwargs) -> Optional[pd.DataFrame]:
        retries = 0
        while retries < MAX_RETRIES:
            try:
                return func(*args, **kwargs)
            except (TimeoutException, UnexpectedAlertPresentException) as e:
                print(f"Attempt {retries+1} failed: {str(e)}")
                retries += 1
                time.sleep(2**retries)  # Exponential backoff
            except Exception as e:
                print(f"Critical error: {str(e)}")
                return self._create_error_df(kwargs.get('CpdName', ''), 
                                            kwargs.get('platform', ''), 
                                            str(e))
        return self._create_error_df(kwargs.get('CpdName', ''),
                                      kwargs.get('platform', ''),
                                      f"Max retries ({MAX_RETRIES}) exceeded")

    @staticmethod
    def _create_error_df(compound: str, platform: str, message: str) -> pd.DataFrame:
        return pd.DataFrame({
            'compound': [compound],
            'platform': [platform],
            'UniProt_name': ['error'],
            'prob': [message]
        })

    def _get_uniprot_entry(self, uniprot_id: str) -> str:
        for _ in range(UNIPROT_RETRIES):
            try:
                res = self.uniprot.search(
                    f"{uniprot_id}+AND+organism_id:9606",
                    frmt="tsv",
                    columns="id",
                    limit=1
                )
                if res.count('\n') > 1:
                    return res.split('\n')[1].split('\t')[0]
                return 'no_entry_found'
            except Exception as e:
                print(f"UniProt query failed: {str(e)}")
                time.sleep(UNIPROT_DELAY)
        return 'query_failed'

    def process_uniprot_ids(self, ids: str) -> str:
        if not ids or pd.isna(ids):
            return 'invalid_input'
            
        entries = []
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            futures = {executor.submit(self._get_uniprot_entry, id_.strip()): id_ 
                      for id_ in ids.split() if id_.strip()}
            
            for future in as_completed(futures):
                try:
                    entries.append(future.result())
                except Exception as e:
                    print(f"UniProt processing error: {str(e)}")
                    entries.append('processing_error')
        
        return '|'.join(filter(None, entries))

### CRAWLER IMPLEMENTATIONS ###
class SEACrawler(TargetCrawler):
    def __call__(self, smiles: str, CpdName: str) -> pd.DataFrame:
        return self._handle_exceptions(self._crawl_sea, smiles, CpdName)

    def _crawl_sea(self, smiles: str, CpdName: str) -> pd.DataFrame:
        self.driver.get('http://sea.bkslab.org/')
        
        WebDriverWait(self.driver, DRIVER_TIMEOUT).until(
            EC.presence_of_element_located((By.NAME, 'query_custom_targets_paste'))
        ).send_keys(smiles + '\n')
        
        # Wait for results
        WebDriverWait(self.driver, DRIVER_TIMEOUT).until(
            EC.presence_of_element_located((By.XPATH, '//table/tbody')))
       
        # Parse table
        html = self.driver.find_element(By.XPATH, '//table').get_attribute('outerHTML')
        df = pd.read_html(StringIO(html))[0]
        
        # Process results
        valid = df['P-Value'].astype(float) < 0.05
        df = df[valid][['Target Key', 'P-Value']]
        df.columns = ['UniProt_name', 'prob']
        
        df.insert(0, 'compound', CpdName)
        df.insert(1, 'platform', 'SEA')
        return df
class SuperPredCrawler(TargetCrawler):
    def __call__(self, smiles: str, CpdName: str) -> pd.DataFrame:
        return self._handle_exceptions(self._crawl_superpred, smiles, CpdName)

    def _crawl_superpred(self, smiles: str, CpdName: str) -> pd.DataFrame:
        self.driver.get('https://prediction.charite.de/subpages/target_prediction.php')

        try:
            self.driver.find_element(By.XPATH, '//*[@id="smiles_string"]').send_keys(smiles)
            self.driver.find_elements(By.XPATH, '/html/body/div[2]/div/div/form/div[2]/div/div/button')[0].click()
            self.driver.find_element(By.XPATH, '/html/body/div[2]/center/form/table/tbody/tr/td/button').click()
        except Exception as e:
            return self._error_df(CpdName, "form submission failed", str(e))

        results = []
        retries = 0
        max_retries = 3

        while retries < max_retries:
            try:
                WebDriverWait(self.driver, 200).until(
                    EC.presence_of_element_located((By.XPATH, '//*[@id="targets"]'))
                )

                table = self.driver.find_element(By.XPATH, '//*[@id="targets"]')
                table_html = table.get_attribute('outerHTML')
                df = pd.read_html(StringIO(table_html))[0]
                if not {'UniProt_name', 'Probability'}.issubset(df.columns):
                    raise ValueError("Expected columns not found")

                df = df[['UniProt_name', 'Probability']]
                df.insert(0, 'compound', CpdName)
                df.insert(1, 'platform', 'superpred')
                df.rename(columns={'UniProt_name': 'uniprotID', 'Probability': 'prob'}, inplace=True)
                results.append(df)

                next_button = self.driver.find_element(By.ID, 'targets_next')
                if "disabled" in next_button.get_attribute("class"):
                    break
                else:
                    next_button.click()
                    WebDriverWait(self.driver, 10).until(
                        EC.presence_of_element_located((By.XPATH, '//*[@id="targets"]/tbody'))
                    )

            except TimeoutException:
                retries += 1
                if retries >= max_retries:
                    return self._error_df(CpdName, "timeout", self.driver.current_url)

            except UnexpectedAlertPresentException:
                alert = self.driver.switch_to.alert
                alert_text = alert.text
                alert.accept()
                return self._error_df(CpdName, "alert", alert_text)

            except Exception as e:
                return self._error_df(CpdName, "unexpected error", str(e))

        return pd.concat(results, ignore_index=True) if results else pd.DataFrame()

    def _error_df(self, CpdName: str, error_type: str, message: str) -> pd.DataFrame:
        return pd.DataFrame({
            'compound': [CpdName],
            'platform': ['superpred'],
            'UniProt_name': [error_type],
            'prob': [message]
        })


### MAIN EXECUTION ###
if __name__ == '__main__':
    import sys
    sys.argv = ['script.py', 
                '-in', 'tableS\S3_compound_info_QS.csv', 
                '-out', 'tableS\S5_target_smile_PA.csv',
                '--headless']
    parser = argparse.ArgumentParser(description='Crawl through Target Prediction Servers')     
    parser.add_argument('-in',
        '--input',
        type=str,
        metavar='',
        required=True,
        help='csv-table in the format "name ; smiles-code" of n compounds')    
    parser.add_argument('-out',
        '--output',
        type=str,
        metavar='',
        required=True,
        help='csv-table populated with processed results')
    # 添加headless参数
    parser.add_argument('--headless', 
                        action='store_true',
                        help='Run browser in headless mode (default: True)')
    args = parser.parse_args()
    
    # 初始化爬虫实例
    crawlers = [SEACrawler(headless=args.headless),
                SuperPredCrawler(headless=args.headless)]
    # 处理每个化合物
    results = []
        # 读取输入文件（带错误处理）
    try:
        compounds = pd.read_csv(
            args.input,
            sep=',',  # 明确分隔符
            # # names=['name', 'smiles'],
            # skiprows=1,
            # dtype={'name': str, 'smiles': str}  # 强制类型转换
        )[['name', 'smiles']]
    except Exception as e:
        print(f"读取文件失败: {str(e)}")
        exit(1)

    # 验证数据
    print("成功读取数据样例:")
    print(compounds.head(3))
    # 处理每个化合物
    results = []
    for idx, row in compounds.iterrows():
        try:
            name = str(row['name']).strip()
            smiles = str(row['smiles']).strip()
            current_num = int(idx) + 1  # 显式转换索引为整数
            total = len(compounds)
            print(f"正在处理: {name} ({current_num}/{total})")
            for crawler in crawlers:
                result = crawler(smiles, name)
                results.append(result)
            # 后续处理逻辑...
        except Exception as e:
            print(f"处理第{idx}行时出错: {str(e)}")
            continue
    # Save results
    pd.concat(results).to_csv(args.output, index=False)
    print(f"Results saved to {args.output}")

成功读取数据样例:
     name                 smiles
0  C4-HSL     CCCC(=O)NC1CCOC1=O
1  C6-HSL   CCCCCC(=O)NC1CCOC1=O
2  C7-HSL  CCCCCCC(=O)NC1CCOC1=O
正在处理: C4-HSL (1/52)
正在处理: C6-HSL (2/52)
正在处理: C7-HSL (3/52)
正在处理: C8-HSL (4/52)
正在处理: C9-HSL (5/52)
正在处理: C10-HSL (6/52)
正在处理: C11-HSL (7/52)
正在处理: C12-HSL (8/52)
正在处理: C13-HSL (9/52)
正在处理: C16-HSL (10/52)
正在处理: C16:1-HSL (11/52)
正在处理: 3-oxo-C6-HSL (12/52)
正在处理: 3-oxo-C9-HSL (13/52)
正在处理: 3-oxo-C11-HSL (14/52)
正在处理: 3-oxo-C12-HSL (15/52)
正在处理: 3-oxo-C13-HSL (16/52)
正在处理: 3-OH-C8-HSL (17/52)
正在处理: 3-OH-C10-HSL (18/52)
正在处理: R-THMF (19/52)
正在处理: AI-2 (20/52)
正在处理: cyclo(Pro-Phe) (21/52)
正在处理: cyclo(Pro-Tyr) (22/52)
正在处理: cyclo(Ala-Val) (23/52)
正在处理: cyclo(Pro-Leu) (24/52)
正在处理: cyclo(L-Pro-L-Tyr) (25/52)
正在处理: cyclo(L-Phe-L-Pro) (26/52)
正在处理: cyclo(L-Pro-D-Leu) (27/52)
正在处理: cyclo(L-Pro-L-Leu) (28/52)
正在处理: cyclo(L-Pro-L-Tyr) (29/52)
正在处理: cyclo(L-Phe-L-Pro) (30/52)
正在处理: cyclo(Gly-L-Phe) (31/52)
正在处理: cyclo(D-Pro-L-Tyr) (32/52)
正在处理: cyclo(L-Leu-L