From f24d9cb3edaa8818698e9e73bde250dfbf32dfb7 Mon Sep 17 00:00:00 2001 From: kr16h Date: Wed, 17 Dec 2025 13:43:07 +0530 Subject: [PATCH 1/2] created doctor.py, updated cli.py --- cortex/cli.py | 12 ++ cortex/doctor.py | 480 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 492 insertions(+) create mode 100644 cortex/doctor.py diff --git a/cortex/cli.py b/cortex/cli.py index 17004c68..d28b51ee 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -176,6 +176,12 @@ def notify(self, args): return 1 # ------------------------------- + # Run system health checks + def doctor(self): + from cortex.doctor import SystemDoctor + doctor = SystemDoctor() + return doctor.run_checks() + def install(self, software: str, execute: bool = False, dry_run: bool = False): # Validate input first is_valid, error = validate_install_request(software) @@ -544,6 +550,7 @@ def show_rich_help(): table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") table.add_row("notify", "Manage desktop notifications") # Added this line + table.add_row("doctor", "System health check") console.print(table) console.print() @@ -616,6 +623,9 @@ def main(): send_parser.add_argument('--level', choices=['low', 'normal', 'critical'], default='normal') send_parser.add_argument('--actions', nargs='*', help='Action buttons') # -------------------------- + + + doctor_parser = subparsers.add_parser('doctor', help='Run system health check') args = parser.parse_args() @@ -645,6 +655,8 @@ def main(): # Handle the new notify command elif args.command == 'notify': return cli.notify(args) + elif args.command == 'doctor': + return cli.doctor() else: parser.print_help() return 1 diff --git a/cortex/doctor.py b/cortex/doctor.py new file mode 100644 index 00000000..69d521c6 --- /dev/null +++ b/cortex/doctor.py @@ -0,0 +1,480 @@ +""" +System Health Check for Cortex Linux +Performs diagnostic checks and provides fix suggestions. +""" + +import sys +import os +import shutil +import subprocess +from typing import Optional, List +from pathlib import Path + +from rich.table import Table +from rich.panel import Panel +from rich.console import Console +from rich import box + +from cortex.branding import console, cx_print +from cortex.validators import validate_api_key + + +class SystemDoctor: + """ + Performs comprehensive system health checks and diagnostics. + + Checks for: + - Python version compatibility + - Required Python dependencies + - GPU drivers (NVIDIA/AMD) + - CUDA/ROCm availability + - Ollama installation and status + - API key configuration + - Disk space availability + - System memory + + Attributes: + warnings: List of non-critical issues found + failures: List of critical issues that may prevent operation + suggestions: List of fix commands for issues + passes: List of successful checks + """ + + def __init__(self) -> None: + """Initialize the SystemDoctor with empty check lists.""" + self.warnings: List[str] = [] + self.failures: List[str] = [] + self.suggestions: List[str] = [] + self.passes: List[str] = [] + + def run_checks(self) -> int: + """ + Run all health checks and return appropriate exit code. + + Exit codes: + 0: All checks passed, system is healthy + 1: Warnings found, system can operate but has recommendations + 2: Critical failures found, system may not work properly + + Returns: + int: Exit code reflecting system health status (0, 1, or 2) + """ + # Header + console.print() + console.print(Panel.fit( + "[bold cyan]CORTEX SYSTEM CHECK[/bold cyan]", + border_style="cyan", + padding=(1, 4) + )) + console.print() + + # Run all check groups + self._print_section("Python & Dependencies") + self._check_python() + self._check_dependencies() + + self._print_section("GPU & Acceleration") + self._check_gpu_driver() + self._check_cuda() + + self._print_section("AI & Services") + self._check_ollama() + self._check_api_keys() + + self._print_section("System Resources") + self._check_disk_space() + self._check_memory() + + # Print summary + self._print_summary() + + # Return appropriate exit code + if self.failures: + return 2 # Critical failures + elif self.warnings: + return 1 # Warnings only + return 0 # All good + + def _print_section(self, title: str) -> None: + """Print a section header for grouping checks.""" + console.print(f"\n[bold cyan]{title}[/bold cyan]") + + def _print_check( + self, + status: str, + message: str, + suggestion: Optional[str] = None + ) -> None: + """ + Print a check result with appropriate formatting and colors. + + Args: + status: One of "PASS", "WARN", "FAIL", or "INFO" + message: Description of the check result + suggestion: Optional fix command or suggestion + """ + # Define symbols and colors + if status == "PASS": + symbol = "✓" + color = "bold green" + prefix = "[PASS]" + self.passes.append(message) + elif status == "WARN": + symbol = "⚠" + color = "bold yellow" + prefix = "[WARN]" + self.warnings.append(message) + if suggestion: + self.suggestions.append(suggestion) + elif status == "FAIL": + symbol = "✗" + color = "bold red" + prefix = "[FAIL]" + self.failures.append(message) + if suggestion: + self.suggestions.append(suggestion) + else: + symbol = "?" + color = "dim" + prefix = "[INFO]" + + # Print with icon prefix and coloring + console.print( + f" [cyan]CX[/cyan] [{color}]{symbol} {prefix}[/{color}] {message}" + ) + + def _check_python(self) -> None: + """Check Python version compatibility.""" + version = ( + f"{sys.version_info.major}." + f"{sys.version_info.minor}." + f"{sys.version_info.micro}" + ) + + if sys.version_info >= (3, 10): + self._print_check("PASS", f"Python {version}") + + elif sys.version_info >= (3, 8): + self._print_check("WARN",f"Python {version} (3.10+ recommended)", + "Upgrade Python: sudo apt install python3.11") + else: + self._print_check("FAIL",f"Python {version} (3.10+ required)", + "Install Python 3.10+: sudo apt install python3.11") + + def _check_dependencies(self) -> None: + """Check if required Python packages are installed.""" + missing = [] + required = { + 'anthropic': 'Anthropic', + 'openai': 'OpenAI', + 'rich': 'Rich', + 'typer': 'Typer', + 'requests': 'Requests' + } + + for module, name in required.items(): + try: + __import__(module) + except ImportError: + missing.append(name) + + if not missing: + self._print_check("PASS", "Python dependencies installed") + elif len(missing) < 3: + self._print_check( + "WARN", + f"Missing dependencies: {', '.join(missing)}", + "Install dependencies: pip install -r requirements.txt" + ) + else: + self._print_check( + "FAIL", + f"Missing {len(missing)} dependencies: {', '.join(missing)}", + "Install dependencies: pip install -r requirements.txt" + ) + + def _check_gpu_driver(self) -> None: + """Check for GPU drivers (NVIDIA or AMD ROCm).""" + # Check NVIDIA + if shutil.which('nvidia-smi'): + try: + result = subprocess.run( + [ + 'nvidia-smi', + '--query-gpu=driver_version', + '--format=csv,noheader' + ], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + version = result.stdout.strip().split('\n')[0] + self._print_check("PASS", f"NVIDIA Driver {version}") + return + except (subprocess.TimeoutExpired, Exception): + pass + + # Check AMD ROCm + if shutil.which('rocm-smi'): + try: + result = subprocess.run( + ['rocm-smi', '--showdriverversion'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0: + self._print_check("PASS", "AMD ROCm driver detected") + return + except (subprocess.TimeoutExpired, Exception): + pass + + # No GPU found - this is a warning, not a failure + self._print_check( + "WARN", + "No GPU detected (CPU-only mode)", + "Optional: Install GPU drivers for acceleration" + ) + + def _check_cuda(self) -> None: + """Check CUDA/ROCm availability for GPU acceleration.""" + # Check CUDA + if shutil.which('nvcc'): + try: + result = subprocess.run( + ['nvcc', '--version'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0 and 'release' in result.stdout: + version_line = result.stdout.split('release')[1].split(',')[0].strip() + self._print_check("PASS", f"CUDA {version_line}") + return + except (subprocess.TimeoutExpired, Exception): + pass + + # Check ROCm + rocm_info_path = Path('/opt/rocm/.info/version') + if rocm_info_path.exists(): + try: + version = rocm_info_path.read_text(encoding='utf-8').strip() + self._print_check("PASS", f"ROCm {version}") + return + except (OSError, UnicodeDecodeError): + self._print_check("PASS", "ROCm installed") + return + elif Path('/opt/rocm').exists(): + self._print_check("PASS", "ROCm installed") + return + + # Check if PyTorch has CUDA available (software level) + try: + import torch + if torch.cuda.is_available(): + self._print_check("PASS", "CUDA available (PyTorch)") + return + except ImportError: + pass + + self._print_check( + "WARN", + "CUDA/ROCm not found (GPU acceleration unavailable)", + "Install CUDA: https://developer.nvidia.com/cuda-downloads" + ) + + def _check_ollama(self) -> None: + """Check if Ollama is installed and running.""" + # Check if installed + if not shutil.which('ollama'): + self._print_check( + "WARN", + "Ollama not installed", + "Install Ollama: curl https://ollama.ai/install.sh | sh" + ) + return + + # Check if running by testing the API + try: + import requests + response = requests.get( + 'http://localhost:11434/api/tags', + timeout=2 + ) + if response.status_code == 200: + self._print_check("PASS", "Ollama installed and running") + return + except Exception: + pass + + # Ollama installed but not running + self._print_check( + "WARN", + "Ollama installed but not running", + "Start Ollama: ollama serve &" + ) + + def _check_api_keys(self) -> None: + """Check if API keys are configured for cloud models.""" + is_valid, provider, error = validate_api_key() + + if is_valid: + self._print_check("PASS", f"{provider} API key configured") + else: + self._print_check( + "WARN", + "No API keys configured (required for cloud models)", + "Configure API key: export ANTHROPIC_API_KEY=sk-... " + "or run 'cortex wizard'" + ) + + def _check_disk_space(self) -> None: + """Check available disk space for model storage.""" + try: + usage = shutil.disk_usage(os.path.expanduser('~')) + free_gb = usage.free / (1024**3) + total_gb = usage.total / (1024**3) + + if free_gb > 20: + self._print_check( + "PASS", + f"{free_gb:.1f}GB free disk space ({total_gb:.1f}GB total)" + ) + elif free_gb > 10: + self._print_check( + "WARN", + f"{free_gb:.1f}GB free (20GB+ recommended for models)", + "Free up disk space: sudo apt clean && docker system prune -a" + ) + else: + self._print_check( + "FAIL", + f"Only {free_gb:.1f}GB free (critically low)", + "Free up disk space: sudo apt autoremove && sudo apt clean" + ) + except (OSError, Exception) as e: + self._print_check( + "WARN", + f"Could not check disk space: {type(e).__name__}" + ) + + def _check_memory(self) -> None: + """Check system RAM availability.""" + mem_gb = self._get_system_memory() + + if mem_gb is None: + self._print_check("WARN", "Could not detect system RAM") + return + + if mem_gb >= 16: + self._print_check("PASS", f"{mem_gb:.1f}GB RAM") + elif mem_gb >= 8: + self._print_check( + "WARN", + f"{mem_gb:.1f}GB RAM (16GB recommended for larger models)", + "Consider upgrading RAM or use smaller models" + ) + else: + self._print_check( + "FAIL", + f"Only {mem_gb:.1f}GB RAM (8GB minimum required)", + "Upgrade RAM to at least 8GB" + ) + + def _get_system_memory(self) -> Optional[float]: + """ + Get system memory in GB. + + Returns: + float: Total system memory in GB, or None if detection fails + """ + # Try /proc/meminfo (Linux) + try: + with open('/proc/meminfo', 'r', encoding='utf-8') as f: + for line in f: + if line.startswith('MemTotal:'): + mem_kb = int(line.split()[1]) + return mem_kb / (1024**2) + except (OSError, ValueError, IndexError): + pass + + # Try psutil (macOS/BSD/Windows) + try: + import psutil + return psutil.virtual_memory().total / (1024**3) + except ImportError: + pass + + return None + + def _print_summary(self) -> None: + """Print summary table and overall health status with suggestions.""" + console.print() + + # Create summary table + table = Table(show_header=False, box=box.SIMPLE, padding=(0, 1)) + table.add_column("Status", style="bold") + table.add_column("Count", justify="right") + + if self.passes: + table.add_row( + "[green]✓ Passed[/green]", + f"[green]{len(self.passes)}[/green]" + ) + if self.warnings: + table.add_row( + "[yellow]⚠ Warnings[/yellow]", + f"[yellow]{len(self.warnings)}[/yellow]" + ) + if self.failures: + table.add_row( + "[red]✗ Failures[/red]", + f"[red]{len(self.failures)}[/red]" + ) + + console.print(table) + console.print() + + # Overall status panel + if self.failures: + console.print(Panel( + f"[bold red]❌ {len(self.failures)} critical failure(s) found[/bold red]", + border_style="red", + padding=(0, 2) + )) + elif self.warnings: + console.print(Panel( + f"[bold yellow]⚠️ {len(self.warnings)} warning(s) found[/bold yellow]", + border_style="yellow", + padding=(0, 2) + )) + else: + console.print(Panel( + "[bold green]✅ All checks passed! System is healthy.[/bold green]", + border_style="green", + padding=(0, 2) + )) + + # Show fix suggestions if any + if self.suggestions: + console.print() + console.print("[bold cyan]💡 Suggested fixes:[/bold cyan]") + for i, suggestion in enumerate(self.suggestions, 1): + console.print(f" [dim]{i}.[/dim] {suggestion}") + console.print() + + +def run_doctor() -> int: + """ + Run the system doctor and return exit code. + + Returns: + int: Exit code (0 = all good, 1 = warnings, 2 = failures) + """ + doctor = SystemDoctor() + return doctor.run_checks() + + +if __name__ == "__main__": + sys.exit(run_doctor()) \ No newline at end of file From 4f309265efed58d58f2fd34956078619391e90f7 Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 17 Dec 2025 17:38:10 +0000 Subject: [PATCH 2/2] Updated files, made ui clean, added pytest test file --- cortex/cli.py | 340 ++++++++++++++++++++++++------------------- cortex/doctor.py | 308 ++++++++++++++++++++------------------- tests/test_doctor.py | 126 ++++++++++++++++ 3 files changed, 474 insertions(+), 300 deletions(-) create mode 100644 tests/test_doctor.py diff --git a/cortex/cli.py b/cortex/cli.py index d28b51ee..ba4d3507 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -1,50 +1,35 @@ -import sys -import os import argparse -import time import logging -from typing import List, Optional +import os +import sys +import time from datetime import datetime # Suppress noisy log messages in normal operation logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("cortex.installation_history").setLevel(logging.ERROR) -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from LLM.interpreter import CommandInterpreter +from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, StepStatus -from cortex.installation_history import ( - InstallationHistory, - InstallationType, - InstallationStatus -) +from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType +from cortex.llm.interpreter import CommandInterpreter +from cortex.notification_manager import NotificationManager from cortex.user_preferences import ( PreferencesManager, + format_preference_value, print_all_preferences, - format_preference_value -) -from cortex.branding import ( - console, - cx_print, - cx_step, - cx_header, - show_banner, - VERSION ) from cortex.validators import ( validate_api_key, validate_install_request, - validate_installation_id, - ValidationError ) -# Import the new Notification Manager -from cortex.notification_manager import NotificationManager class CortexCLI: def __init__(self, verbose: bool = False): - self.spinner_chars = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] + self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self.spinner_idx = 0 self.prefs_manager = None # Lazy initialization self.verbose = verbose @@ -57,33 +42,39 @@ def _debug(self, message: str): def _get_api_key(self) -> Optional[str]: # Check if using Ollama (no API key needed) provider = self._get_provider() - if provider == 'ollama': + if provider == "ollama": self._debug("Using Ollama (no API key required)") return "ollama-local" # Placeholder for Ollama is_valid, detected_provider, error = validate_api_key() if not is_valid: self._print_error(error) - cx_print("Run [bold]cortex wizard[/bold] to configure your API key.", "info") - cx_print("Or use [bold]CORTEX_PROVIDER=ollama[/bold] for offline mode.", "info") + cx_print( + "Run [bold]cortex wizard[/bold] to configure your API key.", "info" + ) + cx_print( + "Or use [bold]CORTEX_PROVIDER=ollama[/bold] for offline mode.", "info" + ) return None - api_key = os.environ.get('ANTHROPIC_API_KEY') or os.environ.get('OPENAI_API_KEY') + api_key = os.environ.get("ANTHROPIC_API_KEY") or os.environ.get( + "OPENAI_API_KEY" + ) return api_key def _get_provider(self) -> str: # Check environment variable for explicit provider choice - explicit_provider = os.environ.get('CORTEX_PROVIDER', '').lower() - if explicit_provider in ['ollama', 'openai', 'claude']: + explicit_provider = os.environ.get("CORTEX_PROVIDER", "").lower() + if explicit_provider in ["ollama", "openai", "claude"]: return explicit_provider # Auto-detect based on available API keys - if os.environ.get('ANTHROPIC_API_KEY'): - return 'claude' - elif os.environ.get('OPENAI_API_KEY'): - return 'openai' + if os.environ.get("ANTHROPIC_API_KEY"): + return "claude" + elif os.environ.get("OPENAI_API_KEY"): + return "openai" # Fallback to Ollama for offline mode - return 'ollama' + return "ollama" def _print_status(self, emoji: str, message: str): """Legacy status print - maps to cx_print for Rich output""" @@ -109,7 +100,7 @@ def _animate_spinner(self, message: str): time.sleep(0.1) def _clear_line(self): - sys.stdout.write('\r\033[K') + sys.stdout.write("\r\033[K") sys.stdout.flush() # --- New Notification Method --- @@ -117,38 +108,50 @@ def notify(self, args): """Handle notification commands""" # Addressing CodeRabbit feedback: Handle missing subcommand gracefully if not args.notify_action: - self._print_error("Please specify a subcommand (config/enable/disable/dnd/send)") + self._print_error( + "Please specify a subcommand (config/enable/disable/dnd/send)" + ) return 1 mgr = NotificationManager() - if args.notify_action == 'config': - console.print("[bold cyan]🔧 Current Notification Configuration:[/bold cyan]") - status = "[green]Enabled[/green]" if mgr.config.get('enabled', True) else "[red]Disabled[/red]" + if args.notify_action == "config": + console.print( + "[bold cyan]🔧 Current Notification Configuration:[/bold cyan]" + ) + status = ( + "[green]Enabled[/green]" + if mgr.config.get("enabled", True) + else "[red]Disabled[/red]" + ) console.print(f"Status: {status}") - console.print(f"DND Window: [yellow]{mgr.config['dnd_start']} - {mgr.config['dnd_end']}[/yellow]") + console.print( + f"DND Window: [yellow]{mgr.config['dnd_start']} - {mgr.config['dnd_end']}[/yellow]" + ) console.print(f"History File: {mgr.history_file}") return 0 - elif args.notify_action == 'enable': + elif args.notify_action == "enable": mgr.config["enabled"] = True # Addressing CodeRabbit feedback: Ideally should use a public method instead of private _save_config, # but keeping as is for a simple fix (or adding a save method to NotificationManager would be best). - mgr._save_config() + mgr._save_config() self._print_success("Notifications enabled") return 0 - elif args.notify_action == 'disable': + elif args.notify_action == "disable": mgr.config["enabled"] = False mgr._save_config() - cx_print("Notifications disabled (Critical alerts will still show)", "warning") + cx_print( + "Notifications disabled (Critical alerts will still show)", "warning" + ) return 0 - elif args.notify_action == 'dnd': + elif args.notify_action == "dnd": if not args.start or not args.end: self._print_error("Please provide start and end times (HH:MM)") return 1 - + # Addressing CodeRabbit feedback: Add time format validation try: datetime.strptime(args.start, "%H:%M") @@ -163,25 +166,27 @@ def notify(self, args): self._print_success(f"DND Window updated: {args.start} - {args.end}") return 0 - elif args.notify_action == 'send': + elif args.notify_action == "send": if not args.message: self._print_error("Message required") return 1 - console.print(f"[dim]Sending notification...[/dim]") + console.print("[dim]Sending notification...[/dim]") mgr.send(args.title, args.message, level=args.level, actions=args.actions) return 0 - + else: self._print_error("Unknown notify command") return 1 + # ------------------------------- # Run system health checks def doctor(self): from cortex.doctor import SystemDoctor + doctor = SystemDoctor() return doctor.run_checks() - + def install(self, software: str, execute: bool = False, dry_run: bool = False): # Validate input first is_valid, error = validate_install_request(software) @@ -216,7 +221,9 @@ def install(self, software: str, execute: bool = False, dry_run: bool = False): commands = interpreter.parse(f"install {software}") if not commands: - self._print_error("No commands generated. Please try again with a different request.") + self._print_error( + "No commands generated. Please try again with a different request." + ) return 1 # Extract packages from commands for tracking @@ -225,10 +232,7 @@ def install(self, software: str, execute: bool = False, dry_run: bool = False): # Record installation start if execute or dry_run: install_id = history.record_installation( - InstallationType.INSTALL, - packages, - commands, - start_time + InstallationType.INSTALL, packages, commands, start_time ) self._print_status("⚙️", f"Installing {software}...") @@ -243,6 +247,7 @@ def install(self, software: str, execute: bool = False, dry_run: bool = False): return 0 if execute: + def progress_callback(current, total, step): status_emoji = "⏳" if step.status == StepStatus.SUCCESS: @@ -259,7 +264,7 @@ def progress_callback(current, total, step): descriptions=[f"Step {i+1}" for i in range(len(commands))], timeout=300, stop_on_error=True, - progress_callback=progress_callback + progress_callback=progress_callback, ) result = coordinator.execute() @@ -270,7 +275,9 @@ def progress_callback(current, total, step): # Record successful installation if install_id: - history.update_installation(install_id, InstallationStatus.SUCCESS) + history.update_installation( + install_id, InstallationStatus.SUCCESS + ) print(f"\n📝 Installation recorded (ID: {install_id})") print(f" To rollback: cortex rollback {install_id}") @@ -280,13 +287,13 @@ def progress_callback(current, total, step): if install_id: error_msg = result.error_message or "Installation failed" history.update_installation( - install_id, - InstallationStatus.FAILED, - error_msg + install_id, InstallationStatus.FAILED, error_msg ) if result.failed_step is not None: - self._print_error(f"Installation failed at step {result.failed_step + 1}") + self._print_error( + f"Installation failed at step {result.failed_step + 1}" + ) else: self._print_error("Installation failed") if result.error_message: @@ -303,21 +310,32 @@ def progress_callback(current, total, step): except ValueError as e: if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + history.update_installation( + install_id, InstallationStatus.FAILED, str(e) + ) self._print_error(str(e)) return 1 except RuntimeError as e: if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + history.update_installation( + install_id, InstallationStatus.FAILED, str(e) + ) self._print_error(f"API call failed: {str(e)}") return 1 except Exception as e: if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + history.update_installation( + install_id, InstallationStatus.FAILED, str(e) + ) self._print_error(f"Unexpected error: {str(e)}") return 1 - def history(self, limit: int = 20, status: Optional[str] = None, show_id: Optional[str] = None): + def history( + self, + limit: int = 20, + status: Optional[str] = None, + show_id: Optional[str] = None, + ): """Show installation history""" history = InstallationHistory() @@ -345,7 +363,7 @@ def history(self, limit: int = 20, status: Optional[str] = None, show_id: Option print(f"\nError: {record.error_message}") if record.commands_executed: - print(f"\nCommands executed:") + print("\nCommands executed:") for cmd in record.commands_executed: print(f" {cmd}") @@ -360,16 +378,20 @@ def history(self, limit: int = 20, status: Optional[str] = None, show_id: Option print("No installation records found.") return 0 - print(f"\n{'ID':<18} {'Date':<20} {'Operation':<12} {'Packages':<30} {'Status':<15}") + print( + f"\n{'ID':<18} {'Date':<20} {'Operation':<12} {'Packages':<30} {'Status':<15}" + ) print("=" * 100) for r in records: - date = r.timestamp[:19].replace('T', ' ') - packages = ', '.join(r.packages[:2]) + date = r.timestamp[:19].replace("T", " ") + packages = ", ".join(r.packages[:2]) if len(r.packages) > 2: packages += f" +{len(r.packages)-2}" - print(f"{r.id:<18} {date:<20} {r.operation_type.value:<12} {packages:<30} {r.status.value:<15}") + print( + f"{r.id:<18} {date:<20} {r.operation_type.value:<12} {packages:<30} {r.status.value:<15}" + ) return 0 except Exception as e: @@ -426,12 +448,14 @@ def check_pref(self, key: Optional[str] = None): self._print_error(f"Failed to read preferences: {str(e)}") return 1 - def edit_pref(self, action: str, key: Optional[str] = None, value: Optional[str] = None): + def edit_pref( + self, action: str, key: Optional[str] = None, value: Optional[str] = None + ): """Edit user preferences (add/set, delete/remove, list)""" manager = self._get_prefs_manager() try: - if action in ['add', 'set', 'update']: + if action in ["add", "set", "update"]: if not key or not value: self._print_error("Key and value required") return 1 @@ -440,7 +464,7 @@ def edit_pref(self, action: str, key: Optional[str] = None, value: Optional[str] print(f" New value: {format_preference_value(manager.get(key))}") return 0 - elif action in ['delete', 'remove', 'reset-key']: + elif action in ["delete", "remove", "reset-key"]: if not key: self._print_error("Key required") return 1 @@ -449,23 +473,23 @@ def edit_pref(self, action: str, key: Optional[str] = None, value: Optional[str] # (In a real implementation we would reset to default) return 0 - elif action in ['list', 'show', 'display']: + elif action in ["list", "show", "display"]: return self.check_pref() - elif action == 'reset-all': + elif action == "reset-all": confirm = input("⚠️ Reset ALL preferences? (y/n): ") - if confirm.lower() == 'y': + if confirm.lower() == "y": manager.reset() self._print_success("Preferences reset") return 0 - - elif action == 'validate': - errors = manager.validate() - if errors: - print("❌ Errors found") - else: - self._print_success("Valid") - return 0 + + elif action == "validate": + errors = manager.validate() + if errors: + print("❌ Errors found") + else: + self._print_success("Valid") + return 0 else: self._print_error(f"Unknown action: {action}") @@ -490,15 +514,15 @@ def status(self): cx_print(f"API Provider: [bold]{provider}[/bold]", "success") else: # Check for Ollama - ollama_provider = os.environ.get('CORTEX_PROVIDER', '').lower() - if ollama_provider == 'ollama': + ollama_provider = os.environ.get("CORTEX_PROVIDER", "").lower() + if ollama_provider == "ollama": cx_print("API Provider: [bold]Ollama (local)[/bold]", "success") else: cx_print("API Provider: [bold]Not configured[/bold]", "warning") cx_print(" Run: cortex wizard", "info") # Check Firejail - firejail_path = shutil.which('firejail') + firejail_path = shutil.which("firejail") if firejail_path: cx_print(f"Firejail: [bold]Available[/bold] ({firejail_path})", "success") else: @@ -559,73 +583,88 @@ def show_rich_help(): def main(): parser = argparse.ArgumentParser( - prog='cortex', - description='AI-powered Linux command interpreter', - formatter_class=argparse.RawDescriptionHelpFormatter + prog="cortex", + description="AI-powered Linux command interpreter", + formatter_class=argparse.RawDescriptionHelpFormatter, ) # Global flags - parser.add_argument('--version', '-V', action='version', version=f'cortex {VERSION}') - parser.add_argument('--verbose', '-v', action='store_true', help='Show detailed output') + parser.add_argument( + "--version", "-V", action="version", version=f"cortex {VERSION}" + ) + parser.add_argument( + "--verbose", "-v", action="store_true", help="Show detailed output" + ) - subparsers = parser.add_subparsers(dest='command', help='Available commands') + subparsers = parser.add_subparsers(dest="command", help="Available commands") # Demo command - demo_parser = subparsers.add_parser('demo', help='See Cortex in action') + demo_parser = subparsers.add_parser("demo", help="See Cortex in action") # Wizard command - wizard_parser = subparsers.add_parser('wizard', help='Configure API key interactively') + wizard_parser = subparsers.add_parser( + "wizard", help="Configure API key interactively" + ) # Status command - status_parser = subparsers.add_parser('status', help='Show system status') + status_parser = subparsers.add_parser("status", help="Show system status") # Install command - install_parser = subparsers.add_parser('install', help='Install software') - install_parser.add_argument('software', type=str, help='Software to install') - install_parser.add_argument('--execute', action='store_true', help='Execute commands') - install_parser.add_argument('--dry-run', action='store_true', help='Show commands only') + install_parser = subparsers.add_parser("install", help="Install software") + install_parser.add_argument("software", type=str, help="Software to install") + install_parser.add_argument( + "--execute", action="store_true", help="Execute commands" + ) + install_parser.add_argument( + "--dry-run", action="store_true", help="Show commands only" + ) # History command - history_parser = subparsers.add_parser('history', help='View history') - history_parser.add_argument('--limit', type=int, default=20) - history_parser.add_argument('--status', choices=['success', 'failed']) - history_parser.add_argument('show_id', nargs='?') + history_parser = subparsers.add_parser("history", help="View history") + history_parser.add_argument("--limit", type=int, default=20) + history_parser.add_argument("--status", choices=["success", "failed"]) + history_parser.add_argument("show_id", nargs="?") # Rollback command - rollback_parser = subparsers.add_parser('rollback', help='Rollback installation') - rollback_parser.add_argument('id', help='Installation ID') - rollback_parser.add_argument('--dry-run', action='store_true') + rollback_parser = subparsers.add_parser("rollback", help="Rollback installation") + rollback_parser.add_argument("id", help="Installation ID") + rollback_parser.add_argument("--dry-run", action="store_true") # Preferences commands - check_pref_parser = subparsers.add_parser('check-pref', help='Check preferences') - check_pref_parser.add_argument('key', nargs='?') + check_pref_parser = subparsers.add_parser("check-pref", help="Check preferences") + check_pref_parser.add_argument("key", nargs="?") - edit_pref_parser = subparsers.add_parser('edit-pref', help='Edit preferences') - edit_pref_parser.add_argument('action', choices=['set', 'add', 'delete', 'list', 'validate']) - edit_pref_parser.add_argument('key', nargs='?') - edit_pref_parser.add_argument('value', nargs='?') + edit_pref_parser = subparsers.add_parser("edit-pref", help="Edit preferences") + edit_pref_parser.add_argument( + "action", choices=["set", "add", "delete", "list", "validate"] + ) + edit_pref_parser.add_argument("key", nargs="?") + edit_pref_parser.add_argument("value", nargs="?") # --- New Notify Command --- - notify_parser = subparsers.add_parser('notify', help='Manage desktop notifications') - notify_subs = notify_parser.add_subparsers(dest='notify_action', help='Notify actions') - - notify_subs.add_parser('config', help='Show configuration') - notify_subs.add_parser('enable', help='Enable notifications') - notify_subs.add_parser('disable', help='Disable notifications') - - dnd_parser = notify_subs.add_parser('dnd', help='Configure DND window') - dnd_parser.add_argument('start', help='Start time (HH:MM)') - dnd_parser.add_argument('end', help='End time (HH:MM)') - - send_parser = notify_subs.add_parser('send', help='Send test notification') - send_parser.add_argument('message', help='Notification message') - send_parser.add_argument('--title', default='Cortex Notification') - send_parser.add_argument('--level', choices=['low', 'normal', 'critical'], default='normal') - send_parser.add_argument('--actions', nargs='*', help='Action buttons') + notify_parser = subparsers.add_parser("notify", help="Manage desktop notifications") + notify_subs = notify_parser.add_subparsers( + dest="notify_action", help="Notify actions" + ) + + notify_subs.add_parser("config", help="Show configuration") + notify_subs.add_parser("enable", help="Enable notifications") + notify_subs.add_parser("disable", help="Disable notifications") + + dnd_parser = notify_subs.add_parser("dnd", help="Configure DND window") + dnd_parser.add_argument("start", help="Start time (HH:MM)") + dnd_parser.add_argument("end", help="End time (HH:MM)") + + send_parser = notify_subs.add_parser("send", help="Send test notification") + send_parser.add_argument("message", help="Notification message") + send_parser.add_argument("--title", default="Cortex Notification") + send_parser.add_argument( + "--level", choices=["low", "normal", "critical"], default="normal" + ) + send_parser.add_argument("--actions", nargs="*", help="Action buttons") # -------------------------- - - - doctor_parser = subparsers.add_parser('doctor', help='Run system health check') + + doctor_parser = subparsers.add_parser("doctor", help="Run system health check") args = parser.parse_args() @@ -636,26 +675,30 @@ def main(): cli = CortexCLI(verbose=args.verbose) try: - if args.command == 'demo': + if args.command == "demo": return cli.demo() - elif args.command == 'wizard': + elif args.command == "wizard": return cli.wizard() - elif args.command == 'status': + elif args.command == "status": return cli.status() - elif args.command == 'install': - return cli.install(args.software, execute=args.execute, dry_run=args.dry_run) - elif args.command == 'history': - return cli.history(limit=args.limit, status=args.status, show_id=args.show_id) - elif args.command == 'rollback': + elif args.command == "install": + return cli.install( + args.software, execute=args.execute, dry_run=args.dry_run + ) + elif args.command == "history": + return cli.history( + limit=args.limit, status=args.status, show_id=args.show_id + ) + elif args.command == "rollback": return cli.rollback(args.id, dry_run=args.dry_run) - elif args.command == 'check-pref': + elif args.command == "check-pref": return cli.check_pref(key=args.key) - elif args.command == 'edit-pref': + elif args.command == "edit-pref": return cli.edit_pref(action=args.action, key=args.key, value=args.value) # Handle the new notify command - elif args.command == 'notify': + elif args.command == "notify": return cli.notify(args) - elif args.command == 'doctor': + elif args.command == "doctor": return cli.doctor() else: parser.print_help() @@ -667,5 +710,6 @@ def main(): print(f"❌ Unexpected error: {e}", file=sys.stderr) return 1 -if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file + +if __name__ == "__main__": + sys.exit(main()) diff --git a/cortex/doctor.py b/cortex/doctor.py index 69d521c6..c8a8840c 100644 --- a/cortex/doctor.py +++ b/cortex/doctor.py @@ -12,17 +12,16 @@ from rich.table import Table from rich.panel import Panel -from rich.console import Console from rich import box -from cortex.branding import console, cx_print +from cortex.branding import console from cortex.validators import validate_api_key class SystemDoctor: """ Performs comprehensive system health checks and diagnostics. - + Checks for: - Python version compatibility - Required Python dependencies @@ -32,82 +31,81 @@ class SystemDoctor: - API key configuration - Disk space availability - System memory - + Attributes: warnings: List of non-critical issues found failures: List of critical issues that may prevent operation suggestions: List of fix commands for issues passes: List of successful checks """ - + def __init__(self) -> None: """Initialize the SystemDoctor with empty check lists.""" self.warnings: List[str] = [] self.failures: List[str] = [] self.suggestions: List[str] = [] self.passes: List[str] = [] - + def run_checks(self) -> int: """ Run all health checks and return appropriate exit code. - + Exit codes: 0: All checks passed, system is healthy 1: Warnings found, system can operate but has recommendations 2: Critical failures found, system may not work properly - + Returns: int: Exit code reflecting system health status (0, 1, or 2) """ # Header console.print() - console.print(Panel.fit( - "[bold cyan]CORTEX SYSTEM CHECK[/bold cyan]", - border_style="cyan", - padding=(1, 4) - )) + console.print( + Panel.fit( + "[bold cyan]CORTEX SYSTEM CHECK[/bold cyan]", + border_style="cyan", + padding=(1, 4), + ) + ) console.print() - + # Run all check groups self._print_section("Python & Dependencies") self._check_python() self._check_dependencies() - + self._print_section("GPU & Acceleration") self._check_gpu_driver() self._check_cuda() - + self._print_section("AI & Services") self._check_ollama() self._check_api_keys() - + self._print_section("System Resources") self._check_disk_space() self._check_memory() - + # Print summary self._print_summary() - + # Return appropriate exit code if self.failures: return 2 # Critical failures elif self.warnings: return 1 # Warnings only return 0 # All good - + def _print_section(self, title: str) -> None: """Print a section header for grouping checks.""" console.print(f"\n[bold cyan]{title}[/bold cyan]") - + def _print_check( - self, - status: str, - message: str, - suggestion: Optional[str] = None + self, status: str, message: str, suggestion: Optional[str] = None ) -> None: """ Print a check result with appropriate formatting and colors. - + Args: status: One of "PASS", "WARN", "FAIL", or "INFO" message: Description of the check result @@ -137,12 +135,12 @@ def _print_check( symbol = "?" color = "dim" prefix = "[INFO]" - + # Print with icon prefix and coloring console.print( f" [cyan]CX[/cyan] [{color}]{symbol} {prefix}[/{color}] {message}" ) - + def _check_python(self) -> None: """Check Python version compatibility.""" version = ( @@ -150,175 +148,183 @@ def _check_python(self) -> None: f"{sys.version_info.minor}." f"{sys.version_info.micro}" ) - + if sys.version_info >= (3, 10): self._print_check("PASS", f"Python {version}") - + elif sys.version_info >= (3, 8): - self._print_check("WARN",f"Python {version} (3.10+ recommended)", - "Upgrade Python: sudo apt install python3.11") + self._print_check( + "WARN", + f"Python {version} (3.10+ recommended)", + "Upgrade Python: sudo apt install python3.11", + ) else: - self._print_check("FAIL",f"Python {version} (3.10+ required)", - "Install Python 3.10+: sudo apt install python3.11") - + self._print_check( + "FAIL", + f"Python {version} (3.10+ required)", + "Install Python 3.10+: sudo apt install python3.11", + ) + def _check_dependencies(self) -> None: - """Check if required Python packages are installed.""" + """Check packages from ACTUAL requirements.txt.""" missing = [] - required = { - 'anthropic': 'Anthropic', - 'openai': 'OpenAI', - 'rich': 'Rich', - 'typer': 'Typer', - 'requests': 'Requests' - } - - for module, name in required.items(): - try: - __import__(module) - except ImportError: - missing.append(name) - + requirements_path = Path("requirements.txt") + + if not requirements_path.exists(): + self._print_check("WARN", "No requirements.txt found") + return + + try: + with open(requirements_path) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + pkg_name = line.split("==")[0].split(">")[0].split("<")[0] + try: + __import__(pkg_name) + except ImportError: + missing.append(pkg_name) + except Exception: + self._print_check("WARN", "Could not read requirements.txt") + return + if not missing: - self._print_check("PASS", "Python dependencies installed") + self._print_check("PASS", "All requirements.txt packages installed") elif len(missing) < 3: self._print_check( "WARN", - f"Missing dependencies: {', '.join(missing)}", - "Install dependencies: pip install -r requirements.txt" + f"Missing from requirements.txt: {', '.join(missing)}", + "Install dependencies: pip install -r requirements.txt", ) else: self._print_check( "FAIL", - f"Missing {len(missing)} dependencies: {', '.join(missing)}", - "Install dependencies: pip install -r requirements.txt" + f" Missing {len(missing)} packages from requirements.txt: {', '.join(missing[:3])}...", + "Install dependencies: pip install -r requirements.txt", ) - + def _check_gpu_driver(self) -> None: """Check for GPU drivers (NVIDIA or AMD ROCm).""" # Check NVIDIA - if shutil.which('nvidia-smi'): + if shutil.which("nvidia-smi"): try: result = subprocess.run( [ - 'nvidia-smi', - '--query-gpu=driver_version', - '--format=csv,noheader' + "nvidia-smi", + "--query-gpu=driver_version", + "--format=csv,noheader", ], capture_output=True, text=True, - timeout=5 + timeout=5, ) if result.returncode == 0 and result.stdout.strip(): - version = result.stdout.strip().split('\n')[0] + version = result.stdout.strip().split("\n")[0] self._print_check("PASS", f"NVIDIA Driver {version}") return except (subprocess.TimeoutExpired, Exception): pass - + # Check AMD ROCm - if shutil.which('rocm-smi'): + if shutil.which("rocm-smi"): try: result = subprocess.run( - ['rocm-smi', '--showdriverversion'], + ["rocm-smi", "--showdriverversion"], capture_output=True, text=True, - timeout=5 + timeout=5, ) if result.returncode == 0: self._print_check("PASS", "AMD ROCm driver detected") return except (subprocess.TimeoutExpired, Exception): pass - + # No GPU found - this is a warning, not a failure self._print_check( "WARN", - "No GPU detected (CPU-only mode)", - "Optional: Install GPU drivers for acceleration" + "No GPU detected (CPU-only mode supported, local inference will be slower)", # ← NEW + "Optional: Install NVIDIA/AMD drivers for acceleration", ) - + def _check_cuda(self) -> None: """Check CUDA/ROCm availability for GPU acceleration.""" # Check CUDA - if shutil.which('nvcc'): + if shutil.which("nvcc"): try: result = subprocess.run( - ['nvcc', '--version'], - capture_output=True, - text=True, - timeout=5 + ["nvcc", "--version"], capture_output=True, text=True, timeout=5 ) - if result.returncode == 0 and 'release' in result.stdout: - version_line = result.stdout.split('release')[1].split(',')[0].strip() + if result.returncode == 0 and "release" in result.stdout: + version_line = ( + result.stdout.split("release")[1].split(",")[0].strip() + ) self._print_check("PASS", f"CUDA {version_line}") return except (subprocess.TimeoutExpired, Exception): pass - + # Check ROCm - rocm_info_path = Path('/opt/rocm/.info/version') + rocm_info_path = Path("/opt/rocm/.info/version") if rocm_info_path.exists(): try: - version = rocm_info_path.read_text(encoding='utf-8').strip() + version = rocm_info_path.read_text(encoding="utf-8").strip() self._print_check("PASS", f"ROCm {version}") return except (OSError, UnicodeDecodeError): self._print_check("PASS", "ROCm installed") return - elif Path('/opt/rocm').exists(): + elif Path("/opt/rocm").exists(): self._print_check("PASS", "ROCm installed") return - + # Check if PyTorch has CUDA available (software level) try: import torch + if torch.cuda.is_available(): self._print_check("PASS", "CUDA available (PyTorch)") return except ImportError: pass - + self._print_check( "WARN", "CUDA/ROCm not found (GPU acceleration unavailable)", - "Install CUDA: https://developer.nvidia.com/cuda-downloads" + "Install CUDA: https://developer.nvidia.com/cuda-downloads", ) - + def _check_ollama(self) -> None: """Check if Ollama is installed and running.""" # Check if installed - if not shutil.which('ollama'): + if not shutil.which("ollama"): self._print_check( "WARN", "Ollama not installed", - "Install Ollama: curl https://ollama.ai/install.sh | sh" + "Install Ollama: curl https://ollama.ai/install.sh | sh", ) return - + # Check if running by testing the API try: import requests - response = requests.get( - 'http://localhost:11434/api/tags', - timeout=2 - ) + + response = requests.get("http://localhost:11434/api/tags", timeout=2) if response.status_code == 200: self._print_check("PASS", "Ollama installed and running") return except Exception: pass - + # Ollama installed but not running self._print_check( - "WARN", - "Ollama installed but not running", - "Start Ollama: ollama serve &" + "WARN", "Ollama installed but not running", "Start Ollama: ollama serve &" ) - + def _check_api_keys(self) -> None: """Check if API keys are configured for cloud models.""" is_valid, provider, error = validate_api_key() - + if is_valid: self._print_check("PASS", f"{provider} API key configured") else: @@ -326,136 +332,134 @@ def _check_api_keys(self) -> None: "WARN", "No API keys configured (required for cloud models)", "Configure API key: export ANTHROPIC_API_KEY=sk-... " - "or run 'cortex wizard'" + "or run 'cortex wizard'", ) - + def _check_disk_space(self) -> None: """Check available disk space for model storage.""" try: - usage = shutil.disk_usage(os.path.expanduser('~')) + usage = shutil.disk_usage(os.path.expanduser("~")) free_gb = usage.free / (1024**3) total_gb = usage.total / (1024**3) - + if free_gb > 20: self._print_check( - "PASS", - f"{free_gb:.1f}GB free disk space ({total_gb:.1f}GB total)" + "PASS", f"{free_gb:.1f}GB free disk space ({total_gb:.1f}GB total)" ) elif free_gb > 10: self._print_check( "WARN", f"{free_gb:.1f}GB free (20GB+ recommended for models)", - "Free up disk space: sudo apt clean && docker system prune -a" + "Free up disk space: sudo apt clean && docker system prune -a", ) else: self._print_check( "FAIL", f"Only {free_gb:.1f}GB free (critically low)", - "Free up disk space: sudo apt autoremove && sudo apt clean" + "Free up disk space: sudo apt autoremove && sudo apt clean", ) except (OSError, Exception) as e: - self._print_check( - "WARN", - f"Could not check disk space: {type(e).__name__}" - ) - + self._print_check("WARN", f"Could not check disk space: {type(e).__name__}") + def _check_memory(self) -> None: """Check system RAM availability.""" mem_gb = self._get_system_memory() - + if mem_gb is None: self._print_check("WARN", "Could not detect system RAM") return - + if mem_gb >= 16: self._print_check("PASS", f"{mem_gb:.1f}GB RAM") elif mem_gb >= 8: self._print_check( "WARN", f"{mem_gb:.1f}GB RAM (16GB recommended for larger models)", - "Consider upgrading RAM or use smaller models" + "Consider upgrading RAM or use smaller models", ) else: self._print_check( "FAIL", f"Only {mem_gb:.1f}GB RAM (8GB minimum required)", - "Upgrade RAM to at least 8GB" + "Upgrade RAM to at least 8GB", ) - + def _get_system_memory(self) -> Optional[float]: """ Get system memory in GB. - + Returns: float: Total system memory in GB, or None if detection fails """ # Try /proc/meminfo (Linux) try: - with open('/proc/meminfo', 'r', encoding='utf-8') as f: + with open("/proc/meminfo", "r", encoding="utf-8") as f: for line in f: - if line.startswith('MemTotal:'): + if line.startswith("MemTotal:"): mem_kb = int(line.split()[1]) return mem_kb / (1024**2) except (OSError, ValueError, IndexError): pass - + # Try psutil (macOS/BSD/Windows) try: import psutil + return psutil.virtual_memory().total / (1024**3) except ImportError: pass - + return None - + def _print_summary(self) -> None: """Print summary table and overall health status with suggestions.""" console.print() - + # Create summary table table = Table(show_header=False, box=box.SIMPLE, padding=(0, 1)) table.add_column("Status", style="bold") table.add_column("Count", justify="right") - + if self.passes: table.add_row( - "[green]✓ Passed[/green]", - f"[green]{len(self.passes)}[/green]" + "[green]✓ Passed[/green]", f"[green]{len(self.passes)}[/green]" ) if self.warnings: table.add_row( - "[yellow]⚠ Warnings[/yellow]", - f"[yellow]{len(self.warnings)}[/yellow]" + "[yellow]⚠ Warnings[/yellow]", f"[yellow]{len(self.warnings)}[/yellow]" ) if self.failures: - table.add_row( - "[red]✗ Failures[/red]", - f"[red]{len(self.failures)}[/red]" - ) - + table.add_row("[red]✗ Failures[/red]", f"[red]{len(self.failures)}[/red]") + console.print(table) console.print() - + # Overall status panel if self.failures: - console.print(Panel( - f"[bold red]❌ {len(self.failures)} critical failure(s) found[/bold red]", - border_style="red", - padding=(0, 2) - )) + console.print( + Panel( + f"[bold red]❌ {len(self.failures)} critical failure(s) found[/bold red]", + border_style="red", + padding=(0, 2), + ) + ) elif self.warnings: - console.print(Panel( - f"[bold yellow]⚠️ {len(self.warnings)} warning(s) found[/bold yellow]", - border_style="yellow", - padding=(0, 2) - )) + console.print( + Panel( + f"[bold yellow]⚠️ {len(self.warnings)} warning(s) found[/bold yellow]", + border_style="yellow", + padding=(0, 2), + ) + ) else: - console.print(Panel( - "[bold green]✅ All checks passed! System is healthy.[/bold green]", - border_style="green", - padding=(0, 2) - )) - + console.print( + Panel( + "[bold green]✅ All checks passed! System is healthy.[/bold green]", + border_style="green", + padding=(0, 2), + ) + ) + # Show fix suggestions if any if self.suggestions: console.print() @@ -468,7 +472,7 @@ def _print_summary(self) -> None: def run_doctor() -> int: """ Run the system doctor and return exit code. - + Returns: int: Exit code (0 = all good, 1 = warnings, 2 = failures) """ @@ -477,4 +481,4 @@ def run_doctor() -> int: if __name__ == "__main__": - sys.exit(run_doctor()) \ No newline at end of file + sys.exit(run_doctor()) diff --git a/tests/test_doctor.py b/tests/test_doctor.py new file mode 100644 index 00000000..fd1794a3 --- /dev/null +++ b/tests/test_doctor.py @@ -0,0 +1,126 @@ +""" +Unit tests for cortex/doctor.py - System Health Check +""" + +import sys +import pytest +from collections import namedtuple +from unittest.mock import patch, MagicMock, mock_open + +from cortex.doctor import SystemDoctor + + +class TestSystemDoctorInit: + def test_init_empty_lists(self): + doctor = SystemDoctor() + assert doctor.passes == [] + assert doctor.warnings == [] + assert doctor.failures == [] + assert doctor.suggestions == [] + + +class TestPythonVersionCheck: + VersionInfo = namedtuple("VersionInfo", "major minor micro releaselevel serial") + + @pytest.mark.parametrize( + "version_tuple, status", + [ + ((3, 12, 3), "PASS"), + ((3, 9, 0), "WARN"), + ((3, 7, 0), "FAIL"), + ], + ) + def test_python_version_scenarios(self, monkeypatch, version_tuple, status): + doctor = SystemDoctor() + + vi = self.VersionInfo( + version_tuple[0], version_tuple[1], version_tuple[2], "final", 0 + ) + monkeypatch.setattr(sys, "version_info", vi) + + doctor._check_python() + + if status == "PASS": + assert any("Python 3.12.3" in msg for msg in doctor.passes) + elif status == "WARN": + assert any("Python 3.9.0" in msg for msg in doctor.warnings) + else: + assert any("Python 3.7.0" in msg for msg in doctor.failures) + + +class TestRequirementsTxtDependencies: + def test_requirements_txt_all_installed(self): + doctor = SystemDoctor() + mock_content = "anthropic\nopenai\nrich\n" + + with patch("builtins.open", mock_open(read_data=mock_content)): + with patch("builtins.__import__", return_value=MagicMock()): + doctor._check_dependencies() + + assert "All requirements.txt packages installed" in doctor.passes[0] + + def test_some_dependencies_missing(self): + doctor = SystemDoctor() + + def mock_import(name, *args, **kwargs): + if name in ["anthropic", "openai"]: + raise ImportError() + return MagicMock() + + mock_content = "anthropic\nopenai\nrich\n" + with patch("builtins.open", mock_open(read_data=mock_content)): + with patch("builtins.__import__", side_effect=mock_import): + doctor._check_dependencies() + + assert "Missing from requirements.txt" in doctor.warnings[0] + + +class TestGPUDriverCheck: + def test_cpu_only_message(self): + doctor = SystemDoctor() + with patch("shutil.which", return_value=None): + doctor._check_gpu_driver() + assert "CPU-only mode" in doctor.warnings[0] + + +class TestExitCodes: + """ + IMPORTANT: run_checks() calls all checks; without patching, your real system + will produce warnings/failures and exit code 2, which is why your previous + tests saw 2 instead of 1/0. + """ + + @patch.object(SystemDoctor, "_check_python") + @patch.object(SystemDoctor, "_check_dependencies") + @patch.object(SystemDoctor, "_check_gpu_driver") + @patch.object(SystemDoctor, "_check_cuda") + @patch.object(SystemDoctor, "_check_ollama") + @patch.object(SystemDoctor, "_check_api_keys") + @patch.object(SystemDoctor, "_check_disk_space") + @patch.object(SystemDoctor, "_check_memory") + @patch.object(SystemDoctor, "_print_summary") + def test_exit_codes(self, *_mocks): + # all good → 0 + d = SystemDoctor() + d.passes = ["ok"] + d.warnings = [] + d.failures = [] + assert d.run_checks() == 0 + + # warnings only → 1 + d = SystemDoctor() + d.passes = ["ok"] + d.warnings = ["warn"] + d.failures = [] + assert d.run_checks() == 1 + + # failures present → 2 + d = SystemDoctor() + d.passes = ["ok"] + d.warnings = ["warn"] + d.failures = ["fail"] + assert d.run_checks() == 2 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])