# **ZincSight**: Interpretable prediction of zinc ion locations in proteins

‚ö†Ô∏è **Important Instructions:**

- If ZincSight crashes, please reset the runtime manually:
  Go to `Runtime` ‚Üí `Disconnect and delete runtime`, then refresh the page and try again.
  <p></p>
- If the issue continues, contact **mechtinger1@mail.tau.ac.il** and include the **protein IDs or structures** you used for reproduction and debugging.
  <p></p>
- For maximum speed, change the hardware accelerator: Go to `Runtime` ‚Üí `Change runtime type` ‚Üí select `TPU`.
  <p></p>



In [None]:
#@title üß¨ ZincSight: Structure Input & Configuration {display-mode: "form"}

#@markdown **Enter your protein(s)** in the field below or upload **PDB** or **mmCIF** structures.
#@markdown - Query IDs: **PDB** (ASU or biological assembly), **AlphaFoldDB** (UniProt or Model ID), **ESM Metagenomic Atlas**, or **TED Domains**.
#@markdown - To upload your own structure files (including `.tar.gz`), check the appropriate box below.
#@markdown - **Note:** All input folders will be flattened (all files moved to the root query folder).

# --- UI CONTROLS ---

#@markdown ### üîë 1. Enter Identifiers
structure_ids = "" #@param {type:"string"}

#@markdown ---
#@markdown ### üìÇ 2. Import Local Files
upload_structures = False #@param {type:"boolean"}
upload_tar_gz = False #@param {type:"boolean"}
upload_id_list_txt = False #@param {type:"boolean"}

#@markdown ---
#@markdown ### ‚òÅÔ∏è 3. Import from Google Drive
import_from_google_drive = True # @param {"type":"boolean"}
drive_path = "" #@param {type:"string"}
#@markdown *Path relative to 'My Drive'. Can be a file, folder, or .tar.gz archive.*

#@markdown ---
#@markdown ### ‚öôÔ∏è 4. Settings
initialize_fresh_query = True #@param {type:"boolean"}
include_histidine_rotamers = True #@param {type: "boolean"}
create_pymol_sessions=False #@param {type: "boolean"}

#@markdown ###  5. output folder name
name_of_output_folder = "" #@param {type:"string"}


# --- IMPLEMENTATION ---

import re, shutil, tarfile, os
from pathlib import Path
from google.colab import files, drive

QUERY_DIR = Path("/content/query_structures")

def setup_environment():
    """Wipes and recreates the query directory if requested."""
    if initialize_fresh_query and QUERY_DIR.exists():
        print("üßπ Initializing query: clearing previous files.")
        shutil.rmtree(QUERY_DIR)
    QUERY_DIR.mkdir(parents=True, exist_ok=True)

def flatten_directory():
    """Moves all files from subdirectories to the root QUERY_DIR."""
    # Move files
    for p in list(QUERY_DIR.rglob("*")):
        if p.is_file() and p.parent != QUERY_DIR:
            dest = QUERY_DIR / p.name
            # Overwrite if exists, or handle duplicates (here we overwrite for simplicity)
            shutil.move(str(p), str(dest))

    # Remove empty directories
    for p in list(QUERY_DIR.rglob("*")):
        if p.is_dir() and p != QUERY_DIR:
            try:
                p.rmdir() # Only removes empty dirs
            except OSError:
                pass # Directory not empty (shouldn't happen if logic is correct)

def extract_archive(file_path):
    """Extracts tar.gz and flattens hierarchy."""
    print(f"üì¶ Extracting: {file_path.name}...")
    try:
        with tarfile.open(file_path, "r:gz") as tar:
            tar.extractall(path=QUERY_DIR)
        flatten_directory()
    except Exception as e:
        print(f"‚ùå Extraction error: {e}")

def parse_ids(text):
    """Tokenizes string input into a unique ID list."""
    return [t for t in re.split(r"[\s,]+", text.strip()) if t]

def handle_drive_import(rel_path):
    """Mounts drive and imports flattened files."""
    drive.mount("/content/drive", force_remount=False)
    src = Path("/content/drive/MyDrive") / rel_path.lstrip('/')

    if not src.exists():
        print(f"‚ùå Not found on Drive: {src}")
        return

    if src.name.endswith((".tar.gz", ".tgz")):
        extract_archive(src)
    elif src.is_dir():
        print(f"üìÇ Importing files from Drive folder: {src.name} (Flattening...)")
        # Copy files directly to root, skipping directory structure
        count = 0
        for f in src.rglob("*"):
            if f.is_file():
                shutil.copy2(f, QUERY_DIR)
                count += 1
        print(f"   -> Copied {count} files.")
    else:
        shutil.copy2(src, QUERY_DIR)
        print(f"üìÑ Imported: {src.name}")

