In [None]:
# @markdown ## 4. High Throughput Molecular Docking Pipeline 🚀 (Required)
# @markdown ### Instructions:
# @markdown a. Please **click the Run button** ▶️ on the left side of the code cell and follow any prompts.

# @markdown b. Upload your files when prompted 📤:

# @markdown    - Receptor files (.pdbqt or .zip) 🏢

# @markdown    - Ligand files (.pdbqt or .zip) 💊

# @markdown    - Parameter files (.txt or .zip) ⚙️

# @markdown c. Wait for processing to complete ⏱️

# @markdown d. Results will be automatically downloaded 📥

import os
import zipfile
import glob
import shutil
from datetime import datetime
import sys
from pathlib import Path
import time
from google.colab import files

VINA_FILE = 'vina_1.2.5_linux_x86_64'
REQUIRED_DIRS = ['receptor', 'ligand', 'conf', 'output']
VALID_EXTENSIONS = {
    'receptor': '.pdbqt',
    'ligand': '.pdbqt',
    'param': '.txt'
}

class DockingPipeline:
    def __init__(self):
        """Initialize the docking pipeline"""
        self.start_time = datetime.now()
        self.successful_dockings = []
        self.setup_directories()
        self.download_vina()

    def setup_directories(self):
        """Initialize working directories"""
        for directory in REQUIRED_DIRS:
            if os.path.exists(directory):
                shutil.rmtree(directory)
            os.makedirs(directory)
            print(f"✓ Created directory: {directory}/")

    def download_vina(self):
        """Download and setup AutoDock Vina"""
        if not os.path.exists(VINA_FILE):
            print("\n📥 Downloading AutoDock Vina...")
            !wget -q https://github.com/ccsb-scripps/AutoDock-Vina/releases/download/v1.2.5/vina_1.2.5_linux_x86_64
            !chmod +x {VINA_FILE}
            print("✓ AutoDock Vina downloaded and configured")
        else:
            print("✓ AutoDock Vina is ready")

    def _sanitize_filename(self, filename):
        """Sanitize filename to avoid common issues"""
        base, ext = os.path.splitext(filename)
        if '(' in base and ')' in base:
            base = base.replace(' ', '_').replace('(', '_').replace(')', '')
        base = base.replace(' ', '_')
        return f"{base}{ext}"

    def _process_single_file(self, filename, file_type):
        """Process single uploaded file"""
        if not filename.endswith(VALID_EXTENSIONS[file_type]):
            print(f"❌ Invalid file type: {filename}")
            return []

        clean_filename = self._sanitize_filename(filename)
        if clean_filename != filename and os.path.exists(filename):
            try:
                os.rename(filename, clean_filename)
                filename = clean_filename
            except Exception as e:
                print(f"⚠️ Warning: Could not rename file: {str(e)}")

        dir_mapping = {
            'receptor': 'receptor',
            'ligand': 'ligand',
            'param': 'conf'
        }
        target_dir = dir_mapping[file_type]
        target_path = os.path.join(target_dir, filename)

        try:
            if not os.path.exists(target_path):
                shutil.move(filename, target_path)
                print(f"✓ Moved: {filename} -> {target_dir}/")
                return [filename]
            else:
                print(f"⚠️ File exists, skipped: {filename}")
                return []
        except Exception as e:
            print(f"❌ Error processing {filename}: {str(e)}")
            return []

    def _process_zip(self, zip_file, file_type):
        """Process uploaded zip file"""
        temp_dir = f'temp_{file_type}_{int(time.time())}'
        processed_files = []

        try:
            os.makedirs(temp_dir, exist_ok=True)
            with zipfile.ZipFile(zip_file, 'r') as zip_ref:
                zip_ref.extractall(temp_dir)
            print(f"✓ Extracted: {zip_file}")

            valid_ext = VALID_EXTENSIONS[file_type]
            dir_mapping = {'receptor': 'receptor', 'ligand': 'ligand', 'param': 'conf'}
            target_dir = dir_mapping[file_type]

            for root, _, files in os.walk(temp_dir):
                for file in files:
                    if file.endswith(valid_ext):
                        src_path = os.path.join(root, file)
                        clean_filename = self._sanitize_filename(file)
                        dst_path = os.path.join(target_dir, clean_filename)

                        if not os.path.exists(dst_path):
                            shutil.move(src_path, dst_path)
                            processed_files.append(clean_filename)
                            print(f"  ↳ Moved: {clean_filename}")
                        else:
                            print(f"  ⚠️ Skipped duplicate: {file}")

        except Exception as e:
            print(f"❌ Error processing {zip_file}: {str(e)}")
        finally:
            if os.path.exists(temp_dir):
                shutil.rmtree(temp_dir)

        return processed_files

    def handle_file_upload(self, file_type):
        """Handle file uploads with progress tracking"""
        prompts = {
            'receptor': "📂 Please upload receptor files (.pdbqt or .zip):",
            'ligand': "📂 Please upload ligand files (.pdbqt or .zip):",
            'param': "📂 Please upload parameter files (.txt or .zip):"
        }

        print(f"\n{prompts[file_type]}")
        uploaded = files.upload()
        processed_files = []

        for filename in uploaded.keys():
            if filename.endswith('.zip'):
                print(f"\n📦 Processing zip file: {filename}")
                processed_files.extend(self._process_zip(filename, file_type))
            else:
                print(f"\n📄 Processing file: {filename}")
                processed_files.extend(self._process_single_file(filename, file_type))

        return processed_files

    def verify_files(self):
        receptor_files = glob.glob('receptor/*.pdbqt')
        ligand_files = glob.glob('ligand/*.pdbqt')
        param_files = glob.glob('conf/*.txt')

        def verify_file_readable(file_path):
            try:
                with open(file_path, 'r') as f:
                    f.read(1)
                return True
            except Exception:
                print(f"⚠️ Warning: Cannot read file {file_path}")
                return False

        print("\n📋 File Verification Results:")
        print("---------------------------")

        # Verify receptor files
        print(f"Receptor files ({len(receptor_files)}):")
        valid_receptors = [f for f in receptor_files if verify_file_readable(f)]
        for f in valid_receptors:
            print(f"  ✓ {os.path.basename(f)}")

        # Verify ligand files
        print(f"\nLigand files ({len(ligand_files)}):")
        valid_ligands = [f for f in ligand_files if verify_file_readable(f)]
        for f in valid_ligands:
            print(f"  ✓ {os.path.basename(f)}")

        # Verify parameter files
        print(f"\nParameter files ({len(param_files)}):")
        valid_params = [f for f in param_files if verify_file_readable(f)]
        for f in valid_params:
            print(f"  ✓ {os.path.basename(f)}")

        if not (valid_receptors and valid_ligands and valid_params):
            print("\n❌ Validation failed:")
            if not valid_receptors:
                print("  - No valid receptor files")
            if not valid_ligands:
                print("  - No valid ligand files")
            if not valid_params:
                print("  - No valid parameter files")
            return False

        print("\n✅ All files verified successfully")
        return True

    def read_parameter_file(self, param_file):
        """Read and validate docking parameters"""
        required_params = {'center_x', 'center_y', 'center_z', 'size_x', 'size_y', 'size_z'}
        params = {}

        try:
            with open(param_file, 'r') as f:
                content = f.read()
                print(f"\n📄 Parameter file: {os.path.basename(param_file)}")
                print("-" * 40)
                print(content)
                print("-" * 40)

                for line in content.split('\n'):
                    if '=' in line:
                        key, value = line.split('=')
                        key = key.strip()
                        try:
                            params[key] = float(value.strip())
                        except ValueError:
                            print(f"⚠️ Invalid value in parameter file: {line.strip()}")

            missing = required_params - set(params.keys())
            if missing:
                raise ValueError(f"Missing required parameters: {missing}")

            print("✓ Parameters validated successfully")
            return params

        except Exception as e:
            print(f"❌ Error reading parameter file: {str(e)}")
            return None

    def find_matching_param_file(self, receptor_file, param_files):
        """Find matching parameter file for receptor"""
        receptor_base = os.path.splitext(os.path.basename(receptor_file))[0]
        receptor_base = receptor_base.split('_')[0]

        matching_params = [p for p in param_files
                         if receptor_base in os.path.basename(p)]

        if not matching_params:
            print(f"⚠️ No parameter file found for {receptor_base}")
            return None

        if len(matching_params) > 1:
            print(f"⚠️ Multiple parameter files found for {receptor_base}, using first one:")
            for p in matching_params:
                print(f"  - {os.path.basename(p)}")

        return matching_params[0]

    def show_progress(self, current, total):
        """Show docking progress with progress bar"""
        percent = (current / total) * 100
        bar_length = 30
        filled_length = int(bar_length * current / total)
        bar = '=' * filled_length + '-' * (bar_length - filled_length)
        print(f"\r⏳ Progress: [{bar}] {percent:.1f}% ({current}/{total})", end='')
        if current == total:
            print() 

    def run_docking(self, receptor, ligand, params, num_modes):
        """Execute molecular docking"""
        base_name = f"{os.path.splitext(os.path.basename(receptor))[0]}_{os.path.splitext(os.path.basename(ligand))[0]}"
        output_file = os.path.join('output', f"{base_name}_docked.pdbqt")
        log_file = os.path.join('output', f"{base_name}_docked.log")

        cmd = (
            f"./{VINA_FILE} "
            f"--receptor \"{receptor}\" "
            f"--ligand \"{ligand}\" "
            f"--center_x {params['center_x']} "
            f"--center_y {params['center_y']} "
            f"--center_z {params['center_z']} "
            f"--size_x {params['size_x']} "
            f"--size_y {params['size_y']} "
            f"--size_z {params['size_z']} "
            f"--num_modes {num_modes} "
            f"--out \"{output_file}\""
        )

        print(f"\n🔄 Starting docking:")
        print(f"  Receptor: {os.path.basename(receptor)}")
        print(f"  Ligand: {os.path.basename(ligand)}")
        print(f"  Output: {os.path.basename(output_file)}")
        print(f"  Log: {os.path.basename(log_file)}")

        try:
            exit_code = os.system(f"{cmd} > \"{log_file}\" 2>&1")

            if exit_code != 0:
                print(f"❌ Docking failed (exit code: {exit_code})")
                if os.path.exists(log_file):
                    with open(log_file, 'r') as log:
                        print("\n📋 Log content:")
                        print("-" * 40)
                        print(log.read())
                        print("-" * 40)
                return None

            print(f"✓ Docking completed successfully")
            return output_file

        except Exception as e:
            print(f"❌ Error during docking: {str(e)}")
            return None

    def execute_docking_batch(self, receptor_paths, ligand_paths, param_paths, num_modes):
        """Execute batch docking for all combinations"""
        total_combinations = len(receptor_paths) * len(ligand_paths)
        current_combination = 0
        successful_dockings = []

        print(f"\n🔬 Starting batch docking:")
        print(f"  - {len(receptor_paths)} receptor(s)")
        print(f"  - {len(ligand_paths)} ligand(s)")
        print(f"  - Total combinations: {total_combinations}")

        for receptor in receptor_paths:
            param_file = self.find_matching_param_file(receptor, param_paths)
            if not param_file:
                continue

            params = self.read_parameter_file(param_file)
            if not params:
                continue

            for ligand in ligand_paths:
                current_combination += 1
                self.show_progress(current_combination, total_combinations)

                result = self.run_docking(receptor, ligand, params, num_modes)
                if result:
                    successful_dockings.append(result)

        print(f"\n✓ Completed {len(successful_dockings)} successful dockings")
        return successful_dockings

    def package_results(self, docking_files):
        
      print("\n📦 Creating results package...")

      timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
      output_zip = f'docking_results_{timestamp}.zip'

      try:
          with zipfile.ZipFile(output_zip, 'w') as zipf:
              print("  ↳ Adding docking results and logs...")
              for file in docking_files:
                  if os.path.exists(file):
                      zipf.write(file, arcname=os.path.basename(file))
                      print(f"    ✓ Added {os.path.basename(file)}")

                      log_file = os.path.splitext(file)[0] + '.log'
                      if os.path.exists(log_file):
                          zipf.write(log_file, arcname=os.path.basename(log_file))
                          print(f"    ✓ Added {os.path.basename(log_file)}")
                      else:
                          print(f"    ⚠️ Missing log file for {os.path.basename(file)}")

              os.makedirs('output', exist_ok=True)
              summary_path = 'output/docking_summary.txt'
              self.create_basic_summary(docking_files, summary_path)

              if os.path.exists(summary_path):
                  zipf.write(summary_path, arcname=os.path.basename(summary_path))
                  print(f"    ✓ Added basic summary")

          print(f"\n✅ Results package created: {output_zip}")
          files.download(output_zip)
          return output_zip

      except Exception as e:
          print(f"❌ Error creating results package: {str(e)}")
          raise

    def create_basic_summary(self, docking_files, summary_path):
      """Create a basic summary of docking results"""
      try:
          with open(summary_path, 'w', encoding='utf-8') as f:
              f.write("Molecular Docking Summary\n")
              f.write("=======================\n\n")
              f.write(f"Run Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
              f.write(f"Duration: {datetime.now() - self.start_time}\n\n")
              f.write("Statistics\n")
              f.write("-----------\n")
              f.write(f"Total successful dockings: {len(self.successful_dockings)}\n\n")
              f.write("Completed Dockings:\n")
              for file in docking_files:
                  f.write(f"- {os.path.basename(file)}\n")

          return summary_path
      except Exception as e:
          print(f"⚠️ Error creating summary: {str(e)}")
          return None

    def run_pipeline(self):
        """Execute the molecular docking pipeline"""
        try:
            print("\n🚀 Starting Molecular Docking Pipeline")
            print("======================================")

            print("\n📂 Step 1: File Upload and Processing")
            print("------------------------------------")
            self.handle_file_upload('receptor')
            self.handle_file_upload('ligand')
            self.handle_file_upload('param')

            if not self.verify_files():
                print("\n❌ Pipeline stopped: Missing required files")
                return

            print("\n⚙️ Step 2: Configure Docking Parameters")
            print("--------------------------------------")
            num_modes = input("Enter number of docking modes (default: 10): ").strip()
            num_modes = int(num_modes) if num_modes.isdigit() and int(num_modes) > 0 else 10
            print(f"✓ Using {num_modes} docking modes")

            print("\n🔬 Step 3: Running Molecular Docking")
            print("-----------------------------------")
            receptor_paths = glob.glob('receptor/*.pdbqt')
            ligand_paths = glob.glob('ligand/*.pdbqt')
            param_paths = glob.glob('conf/*.txt')

            self.successful_dockings = self.execute_docking_batch(
                receptor_paths, ligand_paths, param_paths, num_modes)

            if not self.successful_dockings:
                print("\n❌ No successful docking results")
                return

            print("\n📦 Step 4: Packaging Results")
            print("--------------------------")
            output_zip = self.package_results(self.successful_dockings)
            print("\n📥 Downloading results package...")
            files.download(output_zip)

            duration = datetime.now() - self.start_time
            print("\n✨ Pipeline Complete")
            print("==================")
            print(f"Duration: {duration}")
            total_attempts = len(receptor_paths) * len(ligand_paths)
            print(f"Successful dockings: {len(self.successful_dockings)}/{total_attempts}")

        except Exception as e:
            print(f"\n❌ Pipeline error: {str(e)}")
            raise

def main():
    """Main entry point"""
    try:
        pipeline = DockingPipeline()
        pipeline.run_pipeline()
    except KeyboardInterrupt:
        print("\n\n⚠️ Process interrupted by user")
    except Exception as e:
        print(f"\n❌ Error: {str(e)}")
        raise

if __name__ == "__main__":
    main()