# Claude Tools - Universal Colab Integration

**Advanced integration supporting multiple Claude projects simultaneously**

Based on the original UNIVERSAL_COLAB_CLAUDE_INTEGRATION.py but cleaned and improved.

## Features
- 🚀 **Multi-project support** - Handle multiple Claude instances
- 🔒 **Secure communication** - Service account based
- 🧠 **AI integration** - Gemini, OpenAI support
- 📊 **Advanced monitoring** - Real-time status tracking
- 🔄 **Auto-recovery** - Error handling and retry logic


In [None]:
# Install dependencies
!pip install google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client
!pip install anthropic openai python-dotenv

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')
print("✅ Google Drive mounted")

In [None]:
# Universal Claude Processor (Enhanced)
import os
import json
import time
import threading
import subprocess
import sys
from datetime import datetime
from typing import Dict, Any, List
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr

class UniversalClaudeProcessor:
    """Universal processor for any Claude project"""
    
    def __init__(self, project_name="claude_universal"):
        self.project_name = project_name
        self.session_id = f"{project_name}_{int(time.time())}"
        self.api_folder = "/content/drive/MyDrive/claude-tools-commands"
        self.active_projects = set()
        self.command_queue = []
        
        # Create folders
        os.makedirs(self.api_folder, exist_ok=True)
        os.makedirs(f"{self.api_folder}/results", exist_ok=True)
        
        print(f"🚀 Universal Claude Processor: {self.session_id}")
        print(f"📁 Monitoring: {self.api_folder}")
    
    def process_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """Process any type of command"""
        cmd_type = command.get('type', 'unknown')
        project = command.get('project', 'default')
        
        self.active_projects.add(project)
        
        try:
            if cmd_type == 'execute_code':
                return self._execute_python_code(command['code'], project)
            elif cmd_type == 'install_package':
                return self._install_packages(command['packages'])
            elif cmd_type == 'shell_command':
                return self._execute_shell(command['command'])
            elif cmd_type == 'ai_query':
                return self._handle_ai_query(command)
            elif cmd_type == 'file_operation':
                return self._handle_file_operation(command)
            elif cmd_type == 'system_status':
                return self._get_system_status()
            else:
                return {'success': False, 'error': f'Unknown command: {cmd_type}'}
                
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'traceback': traceback.format_exc()
            }
    
    def _execute_python_code(self, code: str, project: str) -> Dict[str, Any]:
        """Execute Python code with project isolation"""
        stdout_capture = StringIO()
        stderr_capture = StringIO()
        
        try:
            # Create project namespace
            project_globals = {
                '__name__': f'__claude_project_{project}__',
                'PROJECT_NAME': project,
                'SESSION_ID': self.session_id
            }
            
            with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
                exec(code, project_globals)
            
            return {
                'success': True,
                'output': stdout_capture.getvalue(),
                'project': project,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'output': stdout_capture.getvalue(),
                'error': str(e),
                'project': project
            }
    
    def _install_packages(self, packages) -> Dict[str, Any]:
        """Install Python packages"""
        if isinstance(packages, str):
            packages = [packages]
        
        results = []
        for package in packages:
            try:
                result = subprocess.run([
                    sys.executable, '-m', 'pip', 'install', package
                ], capture_output=True, text=True, timeout=60)
                
                results.append({
                    'package': package,
                    'success': result.returncode == 0,
                    'output': result.stdout,
                    'error': result.stderr if result.returncode != 0 else None
                })
            except Exception as e:
                results.append({
                    'package': package,
                    'success': False,
                    'error': str(e)
                })
        
        return {
            'success': all(r['success'] for r in results),
            'results': results
        }
    
    def _execute_shell(self, command: str) -> Dict[str, Any]:
        """Execute shell command with safety checks"""
        # Basic safety check
        dangerous_commands = ['rm -rf', 'sudo rm', 'dd if=', 'format', 'fdisk']
        if any(danger in command.lower() for danger in dangerous_commands):
            return {
                'success': False,
                'error': 'Dangerous command blocked for safety'
            }
        
        try:
            result = subprocess.run(
                command, shell=True, 
                capture_output=True, text=True, timeout=30
            )
            
            return {
                'success': result.returncode == 0,
                'output': result.stdout,
                'error': result.stderr if result.returncode != 0 else None,
                'return_code': result.returncode
            }
            
        except subprocess.TimeoutExpired:
            return {'success': False, 'error': 'Command timed out'}
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def _handle_ai_query(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """Handle AI queries (requires API keys)"""
        query = command.get('prompt', '')
        model = command.get('model', 'gemini')
        
        # This would integrate with actual AI APIs
        # For now, return a placeholder
        return {
            'success': True,
            'response': f'AI query processed: {query[:50]}...',
            'model': model,
            'note': 'AI integration requires API keys configuration'
        }
    
    def _handle_file_operation(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """Handle file operations"""
        operation = command.get('operation', 'read')
        filepath = command.get('filepath', '')
        
        try:
            if operation == 'read':
                with open(filepath, 'r') as f:
                    content = f.read()
                return {'success': True, 'content': content}
            
            elif operation == 'write':
                content = command.get('content', '')
                with open(filepath, 'w') as f:
                    f.write(content)
                return {'success': True, 'message': f'File written: {filepath}'}
            
            elif operation == 'list':
                files = os.listdir(filepath)
                return {'success': True, 'files': files}
            
            else:
                return {'success': False, 'error': f'Unknown operation: {operation}'}
                
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def _get_system_status(self) -> Dict[str, Any]:
        """Get comprehensive system status"""
        import psutil
        
        return {
            'success': True,
            'system': {
                'session_id': self.session_id,
                'active_projects': list(self.active_projects),
                'uptime': time.time() - self.start_time if hasattr(self, 'start_time') else 0,
                'memory_usage': psutil.virtual_memory().percent,
                'cpu_usage': psutil.cpu_percent(),
                'disk_usage': psutil.disk_usage('/').percent
            }
        }

# Initialize processor
processor = UniversalClaudeProcessor()
processor.start_time = time.time()

In [None]:
# Start the universal command processor
def start_universal_processor():
    """Start monitoring for commands from multiple Claude instances"""
    print("👁️ Starting Universal Claude Processor")
    print("=====================================\n")
    
    processed_files = set()
    
    while True:
        try:
            # Monitor command folder
            for filename in os.listdir(processor.api_folder):
                if filename.startswith('command_') and filename.endswith('.json'):
                    filepath = os.path.join(processor.api_folder, filename)
                    
                    if filepath in processed_files:
                        continue
                    
                    # Process command
                    try:
                        with open(filepath, 'r') as f:
                            command = json.load(f)
                        
                        project = command.get('project', 'unknown')
                        cmd_type = command.get('type', 'unknown')
                        
                        print(f"📝 [{project}] Processing: {cmd_type}")
                        
                        # Process the command
                        result = processor.process_command(command)
                        
                        # Save result
                        result_filename = filename.replace('command_', 'result_')
                        result_path = os.path.join(processor.api_folder, 'results', result_filename)
                        
                        with open(result_path, 'w') as f:
                            json.dump(result, f, indent=2)
                        
                        # Clean up command file
                        os.remove(filepath)
                        processed_files.add(filepath)
                        
                        status = "✅" if result.get('success') else "❌"
                        print(f"{status} [{project}] Completed: {cmd_type}")
                        
                    except Exception as e:
                        print(f"❌ Error processing {filename}: {e}")
                        # Move problematic file
                        error_path = filepath.replace('.json', '_error.json')
                        os.rename(filepath, error_path)
            
            # Show status every 30 seconds
            if int(time.time()) % 30 == 0:
                if processor.active_projects:
                    print(f"📊 Active projects: {', '.join(processor.active_projects)}")
            
            time.sleep(1)  # Check every second
            
        except KeyboardInterrupt:
            print("\n🛑 Universal processor stopped")
            break
        except Exception as e:
            print(f"❌ Processor error: {e}")
            time.sleep(5)  # Wait before retrying

# Start the processor
start_universal_processor()