# --- EXECUTION ---

setup_environment()
final_ids = parse_ids(structure_ids)

# Local Uploads
if upload_id_list_txt:
    print("üì§ Upload .txt ID list:")
    for name, content in files.upload().items():
        final_ids.extend(parse_ids(content.decode("utf-8")))

if upload_structures:
    print("üì§ Upload PDB/mmCIF structures:")
    os.chdir(QUERY_DIR)
    uploaded = files.upload()
    os.chdir('/content')

if upload_tar_gz:
    print("üì§ Upload .tar.gz archive:")
    for name, content in files.upload().items():
        tmp = Path(name)
        tmp.write_bytes(content)
        extract_archive(tmp)
        tmp.unlink()

# Drive Import
if import_from_google_drive and drive_path:
    handle_drive_import(drive_path)

# Force flatten one last time to be safe
flatten_directory()

# --- RESULTS SUMMARY ---

unique_ids = list(dict.fromkeys(final_ids))
structure_ids_for_download = ",".join(unique_ids)

# Identify all structural files (shallow scan is sufficient now)
valid_exts = {'.pdb', '.cif', '.ent', '.gz'}
found_files = [f for f in QUERY_DIR.glob("*") if f.is_file() and f.suffix.lower() in valid_exts]

print("\n" + "‚îÄ"*50)
print(f"üöÄ **ZincSight: Input Ready (FLATTENED)**")
print(f"‚Ä¢ Identifiers to fetch: {len(unique_ids)}")
print(f"‚Ä¢ Local files detected: {len(found_files)}")
print(f"‚Ä¢ Folder structure: All files moved to root ‚úÖ")

if found_files:
     # Quick check for duplicates or overwrites
     print(f"‚Ä¢ Sample file: {found_files[0].name}")

if not unique_ids and not found_files:
    print("\n‚ö†Ô∏è **Warning:** No data found. Check your inputs.")
print("‚îÄ"*50)

In [None]:
#@title Execute ZincSight (Auto-Batching 50k + Resume Log) {display-mode: "form"}
from IPython.utils.capture import capture_output
import os
import sys
import shutil
import math
import multiprocessing
import platform
from pathlib import Path
from google.colab import drive

# --- 1. SETUP & DEPENDENCIES ---
SETUP_MARKER = "/content/ENV_SETUP.marker"
if not os.path.exists(SETUP_MARKER):
    print("üîß Installing dependencies (this takes ~2 mins)...")
    with capture_output():
        if not os.path.exists("/content/ZincSight"):
            !git clone https://github.com/MECHTI1/ZincSight.git
        %cd /content/ZincSight
        !pip install -r requirements.txt
        sys.path.append("/content/ZincSight")
    open(SETUP_MARKER, "w").close()

drive.mount("/content/drive")
sys.path.insert(0, "/content/ZincSight")
from main_execute import execute_zincsight

# --- 2. CONFIGURATION ---
ROOT_QUERY = Path("/content/query_structures")
OUTPUT_DIR = Path("/content/output")
DRIVE_DEST = Path(f"/content/drive/MyDrive/{name_of_output_folder}_ZincSight_results")
LOG_FILE = DRIVE_DEST / "zincsight_batch_log.txt"
BATCH_SIZE = 50000  # <--- MAX FILES PER BATCH

ROOT_QUERY.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
DRIVE_DEST.mkdir(parents=True, exist_ok=True)

# Hardware check
physical_cores = max(1, multiprocessing.cpu_count() - 1)

# --- 3. INTELLIGENT BATCH CREATION ---

print("üßπ Organizing files into batches of 50,000...")

# Step A: Flatten everything to a temporary "staging" area
staging_dir = Path("/content/temp_staging")
if staging_dir.exists(): shutil.rmtree(staging_dir)
staging_dir.mkdir()

# Move all existing files/folders from ROOT_QUERY to staging
file_count = 0
for item in ROOT_QUERY.rglob("*"):
    if item.is_file():
        shutil.move(str(item), str(staging_dir / item.name))
        file_count += 1

