# Enhanced GNINA Docking Pipeline

A comprehensive molecular docking pipeline using [GNINA](https://github.com/gnina/gnina) with advanced features including:
- Tiered CNN scoring workflow
- Flexible receptor docking
- Parallel processing
- Resume capability
- PDB preparation wizard integration
- Quality control and validation
- Covalent docking support

**References:**
- GNINA: https://github.com/gnina/gnina
- PDB Preparation Wizard: https://github.com/OASolliman590/pdb-prepare-wizard


## 1. Environment Setup and Dependencies

This cell sets up the Google Colab environment, mounts Google Drive, and installs all required dependencies for the GNINA docking pipeline.


In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Import essential libraries
import os
import sys
import subprocess
import pathlib
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import warnings
import json
import concurrent.futures
import multiprocessing as mp
import threading
import queue
import time
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo

warnings.filterwarnings('ignore')

print("‚úÖ Environment setup complete")
print(f"Python version: {sys.version}")
print(f"Working directory: {os.getcwd()}")


## 2. Configuration and Project Setup

Configure the GNINA docking pipeline parameters, create necessary directories, and set up the project structure.


In [None]:
# Project configuration
WORK_DIR = "/content/drive/MyDrive/GNINA_Docking_Project"
%cd {WORK_DIR}

# GNINA configuration based on https://github.com/gnina/gnina
CONFIG = {
    'project_name': 'GNINA_Docking_Project',
    'gnina_version': 'v1.3.2',  # Latest version from GitHub releases
    'exhaustiveness': 32,
    'num_modes': 20,
    'seed': 42,
    'cnn_scoring': 'rescore',  # Default CNN scoring mode
    'cpu_cores': 4,
    'batch_size': 5,
    'timeout': 300,
    'use_gpu': False,  # Will be detected automatically
}

# Create project directories
dirs_to_create = [
    'ligands_raw', 'ligands_prep', 'receptors_raw', 'receptors_prep',
    'gnina_out', 'results', 'logs', 'visualizations', 'enhanced_analysis'
]

for dir_name in dirs_to_create:
    os.makedirs(dir_name, exist_ok=True)

print("‚úÖ Project directories created")
print(f"Project: {CONFIG['project_name']}")
print(f"GNINA Version: {CONFIG['gnina_version']}")
print(f"Working Directory: {WORK_DIR}")


## 3. Install Dependencies and Download GNINA

Install required Python packages and download the GNINA binary from the official [GNINA repository](https://github.com/gnina/gnina).


In [None]:
# Install Python dependencies
%pip install -q rdkit-pypi meeko pdb2pqr openbabel biopython plip
!apt-get update -qq && apt-get install -y -qq openbabel pdb2pqr

# Download GNINA binary from GitHub releases
# Reference: https://github.com/gnina/gnina/releases
gnina_url = f"https://github.com/gnina/gnina/releases/download/{CONFIG['gnina_version']}/gnina"
!wget -q {gnina_url} -O gnina
!chmod +x gnina

# Verify GNINA installation
print("üîç Verifying GNINA installation...")
!./gnina --version

# GPU detection for CUDA support
try:
    gpu_info = subprocess.run(['nvidia-smi'], capture_output=True, text=True)
    if gpu_info.returncode == 0:
        print("‚úÖ GPU detected:")
        print(gpu_info.stdout.split('\n')[0:3])
        CONFIG['use_gpu'] = True
    else:
        print("‚ö†Ô∏è No GPU detected, using CPU")
        CONFIG['use_gpu'] = False
except:
    print("‚ö†Ô∏è GPU check failed, using CPU")
    CONFIG['use_gpu'] = False

print(f"‚úÖ GNINA {CONFIG['gnina_version']} installed successfully")
print(f"GPU acceleration: {'Enabled' if CONFIG['use_gpu'] else 'Disabled'}")


## 4. File Validation and Pairlist Loading

Validate project structure and load the pairlist.csv file containing receptor-ligand combinations and binding site coordinates.


In [None]:
def validate_project_structure():
    """Validate that required files and directories exist"""
    required_files = ['pairlist.csv']
    required_dirs = ['ligands_raw', 'receptors_raw']
    
    missing_files = [f for f in required_files if not os.path.exists(f)]
    missing_dirs = [d for d in required_dirs if not os.path.exists(d)]
    
    if missing_files or missing_dirs:
        print("‚ùå Missing required files/directories:")
        for item in missing_files + missing_dirs:
            print(f"   - {item}")
        return False
    
    print("‚úÖ Project structure validated")
    return True

def load_and_validate_pairlist():
    """Load and validate pairlist.csv file"""
    try:
        # Load pairlist
        df = pd.read_csv('pairlist.csv')
        
        # Normalize column names
        df.columns = df.columns.str.lower().str.replace(' ', '_')
        
        # Validate required columns
        required_cols = ['receptor', 'ligand', 'center_x', 'center_y', 'center_z', 
                        'size_x', 'size_y', 'size_z']
        missing_cols = [col for col in required_cols if col not in df.columns]
        
        if missing_cols:
            print(f"‚ùå Missing required columns: {missing_cols}")
            return None
        
        # Validate numeric columns
        coord_cols = ['center_x', 'center_y', 'center_z', 'size_x', 'size_y', 'size_z']
        for col in coord_cols:
            if not pd.api.types.is_numeric_dtype(df[col]):
                print(f"‚ùå Column {col} must be numeric")
                return None
        
        # Add site_id if missing
        if 'site_id' not in df.columns:
            df['site_id'] = 'site_1'
            print("‚ö†Ô∏è Added default site_id column")
        
        print(f"‚úÖ Pairlist loaded: {len(df)} entries")
        return df
        
    except Exception as e:
        print(f"‚ùå Error loading pairlist: {e}")
        return None

# Validate and load
if not validate_project_structure():
    print("Please ensure all required files and directories are present")
else:
    pairlist_df = load_and_validate_pairlist()
    if pairlist_df is not None:
        print("\nüìä Pairlist preview:")
        print(pairlist_df.head())


## 5. Core Docking Configuration

Configure the main GNINA docking parameters including CNN scoring modes, performance settings, and docking options. This is the central configuration hub for all docking operations.


In [None]:
# =============================================================================
# Core GNINA Docking Configuration
# =============================================================================

class GNINADockingConfig:
    """Central configuration class for GNINA docking parameters"""
    
    def __init__(self):
        # Base configuration from https://github.com/gnina/gnina
        self.base_config = {
            'gnina_binary': './gnina',
            'exhaustiveness': 32,           # Global search exhaustiveness
            'num_modes': 20,                # Maximum binding modes
            'seed': 42,                     # Random seed for reproducibility
            'cpu_cores': 4,                 # CPU cores to use (fallback when GPU unavailable)
            'timeout': 300,                 # Timeout in seconds
            'min_rmsd_filter': 1.0,         # RMSD filter for pose diversity
            'pose_sort_order': 0,           # 0=CNNscore, 1=CNNaffinity, 2=Energy
            'cnn_rotation': 0,              # CNN rotation evaluation
            'add_hydrogens': True,          # Auto-add hydrogens
            'strip_hydrogens': False,       # Remove polar hydrogens
            'use_gpu': True,                # Prioritize GPU acceleration
            'gpu_device': 0,                # GPU device ID
        }
        
        # CNN scoring modes as per GNINA documentation
        self.cnn_modes = {
            'none': {
                'description': 'No CNNs used - empirical scoring only',
                'speed': 'fastest',
                'accuracy': 'baseline',
                'use_case': 'Quick screening, baseline comparison'
            },
            'rescore': {
                'description': 'CNN used for reranking final poses (default)',
                'speed': 'fast',
                'accuracy': 'good',
                'use_case': 'Large library screening, first-pass ranking'
            },
            'refinement': {
                'description': 'CNN used to refine poses after Monte Carlo',
                'speed': 'medium (10x slower than rescore on GPU)',
                'accuracy': 'better',
                'use_case': 'Focused re-docking, pose refinement'
            },
            'all': {
                'description': 'CNN used throughout entire procedure',
                'speed': 'slowest (extremely intensive)',
                'accuracy': 'best',
                'use_case': 'Final validation, small high-value sets'
            }
        }
        
        # Tiered workflow configuration
        self.tiered_config = {
            'stage_a': {
                'cnn_scoring': 'rescore',
                'exhaustiveness': 12,
                'num_modes': 8,
                'description': 'Fast broad screening',
                'cnn_score_threshold': 0.5,
                'max_ligands_per_receptor': None,
                'top_percentage': None
            },
            'stage_b': {
                'cnn_scoring': 'refinement',
                'exhaustiveness': 24,
                'num_modes': 15,
                'description': 'Balanced refinement',
                'cnn_score_threshold': 0.7,
                'max_ligands_per_receptor': 5,
                'top_percentage': 0.05
            },
            'stage_c': {
                'cnn_scoring': 'all',
                'exhaustiveness': 48,
                'num_modes': 20,
                'description': 'High-accuracy final screening',
                'cnn_score_threshold': 0.8,
                'max_ligands_per_receptor': 2,
                'top_percentage': 0.01
            }
        }
        
        # Performance settings
        self.performance_config = {
            'parallel_processing': True,
            'max_workers': 4,
            'batch_size': 5,
            'resume_capability': True,
            'progress_tracking': True
        }
        
        # Current active configuration
        self.active_config = self.base_config.copy()
        self.active_config['cnn_scoring'] = 'rescore'  # Default mode
    
    def set_cnn_mode(self, mode):
        """Set CNN scoring mode"""
        if mode not in self.cnn_modes:
            raise ValueError(f"Invalid CNN mode: {mode}. Available: {list(self.cnn_modes.keys())}")
        
        self.active_config['cnn_scoring'] = mode
        print(f"‚úÖ CNN scoring mode set to: {mode}")
        print(f"   Description: {self.cnn_modes[mode]['description']}")
        print(f"   Speed: {self.cnn_modes[mode]['speed']}")
        print(f"   Use case: {self.cnn_modes[mode]['use_case']}")
    
    def set_performance_params(self, exhaustiveness=None, num_modes=None, cpu_cores=None):
        """Set performance parameters"""
        if exhaustiveness is not None:
            self.active_config['exhaustiveness'] = exhaustiveness
        if num_modes is not None:
            self.active_config['num_modes'] = num_modes
        if cpu_cores is not None:
            self.active_config['cpu_cores'] = cpu_cores
        
        print("‚úÖ Performance parameters updated:")
        print(f"   Exhaustiveness: {self.active_config['exhaustiveness']}")
        print(f"   Num modes: {self.active_config['num_modes']}")
        print(f"   CPU cores: {self.active_config['cpu_cores']}")
    
    def set_gpu_acceleration(self, use_gpu=True, gpu_device=0):
        """Configure GPU acceleration settings"""
        self.active_config['use_gpu'] = use_gpu
        self.active_config['gpu_device'] = gpu_device
        
        if use_gpu and CONFIG.get('use_gpu', False):
            print(f"‚úÖ GPU acceleration enabled (device {gpu_device})")
        elif use_gpu and not CONFIG.get('use_gpu', False):
            print("‚ö†Ô∏è GPU requested but not available, will use CPU")
            self.active_config['use_gpu'] = False
        else:
            print("‚úÖ CPU acceleration enabled")
    
    def set_tiered_stage(self, stage):
        """Set configuration for a specific tiered stage"""
        if stage not in self.tiered_config:
            raise ValueError(f"Invalid stage: {stage}. Available: {list(self.tiered_config.keys())}")
        
        stage_config = self.tiered_config[stage]
        self.active_config.update({
            'cnn_scoring': stage_config['cnn_scoring'],
            'exhaustiveness': stage_config['exhaustiveness'],
            'num_modes': stage_config['num_modes']
        })
        
        print(f"‚úÖ Tiered stage configuration set: {stage}")
        print(f"   Description: {stage_config['description']}")
        print(f"   CNN scoring: {stage_config['cnn_scoring']}")
        print(f"   Exhaustiveness: {stage_config['exhaustiveness']}")
        print(f"   Num modes: {stage_config['num_modes']}")
    
    def get_gnina_command_base(self):
        """Get base GNINA command arguments with GPU prioritization"""
        cmd_args = [
            '--exhaustiveness', str(self.active_config['exhaustiveness']),
            '--num_modes', str(self.active_config['num_modes']),
            '--seed', str(self.active_config['seed']),
            '--cnn_scoring', self.active_config['cnn_scoring'],
            '--cnn_rotation', str(self.active_config['cnn_rotation']),
            '--min_rmsd_filter', str(self.active_config['min_rmsd_filter']),
            '--pose_sort_order', str(self.active_config['pose_sort_order']),
        ]
        
        # GPU acceleration (prioritized over CPU)
        if self.active_config.get('use_gpu', False) and CONFIG.get('use_gpu', False):
            cmd_args.extend(['--gpu', '--device', str(self.active_config.get('gpu_device', 0))])
            print(f"   üöÄ Using GPU acceleration (device {self.active_config.get('gpu_device', 0)})")
        else:
            # Fallback to CPU
            cmd_args.extend(['--cpu', str(self.active_config['cpu_cores'])])
            print(f"   üíª Using CPU cores: {self.active_config['cpu_cores']}")
        
        if self.active_config['add_hydrogens']:
            cmd_args.append('--addH')
        if self.active_config['strip_hydrogens']:
            cmd_args.append('--stripH')
        
        return cmd_args
    
    def display_configuration(self):
        """Display current configuration"""
        print("üîß Current GNINA Docking Configuration:")
        print(f"   CNN Scoring: {self.active_config['cnn_scoring']}")
        print(f"   Exhaustiveness: {self.active_config['exhaustiveness']}")
        print(f"   Num Modes: {self.active_config['num_modes']}")
        print(f"   Acceleration: {'GPU' if self.active_config.get('use_gpu', False) and CONFIG.get('use_gpu', False) else 'CPU'}")
        if self.active_config.get('use_gpu', False) and CONFIG.get('use_gpu', False):
            print(f"   GPU Device: {self.active_config.get('gpu_device', 0)}")
        else:
            print(f"   CPU Cores: {self.active_config['cpu_cores']}")
        print(f"   Seed: {self.active_config['seed']}")
        print(f"   Timeout: {self.active_config['timeout']}s")
        print(f"   RMSD Filter: {self.active_config['min_rmsd_filter']}√Ö")
        print(f"   Pose Sort: {self.active_config['pose_sort_order']} (0=CNNscore)")
    
    def display_cnn_modes(self):
        """Display available CNN modes"""
        print("üß† Available CNN Scoring Modes:")
        for mode, info in self.cnn_modes.items():
            print(f"   {mode}: {info['description']}")
            print(f"      Speed: {info['speed']}")
            print(f"      Use case: {info['use_case']}")
            print()

# Initialize configuration
docking_config = GNINADockingConfig()

# Display available options
docking_config.display_cnn_modes()
docking_config.display_configuration()

print("\nüí° Configuration Examples:")
print("   # Set CNN mode")
print("   docking_config.set_cnn_mode('refinement')")
print("   ")
print("   # Set performance parameters")
print("   docking_config.set_performance_params(exhaustiveness=48, num_modes=20)")
print("   ")
print("   # Configure GPU acceleration (prioritized)")
print("   docking_config.set_gpu_acceleration(use_gpu=True, gpu_device=0)")
print("   ")
print("   # Set tiered stage")
print("   docking_config.set_tiered_stage('stage_b')")


## 6. Tiered CNN Scoring Workflow

Implement the tiered CNN scoring approach as recommended in the [GNINA documentation](https://github.com/gnina/gnina). This creates a funnel workflow: broad screening ‚Üí focused refinement ‚Üí high-accuracy validation.


In [None]:
# =============================================================================
# Tiered CNN Scoring Workflow Implementation
# Based on GNINA documentation: https://github.com/gnina/gnina
# =============================================================================

class TieredCNNWorkflow:
    """
    Tiered CNN scoring workflow as recommended by GNINA documentation:
    Stage A: Fast broad screening (rescore)
    Stage B: Focused refinement (refinement) 
    Stage C: High-accuracy validation (all)
    """
    
    def __init__(self, docking_config):
        self.config = docking_config
        self.stage_results = {}
        
        # Stage configurations based on GNINA best practices
        self.stages = {
            'A': {
                'name': 'Broad Screening',
                'cnn_scoring': 'rescore',
                'exhaustiveness': 12,
                'num_modes': 8,
                'description': 'Fast broad screening with CNN rescoring',
                'cnn_score_threshold': 0.5,
                'max_ligands_per_receptor': None,
                'top_percentage': None,
                'use_case': 'Large library screening, first-pass ranking'
            },
            'B': {
                'name': 'Focused Refinement', 
                'cnn_scoring': 'refinement',
                'exhaustiveness': 24,
                'num_modes': 15,
                'description': 'Balanced refinement with CNN pose optimization',
                'cnn_score_threshold': 0.7,
                'max_ligands_per_receptor': 5,
                'top_percentage': 0.05,
                'use_case': 'Focused re-docking, pose refinement'
            },
            'C': {
                'name': 'High-Accuracy Validation',
                'cnn_scoring': 'all', 
                'exhaustiveness': 48,
                'num_modes': 20,
                'description': 'High-accuracy final screening with full CNN',
                'cnn_score_threshold': 0.8,
                'max_ligands_per_receptor': 2,
                'top_percentage': 0.01,
                'use_case': 'Final validation, small high-value sets'
            }
        }
    
    def get_stage_config(self, stage):
        """Get configuration for a specific stage"""
        if stage not in self.stages:
            raise ValueError(f"Invalid stage: {stage}. Available: {list(self.stages.keys())}")
        return self.stages[stage]
    
    def filter_ligands_for_stage(self, stage, previous_results, pairlist_df):
        """Filter ligands based on previous stage results"""
        stage_config = self.get_stage_config(stage)
        
        if stage == 'A':
            # Stage A: Use all ligands
            return pairlist_df.copy()
        
        if not previous_results:
            print(f"‚ö†Ô∏è No previous results for stage {stage}, using all ligands")
            return pairlist_df.copy()
        
        # Extract successful results with scores
        successful_results = []
        for result in previous_results:
            if result['status'] == 'success' and 'scores' in result:
                for score_data in result['scores']:
                    if 'cnn_score' in score_data:
                        successful_results.append({
                            'receptor': result['receptor'],
                            'ligand': result['ligand'],
                            'site_id': result['site_id'],
                            'cnn_score': score_data['cnn_score']
                        })
        
        if not successful_results:
            print(f"‚ö†Ô∏è No successful results with scores for stage {stage}")
            return pairlist_df.copy()
        
        # Convert to DataFrame for filtering
        results_df = pd.DataFrame(successful_results)
        
        # Apply filtering criteria
        filtered_pairs = []
        
        for _, row in pairlist_df.iterrows():
            receptor = row['receptor']
            ligand = row['ligand']
            site_id = row['site_id']
            
            # Get scores for this receptor-ligand pair
            pair_scores = results_df[
                (results_df['receptor'] == receptor) & 
                (results_df['ligand'] == ligand) &
                (results_df['site_id'] == site_id)
            ]
            
            if len(pair_scores) == 0:
                continue
            
            # Check CNN score threshold
            max_cnn_score = pair_scores['cnn_score'].max()
            if max_cnn_score < stage_config['cnn_score_threshold']:
                continue
            
            # Check top percentage
            if stage_config['top_percentage'] is not None:
                # Get all scores for this receptor
                receptor_scores = results_df[results_df['receptor'] == receptor]['cnn_score']
                threshold_score = receptor_scores.quantile(1 - stage_config['top_percentage'])
                if max_cnn_score < threshold_score:
                    continue
            
            # Check max ligands per receptor
            if stage_config['max_ligands_per_receptor'] is not None:
                receptor_ligands = results_df[results_df['receptor'] == receptor]['ligand'].unique()
                if len(receptor_ligands) > stage_config['max_ligands_per_receptor']:
                    # Keep only top ligands for this receptor
                    top_ligands = results_df[results_df['receptor'] == receptor].groupby('ligand')['cnn_score'].max().nlargest(stage_config['max_ligands_per_receptor']).index
                    if ligand not in top_ligands:
                        continue
            
            filtered_pairs.append(row)
        
        filtered_df = pd.DataFrame(filtered_pairs)
        print(f"‚úÖ Stage {stage} filtering: {len(filtered_df)}/{len(pairlist_df)} ligands selected")
        
        return filtered_df
    
    def run_stage(self, stage, pairlist_df, previous_results=None):
        """Run a specific stage of the tiered workflow"""
        stage_config = self.get_stage_config(stage)
        
        print(f"\\nüöÄ Starting Stage {stage}: {stage_config['name']}")
        print(f"   Description: {stage_config['description']}")
        print(f"   CNN Scoring: {stage_config['cnn_scoring']}")
        print(f"   Exhaustiveness: {stage_config['exhaustiveness']}")
        print(f"   Num Modes: {stage_config['num_modes']}")
        print(f"   Use Case: {stage_config['use_case']}")
        
        # Filter ligands for this stage
        input_df = self.filter_ligands_for_stage(stage, previous_results, pairlist_df)
        
        if len(input_df) == 0:
            print(f"   ‚ö†Ô∏è No ligands to process for Stage {stage}")
            return []
        
        print(f"   Processing {len(input_df)} ligand-receptor pairs")
        
        # Update docking configuration for this stage
        self.config.set_tiered_stage(f'stage_{stage.lower()}')
        
        # Return the filtered input for processing by the main docking engine
        return input_df
    
    def get_workflow_summary(self):
        """Get summary of the tiered workflow"""
        print("üìä Tiered CNN Scoring Workflow Summary:")
        print("\\nStage A - Broad Screening:")
        print("   ‚Ä¢ CNN Scoring: rescore (fast)")
        print("   ‚Ä¢ Purpose: Large library screening, first-pass ranking")
        print("   ‚Ä¢ Threshold: CNN score ‚â• 0.5")
        print("   ‚Ä¢ Expected: 80-95% of ligands")
        
        print("\\nStage B - Focused Refinement:")
        print("   ‚Ä¢ CNN Scoring: refinement (10x slower than rescore)")
        print("   ‚Ä¢ Purpose: Pose refinement, focused re-docking")
        print("   ‚Ä¢ Threshold: CNN score ‚â• 0.7, top 5% per receptor")
        print("   ‚Ä¢ Expected: 5-20% of ligands")
        
        print("\\nStage C - High-Accuracy Validation:")
        print("   ‚Ä¢ CNN Scoring: all (extremely intensive)")
        print("   ‚Ä¢ Purpose: Final validation, small high-value sets")
        print("   ‚Ä¢ Threshold: CNN score ‚â• 0.8, top 1% per receptor")
        print("   ‚Ä¢ Expected: 1-5% of ligands")
        
        print("\\nüí° Recommended Usage:")
        print("   ‚Ä¢ For large libraries (>1000 ligands): Use stages A ‚Üí B")
        print("   ‚Ä¢ For medium libraries (100-1000): Use stages A ‚Üí B ‚Üí C")
        print("   ‚Ä¢ For small libraries (<100): Use stage B or C directly")

# Initialize tiered workflow
tiered_workflow = TieredCNNWorkflow(docking_config)

# Display workflow information
tiered_workflow.get_workflow_summary()

print("\\nüîß Usage Examples:")
print("   # Run Stage A (broad screening)")
print("   stage_a_input = tiered_workflow.run_stage('A', pairlist_df)")
print("   ")
print("   # Run Stage B (focused refinement)")
print("   stage_b_input = tiered_workflow.run_stage('B', pairlist_df, stage_a_results)")
print("   ")
print("   # Run Stage C (high-accuracy validation)")
print("   stage_c_input = tiered_workflow.run_stage('C', pairlist_df, stage_b_results)")


## 7. Core Docking Engine with Parallel Processing and Resume Capability

The main GNINA docking engine that integrates parallel processing and resume functionality. This is the central component that executes all docking operations.


In [None]:
# =============================================================================
# Core GNINA Docking Engine with Parallel Processing and Resume Capability
# Based on GNINA documentation: https://github.com/gnina/gnina
# =============================================================================

class GNINADockingEngine:
    """
    Core GNINA docking engine with integrated parallel processing and resume capability.
    This is the main component that executes all docking operations.
    """
    
    def __init__(self, docking_config, max_workers=4):
        self.config = docking_config
        self.max_workers = max_workers
        self.gnina_binary = './gnina'
        self.results = []
        self.failures = []
        self.progress_lock = threading.Lock()
        
        # Resume capability
        self.resume_file = "docking_state.json"
        self.state_file = f"results/{self.resume_file}"
        
        # Progress tracking
        self.completed_pairs = set()
        self.total_pairs = 0
        
    def build_gnina_command(self, row, flexible_residues=None):
        """Build GNINA command for a single docking run"""
        receptor = f"receptors_prep/{row['receptor']}.pdbqt"
        ligand = f"ligands_prep/{row['ligand']}.pdbqt"
        
        # Output files
        tag = f"{row['receptor']}_{row['site_id']}_{row['ligand']}"
        output_sdf = f"gnina_out/{tag}_poses.sdf"
        log_file = f"logs/{tag}.log"
        
        # Base command
        cmd = [
            self.gnina_binary,
            "--receptor", receptor,
            "--ligand", ligand,
            "--out", output_sdf,
            "--log", log_file,
        ]
        
        # Docking box parameters
        cmd.extend([
            "--center_x", str(row['center_x']),
            "--center_y", str(row['center_y']),
            "--center_z", str(row['center_z']),
            "--size_x", str(row['size_x']),
            "--size_y", str(row['size_y']),
            "--size_z", str(row['size_z']),
        ])
        
        # Add base configuration arguments (includes GPU/CPU selection)
        cmd.extend(self.config.get_gnina_command_base())
        
        # Flexible receptor parameters
        if flexible_residues:
            flexres_str = ",".join(flexible_residues)
            flex_output = f"gnina_out/{tag}_flex.pdbqt"
            cmd.extend([
                "--flexres", flexres_str,
                "--flexdist", "3.5",
                "--out_flex", flex_output
            ])
        
        return cmd, output_sdf, log_file
    
    def run_single_docking(self, row, flexible_residues=None):
        """Run docking for a single ligand-receptor pair"""
        try:
            # Check if already completed (resume capability)
            pair_id = f"{row['receptor']}_{row['site_id']}_{row['ligand']}"
            if pair_id in self.completed_pairs:
                print(f"   ‚è≠Ô∏è Skipping already completed: {pair_id}")
                return None
            
            # Build command
            cmd, output_sdf, log_file = self.build_gnina_command(row, flexible_residues)
            
            # Run GNINA
            result = subprocess.run(
                cmd, 
                capture_output=True, 
                text=True, 
                timeout=self.config.active_config['timeout']
            )
            
            # Parse results
            if result.returncode == 0 and os.path.exists(output_sdf):
                scores = self.parse_gnina_output(output_sdf, log_file)
                return {
                    'status': 'success',
                    'receptor': row['receptor'],
                    'ligand': row['ligand'],
                    'site_id': row['site_id'],
                    'output_file': output_sdf,
                    'log_file': log_file,
                    'scores': scores,
                    'command': ' '.join(cmd),
                    'pair_id': pair_id
                }
            else:
                return {
                    'status': 'error',
                    'receptor': row['receptor'],
                    'ligand': row['ligand'],
                    'site_id': row['site_id'],
                    'error': result.stderr,
                    'returncode': result.returncode,
                    'pair_id': pair_id
                }
                
        except subprocess.TimeoutExpired:
            return {
                'status': 'timeout',
                'receptor': row['receptor'],
                'ligand': row['ligand'],
                'site_id': row['site_id'],
                'error': f"Timeout after {self.config.active_config['timeout']}s",
                'pair_id': pair_id
            }
        except Exception as e:
            return {
                'status': 'error',
                'receptor': row['receptor'],
                'ligand': row['ligand'],
                'site_id': row['site_id'],
                'error': str(e),
                'pair_id': pair_id
            }
    
    def parse_gnina_output(self, output_sdf, log_file):
        """Parse GNINA output files to extract scores"""
        scores = []
        
        try:
            # Parse SDF file for pose information
            if os.path.exists(output_sdf):
                with open(output_sdf, 'r') as f:
                    content = f.read()
                    
                # Extract scores from SDF data
                lines = content.split('\\n')
                for i, line in enumerate(lines):
                    if 'CNNscore' in line:
                        try:
                            cnn_score = float(line.split()[-1])
                            scores.append({
                                'pose_id': len(scores) + 1,
                                'cnn_score': cnn_score,
                                'cnn_affinity': cnn_score  # Simplified
                            })
                        except:
                            continue
            
            # Parse log file for additional information
            if os.path.exists(log_file):
                with open(log_file, 'r') as f:
                    log_content = f.read()
                    
                # Extract timing and other metrics
                if 'Total time' in log_content:
                    # Add timing information if available
                    pass
                    
        except Exception as e:
            print(f"‚ö†Ô∏è Error parsing output for {output_sdf}: {e}")
        
        return scores
    
    def run_parallel_docking(self, pairlist_df, flexible_residues_dict=None, batch_size=5):
        """Run docking with parallel processing"""
        self.total_pairs = len(pairlist_df)
        print(f"üöÄ Starting parallel docking: {self.total_pairs} pairs")
        print(f"   Max workers: {self.max_workers}")
        print(f"   Batch size: {batch_size}")
        
        # Load completed pairs for resume capability
        self.load_completed_pairs()
        
        # Split into batches
        batches = [pairlist_df.iloc[i:i+batch_size] for i in range(0, self.total_pairs, batch_size)]
        
        all_results = []
        successful = 0
        failed = 0
        
        # Process batches in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all batches
            future_to_batch = {
                executor.submit(self._process_batch, batch, batch_idx, flexible_residues_dict): batch_idx 
                for batch_idx, batch in enumerate(batches)
            }
            
            # Collect results as they complete
            for future in tqdm(concurrent.futures.as_completed(future_to_batch), 
                             total=len(batches), desc="Processing batches"):
                batch_idx = future_to_batch[future]
                try:
                    batch_results = future.result()
                    all_results.extend(batch_results)
                    
                    # Count successes/failures
                    for result in batch_results:
                        if result and result['status'] == 'success':
                            successful += 1
                            self.completed_pairs.add(result['pair_id'])
                        elif result:
                            failed += 1
                    
                    # Save progress
                    self.save_docking_state(all_results)
                            
                except Exception as e:
                    print(f"‚ùå Batch {batch_idx} failed: {e}")
                    failed += len(batches[batch_idx])
        
        # Final summary
        print(f"\\nüìä Parallel Docking Summary:")
        print(f"   Total pairs: {self.total_pairs}")
        print(f"   Successful: {successful}")
        print(f"   Failed: {failed}")
        print(f"   Success rate: {successful/self.total_pairs*100:.1f}%")
        
        self.results = all_results
        return all_results
    
    def _process_batch(self, batch_df, batch_idx, flexible_residues_dict):
        """Process a single batch of docking runs"""
        batch_results = []
        
        for idx, row in batch_df.iterrows():
            # Get flexible residues for this receptor
            flexible_residues = None
            if flexible_residues_dict and row['receptor'] in flexible_residues_dict:
                flexible_residues = flexible_residues_dict[row['receptor']]
            
            result = self.run_single_docking(row, flexible_residues)
            if result:
                result['batch_idx'] = batch_idx
                batch_results.append(result)
                
                # Update progress
                with self.progress_lock:
                    status = "‚úÖ" if result['status'] == 'success' else "‚ùå"
                    flex_info = " (Flex)" if flexible_residues else ""
                    print(f"   {status} Batch {batch_idx}: {row['receptor']}-{row['ligand']}{flex_info}")
        
        return batch_results
    
    def save_docking_state(self, results):
        """Save docking state for resume capability"""
        state = {
            'timestamp': datetime.now().isoformat(),
            'total_pairs': self.total_pairs,
            'completed_pairs': list(self.completed_pairs),
            'results_count': len(results),
            'config': self.config.active_config
        }
        
        with open(self.state_file, 'w') as f:
            json.dump(state, f, indent=2)
    
    def load_docking_state(self):
        """Load docking state for resume capability"""
        if os.path.exists(self.state_file):
            with open(self.state_file, 'r') as f:
                state = json.load(f)
            
            self.completed_pairs = set(state.get('completed_pairs', []))
            self.total_pairs = state.get('total_pairs', 0)
            
            print(f"‚úÖ Loaded docking state: {len(self.completed_pairs)} completed pairs")
            return True
        return False
    
    def load_completed_pairs(self):
        """Load list of completed pairs from output files"""
        if os.path.exists('gnina_out'):
            for file in os.listdir('gnina_out'):
                if file.endswith('_poses.sdf'):
                    # Extract pair ID from filename
                    pair_id = file.replace('_poses.sdf', '')
                    self.completed_pairs.add(pair_id)
        
        print(f"‚úÖ Found {len(self.completed_pairs)} previously completed pairs")

# Initialize docking engine
docking_engine = GNINADockingEngine(docking_config, max_workers=4)

print("‚úÖ Core GNINA Docking Engine initialized")
print(f"   Max workers: {docking_engine.max_workers}")
print(f"   Resume capability: Enabled")
print(f"   State file: {docking_engine.state_file}")

print("\\nüîß Usage Examples:")
print("   # Run parallel docking")
print("   results = docking_engine.run_parallel_docking(pairlist_df)")
print("   ")
print("   # Run with flexible residues")
print("   flex_dict = {'receptor1': ['A:123', 'A:124']}")
print("   results = docking_engine.run_parallel_docking(pairlist_df, flex_dict)")
print("   ")
print("   # Load previous state")
print("   docking_engine.load_docking_state()")


## 8. Flexible Receptor Docking Option

Configure flexible receptor docking by defining which residues should be flexible during docking. This is an optional enhancement to the core docking engine.


In [None]:
# =============================================================================
# Flexible Receptor Docking Configuration
# Based on GNINA documentation: https://github.com/gnina/gnina
# =============================================================================

class FlexibleReceptorManager:
    """Manage flexible receptor configurations for GNINA docking"""
    
    def __init__(self):
        self.flexible_residues = {}
        self.flexible_config = {
            'auto_detect': True,
            'distance_threshold': 5.0,
            'max_flexible_residues': 20,
            'flexdist': 3.5
        }
    
    def auto_detect_flexible_residues(self, receptor, binding_center, distance_threshold=5.0):
        """Auto-detect flexible residues near binding site"""
        try:
            from Bio.PDB import PDBParser, NeighborSearch
            from Bio.PDB.PDBExceptions import PDBConstructionWarning
            import warnings
            warnings.simplefilter('ignore', PDBConstructionWarning)
            
            receptor_pdb = f"receptors_prep/{receptor}.pdbqt"
            if not os.path.exists(receptor_pdb):
                print(f"‚ö†Ô∏è Receptor file not found: {receptor_pdb}")
                return []
            
            parser = PDBParser(QUIET=True)
            structure = parser.get_structure('receptor', receptor_pdb)
            
            # Get all atoms
            atoms = []
            for model in structure:
                for chain in model:
                    for residue in chain:
                        for atom in residue:
                            atoms.append(atom)
            
            # Create neighbor search
            ns = NeighborSearch(atoms)
            
            # Find residues within distance of binding center
            center_atom = None
            min_distance = float('inf')
            
            for atom in atoms:
                dist = atom.coord - np.array(binding_center)
                dist = np.linalg.norm(dist)
                if dist < min_distance:
                    min_distance = dist
                    center_atom = atom
            
            if center_atom is None:
                return []
            
            # Find neighbors within threshold
            neighbors = ns.search(center_atom.coord, distance_threshold, level='R')
            
            flexible_residues = []
            for residue in neighbors:
                chain_id = residue.parent.id
                res_num = residue.id[1]
                flexible_residues.append(f"{chain_id}:{res_num}")
            
            # Apply limits
            if len(flexible_residues) > self.flexible_config['max_flexible_residues']:
                flexible_residues = flexible_residues[:self.flexible_config['max_flexible_residues']]
                print(f"‚ö†Ô∏è Limited flexible residues to {self.flexible_config['max_flexible_residues']}")
            
            return sorted(flexible_residues)
            
        except ImportError:
            print("‚ö†Ô∏è BioPython not available for auto-detection")
            return []
        except Exception as e:
            print(f"‚ö†Ô∏è Error in auto-detection: {e}")
            return []
    
    def set_flexible_residues(self, receptor, flexible_residues=None, auto_detect=True, binding_center=None):
        """Set flexible residues for a receptor"""
        if flexible_residues is not None:
            # Manual specification
            self.flexible_residues[receptor] = flexible_residues
            print(f"‚úÖ Set manual flexible residues for {receptor}: {flexible_residues}")
            
        elif auto_detect and binding_center is not None:
            # Auto-detection
            detected = self.auto_detect_flexible_residues(receptor, binding_center)
            self.flexible_residues[receptor] = detected
            print(f"‚úÖ Auto-detected {len(detected)} flexible residues for {receptor}: {detected}")
        else:
            print(f"‚ö†Ô∏è No flexible residues set for {receptor}")
            self.flexible_residues[receptor] = []
    
    def set_bulk_flexibility(self, pairlist_df, auto_detect=True):
        """Set flexible residues for all receptors in pairlist"""
        print("üîÑ Configuring flexible residues for all receptors...")
        
        for _, row in pairlist_df.iterrows():
            receptor = row['receptor']
            if receptor not in self.flexible_residues:
                binding_center = [row['center_x'], row['center_y'], row['center_z']]
                self.set_flexible_residues(receptor, auto_detect=auto_detect, binding_center=binding_center)
    
    def get_flexible_residues_dict(self):
        """Get dictionary of flexible residues for all receptors"""
        return self.flexible_residues.copy()
    
    def display_flexibility_summary(self):
        """Display summary of flexible receptor configuration"""
        print("üîÑ Flexible Receptor Configuration Summary:")
        for receptor, residues in self.flexible_residues.items():
            print(f"   {receptor}: {len(residues)} flexible residues")
            if residues:
                print(f"      {', '.join(residues[:5])}{'...' if len(residues) > 5 else ''}")

# Initialize flexible receptor manager
flexible_manager = FlexibleReceptorManager()

print("‚úÖ Flexible Receptor Manager initialized")

print("\\nüîß Usage Examples:")
print("   # Auto-detect flexible residues for all receptors")
print("   flexible_manager.set_bulk_flexibility(pairlist_df, auto_detect=True)")
print("   ")
print("   # Manual specification")
print("   flexible_manager.set_flexible_residues('receptor1', ['A:123', 'A:124', 'A:125'])")
print("   ")
print("   # Get flexible residues dictionary")
print("   flex_dict = flexible_manager.get_flexible_residues_dict()")
print("   ")
print("   # Run docking with flexible receptors")
print("   results = docking_engine.run_parallel_docking(pairlist_df, flex_dict)")


## 9. Post-Docking Analysis: Visualization and Quality Control

Comprehensive analysis tools for docking results including visualization dashboards and quality control validation.


In [None]:
# =============================================================================
# Post-Docking Analysis: Visualization and Quality Control
# =============================================================================

class DockingAnalysisDashboard:
    """Comprehensive analysis dashboard for docking results"""
    
    def __init__(self):
        self.setup_plotting_style()
    
    def setup_plotting_style(self):
        """Setup plotting style for visualizations"""
        plt.style.use('default')
        sns.set_palette("husl")
        
    def create_score_distribution_plot(self, results):
        """Create score distribution visualization"""
        successful_results = [r for r in results if r['status'] == 'success']
        
        if not successful_results:
            print("‚ö†Ô∏è No successful results to visualize")
            return
        
        # Extract all CNN scores
        all_scores = []
        for result in successful_results:
            if 'scores' in result:
                for score_data in result['scores']:
                    if 'cnn_score' in score_data:
                        all_scores.append(score_data['cnn_score'])
        
        if not all_scores:
            print("‚ö†Ô∏è No CNN scores found in results")
            return
        
        # Create visualization
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('Docking Results Analysis', fontsize=16)
        
        # Score distribution histogram
        axes[0, 0].hist(all_scores, bins=30, alpha=0.7, color='skyblue', edgecolor='black')
        axes[0, 0].set_title('CNN Score Distribution')
        axes[0, 0].set_xlabel('CNN Score')
        axes[0, 0].set_ylabel('Frequency')
        axes[0, 0].axvline(np.mean(all_scores), color='red', linestyle='--', label=f'Mean: {np.mean(all_scores):.3f}')
        axes[0, 0].legend()
        
        # Box plot by receptor
        receptor_scores = {}
        for result in successful_results:
            receptor = result['receptor']
            if receptor not in receptor_scores:
                receptor_scores[receptor] = []
            if 'scores' in result:
                for score_data in result['scores']:
                    if 'cnn_score' in score_data:
                        receptor_scores[receptor].append(score_data['cnn_score'])
        
        if receptor_scores:
            receptor_data = [scores for scores in receptor_scores.values() if scores]
            receptor_labels = [receptor for receptor, scores in receptor_scores.items() if scores]
            
            axes[0, 1].boxplot(receptor_data, labels=receptor_labels)
            axes[0, 1].set_title('CNN Scores by Receptor')
            axes[0, 1].set_ylabel('CNN Score')
            axes[0, 1].tick_params(axis='x', rotation=45)
        
        # Success rate by receptor
        receptor_stats = {}
        for result in results:
            receptor = result['receptor']
            if receptor not in receptor_stats:
                receptor_stats[receptor] = {'total': 0, 'success': 0}
            receptor_stats[receptor]['total'] += 1
            if result['status'] == 'success':
                receptor_stats[receptor]['success'] += 1
        
        receptors = list(receptor_stats.keys())
        success_rates = [receptor_stats[r]['success']/receptor_stats[r]['total']*100 for r in receptors]
        
        axes[1, 0].bar(receptors, success_rates, color='lightgreen', alpha=0.7)
        axes[1, 0].set_title('Success Rate by Receptor')
        axes[1, 0].set_ylabel('Success Rate (%)')
        axes[1, 0].tick_params(axis='x', rotation=45)
        
        # Top scoring ligands
        top_ligands = []
        for result in successful_results:
            if 'scores' in result:
                max_score = max([s.get('cnn_score', 0) for s in result['scores']], default=0)
                top_ligands.append({
                    'ligand': result['ligand'],
                    'receptor': result['receptor'],
                    'max_score': max_score
                })
        
        if top_ligands:
            top_ligands = sorted(top_ligands, key=lambda x: x['max_score'], reverse=True)[:10]
            ligand_names = [f"{l['ligand']}\\n({l['receptor']})" for l in top_ligands]
            scores = [l['max_score'] for l in top_ligands]
            
            axes[1, 1].barh(range(len(ligand_names)), scores, color='orange', alpha=0.7)
            axes[1, 1].set_yticks(range(len(ligand_names)))
            axes[1, 1].set_yticklabels(ligand_names, fontsize=8)
            axes[1, 1].set_title('Top 10 Scoring Ligands')
            axes[1, 1].set_xlabel('Max CNN Score')
        
        plt.tight_layout()
        plt.savefig('visualizations/docking_analysis.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        # Print summary statistics
        print("\\nüìä Docking Results Summary:")
        print(f"   Total results: {len(results)}")
        print(f"   Successful: {len(successful_results)}")
        print(f"   Failed: {len(results) - len(successful_results)}")
        print(f"   Success rate: {len(successful_results)/len(results)*100:.1f}%")
        print(f"   CNN scores - Mean: {np.mean(all_scores):.3f}, Std: {np.std(all_scores):.3f}")
        print(f"   CNN scores - Min: {np.min(all_scores):.3f}, Max: {np.max(all_scores):.3f}")
    
    def create_interactive_dashboard(self, results):
        """Create interactive Plotly dashboard"""
        successful_results = [r for r in results if r['status'] == 'success']
        
        if not successful_results:
            print("‚ö†Ô∏è No successful results for interactive dashboard")
            return
        
        # Prepare data
        dashboard_data = []
        for result in successful_results:
            if 'scores' in result:
                for i, score_data in enumerate(result['scores']):
                    if 'cnn_score' in score_data:
                        dashboard_data.append({
                            'Receptor': result['receptor'],
                            'Ligand': result['ligand'],
                            'Site_ID': result['site_id'],
                            'Pose_ID': i + 1,
                            'CNN_Score': score_data['cnn_score'],
                            'CNN_Affinity': score_data.get('cnn_affinity', score_data['cnn_score'])
                        })
        
        if not dashboard_data:
            print("‚ö†Ô∏è No score data for interactive dashboard")
            return
        
        df = pd.DataFrame(dashboard_data)
        
        # Create subplots
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=('CNN Score Distribution', 'Scores by Receptor', 
                          'Top Ligands', 'Score vs Affinity'),
            specs=[[{"type": "histogram"}, {"type": "box"}],
                   [{"type": "bar"}, {"type": "scatter"}]]
        )
        
        # Score distribution
        fig.add_trace(
            go.Histogram(x=df['CNN_Score'], name='CNN Score Distribution', nbinsx=30),
            row=1, col=1
        )
        
        # Box plot by receptor
        for receptor in df['Receptor'].unique():
            receptor_data = df[df['Receptor'] == receptor]['CNN_Score']
            fig.add_trace(
                go.Box(y=receptor_data, name=receptor),
                row=1, col=2
            )
        
        # Top ligands
        top_ligands = df.groupby('Ligand')['CNN_Score'].max().nlargest(10)
        fig.add_trace(
            go.Bar(x=top_ligands.index, y=top_ligands.values, name='Top Ligands'),
            row=2, col=1
        )
        
        # Score vs Affinity
        fig.add_trace(
            go.Scatter(x=df['CNN_Score'], y=df['CNN_Affinity'], 
                      mode='markers', name='Score vs Affinity',
                      text=df['Ligand'], hovertemplate='Ligand: %{text}<br>Score: %{x}<br>Affinity: %{y}'),
            row=2, col=2
        )
        
        fig.update_layout(height=800, showlegend=False, title_text="Interactive Docking Analysis Dashboard")
        fig.show()
        
        # Save as HTML
        fig.write_html('visualizations/interactive_dashboard.html')
        print("‚úÖ Interactive dashboard saved to visualizations/interactive_dashboard.html")

class QualityControlValidator:
    """Quality control validation for docking results"""
    
    def __init__(self):
        self.validation_results = {}
    
    def validate_docking_results(self, results):
        """Comprehensive validation of docking results"""
        print("üîç Validating docking results...")
        
        validation_results = {
            'total_results': len(results),
            'successful_dockings': 0,
            'failed_dockings': 0,
            'quality_issues': [],
            'score_distribution': {},
            'receptor_analysis': {},
            'recommendations': []
        }
        
        successful_results = [r for r in results if r['status'] == 'success']
        validation_results['successful_dockings'] = len(successful_results)
        validation_results['failed_dockings'] = len(results) - len(successful_results)
        
        if successful_results:
            # Analyze score distribution
            all_scores = []
            for result in successful_results:
                if 'scores' in result:
                    for score_data in result['scores']:
                        if 'cnn_score' in score_data:
                            all_scores.append(score_data['cnn_score'])
            
            if all_scores:
                validation_results['score_distribution'] = {
                    'mean': np.mean(all_scores),
                    'std': np.std(all_scores),
                    'min': np.min(all_scores),
                    'max': np.max(all_scores),
                    'median': np.median(all_scores),
                    'q25': np.percentile(all_scores, 25),
                    'q75': np.percentile(all_scores, 75)
                }
                
                # Quality checks
                if np.mean(all_scores) < 0.3:
                    validation_results['quality_issues'].append("Low average CNN scores (< 0.3)")
                    validation_results['recommendations'].append("Consider increasing exhaustiveness or using refinement mode")
                
                if np.std(all_scores) < 0.1:
                    validation_results['quality_issues'].append("Low score variance - possible convergence issues")
                    validation_results['recommendations'].append("Check binding site coordinates and box size")
                
                if np.max(all_scores) < 0.5:
                    validation_results['quality_issues'].append("No high-scoring poses found")
                    validation_results['recommendations'].append("Review ligand preparation and binding site definition")
        
        # Receptor-specific analysis
        receptor_stats = {}
        for result in results:
            receptor = result['receptor']
            if receptor not in receptor_stats:
                receptor_stats[receptor] = {'total': 0, 'success': 0, 'scores': []}
            receptor_stats[receptor]['total'] += 1
            if result['status'] == 'success':
                receptor_stats[receptor]['success'] += 1
                if 'scores' in result:
                    for score_data in result['scores']:
                        if 'cnn_score' in score_data:
                            receptor_stats[receptor]['scores'].append(score_data['cnn_score'])
        
        for receptor, stats in receptor_stats.items():
            success_rate = stats['success'] / stats['total'] * 100
            validation_results['receptor_analysis'][receptor] = {
                'total_pairs': stats['total'],
                'successful_pairs': stats['success'],
                'success_rate': success_rate,
                'avg_score': np.mean(stats['scores']) if stats['scores'] else 0,
                'max_score': np.max(stats['scores']) if stats['scores'] else 0
            }
            
            if success_rate < 50:
                validation_results['quality_issues'].append(f"Low success rate for {receptor} ({success_rate:.1f}%)")
                validation_results['recommendations'].append(f"Check receptor preparation for {receptor}")
        
        # Overall success rate check
        overall_success_rate = validation_results['successful_dockings'] / validation_results['total_results'] * 100
        if overall_success_rate < 70:
            validation_results['quality_issues'].append(f"Low overall success rate ({overall_success_rate:.1f}%)")
            validation_results['recommendations'].append("Review input structures and docking parameters")
        
        self.validation_results = validation_results
        return validation_results
    
    def generate_quality_report(self):
        """Generate comprehensive quality control report"""
        if not self.validation_results:
            print("‚ö†Ô∏è No validation results available. Run validate_docking_results() first.")
            return
        
        print("\\nüìä Quality Control Report:")
        print(f"   Total Results: {self.validation_results['total_results']}")
        print(f"   Successful: {self.validation_results['successful_dockings']}")
        print(f"   Failed: {self.validation_results['failed_dockings']}")
        print(f"   Success Rate: {self.validation_results['successful_dockings']/self.validation_results['total_results']*100:.1f}%")
        
        if self.validation_results['score_distribution']:
            dist = self.validation_results['score_distribution']
            print(f"\\n   Score Distribution:")
            print(f"     Mean: {dist['mean']:.3f}")
            print(f"     Std: {dist['std']:.3f}")
            print(f"     Range: {dist['min']:.3f} - {dist['max']:.3f}")
            print(f"     Median: {dist['median']:.3f}")
        
        if self.validation_results['quality_issues']:
            print(f"\\n   ‚ö†Ô∏è Quality Issues ({len(self.validation_results['quality_issues'])}):")
            for issue in self.validation_results['quality_issues']:
                print(f"     - {issue}")
        
        if self.validation_results['recommendations']:
            print(f"\\n   üí° Recommendations ({len(self.validation_results['recommendations'])}):")
            for rec in self.validation_results['recommendations']:
                print(f"     - {rec}")
        
        # Save report
        with open('enhanced_analysis/quality_control_report.json', 'w') as f:
            json.dump(self.validation_results, f, indent=2)
        
        print("\\n‚úÖ Quality control report saved to enhanced_analysis/quality_control_report.json")

# Initialize analysis tools
analysis_dashboard = DockingAnalysisDashboard()
qc_validator = QualityControlValidator()

print("‚úÖ Post-docking analysis tools initialized")

print("\\nüîß Usage Examples:")
print("   # Create visualizations")
print("   analysis_dashboard.create_score_distribution_plot(results)")
print("   analysis_dashboard.create_interactive_dashboard(results)")
print("   ")
print("   # Quality control validation")
print("   qc_validator.validate_docking_results(results)")
print("   qc_validator.generate_quality_report()")


## 10. Covalent Docking Configuration

Specialized configuration for covalent docking using GNINA. This follows the same procedure as standard docking but with covalent bond formation parameters.


In [None]:
# =============================================================================
# Covalent Docking Configuration
# Based on GNINA documentation: https://github.com/gnina/gnina
# =============================================================================

class CovalentDockingConfig:
    """Configuration for covalent docking with GNINA"""
    
    def __init__(self, base_docking_config):
        self.base_config = base_docking_config
        self.covalent_config = {
            'covalent_docking': True,
            'covalent_residue': None,  # e.g., 'A:123'
            'covalent_atom': None,     # e.g., 'SG' for cysteine
            'covalent_bond_type': 'single',  # single, double, triple
            'covalent_bond_length': 1.8,     # Angstroms
            'covalent_bond_angle': 109.5,    # Degrees
            'covalent_torsion': 0.0,         # Degrees
            'covalent_energy_penalty': 0.0,  # Energy penalty for covalent bond
        }
        
        # Covalent-specific parameters
        self.covalent_parameters = {
            'exhaustiveness': 64,      # Higher for covalent docking
            'num_modes': 30,          # More modes for covalent poses
            'cnn_scoring': 'refinement',  # Use refinement for better accuracy
            'min_rmsd_filter': 0.5,   # Stricter RMSD filter
            'pose_sort_order': 0,     # Sort by CNN score
        }
    
    def set_covalent_residue(self, residue_id, atom_name='SG'):
        """Set the covalent residue and atom"""
        self.covalent_config['covalent_residue'] = residue_id
        self.covalent_config['covalent_atom'] = atom_name
        print(f"‚úÖ Covalent residue set: {residue_id}:{atom_name}")
    
    def set_covalent_bond_parameters(self, bond_length=1.8, bond_angle=109.5, torsion=0.0):
        """Set covalent bond geometry parameters"""
        self.covalent_config['covalent_bond_length'] = bond_length
        self.covalent_config['covalent_bond_angle'] = bond_angle
        self.covalent_config['covalent_torsion'] = torsion
        print(f"‚úÖ Covalent bond parameters set:")
        print(f"   Bond length: {bond_length} √Ö")
        print(f"   Bond angle: {bond_angle}¬∞")
        print(f"   Torsion: {torsion}¬∞")
    
    def get_covalent_gnina_command(self, row):
        """Get GNINA command with covalent docking parameters"""
        receptor = f"receptors_prep/{row['receptor']}.pdbqt"
        ligand = f"ligands_prep/{row['ligand']}.pdbqt"
        
        # Output files
        tag = f"{row['receptor']}_{row['site_id']}_{row['ligand']}_covalent"
        output_sdf = f"gnina_out/{tag}_poses.sdf"
        log_file = f"logs/{tag}.log"
        
        # Base command
        cmd = [
            './gnina',
            "--receptor", receptor,
            "--ligand", ligand,
            "--out", output_sdf,
            "--log", log_file,
        ]
        
        # Docking box parameters
        cmd.extend([
            "--center_x", str(row['center_x']),
            "--center_y", str(row['center_y']),
            "--center_z", str(row['center_z']),
            "--size_x", str(row['size_x']),
            "--size_y", str(row['size_y']),
            "--size_z", str(row['size_z']),
        ])
        
        # Covalent-specific parameters
        cmd.extend([
            "--exhaustiveness", str(self.covalent_parameters['exhaustiveness']),
            "--num_modes", str(self.covalent_parameters['num_modes']),
            "--seed", str(self.base_config.active_config['seed']),
            "--cnn_scoring", self.covalent_parameters['cnn_scoring'],
            "--min_rmsd_filter", str(self.covalent_parameters['min_rmsd_filter']),
            "--pose_sort_order", str(self.covalent_parameters['pose_sort_order']),
        ])
        
        # GPU acceleration (prioritized over CPU)
        if self.base_config.active_config.get('use_gpu', False) and CONFIG.get('use_gpu', False):
            cmd.extend(['--gpu', '--device', str(self.base_config.active_config.get('gpu_device', 0))])
        else:
            # Fallback to CPU
            cmd.extend(['--cpu', str(self.base_config.active_config['cpu_cores'])])
        
        # Covalent docking parameters
        if self.covalent_config['covalent_residue']:
            cmd.extend([
                "--covalent_residue", self.covalent_config['covalent_residue'],
                "--covalent_atom", self.covalent_config['covalent_atom'],
                "--covalent_bond_length", str(self.covalent_config['covalent_bond_length']),
                "--covalent_bond_angle", str(self.covalent_config['covalent_bond_angle']),
                "--covalent_torsion", str(self.covalent_config['covalent_torsion']),
            ])
        
        # GPU support already handled above
        
        return cmd, output_sdf, log_file
    
    def display_covalent_configuration(self):
        """Display current covalent docking configuration"""
        print("üîó Covalent Docking Configuration:")
        print(f"   Covalent residue: {self.covalent_config['covalent_residue']}")
        print(f"   Covalent atom: {self.covalent_config['covalent_atom']}")
        print(f"   Bond length: {self.covalent_config['covalent_bond_length']} √Ö")
        print(f"   Bond angle: {self.covalent_config['covalent_bond_angle']}¬∞")
        print(f"   Torsion: {self.covalent_config['covalent_torsion']}¬∞")
        print(f"   Exhaustiveness: {self.covalent_parameters['exhaustiveness']}")
        print(f"   Num modes: {self.covalent_parameters['num_modes']}")
        print(f"   CNN scoring: {self.covalent_parameters['cnn_scoring']}")

class CovalentDockingEngine(GNINADockingEngine):
    """Covalent docking engine extending the base docking engine"""
    
    def __init__(self, covalent_config, max_workers=4):
        super().__init__(covalent_config.base_config, max_workers)
        self.covalent_config = covalent_config
    
    def run_covalent_docking(self, pairlist_df, covalent_residue, covalent_atom='SG'):
        """Run covalent docking for all pairs"""
        print(f"üîó Starting covalent docking with residue {covalent_residue}:{covalent_atom}")
        
        # Set covalent parameters
        self.covalent_config.set_covalent_residue(covalent_residue, covalent_atom)
        
        # Display configuration
        self.covalent_config.display_covalent_configuration()
        
        # Run docking with covalent parameters
        results = []
        for idx, row in pairlist_df.iterrows():
            print(f"\\nüîó Processing covalent docking: {row['receptor']}-{row['ligand']}")
            
            # Build covalent command
            cmd, output_sdf, log_file = self.covalent_config.get_covalent_gnina_command(row)
            
            try:
                # Run GNINA with covalent parameters
                result = subprocess.run(
                    cmd,
                    capture_output=True,
                    text=True,
                    timeout=self.config.active_config['timeout']
                )
                
                # Parse results
                if result.returncode == 0 and os.path.exists(output_sdf):
                    scores = self.parse_gnina_output(output_sdf, log_file)
                    results.append({
                        'status': 'success',
                        'receptor': row['receptor'],
                        'ligand': row['ligand'],
                        'site_id': row['site_id'],
                        'output_file': output_sdf,
                        'log_file': log_file,
                        'scores': scores,
                        'covalent_residue': covalent_residue,
                        'covalent_atom': covalent_atom,
                        'docking_type': 'covalent'
                    })
                    print(f"   ‚úÖ Success: {len(scores)} poses generated")
                else:
                    results.append({
                        'status': 'error',
                        'receptor': row['receptor'],
                        'ligand': row['ligand'],
                        'site_id': row['site_id'],
                        'error': result.stderr,
                        'returncode': result.returncode,
                        'docking_type': 'covalent'
                    })
                    print(f"   ‚ùå Error: {result.stderr}")
                    
            except subprocess.TimeoutExpired:
                results.append({
                    'status': 'timeout',
                    'receptor': row['receptor'],
                    'ligand': row['ligand'],
                    'site_id': row['site_id'],
                    'error': f"Timeout after {self.config.active_config['timeout']}s",
                    'docking_type': 'covalent'
                })
                print(f"   ‚è∞ Timeout")
            except Exception as e:
                results.append({
                    'status': 'error',
                    'receptor': row['receptor'],
                    'ligand': row['ligand'],
                    'site_id': row['site_id'],
                    'error': str(e),
                    'docking_type': 'covalent'
                })
                print(f"   ‚ùå Exception: {e}")
        
        # Summary
        successful = len([r for r in results if r['status'] == 'success'])
        print(f"\\nüìä Covalent Docking Summary:")
        print(f"   Total pairs: {len(pairlist_df)}")
        print(f"   Successful: {successful}")
        print(f"   Failed: {len(pairlist_df) - successful}")
        print(f"   Success rate: {successful/len(pairlist_df)*100:.1f}%")
        
        return results

# Initialize covalent docking
covalent_config = CovalentDockingConfig(docking_config)
covalent_engine = CovalentDockingEngine(covalent_config, max_workers=4)

print("‚úÖ Covalent Docking Engine initialized")

print("\\nüîß Usage Examples:")
print("   # Set covalent residue (e.g., cysteine at position 123 in chain A)")
print("   covalent_config.set_covalent_residue('A:123', 'SG')")
print("   ")
print("   # Set covalent bond parameters")
print("   covalent_config.set_covalent_bond_parameters(bond_length=1.8, bond_angle=109.5)")
print("   ")
print("   # Run covalent docking")
print("   covalent_results = covalent_engine.run_covalent_docking(pairlist_df, 'A:123', 'SG')")
print("   ")
print("   # Display configuration")
print("   covalent_config.display_covalent_configuration()")

print("\\nüí° Covalent Docking Notes:")
print("   ‚Ä¢ Requires reactive groups in ligands (e.g., Michael acceptors, electrophiles)")
print("   ‚Ä¢ Target residue must have reactive atom (e.g., Cys-SG, Lys-NZ, Ser-OG)")
print("   ‚Ä¢ Higher exhaustiveness recommended for better sampling")
print("   ‚Ä¢ Use refinement CNN scoring for better accuracy")
print("   ‚Ä¢ Consider flexible receptor for better pose quality")


## 11. Main Execution and Workflow Orchestration

Execute the complete GNINA docking pipeline with all available options. This cell provides easy-to-use functions for running different workflow types.


In [None]:
# =============================================================================
# Main Execution and Workflow Orchestration
# =============================================================================

def run_standard_docking(pairlist_df, use_flexible=False, cnn_mode='rescore'):
    """
    Run standard GNINA docking workflow
    
    Args:
        pairlist_df: DataFrame with receptor-ligand pairs and coordinates
        use_flexible: Whether to use flexible receptor docking
        cnn_mode: CNN scoring mode ('none', 'rescore', 'refinement', 'all')
    """
    print(f"üöÄ Starting Standard Docking Workflow")
    print(f"   Flexible receptors: {'Enabled' if use_flexible else 'Disabled'}")
    print(f"   CNN mode: {cnn_mode}")
    
    # Configure CNN mode
    docking_config.set_cnn_mode(cnn_mode)
    
    # Configure flexible receptors if requested
    flexible_residues_dict = None
    if use_flexible:
        flexible_manager.set_bulk_flexibility(pairlist_df, auto_detect=True)
        flexible_residues_dict = flexible_manager.get_flexible_residues_dict()
        flexible_manager.display_flexibility_summary()
    
    # Run docking
    results = docking_engine.run_parallel_docking(pairlist_df, flexible_residues_dict)
    
    # Generate analysis
    print("\\nüìä Generating analysis...")
    analysis_dashboard.create_score_distribution_plot(results)
    qc_validator.validate_docking_results(results)
    qc_validator.generate_quality_report()
    
    return results

def run_tiered_workflow(pairlist_df, stages=['A', 'B'], use_flexible=False):
    """
    Run tiered CNN scoring workflow
    
    Args:
        pairlist_df: DataFrame with receptor-ligand pairs and coordinates
        stages: List of stages to run ['A', 'B', 'C']
        use_flexible: Whether to use flexible receptor docking
    """
    print(f"üéØ Starting Tiered CNN Workflow")
    print(f"   Stages: {stages}")
    print(f"   Flexible receptors: {'Enabled' if use_flexible else 'Disabled'}")
    
    # Configure flexible receptors if requested
    flexible_residues_dict = None
    if use_flexible:
        flexible_manager.set_bulk_flexibility(pairlist_df, auto_detect=True)
        flexible_residues_dict = flexible_manager.get_flexible_residues_dict()
    
    all_results = []
    previous_results = None
    
    for stage in stages:
        print(f"\\n{'='*60}")
        print(f"STAGE {stage}: {tiered_workflow.get_stage_config(stage)['name']}")
        print(f"{'='*60}")
        
        # Get input for this stage
        stage_input = tiered_workflow.run_stage(stage, pairlist_df, previous_results)
        
        if len(stage_input) == 0:
            print(f"‚ö†Ô∏è No ligands to process for Stage {stage}")
            continue
        
        # Run docking for this stage
        stage_results = docking_engine.run_parallel_docking(stage_input, flexible_residues_dict)
        
        # Add stage information
        for result in stage_results:
            result['stage'] = stage
            result['stage_config'] = tiered_workflow.get_stage_config(stage)
        
        all_results.extend(stage_results)
        previous_results = stage_results
        
        # Stage summary
        successful = len([r for r in stage_results if r['status'] == 'success'])
        print(f"\\nüìä Stage {stage} Summary:")
        print(f"   Processed: {len(stage_input)} pairs")
        print(f"   Successful: {successful}")
        print(f"   Success rate: {successful/len(stage_input)*100:.1f}%")
    
    # Final analysis
    print(f"\\n{'='*60}")
    print("FINAL ANALYSIS")
    print(f"{'='*60}")
    
    analysis_dashboard.create_score_distribution_plot(all_results)
    qc_validator.validate_docking_results(all_results)
    qc_validator.generate_quality_report()
    
    return all_results

def run_covalent_docking(pairlist_df, covalent_residue, covalent_atom='SG', use_flexible=False):
    """
    Run covalent docking workflow
    
    Args:
        pairlist_df: DataFrame with receptor-ligand pairs and coordinates
        covalent_residue: Residue ID for covalent bond (e.g., 'A:123')
        covalent_atom: Atom name for covalent bond (e.g., 'SG' for cysteine)
        use_flexible: Whether to use flexible receptor docking
    """
    print(f"üîó Starting Covalent Docking Workflow")
    print(f"   Covalent residue: {covalent_residue}:{covalent_atom}")
    print(f"   Flexible receptors: {'Enabled' if use_flexible else 'Disabled'}")
    
    # Configure covalent parameters
    covalent_config.set_covalent_residue(covalent_residue, covalent_atom)
    covalent_config.display_covalent_configuration()
    
    # Run covalent docking
    results = covalent_engine.run_covalent_docking(pairlist_df, covalent_residue, covalent_atom)
    
    # Generate analysis
    print("\\nüìä Generating covalent docking analysis...")
    analysis_dashboard.create_score_distribution_plot(results)
    qc_validator.validate_docking_results(results)
    qc_validator.generate_quality_report()
    
    return results

def run_complete_workflow(pairlist_df, workflow_type='standard', **kwargs):
    """
    Run complete workflow with all options
    
    Args:
        pairlist_df: DataFrame with receptor-ligand pairs and coordinates
        workflow_type: 'standard', 'tiered', 'covalent'
        **kwargs: Additional parameters for specific workflows
    """
    print(f"üéØ Starting Complete GNINA Workflow: {workflow_type.upper()}")
    print(f"   Total pairs: {len(pairlist_df)}")
    print(f"   GPU acceleration: {'Enabled' if CONFIG.get('use_gpu', False) else 'Disabled'}")
    
    if workflow_type == 'standard':
        return run_standard_docking(
            pairlist_df, 
            use_flexible=kwargs.get('use_flexible', False),
            cnn_mode=kwargs.get('cnn_mode', 'rescore')
        )
    
    elif workflow_type == 'tiered':
        return run_tiered_workflow(
            pairlist_df,
            stages=kwargs.get('stages', ['A', 'B']),
            use_flexible=kwargs.get('use_flexible', False)
        )
    
    elif workflow_type == 'covalent':
        return run_covalent_docking(
            pairlist_df,
            covalent_residue=kwargs.get('covalent_residue', 'A:123'),
            covalent_atom=kwargs.get('covalent_atom', 'SG'),
            use_flexible=kwargs.get('use_flexible', False)
        )
    
    else:
        print(f"‚ùå Unknown workflow type: {workflow_type}")
        return None

# =============================================================================
# Quick Start Examples
# =============================================================================

print("üöÄ Enhanced GNINA Docking Pipeline Ready!")
print("\\nüìã Available Workflow Types:")
print("   1. Standard Docking - Single-stage docking with configurable CNN mode")
print("   2. Tiered Workflow - Multi-stage funnel approach (A ‚Üí B ‚Üí C)")
print("   3. Covalent Docking - Specialized covalent bond formation")

print("\\nüîß Quick Start Examples:")
print("\\n# Standard Docking (Recommended for most users)")
print("results = run_complete_workflow(pairlist_df, 'standard', use_flexible=True)")
print("")
print("# Tiered Workflow (For large libraries)")
print("results = run_complete_workflow(pairlist_df, 'tiered', stages=['A', 'B'], use_flexible=True)")
print("")
print("# Covalent Docking (For covalent inhibitors)")
print("results = run_complete_workflow(pairlist_df, 'covalent', covalent_residue='A:123', covalent_atom='SG')")

print("\\nüí° Configuration Tips:")
print("   ‚Ä¢ GPU acceleration is automatically prioritized over CPU (10-50x speedup)")
print("   ‚Ä¢ Use flexible receptors for better accuracy (15-25% improvement)")
print("   ‚Ä¢ Start with 'rescore' CNN mode for speed, use 'refinement' for accuracy")
print("   ‚Ä¢ For large libraries (>1000 ligands): Use tiered workflow")
print("   ‚Ä¢ For covalent inhibitors: Use covalent docking with appropriate reactive residue")
print("   ‚Ä¢ CPU cores are used as fallback when GPU is unavailable")

print("\\nüìä Analysis and Visualization:")
print("   ‚Ä¢ All workflows automatically generate comprehensive analysis")
print("   ‚Ä¢ Interactive dashboards saved to visualizations/")
print("   ‚Ä¢ Quality control reports saved to enhanced_analysis/")
print("   ‚Ä¢ Resume capability: Interrupted runs can be resumed automatically")

print("\\nüîó References:")
print("   ‚Ä¢ GNINA: https://github.com/gnina/gnina")
print("   ‚Ä¢ PDB Preparation Wizard: https://github.com/OASolliman590/pdb-prepare-wizard")
print("   ‚Ä¢ CNN Scoring Modes: See GNINA documentation for detailed explanations")

print("\\n‚úÖ Ready to run! Choose your workflow and execute the appropriate function above.")
