In [1]:
import os
import concurrent.futures
import urllib.request

In [2]:
with open('pdbname.txt', 'r', encoding='utf-8') as file:
    content = file.read().lower()
    pdb_filenames = content.split(',')

output_dir = "pdb_folder"

In [None]:
def download_pdb(pdb_id):
    """Download a single PDB file. The download will be automatically cancelled if it takes more than 60 seconds."""
    url = f"https://files.rcsb.org/download/{pdb_id}.pdb"
    fname = os.path.join(output_dir, f"{pdb_id}.pdb")
    
    try:
        with urllib.request.urlopen(url, timeout=60) as response, open(fname, "wb") as f:
            f.write(response.read())
        print(f"✅ downloaded successfully: {pdb_id}")
    except urllib.error.HTTPError as e:
        print(f"❌ HTTP error {e.code}: {pdb_id}")
    except urllib.error.URLError as e:
        print(f"❌ URL error: {pdb_id} - {e.reason}")
    except TimeoutError:
        print(f"⏳ Download timeout (60 seconds): {pdb_id}")
    except Exception as e:
        print(f"❌ unknown error: {pdb_id} - {e}")

# Download multiple PDB files in parallel
def download_pdb_files(pdb_filenames, max_workers=10):
    """Use multi-threading for parallel downloading and set a timeout mechanism"""
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_pdb = {executor.submit(download_pdb, pdb_id): pdb_id for pdb_id in pdb_filenames}

        for future in concurrent.futures.as_completed(future_to_pdb):
            pdb_id = future_to_pdb[future]
            try:
                future.result(timeout=60)  # Set timeout
            except concurrent.futures.TimeoutError:
                print(f"⏳ Task has exceeded the time limit (60 seconds): {pdb_id}")
            except Exception as e:
                print(f"❌ Task failed {pdb_id}: {e}")

In [None]:
download_pdb_files(pdb_filenames, max_workers=10)