# Step B: Create Batches (batch_001, batch_002...)
all_files = sorted(list(staging_dir.glob("*"))) # Sorting ensures consistent batches on retry
total_files = len(all_files)
num_batches = math.ceil(total_files / BATCH_SIZE)

if num_batches == 0 and structure_ids_for_download.strip():
    num_batches = 1

print(f"   ‚Ä¢ Found {total_files} local files.")
print(f"   ‚Ä¢ Creating {num_batches} batch(es).")

# Distribute files into batch folders
batch_map = []

for i in range(num_batches):
    batch_name = f"batch_{i+1:03d}"
    batch_path = ROOT_QUERY / batch_name
    batch_path.mkdir(exist_ok=True)

    start = i * BATCH_SIZE
    end = start + BATCH_SIZE
    files_in_batch = all_files[start:end]

    for f in files_in_batch:
        shutil.move(str(f), str(batch_path / f.name))

    batch_map.append({
        "name": batch_name,
        "path": batch_path,
        "ids": ""
    })

# Step C: Handle Downloads (The "Download Batch")
if structure_ids_for_download and structure_ids_for_download.strip():
    download_batch_name = f"batch_{num_batches + 1:03d}_downloads"
    download_batch_path = ROOT_QUERY / download_batch_name
    download_batch_path.mkdir(exist_ok=True)

    batch_map.append({
        "name": download_batch_name,
        "path": download_batch_path,
        "ids": structure_ids_for_download
    })
    print(f"   ‚Ä¢ Added dedicated batch for downloads: {download_batch_name}")

# Cleanup Staging
if staging_dir.exists(): shutil.rmtree(staging_dir)

# --- 4. LOG FILE FUNCTIONS ---

def get_completed_batches():
    if not LOG_FILE.exists():
        return set()
    with open(LOG_FILE, "r") as f:
        return set(line.strip() for line in f.readlines())

def mark_batch_complete(b_name):
    with open(LOG_FILE, "a") as f:
        f.write(f"{b_name}\n")

# --- 5. EXECUTION LOOP ---

completed_batches = get_completed_batches()

print("\n" + "‚ïê"*40)
print(f"üöÄ **Starting Execution**")
print(f"   ‚Ä¢ Destination: {DRIVE_DEST}")
print(f"   ‚Ä¢ Log File: {LOG_FILE.name}")
print(f"   ‚Ä¢ Previously Completed: {len(completed_batches)}")
print("‚ïê"*40)

for batch in batch_map:
    b_name = batch['name']
    b_path = batch['path']
    b_ids  = batch['ids']

    # --- CHECK LOG FILE ---
    if b_name in completed_batches:
        print(f"\n‚è≠Ô∏è **Skipping {b_name}** (Already marked complete in log)")
        # Clean up the folder since we don't need it
        if b_path.exists(): shutil.rmtree(str(b_path))
        continue
    # ----------------------

    # Output folder for this specific batch
    current_output = OUTPUT_DIR / b_name
    current_output.mkdir(parents=True, exist_ok=True)

    print(f"\n‚ñ∂Ô∏è **Processing: {b_name}**")
    if b_ids: print(f"   (Downloading IDs...)")
    else: print(f"   (Local files: {len(list(b_path.glob('*')))})")

    try:
        execute_zincsight(
            include_histidine_rotamers,
            b_ids,
            str(b_path),
            str(current_output),
            physical_cores,
            create_pymol_sessions
        )

        # Compress and Save
        archive_name = shutil.make_archive(str(current_output / b_name), 'gztar', str(current_output))
        final_dest = DRIVE_DEST / f"ZincSight_{b_name}.tar.gz"
        shutil.copy2(archive_name, final_dest)

        print(f"‚úÖ **Completed:** {b_name}")
        print(f"   Saved to: {final_dest.name}")

        # --- UPDATE LOG ---
        mark_batch_complete(b_name)
        # ------------------

        # Cleanup
        shutil.rmtree(str(b_path))
        shutil.rmtree(str(current_output))

    except Exception as e:
        print(f"‚ùå **Failed:** {b_name}")
        print(f"   Error: {e}")
        # We do NOT mark as complete, so it runs again next time

print("\n" + "‚ïê"*40)
print("üèÅ **ALL JOBS DONE**")