diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml deleted file mode 100644 index 6398cc7..0000000 --- a/.github/workflows/codeql.yml +++ /dev/null @@ -1,43 +0,0 @@ -name: "CodeQL Security Analysis" - -on: - push: - branches: [ main ] - pull_request: - branches: [ main ] - schedule: - # Run every Monday at 6:00 AM UTC - - cron: '0 6 * * 1' - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - strategy: - fail-fast: false - matrix: - language: [ 'python' ] - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Initialize CodeQL - uses: github/codeql-action/init@v4 - with: - languages: ${{ matrix.language }} - # Use default queries plus security-extended - queries: security-extended - - - name: Autobuild - uses: github/codeql-action/autobuild@v4 - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v4 - with: - category: "/language:${{ matrix.language }}" diff --git a/cortex/cli.py b/cortex/cli.py index 17004c6..b56018b 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -3,6 +3,9 @@ import argparse import time import logging +import shutil +import traceback +import urllib.request from typing import List, Optional from datetime import datetime @@ -34,9 +37,7 @@ ) from cortex.validators import ( validate_api_key, - validate_install_request, - validate_installation_id, - ValidationError + validate_install_request ) # Import the new Notification Manager from cortex.notification_manager import NotificationManager @@ -63,7 +64,8 @@ def _get_api_key(self) -> Optional[str]: is_valid, detected_provider, error = validate_api_key() if not is_valid: - self._print_error(error) + if error: + self._print_error(error) cx_print("Run [bold]cortex wizard[/bold] to configure your API key.", "info") cx_print("Or use [bold]CORTEX_PROVIDER=ollama[/bold] for offline mode.", "info") return None @@ -176,11 +178,12 @@ def notify(self, args): return 1 # ------------------------------- - def install(self, software: str, execute: bool = False, dry_run: bool = False): + def install(self, software: str, execute: bool = False, dry_run: bool = False, parallel: bool = False): # Validate input first is_valid, error = validate_install_request(software) if not is_valid: - self._print_error(error) + if error: + self._print_error(error) return 1 api_key = self._get_api_key() @@ -247,48 +250,120 @@ def progress_callback(current, total, step): print(f" Command: {step.command}") print("\nExecuting commands...") - - coordinator = InstallationCoordinator( - commands=commands, - descriptions=[f"Step {i+1}" for i in range(len(commands))], - timeout=300, - stop_on_error=True, - progress_callback=progress_callback - ) - - result = coordinator.execute() - - if result.success: - self._print_success(f"{software} installed successfully!") - print(f"\nCompleted in {result.total_duration:.2f} seconds") - - # Record successful installation - if install_id: - history.update_installation(install_id, InstallationStatus.SUCCESS) - print(f"\nšŸ“ Installation recorded (ID: {install_id})") - print(f" To rollback: cortex rollback {install_id}") - - return 0 - else: - # Record failed installation - if install_id: - error_msg = result.error_message or "Installation failed" - history.update_installation( - install_id, - InstallationStatus.FAILED, - error_msg + + if parallel: + # Use parallel execution + import asyncio + from cortex.install_parallel import run_parallel_install + + def parallel_log_callback(message: str, level: str = "info"): + """Callback for parallel execution logging.""" + if level == "success": + cx_print(f" āœ… {message}", "success") + elif level == "error": + cx_print(f" āŒ {message}", "error") + else: + cx_print(f" ℹ {message}", "info") + + try: + success, parallel_tasks = asyncio.run( + run_parallel_install( + commands=commands, + descriptions=[f"Step {i+1}" for i in range(len(commands))], + timeout=300, + stop_on_error=True, + log_callback=parallel_log_callback + ) ) - - if result.failed_step is not None: - self._print_error(f"Installation failed at step {result.failed_step + 1}") + + # Calculate total duration from tasks + total_duration = 0 + if parallel_tasks: + max_end = max((t.end_time for t in parallel_tasks if t.end_time is not None), default=None) + min_start = min((t.start_time for t in parallel_tasks if t.start_time is not None), default=None) + if max_end is not None and min_start is not None: + total_duration = max_end - min_start + + if success: + self._print_success(f"{software} installed successfully!") + print(f"\nCompleted in {total_duration:.2f} seconds (parallel mode)") + + # Record successful installation + if install_id: + history.update_installation(install_id, InstallationStatus.SUCCESS) + print(f"\nšŸ“ Installation recorded (ID: {install_id})") + print(f" To rollback: cortex rollback {install_id}") + + return 0 + else: + # Find failed task for error reporting + failed_tasks = [t for t in parallel_tasks if t.status.value == "failed"] + error_msg = failed_tasks[0].error if failed_tasks else "Installation failed" + + if install_id: + history.update_installation( + install_id, + InstallationStatus.FAILED, + error_msg + ) + + self._print_error("Installation failed") + if error_msg: + print(f" Error: {error_msg}", file=sys.stderr) + if install_id: + print(f"\nšŸ“ Installation recorded (ID: {install_id})") + print(f" View details: cortex history show {install_id}") + return 1 + + except Exception as e: + if install_id: + history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._print_error(f"Parallel execution failed: {str(e)}") + return 1 + + else: + # Use sequential execution (original behavior) + coordinator = InstallationCoordinator( + commands=commands, + descriptions=[f"Step {i+1}" for i in range(len(commands))], + timeout=300, + stop_on_error=True, + progress_callback=progress_callback + ) + + result = coordinator.execute() + + if result.success: + self._print_success(f"{software} installed successfully!") + print(f"\nCompleted in {result.total_duration:.2f} seconds") + + # Record successful installation + if install_id: + history.update_installation(install_id, InstallationStatus.SUCCESS) + print(f"\nšŸ“ Installation recorded (ID: {install_id})") + print(f" To rollback: cortex rollback {install_id}") + + return 0 else: - self._print_error("Installation failed") - if result.error_message: - print(f" Error: {result.error_message}", file=sys.stderr) - if install_id: - print(f"\nšŸ“ Installation recorded (ID: {install_id})") - print(f" View details: cortex history show {install_id}") - return 1 + # Record failed installation + if install_id: + error_msg = result.error_message or "Installation failed" + history.update_installation( + install_id, + InstallationStatus.FAILED, + error_msg + ) + + if result.failed_step is not None: + self._print_error(f"Installation failed at step {result.failed_step + 1}") + else: + self._print_error("Installation failed") + if result.error_message: + print(f" Error: {result.error_message}", file=sys.stderr) + if install_id: + print(f"\nšŸ“ Installation recorded (ID: {install_id})") + print(f" View details: cortex history show {install_id}") + return 1 else: print("\nTo execute these commands, run with --execute flag") print("Example: cortex install docker --execute") @@ -467,12 +542,11 @@ def edit_pref(self, action: str, key: Optional[str] = None, value: Optional[str] except Exception as e: self._print_error(f"Failed to edit preferences: {str(e)}") + traceback.print_exc() return 1 def status(self): """Show system status including security features""" - import shutil - show_banner(show_version=True) console.print() @@ -499,6 +573,32 @@ def status(self): cx_print("Firejail: [bold]Not installed[/bold]", "warning") cx_print(" Install: sudo apt-get install firejail", "info") + # Check Ollama + ollama_host = os.environ.get('OLLAMA_HOST', 'http://localhost:11434') + try: + req = urllib.request.Request(f"{ollama_host}/api/tags", method='GET') + with urllib.request.urlopen(req, timeout=2) as resp: + cx_print(f"Ollama: [bold]Running[/bold] ({ollama_host})", "success") + except Exception: + cx_print(f"Ollama: [bold]Not running[/bold]", "info") + cx_print(" Start: ollama serve", "info") + + console.print() + cx_header("Configuration") + + # Show config file location + config_path = os.path.expanduser('~/.cortex/config.json') + if os.path.exists(config_path): + cx_print(f"Config: {config_path}", "info") + else: + cx_print(f"Config: Not created yet", "info") + + history_path = os.path.expanduser('~/.cortex/history.db') + if os.path.exists(history_path): + cx_print(f"History: {history_path}", "info") + else: + cx_print(f"History: Not created yet", "info") + console.print() return 0 @@ -564,19 +664,20 @@ def main(): subparsers = parser.add_subparsers(dest='command', help='Available commands') # Demo command - demo_parser = subparsers.add_parser('demo', help='See Cortex in action') + _demo_parser = subparsers.add_parser('demo', help='See Cortex in action') # Wizard command - wizard_parser = subparsers.add_parser('wizard', help='Configure API key interactively') + _wizard_parser = subparsers.add_parser('wizard', help='Configure API key interactively') # Status command - status_parser = subparsers.add_parser('status', help='Show system status') + _status_parser = subparsers.add_parser('status', help='Show system status') # Install command install_parser = subparsers.add_parser('install', help='Install software') install_parser.add_argument('software', type=str, help='Software to install') install_parser.add_argument('--execute', action='store_true', help='Execute commands') install_parser.add_argument('--dry-run', action='store_true', help='Show commands only') + install_parser.add_argument('--parallel', action='store_true', help='Execute independent tasks in parallel') # History command history_parser = subparsers.add_parser('history', help='View history') @@ -590,13 +691,13 @@ def main(): rollback_parser.add_argument('--dry-run', action='store_true') # Preferences commands - check_pref_parser = subparsers.add_parser('check-pref', help='Check preferences') - check_pref_parser.add_argument('key', nargs='?') + _check_pref_parser = subparsers.add_parser('check-pref', help='Check preferences') + _check_pref_parser.add_argument('key', nargs='?') - edit_pref_parser = subparsers.add_parser('edit-pref', help='Edit preferences') - edit_pref_parser.add_argument('action', choices=['set', 'add', 'delete', 'list', 'validate']) - edit_pref_parser.add_argument('key', nargs='?') - edit_pref_parser.add_argument('value', nargs='?') + _edit_pref_parser = subparsers.add_parser('edit-pref', help='Edit preferences') + _edit_pref_parser.add_argument('action', choices=['set', 'add', 'delete', 'list', 'validate']) + _edit_pref_parser.add_argument('key', nargs='?') + _edit_pref_parser.add_argument('value', nargs='?') # --- New Notify Command --- notify_parser = subparsers.add_parser('notify', help='Manage desktop notifications') @@ -633,7 +734,7 @@ def main(): elif args.command == 'status': return cli.status() elif args.command == 'install': - return cli.install(args.software, execute=args.execute, dry_run=args.dry_run) + return cli.install(args.software, execute=args.execute, dry_run=args.dry_run, parallel=args.parallel) elif args.command == 'history': return cli.history(limit=args.limit, status=args.status, show_id=args.show_id) elif args.command == 'rollback': diff --git a/cortex/coordinator.py b/cortex/coordinator.py index 431bedb..1703894 100644 --- a/cortex/coordinator.py +++ b/cortex/coordinator.py @@ -9,23 +9,9 @@ from datetime import datetime import logging -logger = logging.getLogger(__name__) +from cortex.validators import DANGEROUS_PATTERNS -# Dangerous patterns that should never be executed -DANGEROUS_PATTERNS = [ - r'rm\s+-rf\s+[/\*]', - r'rm\s+--no-preserve-root', - r'dd\s+if=.*of=/dev/', - r'curl\s+.*\|\s*sh', - r'curl\s+.*\|\s*bash', - r'wget\s+.*\|\s*sh', - r'wget\s+.*\|\s*bash', - r'\beval\s+', - r'base64\s+-d\s+.*\|', - r'>\s*/etc/', - r'chmod\s+777', - r'chmod\s+\+s', -] +logger = logging.getLogger(__name__) class StepStatus(Enum): diff --git a/cortex/install_parallel.py b/cortex/install_parallel.py new file mode 100644 index 0000000..dd688b1 --- /dev/null +++ b/cortex/install_parallel.py @@ -0,0 +1,269 @@ +import asyncio +import time +import subprocess +import re +import concurrent.futures +from typing import List, Dict, Optional, Callable +from dataclasses import dataclass, field +from enum import Enum + +from cortex.validators import DANGEROUS_PATTERNS + + +class TaskStatus(Enum): + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + SKIPPED = "skipped" + + +@dataclass +class ParallelTask: + """Represents a single task in parallel execution.""" + name: str + command: str + description: str + dependencies: List[str] = field(default_factory=list) + status: TaskStatus = TaskStatus.PENDING + output: str = "" + error: str = "" + start_time: Optional[float] = None + end_time: Optional[float] = None + + def duration(self) -> Optional[float]: + if self.start_time and self.end_time: + return self.end_time - self.start_time + return None + + +async def run_single_task(task: ParallelTask, executor, timeout: int, log_callback: Optional[Callable] = None) -> bool: + """Run a single task asynchronously. + + Args: + task: Task to run + executor: Thread pool executor for running blocking subprocess calls + timeout: Command timeout in seconds + log_callback: Optional callback for logging messages + + Returns: + True if task succeeded, False otherwise + """ + task.status = TaskStatus.RUNNING + task.start_time = time.time() + + # Log task start + if log_callback: + log_callback(f"Starting {task.name}…", "info") + + # Validate command for dangerous patterns + for pattern in DANGEROUS_PATTERNS: + if re.search(pattern, task.command, re.IGNORECASE): + task.status = TaskStatus.FAILED + task.error = "Command blocked: matches dangerous pattern" + task.end_time = time.time() + if log_callback: + log_callback(f"Finished {task.name} (failed)", "error") + return False + + try: + # Run command in executor (thread pool) to avoid blocking the event loop + loop = asyncio.get_running_loop() + result = await asyncio.wait_for( + loop.run_in_executor( + executor, + # Use shell=True carefully - commands are validated against dangerous patterns above. + # shell=True is required to support complex shell commands (e.g., pipes, redirects). + lambda: subprocess.run( + task.command, + shell=True, + capture_output=True, + text=True, + timeout=timeout + ) + ), + timeout=timeout + 5 # Slight buffer for asyncio overhead + ) + + task.output = result.stdout + task.error = result.stderr + task.end_time = time.time() + + if result.returncode == 0: + task.status = TaskStatus.SUCCESS + if log_callback: + log_callback(f"Finished {task.name} (ok)", "success") + return True + else: + task.status = TaskStatus.FAILED + if log_callback: + log_callback(f"Finished {task.name} (failed)", "error") + return False + + except asyncio.TimeoutError: + task.status = TaskStatus.FAILED + task.error = f"Command timed out after {timeout} seconds" + task.end_time = time.time() + if log_callback: + log_callback(f"Finished {task.name} (failed)", "error") + return False + + except Exception as e: + task.status = TaskStatus.FAILED + task.error = str(e) + task.end_time = time.time() + if log_callback: + log_callback(f"Finished {task.name} (failed)", "error") + return False + + +async def run_parallel_install( + commands: List[str], + descriptions: Optional[List[str]] = None, + dependencies: Optional[Dict[int, List[int]]] = None, + timeout: int = 300, + stop_on_error: bool = True, + log_callback: Optional[Callable] = None +) -> tuple: + """Execute installation tasks in parallel based on dependency graph. + + Args: + commands: List of commands to execute + descriptions: Optional list of descriptions for each command + dependencies: Optional dict mapping command index to list of dependent indices + e.g., {0: [], 1: [0]} means task 1 depends on task 0 + timeout: Timeout per command in seconds + stop_on_error: If True, cancel dependent tasks when a task fails + log_callback: Optional callback for logging (called with message and level) + + Returns: + tuple[bool, List[ParallelTask]]: Success status and list of all tasks + """ + if not commands: + return True, [] + + if descriptions and len(descriptions) != len(commands): + raise ValueError("Number of descriptions must match number of commands") + + # Create tasks + tasks: Dict[str, ParallelTask] = {} + for i, command in enumerate(commands): + task_name = f"Task {i+1}" + desc = descriptions[i] if descriptions else f"Step {i+1}" + + # Get dependencies for this task (if any commands depend on it, don't use that) + # Instead, find which tasks this task depends on + task_deps = [] + if dependencies: + # Dependencies format: key=task_index -> list of indices it depends on + for dep_idx in dependencies.get(i, []): + task_deps.append(f"Task {dep_idx+1}") + + tasks[task_name] = ParallelTask( + name=task_name, + command=command, + description=desc, + dependencies=task_deps + ) + + # Execution tracking + completed = set() + running = {} + pending = set(tasks.keys()) + failed = set() + + # Thread pool for subprocess calls + executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) + + try: + while pending or running: + # Start tasks whose dependencies are met + ready_to_start = [] + for task_name in pending.copy(): + task = tasks[task_name] + # When stop_on_error=False, accept both completed and failed dependencies + if stop_on_error: + deps_met = all(dep in completed for dep in task.dependencies) + else: + deps_met = all(dep in completed or dep in failed for dep in task.dependencies) + + if deps_met: + ready_to_start.append(task_name) + pending.remove(task_name) + + # If no tasks can be started and none are running, we're stuck (deadlock/cycle detection) + if not ready_to_start and not running and pending: + # Mark remaining tasks as skipped - they have unresolvable dependencies + for task_name in pending: + task = tasks[task_name] + if task.status == TaskStatus.PENDING: + task.status = TaskStatus.SKIPPED + task.error = "Task could not run because dependencies never completed" + if log_callback: + log_callback(f"{task_name} skipped due to unresolved dependencies", "error") + failed.update(pending) + break + + # Create tasks for ready items + for task_name in ready_to_start: + coro = run_single_task(tasks[task_name], executor, timeout, log_callback) + running[task_name] = asyncio.create_task(coro) + + # If nothing is running and nothing is pending, we're done + if not running and not pending: + break + + # Wait for at least one task to finish + if running: + done, _ = await asyncio.wait( + running.values(), + return_when=asyncio.FIRST_COMPLETED + ) + + # Process completed tasks + for task_coro in done: + # Find which task this is + for task_name, running_coro in running.items(): + if running_coro is task_coro: + task = tasks[task_name] + + # Handle cancelled tasks + try: + success = task_coro.result() + except asyncio.CancelledError: + # Task was cancelled due to stop_on_error + task.status = TaskStatus.SKIPPED + task.error = "Task cancelled due to dependency failure" + failed.add(task_name) + del running[task_name] + break + + if success: + completed.add(task_name) + else: + failed.add(task_name) + + # If stop_on_error, skip dependent tasks + if stop_on_error: + dependent_tasks = [ + name for name, t in tasks.items() + if task_name in t.dependencies + ] + for dep_task_name in dependent_tasks: + if dep_task_name in pending: + pending.remove(dep_task_name) + tasks[dep_task_name].status = TaskStatus.SKIPPED + elif dep_task_name in running: + running[dep_task_name].cancel() + + del running[task_name] + break + + finally: + executor.shutdown(wait=True) + + # Check overall success + all_success = len(failed) == 0 + task_list = list(tasks.values()) + + return all_success, task_list diff --git a/cortex/validators.py b/cortex/validators.py index e4e0992..a61fe63 100644 --- a/cortex/validators.py +++ b/cortex/validators.py @@ -9,6 +9,23 @@ from typing import Optional, Tuple +# Dangerous command patterns to block for security +DANGEROUS_PATTERNS = [ + r'rm\s+-rf\s+[/\*]', + r'rm\s+--no-preserve-root', + r'dd\s+if=.*of=/dev/', + r'curl\s+.*\|\s*sh', + r'curl\s+.*\|\s*bash', + r'wget\s+.*\|\s*sh', + r'wget\s+.*\|\s*bash', + r'\beval\s+', + r'base64\s+-d\s+.*\|', + r'>\s*/etc/', + r'chmod\s+777', + r'chmod\s+\+s', +] + + class ValidationError(Exception): """Custom exception for validation errors with user-friendly messages""" diff --git a/requirements.txt b/requirements.txt index 1f24bc5..490190f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ rich>=13.0.0 # Type hints for older Python versions typing-extensions>=4.0.0 +PyYAML>=6.0.0 \ No newline at end of file diff --git a/tests/installer/__init__.py b/tests/installer/__init__.py new file mode 100644 index 0000000..1f56320 --- /dev/null +++ b/tests/installer/__init__.py @@ -0,0 +1 @@ +"""Installer tests package.""" diff --git a/tests/installer/test_parallel_install.py b/tests/installer/test_parallel_install.py new file mode 100644 index 0000000..0253d43 --- /dev/null +++ b/tests/installer/test_parallel_install.py @@ -0,0 +1,317 @@ +"""Tests for parallel installation execution.""" + +import pytest +import asyncio +import time +from cortex.install_parallel import ( + run_parallel_install, + TaskStatus +) + + +class TestParallelExecution: + """Test parallel execution of installation tasks.""" + + def test_parallel_runs_faster_than_sequential(self): + """Verify that parallel execution is faster for independent tasks.""" + async def run_test(): + # Create 3 independent commands using Python's time.sleep (Windows-compatible) + commands = [ + 'python -c "import time; time.sleep(0.1); print(\'Task 1\')"', + 'python -c "import time; time.sleep(0.1); print(\'Task 2\')"', + 'python -c "import time; time.sleep(0.1); print(\'Task 3\')"' + ] + + # Run in parallel + start = time.time() + success, tasks = await run_parallel_install(commands, timeout=10) + parallel_time = time.time() - start + + assert success + assert all(t.status == TaskStatus.SUCCESS for t in tasks) + + # Parallel execution should be faster than sequential (0.3s + overhead) + # On Windows, Python subprocess startup adds significant overhead + # We just verify it completes and doesn't take more than 1 second + assert parallel_time < 1.0, f"Parallel execution took {parallel_time}s, expected < 1.0s" + + asyncio.run(run_test()) + + def test_dependency_order_respected(self): + """Verify that task execution respects dependency order.""" + async def run_test(): + commands = [ + 'python -c "print(\'Task 1\')"', + 'python -c "print(\'Task 2\')"', + 'python -c "print(\'Task 3\')"' + ] + + # Task 1 has no dependencies + # Task 2 depends on Task 1 + # Task 3 depends on Task 2 + dependencies = { + 0: [], # Task 1 (index 0) has no dependencies + 1: [0], # Task 2 (index 1) depends on Task 1 (index 0) + 2: [1] # Task 3 (index 2) depends on Task 2 (index 1) + } + + success, tasks = await run_parallel_install( + commands, + dependencies=dependencies, + timeout=10 + ) + + assert success + assert all(t.status == TaskStatus.SUCCESS for t in tasks) + + asyncio.run(run_test()) + + def test_failure_blocks_dependent_tasks(self): + """Verify that dependent tasks are skipped when a parent task fails.""" + async def run_test(): + commands = [ + 'python -c "exit(1)"', # Task 1 fails + 'python -c "print(\'Task 2\')"', # Task 2 depends on Task 1 + 'python -c "print(\'Task 3\')"' # Task 3 is independent + ] + + # Task 2 depends on Task 1 + dependencies = { + 0: [], # Task 1 has no dependencies + 1: [0], # Task 2 depends on Task 1 (which will fail) + 2: [] # Task 3 is independent + } + + success, tasks = await run_parallel_install( + commands, + dependencies=dependencies, + timeout=10, + stop_on_error=True + ) + + assert not success + assert tasks[0].status == TaskStatus.FAILED + assert tasks[1].status == TaskStatus.SKIPPED # Blocked by failed Task 1 + assert tasks[2].status == TaskStatus.SUCCESS # Independent, should run + + asyncio.run(run_test()) + + def test_all_independent_tasks_run(self): + """Verify that all independent tasks run in parallel.""" + async def run_test(): + commands = [ + 'python -c "print(\'Task 1\')"', + 'python -c "print(\'Task 2\')"', + 'python -c "print(\'Task 3\')"', + 'python -c "print(\'Task 4\')"' + ] + + # All tasks are independent (no dependencies) + dependencies = { + 0: [], + 1: [], + 2: [], + 3: [] + } + + success, tasks = await run_parallel_install( + commands, + dependencies=dependencies, + timeout=10 + ) + + assert success + assert all(t.status == TaskStatus.SUCCESS for t in tasks) + assert len(tasks) == 4 + + asyncio.run(run_test()) + + def test_descriptions_match_tasks(self): + """Verify that descriptions are properly assigned to tasks.""" + async def run_test(): + commands = ['python -c "print(\'Task 1\')"', 'python -c "print(\'Task 2\')"'] + descriptions = ["Install package A", "Start service B"] + + success, tasks = await run_parallel_install( + commands, + descriptions=descriptions, + timeout=10 + ) + + assert success + assert tasks[0].description == "Install package A" + assert tasks[1].description == "Start service B" + + asyncio.run(run_test()) + + def test_invalid_description_count_raises_error(self): + """Verify that mismatched description count raises ValueError.""" + async def run_test(): + commands = ['python -c "print(\'Task 1\')"', 'python -c "print(\'Task 2\')"'] + descriptions = ["Only one description"] # Mismatch + + with pytest.raises(ValueError): + await run_parallel_install( + commands, + descriptions=descriptions, + timeout=10 + ) + + asyncio.run(run_test()) + + def test_command_timeout(self): + """Verify that commands timing out are handled correctly.""" + async def run_test(): + commands = [ + 'python -c "import time; time.sleep(5)"', # This will timeout with 1 second limit + ] + + success, tasks = await run_parallel_install( + commands, + timeout=1 + ) + + assert not success + assert tasks[0].status == TaskStatus.FAILED + assert "timed out" in tasks[0].error.lower() or "timeout" in tasks[0].error.lower() + + asyncio.run(run_test()) + + def test_empty_commands_list(self): + """Verify handling of empty command list.""" + async def run_test(): + success, tasks = await run_parallel_install([], timeout=5) + + assert success + assert len(tasks) == 0 + + asyncio.run(run_test()) + + def test_task_status_tracking(self): + """Verify that task status is properly tracked.""" + async def run_test(): + commands = ['python -c "print(\'Success\')"'] + + success, tasks = await run_parallel_install(commands, timeout=10) + + assert success + task = tasks[0] + assert task.status == TaskStatus.SUCCESS + assert "Success" in task.output + assert task.start_time is not None + assert task.end_time is not None + assert task.duration() is not None + assert task.duration() > 0 + + asyncio.run(run_test()) + + def test_sequential_mode_unchanged(self): + """Verify that sequential mode (no dependencies) still works as expected.""" + async def run_test(): + commands = [ + 'python -c "print(\'Step 1\')"', + 'python -c "print(\'Step 2\')"', + 'python -c "print(\'Step 3\')"' + ] + descriptions = ["Step 1", "Step 2", "Step 3"] + + success, tasks = await run_parallel_install( + commands, + descriptions=descriptions, + timeout=10 + ) + + assert success + assert len(tasks) == 3 + assert all(t.status == TaskStatus.SUCCESS for t in tasks) + assert all(t.description for t in tasks) + + asyncio.run(run_test()) + + def test_log_callback_called(self): + """Verify that log callback is invoked during execution.""" + async def run_test(): + commands = ['python -c "print(\'Test\')"'] + log_messages = [] + + def log_callback(message: str, level: str = "info"): + log_messages.append((message, level)) + + success, _tasks = await run_parallel_install( + commands, + timeout=10, + log_callback=log_callback + ) + + assert success + # Should have at least "Starting" and "Finished" messages + assert len(log_messages) >= 2 + assert any("Starting" in msg[0] for msg in log_messages) + assert any("Finished" in msg[0] for msg in log_messages) + + asyncio.run(run_test()) + + +class TestParallelExecutionIntegration: + """Integration tests for parallel execution with realistic scenarios.""" + + def test_diamond_dependency_graph(self): + """Test diamond-shaped dependency graph: + Task 1 -> Task 2 & Task 3 -> Task 4 + """ + async def run_test(): + commands = [ + 'python -c "print(\'Base\')"', # Task 1 + 'python -c "print(\'Branch A\')"', # Task 2 + 'python -c "print(\'Branch B\')"', # Task 3 + 'python -c "print(\'Final\')"' # Task 4 + ] + + # Task 2 and 3 depend on Task 1 + # Task 4 depends on both Task 2 and 3 + dependencies = { + 0: [], # Task 1 (base) + 1: [0], # Task 2 depends on Task 1 + 2: [0], # Task 3 depends on Task 1 + 3: [1, 2] # Task 4 depends on Task 2 and 3 + } + + success, tasks = await run_parallel_install( + commands, + dependencies=dependencies, + timeout=10 + ) + + assert success + assert all(t.status == TaskStatus.SUCCESS for t in tasks) + + asyncio.run(run_test()) + + def test_mixed_success_and_independent_failure(self): + """Test that independent failures don't block unrelated tasks.""" + async def run_test(): + commands = [ + 'python -c "exit(1)"', # Task 1 fails + 'python -c "print(\'OK\')"', # Task 2 independent + 'python -c "print(\'OK\')"', # Task 3 independent + ] + + dependencies = { + 0: [], + 1: [], + 2: [] + } + + success, tasks = await run_parallel_install( + commands, + dependencies=dependencies, + timeout=10, + stop_on_error=False # Don't stop on error to see all results + ) + + assert not success + assert tasks[0].status == TaskStatus.FAILED + assert tasks[1].status == TaskStatus.SUCCESS + assert tasks[2].status == TaskStatus.SUCCESS + + asyncio.run(run_test())