From 76321c087772f168dd3c6458ea44ec137362db15 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 16:23:43 +0000 Subject: [PATCH 01/16] Initial plan From ead2d2067efcd56af65546f89458d1bcdcf35039 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 16:32:35 +0000 Subject: [PATCH 02/16] Add configuration management feature with tests and documentation Co-authored-by: danishirfan21 <44131991+danishirfan21@users.noreply.github.com> --- .gitignore | 14 +- CONFIGURATION.md | 589 ++++++++++++++++++++++++++++ examples/sample-config.yaml | 74 ++++ src/config_manager.py | 762 ++++++++++++++++++++++++++++++++++++ src/requirements.txt | 4 +- src/test_config_manager.py | 683 ++++++++++++++++++++++++++++++++ 6 files changed, 2121 insertions(+), 5 deletions(-) create mode 100644 CONFIGURATION.md create mode 100644 examples/sample-config.yaml create mode 100644 src/config_manager.py create mode 100644 src/test_config_manager.py diff --git a/.gitignore b/.gitignore index f1d563d..6b46cf9 100644 --- a/.gitignore +++ b/.gitignore @@ -20,10 +20,6 @@ venv.bak/ # ============================== # Distribution / Packaging # ============================== -__pycache__/ -*.py[cod] -*$py.class -*.so .Python build/ develop-eggs/ @@ -150,3 +146,13 @@ ENV/ .pytest_cache/ .coverage htmlcov/ +*.out +*~ +*.swo + +# ============================== +# Cortex specific +# ============================== +.cortex/ +*.yaml.bak +/tmp/ diff --git a/CONFIGURATION.md b/CONFIGURATION.md new file mode 100644 index 0000000..734c12d --- /dev/null +++ b/CONFIGURATION.md @@ -0,0 +1,589 @@ +# Configuration Management for Cortex Linux + +## Overview + +Cortex Linux's Configuration Management feature enables you to export, share, and import system configurations for reproducibility and team collaboration. This feature is essential for: + +- **Team Collaboration**: Share exact development environments with team members +- **Infrastructure as Code**: Version control your system configurations +- **Disaster Recovery**: Quickly restore systems to known-good states +- **Onboarding**: New team members can replicate production environments instantly +- **CI/CD**: Ensure consistent environments across development, staging, and production + +## Installation + +### Prerequisites + +- Python 3.8 or higher +- Cortex Linux 0.2.0 or compatible version +- System package managers: apt, pip3, npm (depending on what you want to export/import) + +### Dependencies + +Install required Python dependencies: + +```bash +pip3 install pyyaml>=6.0.1 +``` + +### System Requirements + +- Ubuntu 24.04 LTS (or compatible Debian-based distribution) +- Sufficient disk space for configuration files +- Root/sudo access for package installation + +## Usage + +The Configuration Manager provides three main commands: + +1. **export** - Export current system configuration +2. **import** - Import and apply configuration +3. **diff** - Compare current system with configuration file + +### Exporting Configuration + +#### Basic Export + +Export your current system configuration: + +```bash +python3 config_manager.py export --output my-config.yaml +``` + +This creates a YAML file containing: +- Cortex version +- OS version +- Installed packages (apt, pip, npm) +- User preferences +- Selected environment variables + +#### Export with Hardware Information + +Include hardware profile in the export: + +```bash +python3 config_manager.py export --output dev-machine.yaml --include-hardware +``` + +Hardware information includes: +- CPU model and core count +- GPU details (NVIDIA, AMD, Intel) +- RAM size +- Storage devices +- Network interfaces + +#### Export Packages Only + +Export only package information (no preferences or hardware): + +```bash +python3 config_manager.py export --output packages.yaml --packages-only +``` + +#### Export Without Preferences + +Export everything except user preferences: + +```bash +python3 config_manager.py export --output config.yaml --no-preferences +``` + +### Importing Configuration + +#### Preview Changes (Dry-Run) + +Preview what would change without applying anything: + +```bash +python3 config_manager.py import dev-machine.yaml --dry-run +``` + +Output shows: +- Packages to install +- Packages to upgrade/downgrade +- Preferences that will change +- Warnings about compatibility + +#### Apply Configuration + +Import and apply the configuration: + +```bash +python3 config_manager.py import dev-machine.yaml +``` + +This will: +1. Validate compatibility +2. Install missing packages +3. Upgrade outdated packages +4. Update user preferences + +#### Force Import + +Skip compatibility checks (use with caution): + +```bash +python3 config_manager.py import dev-machine.yaml --force +``` + +#### Selective Import + +Import only packages: + +```bash +python3 config_manager.py import dev-machine.yaml --packages-only +``` + +Import only preferences: + +```bash +python3 config_manager.py import dev-machine.yaml --preferences-only +``` + +### Comparing Configurations + +Show differences between current system and configuration file: + +```bash +python3 config_manager.py diff production-config.yaml +``` + +Output includes: +- Number of packages to install +- Number of packages to upgrade/downgrade +- Packages already installed +- Changed preferences +- Compatibility warnings + +## Configuration File Format + +Configuration files are in YAML format with the following structure: + +```yaml +cortex_version: 0.2.0 +exported_at: '2025-11-14T14:23:15.123456' +os: ubuntu-24.04 + +hardware: # Optional + cpu: + model: AMD Ryzen 9 5950X + cores: 16 + architecture: x86_64 + gpu: + - vendor: NVIDIA + model: RTX 4090 + vram: 24576 + cuda: '12.3' + ram: 65536 + storage: + - type: nvme + size: 2097152 + device: nvme0n1 + network: + interfaces: + - name: eth0 + speed_mbps: 1000 + max_speed_mbps: 1000 + +packages: + - name: docker + version: 24.0.7-1 + source: apt + - name: numpy + version: 1.24.0 + source: pip + - name: typescript + version: 5.0.0 + source: npm + +preferences: + confirmations: minimal + verbosity: normal + +environment_variables: + LANG: en_US.UTF-8 + SHELL: /bin/bash +``` + +### Field Descriptions + +- **cortex_version**: Version of Cortex Linux that created this config +- **exported_at**: ISO timestamp of export +- **os**: Operating system identifier (e.g., ubuntu-24.04) +- **hardware**: Optional hardware profile from HardwareProfiler +- **packages**: List of installed packages with name, version, and source +- **preferences**: User preferences for Cortex behavior +- **environment_variables**: Selected environment variables + +### Package Sources + +Supported package sources: + +- **apt**: System packages via APT/dpkg +- **pip**: Python packages via pip/pip3 +- **npm**: Node.js global packages via npm + +## Integration with SandboxExecutor + +For enhanced security, ConfigManager can integrate with SandboxExecutor to safely install packages: + +```python +from config_manager import ConfigManager +from sandbox_executor import SandboxExecutor + +# Create instances +executor = SandboxExecutor() +manager = ConfigManager(sandbox_executor=executor) + +# All package installations will go through sandbox +manager.import_configuration('config.yaml') +``` + +Benefits: +- Commands are validated before execution +- Resource limits prevent runaway installations +- Audit logging of all operations +- Rollback capability on failures + +## Best Practices + +### Version Control Your Configs + +Store configuration files in Git: + +```bash +git add environments/ +git commit -m "Add production environment config" +git push +``` + +### Use Meaningful Filenames + +Name files descriptively: + +``` +dev-machine-john.yaml +production-web-server.yaml +ml-training-gpu-rig.yaml +team-baseline-2024-11.yaml +``` + +### Always Test with Dry-Run First + +Before applying any configuration: + +```bash +# 1. Check differences +python3 config_manager.py diff config.yaml + +# 2. Dry-run to see exactly what will happen +python3 config_manager.py import config.yaml --dry-run + +# 3. Apply if everything looks good +python3 config_manager.py import config.yaml +``` + +### Regular Backups + +Export your configuration regularly: + +```bash +# Daily backup script +python3 config_manager.py export \ + --output "backups/config-$(date +%Y-%m-%d).yaml" \ + --include-hardware +``` + +### Team Onboarding Workflow + +1. **Team Lead**: Export reference configuration + ```bash + python3 config_manager.py export --output team-baseline.yaml --include-hardware + ``` + +2. **Share**: Commit to repository or share via secure channel + +3. **New Member**: Preview then import + ```bash + python3 config_manager.py import team-baseline.yaml --dry-run + python3 config_manager.py import team-baseline.yaml + ``` + +### Environment-Specific Configs + +Maintain separate configs for different environments: + +``` +configs/ +├── development.yaml +├── staging.yaml +└── production.yaml +``` + +### Selective Operations + +Use selective import for fine-grained control: + +```bash +# Update only packages, keep local preferences +python3 config_manager.py import prod.yaml --packages-only + +# Update only preferences, keep packages +python3 config_manager.py import team-prefs.yaml --preferences-only +``` + +## Troubleshooting + +### Compatibility Errors + +**Problem**: "Incompatible configuration: Incompatible major version" + +**Solution**: Configuration was created with a different major version of Cortex. Use `--force` to bypass (risky) or update Cortex version. + +### OS Mismatch Warnings + +**Problem**: "Warning: OS mismatch (config=ubuntu-24.04, current=ubuntu-22.04)" + +**Solution**: Configuration may not work perfectly on different OS versions. Proceed with caution or update your OS. + +### Package Installation Failures + +**Problem**: Some packages fail to install + +**Solution**: +1. Check network connectivity +2. Update package indexes: `sudo apt-get update` +3. Check for conflicting packages +4. Review failed packages in output and install manually if needed + +### Permission Errors + +**Problem**: "Permission denied" when installing packages + +**Solution**: Run with appropriate privileges: +```bash +# Use sudo for system package installation +sudo python3 config_manager.py import config.yaml +``` + +### Missing Package Managers + +**Problem**: npm or pip packages fail because manager not installed + +**Solution**: Install missing package managers first: +```bash +sudo apt-get install npm python3-pip +``` + +### Large Package Lists + +**Problem**: Import takes very long with many packages + +**Solution**: +1. Use `--packages-only` to skip other operations +2. Consider splitting into smaller configs +3. Increase timeout if using SandboxExecutor + +### YAML Syntax Errors + +**Problem**: "Failed to load configuration file: YAML error" + +**Solution**: Validate YAML syntax: +```bash +python3 -c "import yaml; yaml.safe_load(open('config.yaml'))" +``` + +## Advanced Usage + +### Programmatic API + +Use ConfigManager in Python scripts: + +```python +from config_manager import ConfigManager + +manager = ConfigManager() + +# Export +manager.export_configuration( + output_path='config.yaml', + include_hardware=True, + package_sources=['apt', 'pip'] +) + +# Import with dry-run +result = manager.import_configuration( + config_path='config.yaml', + dry_run=True +) + +# Check diff +diff = manager.diff_configuration(config) +print(f"To install: {len(diff['packages_to_install'])}") +``` + +### Custom Package Sources + +Extend detection for additional package managers: + +```python +class CustomConfigManager(ConfigManager): + def detect_cargo_packages(self): + # Implement Rust cargo package detection + pass + + def detect_installed_packages(self, sources=None): + packages = super().detect_installed_packages(sources) + if 'cargo' in (sources or []): + packages.extend(self.detect_cargo_packages()) + return packages +``` + +### Batch Operations + +Process multiple configurations: + +```bash +# Export all team members +for user in team_members; do + python3 config_manager.py export \ + --output "team/$user-config.yaml" +done + +# Compare all configs +for config in team/*.yaml; do + echo "=== $config ===" + python3 config_manager.py diff "$config" +done +``` + +## Security Considerations + +### Sensitive Data + +Configuration files may contain sensitive information: + +- Package versions that reveal security vulnerabilities +- Environment variables with API keys or tokens +- Hardware details useful for targeted attacks + +**Recommendations**: +- Review exported configs before sharing +- Sanitize environment variables +- Use `.gitignore` for sensitive configs +- Encrypt configs containing secrets + +### Sandboxed Installation + +Always use SandboxExecutor for production imports: + +```python +from sandbox_executor import SandboxExecutor +from config_manager import ConfigManager + +executor = SandboxExecutor( + max_memory_mb=2048, + timeout_seconds=600, + enable_rollback=True +) +manager = ConfigManager(sandbox_executor=executor) +``` + +### Validation + +Configuration validation checks: +- Version compatibility +- OS compatibility +- Package source availability + +Use `--dry-run` extensively before applying configurations. + +## API Reference + +### ConfigManager Class + +#### Constructor + +```python +ConfigManager(sandbox_executor=None) +``` + +Parameters: +- `sandbox_executor` (optional): SandboxExecutor instance for safe command execution + +#### Methods + +##### export_configuration() + +```python +export_configuration( + output_path: str, + include_hardware: bool = True, + include_preferences: bool = True, + package_sources: List[str] = None +) -> str +``` + +Export system configuration to YAML file. + +##### import_configuration() + +```python +import_configuration( + config_path: str, + dry_run: bool = False, + selective: Optional[List[str]] = None, + force: bool = False +) -> Dict[str, Any] +``` + +Import configuration from YAML file. + +##### diff_configuration() + +```python +diff_configuration(config: Dict[str, Any]) -> Dict[str, Any] +``` + +Compare current system state with configuration. + +##### validate_compatibility() + +```python +validate_compatibility(config: Dict[str, Any]) -> Tuple[bool, Optional[str]] +``` + +Validate if configuration can be imported. + +##### detect_installed_packages() + +```python +detect_installed_packages(sources: List[str] = None) -> List[Dict[str, Any]] +``` + +Detect all installed packages from specified sources. + +## Contributing + +Contributions are welcome! Areas for improvement: + +- Additional package manager support (cargo, gem, etc.) +- Configuration validation schemas +- Migration tools between versions +- GUI for configuration management +- Cloud storage integration + +## License + +Cortex Linux Configuration Management is part of the Cortex Linux project. + +## Support + +- **Issues**: https://github.com/danishirfan21/cortex/issues +- **Discord**: https://discord.gg/uCqHvxjU83 +- **Email**: mike@cortexlinux.com + +--- + +**Version**: 0.2.0 +**Last Updated**: November 2024 diff --git a/examples/sample-config.yaml b/examples/sample-config.yaml new file mode 100644 index 0000000..30fc171 --- /dev/null +++ b/examples/sample-config.yaml @@ -0,0 +1,74 @@ +cortex_version: 0.2.0 +exported_at: '2025-11-14T14:23:15.123456' +os: ubuntu-24.04 + +hardware: + cpu: + model: AMD Ryzen 9 5950X 16-Core Processor + cores: 16 + architecture: x86_64 + gpu: + - vendor: NVIDIA + model: NVIDIA GeForce RTX 4090 + vram: 24576 + cuda: '12.3' + ram: 65536 + storage: + - type: nvme + size: 2097152 + device: nvme0n1 + - type: ssd + size: 1048576 + device: sda + network: + interfaces: + - name: eth0 + speed_mbps: 1000 + max_speed_mbps: 1000 + +packages: + # System packages (APT) + - name: docker.io + version: 24.0.7-1ubuntu0 + source: apt + - name: git + version: 1:2.43.0-1ubuntu1 + source: apt + - name: curl + version: 8.5.0-2ubuntu1 + source: apt + - name: build-essential + version: 12.10ubuntu1 + source: apt + + # Python packages (PIP) + - name: numpy + version: 1.24.0 + source: pip + - name: pandas + version: 2.0.0 + source: pip + - name: torch + version: 2.1.0 + source: pip + - name: transformers + version: 4.35.0 + source: pip + + # Node.js global packages (NPM) + - name: typescript + version: 5.0.0 + source: npm + - name: eslint + version: 8.0.0 + source: npm + +preferences: + confirmations: minimal + verbosity: normal + +environment_variables: + LANG: en_US.UTF-8 + LANGUAGE: en_US:en + LC_ALL: en_US.UTF-8 + SHELL: /bin/bash diff --git a/src/config_manager.py b/src/config_manager.py new file mode 100644 index 0000000..304b240 --- /dev/null +++ b/src/config_manager.py @@ -0,0 +1,762 @@ +#!/usr/bin/env python3 +""" +Configuration Manager for Cortex Linux +Handles export/import of system state for reproducibility. + +Part of Cortex Linux - AI-native OS that needs to export/import system configurations. +""" + +import os +import json +import yaml +import subprocess +import re +from typing import Dict, List, Optional, Any, Set, Tuple +from datetime import datetime +from pathlib import Path + + +class ConfigManager: + """ + Manages configuration export/import for Cortex Linux. + + Features: + - Export current system state to YAML (packages, configs, preferences) + - Import configuration from YAML file + - Validate version compatibility between export and import + - Support dry-run mode (preview without applying) + - Generate diff between current state and config file + - Handle selective export/import (packages only, configs only, etc.) + """ + + CORTEX_VERSION = "0.2.0" + + def __init__(self, sandbox_executor=None): + """ + Initialize ConfigManager. + + Args: + sandbox_executor: Optional SandboxExecutor instance for safe command execution + """ + self.sandbox_executor = sandbox_executor + self.cortex_dir = Path.home() / '.cortex' + self.preferences_file = self.cortex_dir / 'preferences.yaml' + + # Ensure .cortex directory exists + self.cortex_dir.mkdir(mode=0o700, exist_ok=True) + + def detect_apt_packages(self) -> List[Dict[str, Any]]: + """ + Detect installed APT packages. + + Returns: + List of package dictionaries with name, version, and source + """ + packages = [] + + try: + result = subprocess.run( + ['dpkg-query', '-W', '-f=${Package}\t${Version}\n'], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + if line.strip(): + parts = line.split('\t') + if len(parts) >= 2: + packages.append({ + 'name': parts[0], + 'version': parts[1], + 'source': 'apt' + }) + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + pass + + return packages + + def detect_pip_packages(self) -> List[Dict[str, Any]]: + """ + Detect installed PIP packages. + + Returns: + List of package dictionaries with name, version, and source + """ + packages = [] + + # Try pip3 first, then pip + for pip_cmd in ['pip3', 'pip']: + try: + result = subprocess.run( + [pip_cmd, 'list', '--format=json'], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + pip_packages = json.loads(result.stdout) + for pkg in pip_packages: + packages.append({ + 'name': pkg['name'], + 'version': pkg['version'], + 'source': 'pip' + }) + break # Success, no need to try other pip commands + except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): + continue + + return packages + + def detect_npm_packages(self) -> List[Dict[str, Any]]: + """ + Detect globally installed NPM packages. + + Returns: + List of package dictionaries with name, version, and source + """ + packages = [] + + try: + result = subprocess.run( + ['npm', 'list', '-g', '--depth=0', '--json'], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + npm_data = json.loads(result.stdout) + dependencies = npm_data.get('dependencies', {}) + + for name, info in dependencies.items(): + version = info.get('version', 'unknown') + packages.append({ + 'name': name, + 'version': version, + 'source': 'npm' + }) + except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): + pass + + return packages + + def detect_installed_packages(self, sources: List[str] = None) -> List[Dict[str, Any]]: + """ + Detect all installed packages from specified sources. + + Args: + sources: List of package sources to detect ['apt', 'pip', 'npm'] + If None, detects from all sources + + Returns: + List of package dictionaries sorted by name + """ + if sources is None: + sources = ['apt', 'pip', 'npm'] + + all_packages = [] + + if 'apt' in sources: + all_packages.extend(self.detect_apt_packages()) + + if 'pip' in sources: + all_packages.extend(self.detect_pip_packages()) + + if 'npm' in sources: + all_packages.extend(self.detect_npm_packages()) + + # Remove duplicates based on name and source + seen = set() + unique_packages = [] + for pkg in all_packages: + key = (pkg['name'], pkg['source']) + if key not in seen: + seen.add(key) + unique_packages.append(pkg) + + # Sort by name + unique_packages.sort(key=lambda x: x['name']) + + return unique_packages + + def _detect_os_version(self) -> str: + """ + Detect OS version from /etc/os-release. + + Returns: + OS version string (e.g., 'ubuntu-24.04') + """ + try: + with open('/etc/os-release', 'r') as f: + os_release = f.read() + + # Extract distribution name and version + name_match = re.search(r'ID=([^\n]+)', os_release) + version_match = re.search(r'VERSION_ID="?([^"\n]+)"?', os_release) + + if name_match and version_match: + name = name_match.group(1).strip().strip('"') + version = version_match.group(1).strip() + return f"{name}-{version}" + + return "unknown" + except Exception: + return "unknown" + + def _load_preferences(self) -> Dict[str, Any]: + """ + Load user preferences from ~/.cortex/preferences.yaml. + + Returns: + Dictionary of preferences + """ + if self.preferences_file.exists(): + try: + with open(self.preferences_file, 'r') as f: + return yaml.safe_load(f) or {} + except Exception: + pass + + return {} + + def _save_preferences(self, preferences: Dict[str, Any]) -> None: + """ + Save user preferences to ~/.cortex/preferences.yaml. + + Args: + preferences: Dictionary of preferences to save + """ + try: + with open(self.preferences_file, 'w') as f: + yaml.safe_dump(preferences, f, default_flow_style=False) + except Exception as e: + raise RuntimeError(f"Failed to save preferences: {e}") + + def export_configuration(self, + output_path: str, + include_hardware: bool = True, + include_preferences: bool = True, + package_sources: List[str] = None) -> str: + """ + Export current system configuration to YAML file. + + Args: + output_path: Path to save YAML configuration file + include_hardware: Include hardware profile from HardwareProfiler + include_preferences: Include user preferences + package_sources: List of package sources to export ['apt', 'pip', 'npm'] + If None, exports all + + Returns: + Success message with file path + """ + if package_sources is None: + package_sources = ['apt', 'pip', 'npm'] + + # Build configuration dictionary + config = { + 'cortex_version': self.CORTEX_VERSION, + 'exported_at': datetime.now().isoformat(), + 'os': self._detect_os_version(), + } + + # Add hardware profile if requested + if include_hardware: + try: + from hwprofiler import HardwareProfiler + profiler = HardwareProfiler() + config['hardware'] = profiler.profile() + except Exception as e: + config['hardware'] = {'error': f'Failed to detect hardware: {e}'} + + # Add packages + config['packages'] = self.detect_installed_packages(sources=package_sources) + + # Add preferences if requested + if include_preferences: + config['preferences'] = self._load_preferences() + + # Add environment variables (selected safe ones) + config['environment_variables'] = {} + safe_env_vars = ['LANG', 'LANGUAGE', 'LC_ALL', 'PATH', 'SHELL'] + for var in safe_env_vars: + if var in os.environ: + config['environment_variables'][var] = os.environ[var] + + # Write to file + try: + output_path_obj = Path(output_path) + output_path_obj.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path_obj, 'w') as f: + yaml.safe_dump(config, f, default_flow_style=False, sort_keys=False) + + return f"Configuration exported successfully to {output_path}" + except Exception as e: + raise RuntimeError(f"Failed to export configuration: {e}") + + def validate_compatibility(self, config: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """ + Validate if configuration can be imported on this system. + + Args: + config: Configuration dictionary from YAML + + Returns: + Tuple of (is_compatible, reason_if_not) + """ + # Check required fields + if 'cortex_version' not in config: + return False, "Missing cortex_version field in configuration" + + if 'os' not in config: + return False, "Missing os field in configuration" + + if 'packages' not in config: + return False, "Missing packages field in configuration" + + # Check cortex version compatibility + config_version = config['cortex_version'] + current_version = self.CORTEX_VERSION + + # Parse versions (simple major.minor.patch comparison) + try: + config_parts = [int(x) for x in config_version.split('.')] + current_parts = [int(x) for x in current_version.split('.')] + + # Major version must match + if config_parts[0] != current_parts[0]: + return False, f"Incompatible major version: config={config_version}, current={current_version}" + + # Minor version: current should be >= config + if current_parts[1] < config_parts[1]: + return False, f"Configuration requires newer Cortex version: {config_version} > {current_version}" + except Exception: + # If version parsing fails, be lenient + pass + + # Check OS compatibility (warn but allow) + config_os = config.get('os', 'unknown') + current_os = self._detect_os_version() + + if config_os != current_os and config_os != 'unknown' and current_os != 'unknown': + # Don't fail, just warn in the return message + return True, f"Warning: OS mismatch (config={config_os}, current={current_os}). Proceed with caution." + + return True, None + + def diff_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]: + """ + Compare current system state with configuration file. + + Args: + config: Configuration dictionary from YAML + + Returns: + Dictionary with differences + """ + diff = { + 'packages_to_install': [], + 'packages_to_upgrade': [], + 'packages_to_downgrade': [], + 'packages_already_installed': [], + 'preferences_changed': {}, + 'warnings': [] + } + + # Get current packages + current_packages = self.detect_installed_packages() + current_pkg_map = { + (pkg['name'], pkg['source']): pkg['version'] + for pkg in current_packages + } + + # Compare packages from config + config_packages = config.get('packages', []) + for pkg in config_packages: + name = pkg['name'] + version = pkg['version'] + source = pkg['source'] + key = (name, source) + + if key not in current_pkg_map: + diff['packages_to_install'].append(pkg) + else: + current_version = current_pkg_map[key] + if current_version != version: + # Simple version comparison (would need proper semver in production) + try: + if self._compare_versions(current_version, version) < 0: + diff['packages_to_upgrade'].append({ + **pkg, + 'current_version': current_version + }) + else: + diff['packages_to_downgrade'].append({ + **pkg, + 'current_version': current_version + }) + except Exception: + # If comparison fails, treat as upgrade + diff['packages_to_upgrade'].append({ + **pkg, + 'current_version': current_version + }) + else: + diff['packages_already_installed'].append(pkg) + + # Compare preferences + current_prefs = self._load_preferences() + config_prefs = config.get('preferences', {}) + + for key, value in config_prefs.items(): + if key not in current_prefs or current_prefs[key] != value: + diff['preferences_changed'][key] = { + 'current': current_prefs.get(key), + 'new': value + } + + # Add warnings + if diff['packages_to_downgrade']: + diff['warnings'].append( + f"Warning: {len(diff['packages_to_downgrade'])} packages will be downgraded" + ) + + return diff + + def _compare_versions(self, version1: str, version2: str) -> int: + """ + Compare two version strings. + + Args: + version1: First version string + version2: Second version string + + Returns: + -1 if version1 < version2, 0 if equal, 1 if version1 > version2 + """ + # Simple version comparison (extract numeric parts) + v1_parts = re.findall(r'\d+', version1) + v2_parts = re.findall(r'\d+', version2) + + # Pad to same length + max_len = max(len(v1_parts), len(v2_parts)) + v1_parts += ['0'] * (max_len - len(v1_parts)) + v2_parts += ['0'] * (max_len - len(v2_parts)) + + for p1, p2 in zip(v1_parts, v2_parts): + n1, n2 = int(p1), int(p2) + if n1 < n2: + return -1 + elif n1 > n2: + return 1 + + return 0 + + def import_configuration(self, + config_path: str, + dry_run: bool = False, + selective: Optional[List[str]] = None, + force: bool = False) -> Dict[str, Any]: + """ + Import configuration from YAML file. + + Args: + config_path: Path to YAML configuration file + dry_run: If True, preview changes without applying + selective: Import only specified sections ['packages', 'preferences'] + If None, imports all + force: Skip compatibility checks + + Returns: + Summary dictionary with results + """ + # Load configuration + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + except Exception as e: + raise RuntimeError(f"Failed to load configuration file: {e}") + + # Validate compatibility + if not force: + is_compatible, reason = self.validate_compatibility(config) + if not is_compatible: + raise RuntimeError(f"Incompatible configuration: {reason}") + elif reason: # Warning + print(f"⚠️ {reason}") + + # If dry run, return diff + if dry_run: + diff = self.diff_configuration(config) + return { + 'dry_run': True, + 'diff': diff, + 'message': 'Dry-run completed. Use import without --dry-run to apply changes.' + } + + # Determine what to import + if selective is None: + selective = ['packages', 'preferences'] + + summary = { + 'installed': [], + 'upgraded': [], + 'failed': [], + 'skipped': [], + 'preferences_updated': False + } + + # Import packages + if 'packages' in selective: + diff = self.diff_configuration(config) + packages_to_process = ( + diff['packages_to_install'] + + diff['packages_to_upgrade'] + + diff['packages_to_downgrade'] + ) + + for pkg in packages_to_process: + try: + success = self._install_package(pkg) + if success: + if pkg in diff['packages_to_install']: + summary['installed'].append(pkg['name']) + else: + summary['upgraded'].append(pkg['name']) + else: + summary['failed'].append(pkg['name']) + except Exception as e: + summary['failed'].append(f"{pkg['name']} ({str(e)})") + + # Import preferences + if 'preferences' in selective: + config_prefs = config.get('preferences', {}) + if config_prefs: + try: + self._save_preferences(config_prefs) + summary['preferences_updated'] = True + except Exception as e: + summary['failed'].append(f"preferences ({str(e)})") + + return summary + + def _install_package(self, pkg: Dict[str, Any]) -> bool: + """ + Install a single package using appropriate package manager. + + Args: + pkg: Package dictionary with name, version, source + + Returns: + True if successful, False otherwise + """ + name = pkg['name'] + version = pkg.get('version', '') + source = pkg['source'] + + if self.sandbox_executor: + # Use SandboxExecutor for safe installation + try: + if source == 'apt': + # For apt, we typically install latest or specific version + if version: + command = f"sudo apt-get install -y {name}={version}" + else: + command = f"sudo apt-get install -y {name}" + result = self.sandbox_executor.execute(command) + return result.success + + elif source == 'pip': + if version: + command = f"pip3 install {name}=={version}" + else: + command = f"pip3 install {name}" + result = self.sandbox_executor.execute(command) + return result.success + + elif source == 'npm': + if version: + command = f"npm install -g {name}@{version}" + else: + command = f"npm install -g {name}" + result = self.sandbox_executor.execute(command) + return result.success + + except Exception: + return False + else: + # Direct installation (not recommended in production) + try: + if source == 'apt': + if version: + cmd = ['sudo', 'apt-get', 'install', '-y', f'{name}={version}'] + else: + cmd = ['sudo', 'apt-get', 'install', '-y', name] + result = subprocess.run(cmd, capture_output=True, timeout=300) + return result.returncode == 0 + + elif source == 'pip': + if version: + cmd = ['pip3', 'install', f'{name}=={version}'] + else: + cmd = ['pip3', 'install', name] + result = subprocess.run(cmd, capture_output=True, timeout=300) + return result.returncode == 0 + + elif source == 'npm': + if version: + cmd = ['npm', 'install', '-g', f'{name}@{version}'] + else: + cmd = ['npm', 'install', '-g', name] + result = subprocess.run(cmd, capture_output=True, timeout=300) + return result.returncode == 0 + + except Exception: + return False + + return False + + +def main(): + """CLI entry point for configuration manager.""" + import argparse + import sys + + parser = argparse.ArgumentParser(description='Cortex Configuration Manager') + subparsers = parser.add_subparsers(dest='command', help='Command to execute') + + # Export command + export_parser = subparsers.add_parser('export', help='Export system configuration') + export_parser.add_argument('--output', '-o', required=True, help='Output file path') + export_parser.add_argument('--include-hardware', action='store_true', + help='Include hardware information') + export_parser.add_argument('--no-preferences', action='store_true', + help='Exclude user preferences') + export_parser.add_argument('--packages-only', action='store_true', + help='Export only packages') + + # Import command + import_parser = subparsers.add_parser('import', help='Import configuration') + import_parser.add_argument('config_file', help='Configuration file to import') + import_parser.add_argument('--dry-run', action='store_true', + help='Preview changes without applying') + import_parser.add_argument('--force', action='store_true', + help='Skip compatibility checks') + import_parser.add_argument('--packages-only', action='store_true', + help='Import only packages') + import_parser.add_argument('--preferences-only', action='store_true', + help='Import only preferences') + + # Diff command + diff_parser = subparsers.add_parser('diff', help='Show configuration differences') + diff_parser.add_argument('config_file', help='Configuration file to compare') + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + manager = ConfigManager() + + try: + if args.command == 'export': + include_hardware = args.include_hardware + include_preferences = not args.no_preferences + + if args.packages_only: + include_hardware = False + include_preferences = False + + message = manager.export_configuration( + output_path=args.output, + include_hardware=include_hardware, + include_preferences=include_preferences + ) + print(message) + + elif args.command == 'import': + selective = None + if args.packages_only: + selective = ['packages'] + elif args.preferences_only: + selective = ['preferences'] + + result = manager.import_configuration( + config_path=args.config_file, + dry_run=args.dry_run, + selective=selective, + force=args.force + ) + + if args.dry_run: + print("\n🔍 Dry-run results:\n") + diff = result['diff'] + + if diff['packages_to_install']: + print(f"📦 Packages to install: {len(diff['packages_to_install'])}") + for pkg in diff['packages_to_install'][:5]: + print(f" - {pkg['name']} ({pkg['source']})") + if len(diff['packages_to_install']) > 5: + print(f" ... and {len(diff['packages_to_install']) - 5} more") + + if diff['packages_to_upgrade']: + print(f"\n⬆️ Packages to upgrade: {len(diff['packages_to_upgrade'])}") + for pkg in diff['packages_to_upgrade'][:5]: + print(f" - {pkg['name']} ({pkg.get('current_version')} → {pkg['version']})") + if len(diff['packages_to_upgrade']) > 5: + print(f" ... and {len(diff['packages_to_upgrade']) - 5} more") + + if diff['preferences_changed']: + print(f"\n⚙️ Preferences to change: {len(diff['preferences_changed'])}") + for key in diff['preferences_changed']: + print(f" - {key}") + + if diff['warnings']: + print("\n⚠️ Warnings:") + for warning in diff['warnings']: + print(f" {warning}") + + print(f"\n{result['message']}") + else: + print("\n✅ Import completed:\n") + if result['installed']: + print(f"📦 Installed: {len(result['installed'])} packages") + if result['upgraded']: + print(f"⬆️ Upgraded: {len(result['upgraded'])} packages") + if result['failed']: + print(f"❌ Failed: {len(result['failed'])} packages") + for pkg in result['failed']: + print(f" - {pkg}") + if result['preferences_updated']: + print("⚙️ Preferences updated") + + elif args.command == 'diff': + with open(args.config_file, 'r') as f: + config = yaml.safe_load(f) + + diff = manager.diff_configuration(config) + + print("\n📊 Configuration Differences:\n") + print(f"Packages to install: {len(diff['packages_to_install'])}") + print(f"Packages to upgrade: {len(diff['packages_to_upgrade'])}") + print(f"Packages to downgrade: {len(diff['packages_to_downgrade'])}") + print(f"Packages already installed: {len(diff['packages_already_installed'])}") + print(f"Preferences changed: {len(diff['preferences_changed'])}") + + if diff['warnings']: + print("\n⚠️ Warnings:") + for warning in diff['warnings']: + print(f" {warning}") + + except Exception as e: + print(f"❌ Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/src/requirements.txt b/src/requirements.txt index 65c3c15..6494229 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -5,6 +5,9 @@ rich>=13.0.0 # Beautiful terminal progress bars and formatting plyer>=2.0.0 # Desktop notifications (optional but recommended) +# Configuration Management +pyyaml>=6.0.1 + # Testing Dependencies (dev) pytest>=7.0.0 pytest-asyncio>=0.21.0 @@ -16,4 +19,3 @@ pytest-cov>=4.0.0 # - lspci (usually pre-installed) # - lsblk (usually pre-installed) # - ip (usually pre-installed) - diff --git a/src/test_config_manager.py b/src/test_config_manager.py new file mode 100644 index 0000000..0d0779a --- /dev/null +++ b/src/test_config_manager.py @@ -0,0 +1,683 @@ +#!/usr/bin/env python3 +""" +Unit tests for ConfigManager. +Tests all functionality with mocked system calls. +""" + +import unittest +from unittest.mock import patch, mock_open, MagicMock, call +import tempfile +import shutil +import yaml +import json +import os +from pathlib import Path +from config_manager import ConfigManager + + +class TestConfigManager(unittest.TestCase): + """Test cases for ConfigManager.""" + + def setUp(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + self.config_manager = ConfigManager() + + # Override cortex_dir to use temp directory + self.config_manager.cortex_dir = Path(self.temp_dir) / '.cortex' + self.config_manager.cortex_dir.mkdir(exist_ok=True) + self.config_manager.preferences_file = self.config_manager.cortex_dir / 'preferences.yaml' + + def tearDown(self): + """Clean up test fixtures.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + @patch('subprocess.run') + def test_detect_apt_packages_success(self, mock_run): + """Test successful detection of APT packages.""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "package1\t1.0.0\npackage2\t2.0.0\n" + mock_run.return_value = mock_result + + packages = self.config_manager.detect_apt_packages() + + self.assertEqual(len(packages), 2) + self.assertEqual(packages[0]['name'], 'package1') + self.assertEqual(packages[0]['version'], '1.0.0') + self.assertEqual(packages[0]['source'], 'apt') + self.assertEqual(packages[1]['name'], 'package2') + self.assertEqual(packages[1]['version'], '2.0.0') + + @patch('subprocess.run') + def test_detect_apt_packages_failure(self, mock_run): + """Test APT package detection with failure.""" + mock_run.side_effect = FileNotFoundError() + + packages = self.config_manager.detect_apt_packages() + + self.assertEqual(len(packages), 0) + + @patch('subprocess.run') + def test_detect_pip_packages_success(self, mock_run): + """Test successful detection of PIP packages.""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps([ + {'name': 'numpy', 'version': '1.24.0'}, + {'name': 'requests', 'version': '2.28.0'} + ]) + mock_run.return_value = mock_result + + packages = self.config_manager.detect_pip_packages() + + self.assertEqual(len(packages), 2) + self.assertEqual(packages[0]['name'], 'numpy') + self.assertEqual(packages[0]['version'], '1.24.0') + self.assertEqual(packages[0]['source'], 'pip') + + @patch('subprocess.run') + def test_detect_pip_packages_failure(self, mock_run): + """Test PIP package detection with failure.""" + mock_run.side_effect = FileNotFoundError() + + packages = self.config_manager.detect_pip_packages() + + self.assertEqual(len(packages), 0) + + @patch('subprocess.run') + def test_detect_npm_packages_success(self, mock_run): + """Test successful detection of NPM packages.""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps({ + 'dependencies': { + 'typescript': {'version': '5.0.0'}, + 'eslint': {'version': '8.0.0'} + } + }) + mock_run.return_value = mock_result + + packages = self.config_manager.detect_npm_packages() + + self.assertEqual(len(packages), 2) + names = [p['name'] for p in packages] + self.assertIn('typescript', names) + self.assertIn('eslint', names) + + @patch('subprocess.run') + def test_detect_npm_packages_failure(self, mock_run): + """Test NPM package detection with failure.""" + mock_run.side_effect = FileNotFoundError() + + packages = self.config_manager.detect_npm_packages() + + self.assertEqual(len(packages), 0) + + @patch.object(ConfigManager, 'detect_apt_packages') + @patch.object(ConfigManager, 'detect_pip_packages') + @patch.object(ConfigManager, 'detect_npm_packages') + def test_detect_all_packages(self, mock_npm, mock_pip, mock_apt): + """Test detection of all packages from all sources.""" + mock_apt.return_value = [ + {'name': 'curl', 'version': '7.0.0', 'source': 'apt'} + ] + mock_pip.return_value = [ + {'name': 'numpy', 'version': '1.24.0', 'source': 'pip'} + ] + mock_npm.return_value = [ + {'name': 'typescript', 'version': '5.0.0', 'source': 'npm'} + ] + + packages = self.config_manager.detect_installed_packages() + + self.assertEqual(len(packages), 3) + sources = [p['source'] for p in packages] + self.assertIn('apt', sources) + self.assertIn('pip', sources) + self.assertIn('npm', sources) + + @patch.object(ConfigManager, 'detect_apt_packages') + @patch.object(ConfigManager, 'detect_pip_packages') + def test_detect_selective_packages(self, mock_pip, mock_apt): + """Test selective package detection.""" + mock_apt.return_value = [ + {'name': 'curl', 'version': '7.0.0', 'source': 'apt'} + ] + mock_pip.return_value = [ + {'name': 'numpy', 'version': '1.24.0', 'source': 'pip'} + ] + + # Only detect apt packages + packages = self.config_manager.detect_installed_packages(sources=['apt']) + + self.assertEqual(len(packages), 1) + self.assertEqual(packages[0]['source'], 'apt') + mock_apt.assert_called_once() + mock_pip.assert_not_called() + + @patch.object(ConfigManager, 'detect_installed_packages') + @patch.object(ConfigManager, '_detect_os_version') + @patch.object(ConfigManager, '_load_preferences') + def test_export_configuration_minimal(self, mock_prefs, mock_os, mock_packages): + """Test export with minimal settings.""" + mock_packages.return_value = [ + {'name': 'test-pkg', 'version': '1.0.0', 'source': 'apt'} + ] + mock_os.return_value = 'ubuntu-24.04' + mock_prefs.return_value = {'confirmations': 'minimal'} + + output_path = os.path.join(self.temp_dir, 'config.yaml') + + result = self.config_manager.export_configuration( + output_path=output_path, + include_hardware=False, + include_preferences=True + ) + + self.assertIn('exported successfully', result) + self.assertTrue(os.path.exists(output_path)) + + # Verify contents + with open(output_path, 'r') as f: + config = yaml.safe_load(f) + + self.assertEqual(config['cortex_version'], '0.2.0') + self.assertEqual(config['os'], 'ubuntu-24.04') + self.assertIn('exported_at', config) + self.assertEqual(len(config['packages']), 1) + self.assertEqual(config['packages'][0]['name'], 'test-pkg') + self.assertIn('preferences', config) + self.assertEqual(config['preferences']['confirmations'], 'minimal') + + @patch.object(ConfigManager, 'detect_installed_packages') + @patch.object(ConfigManager, '_detect_os_version') + @patch('hwprofiler.HardwareProfiler') + def test_export_configuration_with_hardware(self, mock_hwprofiler_class, mock_os, mock_packages): + """Test export with hardware profile.""" + mock_packages.return_value = [] + mock_os.return_value = 'ubuntu-24.04' + + # Mock HardwareProfiler instance + mock_profiler = MagicMock() + mock_profiler.profile.return_value = { + 'cpu': {'model': 'Intel i7', 'cores': 8}, + 'ram': 16384 + } + mock_hwprofiler_class.return_value = mock_profiler + + output_path = os.path.join(self.temp_dir, 'config.yaml') + + self.config_manager.export_configuration( + output_path=output_path, + include_hardware=True + ) + + with open(output_path, 'r') as f: + config = yaml.safe_load(f) + + self.assertIn('hardware', config) + self.assertEqual(config['hardware']['cpu']['model'], 'Intel i7') + self.assertEqual(config['hardware']['ram'], 16384) + + @patch.object(ConfigManager, 'detect_installed_packages') + @patch.object(ConfigManager, '_detect_os_version') + def test_export_configuration_packages_only(self, mock_os, mock_packages): + """Test export with packages only.""" + mock_packages.return_value = [ + {'name': 'test-pkg', 'version': '1.0.0', 'source': 'apt'} + ] + mock_os.return_value = 'ubuntu-24.04' + + output_path = os.path.join(self.temp_dir, 'config.yaml') + + self.config_manager.export_configuration( + output_path=output_path, + include_hardware=False, + include_preferences=False + ) + + with open(output_path, 'r') as f: + config = yaml.safe_load(f) + + self.assertIn('packages', config) + self.assertNotIn('hardware', config) + + @patch.object(ConfigManager, '_detect_os_version') + def test_validate_compatibility_success(self, mock_os): + """Test validation of compatible configuration.""" + mock_os.return_value = 'ubuntu-24.04' + + config = { + 'cortex_version': '0.2.0', + 'os': 'ubuntu-24.04', + 'packages': [] + } + + is_compatible, reason = self.config_manager.validate_compatibility(config) + + self.assertTrue(is_compatible) + self.assertIsNone(reason) + + def test_validate_compatibility_missing_fields(self): + """Test validation with missing required fields.""" + config = { + 'os': 'ubuntu-24.04' + } + + is_compatible, reason = self.config_manager.validate_compatibility(config) + + self.assertFalse(is_compatible) + self.assertIn('cortex_version', reason) + + def test_validate_compatibility_version_mismatch(self): + """Test validation with incompatible version.""" + config = { + 'cortex_version': '1.0.0', # Major version different + 'os': 'ubuntu-24.04', + 'packages': [] + } + + is_compatible, reason = self.config_manager.validate_compatibility(config) + + self.assertFalse(is_compatible) + self.assertIn('major version', reason) + + @patch.object(ConfigManager, '_detect_os_version') + def test_validate_compatibility_os_warning(self, mock_os): + """Test validation with OS mismatch (warning).""" + mock_os.return_value = 'ubuntu-22.04' + + config = { + 'cortex_version': '0.2.0', + 'os': 'ubuntu-24.04', + 'packages': [] + } + + is_compatible, reason = self.config_manager.validate_compatibility(config) + + self.assertTrue(is_compatible) + self.assertIsNotNone(reason) + self.assertIn('Warning', reason) + self.assertIn('OS mismatch', reason) + + @patch.object(ConfigManager, 'detect_installed_packages') + def test_diff_configuration_no_changes(self, mock_packages): + """Test diff with identical configurations.""" + current_packages = [ + {'name': 'curl', 'version': '7.0.0', 'source': 'apt'} + ] + mock_packages.return_value = current_packages + + config = { + 'packages': current_packages, + 'preferences': {} + } + + diff = self.config_manager.diff_configuration(config) + + self.assertEqual(len(diff['packages_to_install']), 0) + self.assertEqual(len(diff['packages_to_upgrade']), 0) + self.assertEqual(len(diff['packages_already_installed']), 1) + + @patch.object(ConfigManager, 'detect_installed_packages') + def test_diff_configuration_new_packages(self, mock_packages): + """Test diff with new packages to install.""" + mock_packages.return_value = [ + {'name': 'curl', 'version': '7.0.0', 'source': 'apt'} + ] + + config = { + 'packages': [ + {'name': 'curl', 'version': '7.0.0', 'source': 'apt'}, + {'name': 'wget', 'version': '1.0.0', 'source': 'apt'} + ], + 'preferences': {} + } + + diff = self.config_manager.diff_configuration(config) + + self.assertEqual(len(diff['packages_to_install']), 1) + self.assertEqual(diff['packages_to_install'][0]['name'], 'wget') + + @patch.object(ConfigManager, 'detect_installed_packages') + def test_diff_configuration_upgrades(self, mock_packages): + """Test diff with packages to upgrade.""" + mock_packages.return_value = [ + {'name': 'curl', 'version': '7.0.0', 'source': 'apt'} + ] + + config = { + 'packages': [ + {'name': 'curl', 'version': '8.0.0', 'source': 'apt'} + ], + 'preferences': {} + } + + diff = self.config_manager.diff_configuration(config) + + self.assertEqual(len(diff['packages_to_upgrade']), 1) + self.assertEqual(diff['packages_to_upgrade'][0]['name'], 'curl') + self.assertEqual(diff['packages_to_upgrade'][0]['current_version'], '7.0.0') + + @patch.object(ConfigManager, '_load_preferences') + @patch.object(ConfigManager, 'detect_installed_packages') + def test_diff_configuration_preferences(self, mock_packages, mock_prefs): + """Test diff with changed preferences.""" + mock_packages.return_value = [] + mock_prefs.return_value = {'confirmations': 'normal'} + + config = { + 'packages': [], + 'preferences': {'confirmations': 'minimal', 'verbosity': 'high'} + } + + diff = self.config_manager.diff_configuration(config) + + self.assertEqual(len(diff['preferences_changed']), 2) + self.assertIn('confirmations', diff['preferences_changed']) + self.assertIn('verbosity', diff['preferences_changed']) + + @patch.object(ConfigManager, 'validate_compatibility') + @patch.object(ConfigManager, 'diff_configuration') + def test_import_configuration_dry_run(self, mock_diff, mock_validate): + """Test import in dry-run mode.""" + mock_validate.return_value = (True, None) + mock_diff.return_value = { + 'packages_to_install': [{'name': 'wget', 'version': '1.0.0', 'source': 'apt'}], + 'packages_to_upgrade': [], + 'packages_to_downgrade': [], + 'packages_already_installed': [], + 'preferences_changed': {}, + 'warnings': [] + } + + # Create test config file + config_path = os.path.join(self.temp_dir, 'test_config.yaml') + with open(config_path, 'w') as f: + yaml.safe_dump({ + 'cortex_version': '0.2.0', + 'os': 'ubuntu-24.04', + 'packages': [] + }, f) + + result = self.config_manager.import_configuration( + config_path=config_path, + dry_run=True + ) + + self.assertTrue(result['dry_run']) + self.assertIn('diff', result) + self.assertIn('message', result) + + @patch.object(ConfigManager, 'validate_compatibility') + @patch.object(ConfigManager, 'diff_configuration') + @patch.object(ConfigManager, '_install_package') + @patch.object(ConfigManager, '_save_preferences') + def test_import_configuration_success(self, mock_save_prefs, mock_install, mock_diff, mock_validate): + """Test successful import.""" + mock_validate.return_value = (True, None) + mock_diff.return_value = { + 'packages_to_install': [{'name': 'wget', 'version': '1.0.0', 'source': 'apt'}], + 'packages_to_upgrade': [], + 'packages_to_downgrade': [], + 'packages_already_installed': [], + 'preferences_changed': {}, + 'warnings': [] + } + mock_install.return_value = True + + # Create test config file + config_path = os.path.join(self.temp_dir, 'test_config.yaml') + with open(config_path, 'w') as f: + yaml.safe_dump({ + 'cortex_version': '0.2.0', + 'os': 'ubuntu-24.04', + 'packages': [{'name': 'wget', 'version': '1.0.0', 'source': 'apt'}], + 'preferences': {'confirmations': 'minimal'} + }, f) + + result = self.config_manager.import_configuration( + config_path=config_path, + dry_run=False + ) + + self.assertEqual(len(result['installed']), 1) + self.assertIn('wget', result['installed']) + self.assertTrue(result['preferences_updated']) + mock_install.assert_called_once() + mock_save_prefs.assert_called_once() + + @patch.object(ConfigManager, 'validate_compatibility') + def test_import_configuration_incompatible(self, mock_validate): + """Test import with incompatible configuration.""" + mock_validate.return_value = (False, "Incompatible version") + + # Create test config file + config_path = os.path.join(self.temp_dir, 'test_config.yaml') + with open(config_path, 'w') as f: + yaml.safe_dump({ + 'cortex_version': '999.0.0', + 'os': 'ubuntu-24.04', + 'packages': [] + }, f) + + with self.assertRaises(RuntimeError) as context: + self.config_manager.import_configuration( + config_path=config_path, + dry_run=False + ) + + self.assertIn('Incompatible', str(context.exception)) + + @patch.object(ConfigManager, 'validate_compatibility') + @patch.object(ConfigManager, 'diff_configuration') + @patch.object(ConfigManager, '_install_package') + def test_import_configuration_selective_packages(self, mock_install, mock_diff, mock_validate): + """Test selective import (packages only).""" + mock_validate.return_value = (True, None) + mock_diff.return_value = { + 'packages_to_install': [{'name': 'wget', 'version': '1.0.0', 'source': 'apt'}], + 'packages_to_upgrade': [], + 'packages_to_downgrade': [], + 'packages_already_installed': [], + 'preferences_changed': {}, + 'warnings': [] + } + mock_install.return_value = True + + # Create test config file + config_path = os.path.join(self.temp_dir, 'test_config.yaml') + with open(config_path, 'w') as f: + yaml.safe_dump({ + 'cortex_version': '0.2.0', + 'os': 'ubuntu-24.04', + 'packages': [{'name': 'wget', 'version': '1.0.0', 'source': 'apt'}], + 'preferences': {'confirmations': 'minimal'} + }, f) + + result = self.config_manager.import_configuration( + config_path=config_path, + dry_run=False, + selective=['packages'] + ) + + self.assertEqual(len(result['installed']), 1) + self.assertFalse(result['preferences_updated']) + + @patch.object(ConfigManager, 'validate_compatibility') + @patch.object(ConfigManager, 'diff_configuration') + @patch.object(ConfigManager, '_save_preferences') + def test_import_configuration_selective_preferences(self, mock_save_prefs, mock_diff, mock_validate): + """Test selective import (preferences only).""" + mock_validate.return_value = (True, None) + mock_diff.return_value = { + 'packages_to_install': [], + 'packages_to_upgrade': [], + 'packages_to_downgrade': [], + 'packages_already_installed': [], + 'preferences_changed': {}, + 'warnings': [] + } + + # Create test config file + config_path = os.path.join(self.temp_dir, 'test_config.yaml') + with open(config_path, 'w') as f: + yaml.safe_dump({ + 'cortex_version': '0.2.0', + 'os': 'ubuntu-24.04', + 'packages': [], + 'preferences': {'confirmations': 'minimal'} + }, f) + + result = self.config_manager.import_configuration( + config_path=config_path, + dry_run=False, + selective=['preferences'] + ) + + self.assertEqual(len(result['installed']), 0) + self.assertTrue(result['preferences_updated']) + mock_save_prefs.assert_called_once() + + def test_error_handling_invalid_yaml(self): + """Test error handling with malformed YAML file.""" + config_path = os.path.join(self.temp_dir, 'invalid.yaml') + with open(config_path, 'w') as f: + f.write("{ invalid yaml content [") + + with self.assertRaises(RuntimeError) as context: + self.config_manager.import_configuration(config_path) + + self.assertIn('Failed to load', str(context.exception)) + + def test_error_handling_missing_file(self): + """Test error handling with missing configuration file.""" + config_path = os.path.join(self.temp_dir, 'nonexistent.yaml') + + with self.assertRaises(RuntimeError) as context: + self.config_manager.import_configuration(config_path) + + self.assertIn('Failed to load', str(context.exception)) + + @patch.object(ConfigManager, 'validate_compatibility') + @patch.object(ConfigManager, 'diff_configuration') + @patch.object(ConfigManager, '_install_package') + def test_error_handling_package_install_fails(self, mock_install, mock_diff, mock_validate): + """Test handling of package installation failures.""" + mock_validate.return_value = (True, None) + mock_diff.return_value = { + 'packages_to_install': [ + {'name': 'pkg1', 'version': '1.0.0', 'source': 'apt'}, + {'name': 'pkg2', 'version': '2.0.0', 'source': 'apt'} + ], + 'packages_to_upgrade': [], + 'packages_to_downgrade': [], + 'packages_already_installed': [], + 'preferences_changed': {}, + 'warnings': [] + } + # First package succeeds, second fails + mock_install.side_effect = [True, False] + + # Create test config file + config_path = os.path.join(self.temp_dir, 'test_config.yaml') + with open(config_path, 'w') as f: + yaml.safe_dump({ + 'cortex_version': '0.2.0', + 'os': 'ubuntu-24.04', + 'packages': [ + {'name': 'pkg1', 'version': '1.0.0', 'source': 'apt'}, + {'name': 'pkg2', 'version': '2.0.0', 'source': 'apt'} + ] + }, f) + + result = self.config_manager.import_configuration( + config_path=config_path, + dry_run=False + ) + + self.assertEqual(len(result['installed']), 1) + self.assertEqual(len(result['failed']), 1) + + def test_compare_versions(self): + """Test version comparison.""" + # Equal versions + self.assertEqual(self.config_manager._compare_versions('1.0.0', '1.0.0'), 0) + + # First version less than second + self.assertEqual(self.config_manager._compare_versions('1.0.0', '2.0.0'), -1) + self.assertEqual(self.config_manager._compare_versions('1.0.0', '1.1.0'), -1) + self.assertEqual(self.config_manager._compare_versions('1.0.0', '1.0.1'), -1) + + # First version greater than second + self.assertEqual(self.config_manager._compare_versions('2.0.0', '1.0.0'), 1) + self.assertEqual(self.config_manager._compare_versions('1.1.0', '1.0.0'), 1) + self.assertEqual(self.config_manager._compare_versions('1.0.1', '1.0.0'), 1) + + def test_preferences_save_and_load(self): + """Test saving and loading preferences.""" + preferences = { + 'confirmations': 'minimal', + 'verbosity': 'normal' + } + + self.config_manager._save_preferences(preferences) + loaded = self.config_manager._load_preferences() + + self.assertEqual(loaded, preferences) + + @patch('subprocess.run') + def test_install_package_apt_with_sandbox(self, mock_run): + """Test package installation via APT with SandboxExecutor.""" + mock_executor = MagicMock() + mock_result = MagicMock() + mock_result.success = True + mock_executor.execute.return_value = mock_result + + self.config_manager.sandbox_executor = mock_executor + + pkg = {'name': 'curl', 'version': '7.0.0', 'source': 'apt'} + result = self.config_manager._install_package(pkg) + + self.assertTrue(result) + mock_executor.execute.assert_called_once() + call_args = mock_executor.execute.call_args[0][0] + self.assertIn('curl', call_args) + self.assertIn('apt-get install', call_args) + + @patch('subprocess.run') + def test_install_package_pip_direct(self, mock_run): + """Test package installation via PIP without SandboxExecutor.""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + pkg = {'name': 'numpy', 'version': '1.24.0', 'source': 'pip'} + result = self.config_manager._install_package(pkg) + + self.assertTrue(result) + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] + self.assertIn('pip3', call_args) + self.assertIn('numpy==1.24.0', call_args) + + @patch('subprocess.run') + def test_install_package_npm_direct(self, mock_run): + """Test package installation via NPM without SandboxExecutor.""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + pkg = {'name': 'typescript', 'version': '5.0.0', 'source': 'npm'} + result = self.config_manager._install_package(pkg) + + self.assertTrue(result) + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] + self.assertIn('npm', call_args) + self.assertIn('typescript@5.0.0', call_args) + + +if __name__ == '__main__': + unittest.main() From e42ba6787409dbcbe4e58408d4cb84979df02181 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 16:56:54 +0000 Subject: [PATCH 03/16] Address code review feedback: add constants, improve version comparison, refactor methods Co-authored-by: danishirfan21 <44131991+danishirfan21@users.noreply.github.com> --- =23.0 | 2 + src/config_manager.py | 163 ++++++++++++++++++++++++++++-------------- src/requirements.txt | 1 + 3 files changed, 112 insertions(+), 54 deletions(-) create mode 100644 =23.0 diff --git a/=23.0 b/=23.0 new file mode 100644 index 0000000..e7cf74d --- /dev/null +++ b/=23.0 @@ -0,0 +1,2 @@ +Defaulting to user installation because normal site-packages is not writeable +Requirement already satisfied: packaging in /usr/lib/python3/dist-packages (24.0) diff --git a/src/config_manager.py b/src/config_manager.py index 304b240..8d799fb 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -31,6 +31,16 @@ class ConfigManager: CORTEX_VERSION = "0.2.0" + # Timeout constants + DETECTION_TIMEOUT = 30 # seconds for package detection + INSTALLATION_TIMEOUT = 300 # seconds for package installation + + # Package sources + SOURCE_APT = 'apt' + SOURCE_PIP = 'pip' + SOURCE_NPM = 'npm' + DEFAULT_SOURCES = [SOURCE_APT, SOURCE_PIP, SOURCE_NPM] + def __init__(self, sandbox_executor=None): """ Initialize ConfigManager. @@ -59,7 +69,7 @@ def detect_apt_packages(self) -> List[Dict[str, Any]]: ['dpkg-query', '-W', '-f=${Package}\t${Version}\n'], capture_output=True, text=True, - timeout=30 + timeout=self.DETECTION_TIMEOUT ) if result.returncode == 0: @@ -70,7 +80,7 @@ def detect_apt_packages(self) -> List[Dict[str, Any]]: packages.append({ 'name': parts[0], 'version': parts[1], - 'source': 'apt' + 'source': self.SOURCE_APT }) except (subprocess.TimeoutExpired, FileNotFoundError) as e: pass @@ -93,7 +103,7 @@ def detect_pip_packages(self) -> List[Dict[str, Any]]: [pip_cmd, 'list', '--format=json'], capture_output=True, text=True, - timeout=30 + timeout=self.DETECTION_TIMEOUT ) if result.returncode == 0: @@ -102,7 +112,7 @@ def detect_pip_packages(self) -> List[Dict[str, Any]]: packages.append({ 'name': pkg['name'], 'version': pkg['version'], - 'source': 'pip' + 'source': self.SOURCE_PIP }) break # Success, no need to try other pip commands except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): @@ -124,7 +134,7 @@ def detect_npm_packages(self) -> List[Dict[str, Any]]: ['npm', 'list', '-g', '--depth=0', '--json'], capture_output=True, text=True, - timeout=30 + timeout=self.DETECTION_TIMEOUT ) if result.returncode == 0: @@ -136,7 +146,7 @@ def detect_npm_packages(self) -> List[Dict[str, Any]]: packages.append({ 'name': name, 'version': version, - 'source': 'npm' + 'source': self.SOURCE_NPM }) except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): pass @@ -155,30 +165,27 @@ def detect_installed_packages(self, sources: List[str] = None) -> List[Dict[str, List of package dictionaries sorted by name """ if sources is None: - sources = ['apt', 'pip', 'npm'] + sources = self.DEFAULT_SOURCES all_packages = [] - if 'apt' in sources: + if self.SOURCE_APT in sources: all_packages.extend(self.detect_apt_packages()) - if 'pip' in sources: + if self.SOURCE_PIP in sources: all_packages.extend(self.detect_pip_packages()) - if 'npm' in sources: + if self.SOURCE_NPM in sources: all_packages.extend(self.detect_npm_packages()) - # Remove duplicates based on name and source - seen = set() - unique_packages = [] + # Remove duplicates based on name and source (more efficient) + unique_packages_dict = {} for pkg in all_packages: key = (pkg['name'], pkg['source']) - if key not in seen: - seen.add(key) - unique_packages.append(pkg) + unique_packages_dict[key] = pkg # Sort by name - unique_packages.sort(key=lambda x: x['name']) + unique_packages = sorted(unique_packages_dict.values(), key=lambda x: x['name']) return unique_packages @@ -190,7 +197,11 @@ def _detect_os_version(self) -> str: OS version string (e.g., 'ubuntu-24.04') """ try: - with open('/etc/os-release', 'r') as f: + os_release_path = Path('/etc/os-release') + if not os_release_path.exists(): + return "unknown" + + with open(os_release_path, 'r') as f: os_release = f.read() # Extract distribution name and version @@ -429,7 +440,31 @@ def diff_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]: def _compare_versions(self, version1: str, version2: str) -> int: """ - Compare two version strings. + Compare two version strings using packaging library for robustness. + + Args: + version1: First version string + version2: Second version string + + Returns: + -1 if version1 < version2, 0 if equal, 1 if version1 > version2 + """ + try: + from packaging import version + v1 = version.parse(version1) + v2 = version.parse(version2) + if v1 < v2: + return -1 + elif v1 > v2: + return 1 + return 0 + except Exception: + # Fallback to simple numeric comparison + return self._simple_version_compare(version1, version2) + + def _simple_version_compare(self, version1: str, version2: str) -> int: + """ + Simple fallback version comparison (extract numeric parts). Args: version1: First version string @@ -512,38 +547,58 @@ def import_configuration(self, # Import packages if 'packages' in selective: - diff = self.diff_configuration(config) - packages_to_process = ( - diff['packages_to_install'] + - diff['packages_to_upgrade'] + - diff['packages_to_downgrade'] - ) - - for pkg in packages_to_process: - try: - success = self._install_package(pkg) - if success: - if pkg in diff['packages_to_install']: - summary['installed'].append(pkg['name']) - else: - summary['upgraded'].append(pkg['name']) - else: - summary['failed'].append(pkg['name']) - except Exception as e: - summary['failed'].append(f"{pkg['name']} ({str(e)})") + self._import_packages(config, summary) # Import preferences if 'preferences' in selective: - config_prefs = config.get('preferences', {}) - if config_prefs: - try: - self._save_preferences(config_prefs) - summary['preferences_updated'] = True - except Exception as e: - summary['failed'].append(f"preferences ({str(e)})") + self._import_preferences(config, summary) return summary + def _import_packages(self, config: Dict[str, Any], summary: Dict[str, Any]) -> None: + """ + Import packages from configuration. + + Args: + config: Configuration dictionary + summary: Summary dictionary to update with results + """ + diff = self.diff_configuration(config) + packages_to_process = ( + diff['packages_to_install'] + + diff['packages_to_upgrade'] + + diff['packages_to_downgrade'] + ) + + for pkg in packages_to_process: + try: + success = self._install_package(pkg) + if success: + if pkg in diff['packages_to_install']: + summary['installed'].append(pkg['name']) + else: + summary['upgraded'].append(pkg['name']) + else: + summary['failed'].append(pkg['name']) + except Exception as e: + summary['failed'].append(f"{pkg['name']} ({str(e)})") + + def _import_preferences(self, config: Dict[str, Any], summary: Dict[str, Any]) -> None: + """ + Import preferences from configuration. + + Args: + config: Configuration dictionary + summary: Summary dictionary to update with results + """ + config_prefs = config.get('preferences', {}) + if config_prefs: + try: + self._save_preferences(config_prefs) + summary['preferences_updated'] = True + except Exception as e: + summary['failed'].append(f"preferences ({str(e)})") + def _install_package(self, pkg: Dict[str, Any]) -> bool: """ Install a single package using appropriate package manager. @@ -561,7 +616,7 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: if self.sandbox_executor: # Use SandboxExecutor for safe installation try: - if source == 'apt': + if source == self.SOURCE_APT: # For apt, we typically install latest or specific version if version: command = f"sudo apt-get install -y {name}={version}" @@ -570,7 +625,7 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: result = self.sandbox_executor.execute(command) return result.success - elif source == 'pip': + elif source == self.SOURCE_PIP: if version: command = f"pip3 install {name}=={version}" else: @@ -578,7 +633,7 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: result = self.sandbox_executor.execute(command) return result.success - elif source == 'npm': + elif source == self.SOURCE_NPM: if version: command = f"npm install -g {name}@{version}" else: @@ -591,28 +646,28 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: else: # Direct installation (not recommended in production) try: - if source == 'apt': + if source == self.SOURCE_APT: if version: cmd = ['sudo', 'apt-get', 'install', '-y', f'{name}={version}'] else: cmd = ['sudo', 'apt-get', 'install', '-y', name] - result = subprocess.run(cmd, capture_output=True, timeout=300) + result = subprocess.run(cmd, capture_output=True, timeout=self.INSTALLATION_TIMEOUT) return result.returncode == 0 - elif source == 'pip': + elif source == self.SOURCE_PIP: if version: cmd = ['pip3', 'install', f'{name}=={version}'] else: cmd = ['pip3', 'install', name] - result = subprocess.run(cmd, capture_output=True, timeout=300) + result = subprocess.run(cmd, capture_output=True, timeout=self.INSTALLATION_TIMEOUT) return result.returncode == 0 - elif source == 'npm': + elif source == self.SOURCE_NPM: if version: cmd = ['npm', 'install', '-g', f'{name}@{version}'] else: cmd = ['npm', 'install', '-g', name] - result = subprocess.run(cmd, capture_output=True, timeout=300) + result = subprocess.run(cmd, capture_output=True, timeout=self.INSTALLATION_TIMEOUT) return result.returncode == 0 except Exception: diff --git a/src/requirements.txt b/src/requirements.txt index 6494229..81aca17 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -7,6 +7,7 @@ plyer>=2.0.0 # Desktop notifications (optional but recommended) # Configuration Management pyyaml>=6.0.1 +packaging>=23.0 # Testing Dependencies (dev) pytest>=7.0.0 From 7e010c5ac1f21d94bd843f6670ba7b7fd4c52268 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 16:57:22 +0000 Subject: [PATCH 04/16] Remove spurious file from packaging install Co-authored-by: danishirfan21 <44131991+danishirfan21@users.noreply.github.com> --- =23.0 | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 =23.0 diff --git a/=23.0 b/=23.0 deleted file mode 100644 index e7cf74d..0000000 --- a/=23.0 +++ /dev/null @@ -1,2 +0,0 @@ -Defaulting to user installation because normal site-packages is not writeable -Requirement already satisfied: packaging in /usr/lib/python3/dist-packages (24.0) From 21c6b52f6b0723534083cd2e26f7656a5e856900 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 14 Nov 2025 17:12:25 +0000 Subject: [PATCH 05/16] Add comprehensive docstrings to helper methods Co-authored-by: danishirfan21 <44131991+danishirfan21@users.noreply.github.com> --- src/config_manager.py | 84 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 74 insertions(+), 10 deletions(-) diff --git a/src/config_manager.py b/src/config_manager.py index 8d799fb..cbd60eb 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -464,14 +464,38 @@ def _compare_versions(self, version1: str, version2: str) -> int: def _simple_version_compare(self, version1: str, version2: str) -> int: """ - Simple fallback version comparison (extract numeric parts). + Fallback version comparison using numeric extraction. + + Used when the packaging library is unavailable or fails to parse + version strings. Extracts numeric components and compares them + sequentially, padding shorter versions with zeros. + + This method provides a basic version comparison by extracting all + numeric parts from the version strings and comparing them position + by position. It handles simple version schemes well but may not + correctly handle complex pre-release tags or build metadata. Args: - version1: First version string - version2: Second version string + version1: First version string (e.g., "1.2.3", "2.0.0-rc1") + version2: Second version string to compare against Returns: - -1 if version1 < version2, 0 if equal, 1 if version1 > version2 + int: -1 if version1 < version2 + 0 if versions are equal + 1 if version1 > version2 + + Example: + >>> _simple_version_compare("1.2.3", "1.2.4") + -1 + >>> _simple_version_compare("2.0.0", "1.9.9") + 1 + >>> _simple_version_compare("1.0", "1.0.0") + 0 + + Note: + This is a simplified comparison that only considers numeric parts. + Complex version schemes (pre-release tags, build metadata) may not + be handled correctly. Prefer using packaging.version when available. """ # Simple version comparison (extract numeric parts) v1_parts = re.findall(r'\d+', version1) @@ -557,11 +581,32 @@ def import_configuration(self, def _import_packages(self, config: Dict[str, Any], summary: Dict[str, Any]) -> None: """ - Import packages from configuration. + Import packages from configuration and update system state. + + This method processes package installations by first computing the + difference between the current system state and the target configuration + using diff_configuration(). It then attempts to install, upgrade, or + downgrade packages as needed. + + The method continues processing all packages even if individual packages + fail to install, ensuring maximum success. Failed installations are + tracked in the summary for user review. Args: - config: Configuration dictionary - summary: Summary dictionary to update with results + config: Configuration dictionary containing package specifications + Expected to have 'packages' key with list of package dicts + summary: Summary dictionary to update with results. Modified in-place + with keys: 'installed', 'upgraded', 'failed' + + Updates: + summary['installed']: List of successfully installed package names + summary['upgraded']: List of successfully upgraded package names + summary['failed']: List of failed package names (with error details) + + Note: + Uses _install_package() internally for actual package installation. + Each package is categorized based on diff results (install vs upgrade). + Errors are caught and logged to allow processing to continue. """ diff = self.diff_configuration(config) packages_to_process = ( @@ -585,11 +630,30 @@ def _import_packages(self, config: Dict[str, Any], summary: Dict[str, Any]) -> N def _import_preferences(self, config: Dict[str, Any], summary: Dict[str, Any]) -> None: """ - Import preferences from configuration. + Import user preferences from configuration and save to disk. + + Extracts preferences from the configuration dictionary and saves them + to the user's Cortex preferences file at ~/.cortex/preferences.yaml. + If preferences are empty or missing, no action is taken. + + This method handles the persistence of user-configurable settings such + as confirmation levels, verbosity settings, and other behavioral + preferences for the Cortex system. Args: - config: Configuration dictionary - summary: Summary dictionary to update with results + config: Configuration dictionary containing optional 'preferences' key + with user preference settings as a dictionary + summary: Summary dictionary to update with results. Modified in-place + with keys: 'preferences_updated', 'failed' + + Updates: + summary['preferences_updated']: Set to True on successful save + summary['failed']: Appends error message if save fails + + Note: + Uses _save_preferences() internally to persist to disk. + Errors during save are caught and added to failed list with details. + If config has no preferences or they are empty, silently succeeds. """ config_prefs = config.get('preferences', {}) if config_prefs: From e12e5d91ecd4ae294dbb188cc5292908ac47bb7a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 19 Nov 2025 00:28:35 +0000 Subject: [PATCH 06/16] Address PR review feedback: improve security, fix imports, enhance validation Co-authored-by: danishirfan21 <44131991+danishirfan21@users.noreply.github.com> --- CONFIGURATION.md | 4 ++-- src/config_manager.py | 48 ++++++++++++++++++++++++++++++++++---- src/test_config_manager.py | 2 +- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 734c12d..88a499d 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -23,7 +23,7 @@ Cortex Linux's Configuration Management feature enables you to export, share, an Install required Python dependencies: ```bash -pip3 install pyyaml>=6.0.1 +pip3 install pyyaml>=6.0.1 packaging>=23.0 ``` ### System Requirements @@ -213,7 +213,7 @@ environment_variables: - **hardware**: Optional hardware profile from HardwareProfiler - **packages**: List of installed packages with name, version, and source - **preferences**: User preferences for Cortex behavior -- **environment_variables**: Selected environment variables +- **environment_variables**: Selected environment variables (exported for reference only; not automatically restored during import) ### Package Sources diff --git a/src/config_manager.py b/src/config_manager.py index cbd60eb..6ad010a 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -11,7 +11,7 @@ import yaml import subprocess import re -from typing import Dict, List, Optional, Any, Set, Tuple +from typing import Dict, List, Optional, Any, Tuple from datetime import datetime from pathlib import Path @@ -83,6 +83,7 @@ def detect_apt_packages(self) -> List[Dict[str, Any]]: 'source': self.SOURCE_APT }) except (subprocess.TimeoutExpired, FileNotFoundError) as e: + # Silently handle errors - package manager may not be available pass return packages @@ -149,6 +150,7 @@ def detect_npm_packages(self) -> List[Dict[str, Any]]: 'source': self.SOURCE_NPM }) except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): + # Silently handle errors - npm may not be installed or global packages unavailable pass return packages @@ -265,7 +267,7 @@ def export_configuration(self, Success message with file path """ if package_sources is None: - package_sources = ['apt', 'pip', 'npm'] + package_sources = self.DEFAULT_SOURCES # Build configuration dictionary config = { @@ -388,9 +390,16 @@ def diff_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]: # Compare packages from config config_packages = config.get('packages', []) for pkg in config_packages: - name = pkg['name'] - version = pkg['version'] - source = pkg['source'] + name = pkg.get('name') + version = pkg.get('version') + source = pkg.get('source') + + if not name or not source: + diff['warnings'].append( + f"Malformed package entry skipped: {pkg}" + ) + continue # Skip malformed package entries + key = (name, source) if key not in current_pkg_map: @@ -501,6 +510,14 @@ def _simple_version_compare(self, version1: str, version2: str) -> int: v1_parts = re.findall(r'\d+', version1) v2_parts = re.findall(r'\d+', version2) + # Handle case where no numeric parts found + if not v1_parts and not v2_parts: + return 0 # Both have no numeric parts, treat as equal + if not v1_parts: + return -1 # version1 has no numeric parts, consider it less + if not v2_parts: + return 1 # version2 has no numeric parts, consider it greater + # Pad to same length max_len = max(len(v1_parts), len(v2_parts)) v1_parts += ['0'] * (max_len - len(v1_parts)) @@ -663,6 +680,21 @@ def _import_preferences(self, config: Dict[str, Any], summary: Dict[str, Any]) - except Exception as e: summary['failed'].append(f"preferences ({str(e)})") + def _validate_package_identifier(self, identifier: str) -> bool: + """ + Validate package name or version contains only safe characters. + + Prevents command injection by ensuring package identifiers only contain + alphanumeric characters and common package naming characters. + + Args: + identifier: Package name or version string to validate + + Returns: + bool: True if identifier is safe, False otherwise + """ + return bool(re.match(r'^[a-zA-Z0-9._:@=+\-]+$', identifier)) + def _install_package(self, pkg: Dict[str, Any]) -> bool: """ Install a single package using appropriate package manager. @@ -677,6 +709,12 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: version = pkg.get('version', '') source = pkg['source'] + # Validate package identifiers to prevent command injection + if not self._validate_package_identifier(name): + return False + if version and not self._validate_package_identifier(version): + return False + if self.sandbox_executor: # Use SandboxExecutor for safe installation try: diff --git a/src/test_config_manager.py b/src/test_config_manager.py index 0d0779a..bf15995 100644 --- a/src/test_config_manager.py +++ b/src/test_config_manager.py @@ -5,7 +5,7 @@ """ import unittest -from unittest.mock import patch, mock_open, MagicMock, call +from unittest.mock import patch, MagicMock import tempfile import shutil import yaml From 70c403d9de10941a5bac9b3a650765458e1b49ee Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 06:04:37 +0000 Subject: [PATCH 07/16] support NPM scoped packages safely and fix outdated comment --- src/config_manager.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/config_manager.py b/src/config_manager.py index 6ad010a..718d1db 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -407,7 +407,7 @@ def diff_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]: else: current_version = current_pkg_map[key] if current_version != version: - # Simple version comparison (would need proper semver in production) + # Compare versions (uses packaging library with fallback to simple comparison) try: if self._compare_versions(current_version, version) < 0: diff['packages_to_upgrade'].append({ @@ -686,6 +686,7 @@ def _validate_package_identifier(self, identifier: str) -> bool: Prevents command injection by ensuring package identifiers only contain alphanumeric characters and common package naming characters. + Supports NPM scoped packages (@scope/package) while blocking path traversal. Args: identifier: Package name or version string to validate @@ -693,7 +694,9 @@ def _validate_package_identifier(self, identifier: str) -> bool: Returns: bool: True if identifier is safe, False otherwise """ - return bool(re.match(r'^[a-zA-Z0-9._:@=+\-]+$', identifier)) + # Allow standard characters plus exactly one forward slash for NPM scoped packages + # This prevents path traversal (../, ../../, etc.) while allowing @scope/package + return bool(re.match(r'^[a-zA-Z0-9._:@=+\-]+(/[a-zA-Z0-9._\-]+)?$', identifier)) def _install_package(self, pkg: Dict[str, Any]) -> bool: """ From 0893bb8dba7b029f95f3cf2bfb0ed1db0ccc6de0 Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:42:24 +0500 Subject: [PATCH 08/16] Update CONFIGURATION.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- CONFIGURATION.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 88a499d..f4bc2d4 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -579,7 +579,7 @@ Cortex Linux Configuration Management is part of the Cortex Linux project. ## Support -- **Issues**: https://github.com/danishirfan21/cortex/issues +- **Issues**: https://github.com/cortexlinux/cortex/issues - **Discord**: https://discord.gg/uCqHvxjU83 - **Email**: mike@cortexlinux.com From c056bfa6146f6ec725a508dda1eff44f792a1da7 Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 12:12:27 +0000 Subject: [PATCH 09/16] code feedback changes --- CONFIGURATION.md | 15 +++++++++------ src/config_manager.py | 5 +++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index f4bc2d4..c90ca70 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -261,7 +261,7 @@ git push Name files descriptively: -``` +```text dev-machine-john.yaml production-web-server.yaml ml-training-gpu-rig.yaml @@ -313,7 +313,7 @@ python3 config_manager.py export \ Maintain separate configs for different environments: -``` +```text configs/ ├── development.yaml ├── staging.yaml @@ -417,7 +417,10 @@ result = manager.import_configuration( dry_run=True ) -# Check diff +# Check diff - load the config file first +import yaml +with open('config.yaml', 'r') as f: + config = yaml.safe_load(f) diff = manager.diff_configuration(config) print(f"To install: {len(diff['packages_to_install'])}") ``` @@ -579,9 +582,9 @@ Cortex Linux Configuration Management is part of the Cortex Linux project. ## Support -- **Issues**: https://github.com/cortexlinux/cortex/issues -- **Discord**: https://discord.gg/uCqHvxjU83 -- **Email**: mike@cortexlinux.com +- **Issues**: [https://github.com/cortexlinux/cortex/issues](https://github.com/cortexlinux/cortex/issues) +- **Discord**: [https://discord.gg/uCqHvxjU83](https://discord.gg/uCqHvxjU83) +- **Email**: [mike@cortexlinux.com](mailto:mike@cortexlinux.com) --- diff --git a/src/config_manager.py b/src/config_manager.py index 718d1db..f3ef7a0 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -581,6 +581,7 @@ def import_configuration(self, summary = { 'installed': [], 'upgraded': [], + 'downgraded': [], 'failed': [], 'skipped': [], 'preferences_updated': False @@ -638,6 +639,8 @@ def _import_packages(self, config: Dict[str, Any], summary: Dict[str, Any]) -> N if success: if pkg in diff['packages_to_install']: summary['installed'].append(pkg['name']) + elif pkg in diff['packages_to_downgrade']: + summary['downgraded'].append(pkg['name']) else: summary['upgraded'].append(pkg['name']) else: @@ -888,6 +891,8 @@ def main(): print(f"📦 Installed: {len(result['installed'])} packages") if result['upgraded']: print(f"⬆️ Upgraded: {len(result['upgraded'])} packages") + if result.get('downgraded'): + print(f"⬇️ Downgraded: {len(result['downgraded'])} packages") if result['failed']: print(f"❌ Failed: {len(result['failed'])} packages") for pkg in result['failed']: From 14088eb537b974e3250024576365e84bd5edb1cb Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 14:05:24 +0000 Subject: [PATCH 10/16] feedback resolved --- src/config_manager.py | 208 +++++++++++++++++++++++------------------- 1 file changed, 112 insertions(+), 96 deletions(-) diff --git a/src/config_manager.py b/src/config_manager.py index f3ef7a0..7bd6c1e 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -82,7 +82,7 @@ def detect_apt_packages(self) -> List[Dict[str, Any]]: 'version': parts[1], 'source': self.SOURCE_APT }) - except (subprocess.TimeoutExpired, FileNotFoundError) as e: + except (subprocess.TimeoutExpired, FileNotFoundError): # Silently handle errors - package manager may not be available pass @@ -361,6 +361,46 @@ def validate_compatibility(self, config: Dict[str, Any]) -> Tuple[bool, Optional return True, None + def _categorize_package(self, pkg: Dict[str, Any], current_pkg_map: Dict[Tuple[str, str], str]) -> Tuple[str, Optional[Dict[str, Any]]]: + """ + Categorize a package as install, upgrade, downgrade, or already installed. + + Args: + pkg: Package dictionary from config + current_pkg_map: Map of (name, source) to current version + + Returns: + Tuple of (category, package_data) where category is one of: + 'install', 'upgrade', 'downgrade', 'already_installed', 'skip' + package_data is the modified package dict (with current_version if applicable) + """ + name = pkg.get('name') + version = pkg.get('version') + source = pkg.get('source') + + if not name or not source: + return 'skip', None + + key = (name, source) + + if key not in current_pkg_map: + return 'install', pkg + + current_version = current_pkg_map[key] + if current_version == version: + return 'already_installed', pkg + + # Compare versions + try: + pkg_with_version = {**pkg, 'current_version': current_version} + if self._compare_versions(current_version, version) < 0: + return 'upgrade', pkg_with_version + else: + return 'downgrade', pkg_with_version + except Exception: + # If comparison fails, treat as upgrade + return 'upgrade', {**pkg, 'current_version': current_version} + def diff_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]: """ Compare current system state with configuration file. @@ -390,43 +430,18 @@ def diff_configuration(self, config: Dict[str, Any]) -> Dict[str, Any]: # Compare packages from config config_packages = config.get('packages', []) for pkg in config_packages: - name = pkg.get('name') - version = pkg.get('version') - source = pkg.get('source') - - if not name or not source: - diff['warnings'].append( - f"Malformed package entry skipped: {pkg}" - ) - continue # Skip malformed package entries - - key = (name, source) + category, pkg_data = self._categorize_package(pkg, current_pkg_map) - if key not in current_pkg_map: - diff['packages_to_install'].append(pkg) - else: - current_version = current_pkg_map[key] - if current_version != version: - # Compare versions (uses packaging library with fallback to simple comparison) - try: - if self._compare_versions(current_version, version) < 0: - diff['packages_to_upgrade'].append({ - **pkg, - 'current_version': current_version - }) - else: - diff['packages_to_downgrade'].append({ - **pkg, - 'current_version': current_version - }) - except Exception: - # If comparison fails, treat as upgrade - diff['packages_to_upgrade'].append({ - **pkg, - 'current_version': current_version - }) - else: - diff['packages_already_installed'].append(pkg) + if category == 'skip': + diff['warnings'].append(f"Malformed package entry skipped: {pkg}") + elif category == 'install': + diff['packages_to_install'].append(pkg_data) + elif category == 'upgrade': + diff['packages_to_upgrade'].append(pkg_data) + elif category == 'downgrade': + diff['packages_to_downgrade'].append(pkg_data) + elif category == 'already_installed': + diff['packages_already_installed'].append(pkg_data) # Compare preferences current_prefs = self._load_preferences() @@ -701,6 +716,64 @@ def _validate_package_identifier(self, identifier: str) -> bool: # This prevents path traversal (../, ../../, etc.) while allowing @scope/package return bool(re.match(r'^[a-zA-Z0-9._:@=+\-]+(/[a-zA-Z0-9._\-]+)?$', identifier)) + def _install_with_sandbox(self, name: str, version: Optional[str], source: str) -> bool: + """ + Install package using sandbox executor. + + Args: + name: Package name + version: Package version (optional) + source: Package source (apt/pip/npm) + + Returns: + True if successful, False otherwise + """ + try: + if source == self.SOURCE_APT: + command = f"sudo apt-get install -y {name}={version}" if version else f"sudo apt-get install -y {name}" + elif source == self.SOURCE_PIP: + command = f"pip3 install {name}=={version}" if version else f"pip3 install {name}" + elif source == self.SOURCE_NPM: + command = f"npm install -g {name}@{version}" if version else f"npm install -g {name}" + else: + return False + + result = self.sandbox_executor.execute(command) + return result.success + except Exception: + return False + + def _install_direct(self, name: str, version: Optional[str], source: str) -> bool: + """ + Install package directly using subprocess (not recommended in production). + + Args: + name: Package name + version: Package version (optional) + source: Package source (apt/pip/npm) + + Returns: + True if successful, False otherwise + """ + try: + if source == self.SOURCE_APT: + cmd = ['sudo', 'apt-get', 'install', '-y', f'{name}={version}' if version else name] + if not version: + cmd = ['sudo', 'apt-get', 'install', '-y', name] + else: + cmd = ['sudo', 'apt-get', 'install', '-y', f'{name}={version}'] + elif source == self.SOURCE_PIP: + cmd = ['pip3', 'install', f'{name}=={version}'] if version else ['pip3', 'install', name] + elif source == self.SOURCE_NPM: + cmd = ['npm', 'install', '-g', f'{name}@{version}'] if version else ['npm', 'install', '-g', name] + else: + return False + + result = subprocess.run(cmd, capture_output=True, timeout=self.INSTALLATION_TIMEOUT) + return result.returncode == 0 + except Exception: + return False + def _install_package(self, pkg: Dict[str, Any]) -> bool: """ Install a single package using appropriate package manager. @@ -722,66 +795,9 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: return False if self.sandbox_executor: - # Use SandboxExecutor for safe installation - try: - if source == self.SOURCE_APT: - # For apt, we typically install latest or specific version - if version: - command = f"sudo apt-get install -y {name}={version}" - else: - command = f"sudo apt-get install -y {name}" - result = self.sandbox_executor.execute(command) - return result.success - - elif source == self.SOURCE_PIP: - if version: - command = f"pip3 install {name}=={version}" - else: - command = f"pip3 install {name}" - result = self.sandbox_executor.execute(command) - return result.success - - elif source == self.SOURCE_NPM: - if version: - command = f"npm install -g {name}@{version}" - else: - command = f"npm install -g {name}" - result = self.sandbox_executor.execute(command) - return result.success - - except Exception: - return False + return self._install_with_sandbox(name, version or None, source) else: - # Direct installation (not recommended in production) - try: - if source == self.SOURCE_APT: - if version: - cmd = ['sudo', 'apt-get', 'install', '-y', f'{name}={version}'] - else: - cmd = ['sudo', 'apt-get', 'install', '-y', name] - result = subprocess.run(cmd, capture_output=True, timeout=self.INSTALLATION_TIMEOUT) - return result.returncode == 0 - - elif source == self.SOURCE_PIP: - if version: - cmd = ['pip3', 'install', f'{name}=={version}'] - else: - cmd = ['pip3', 'install', name] - result = subprocess.run(cmd, capture_output=True, timeout=self.INSTALLATION_TIMEOUT) - return result.returncode == 0 - - elif source == self.SOURCE_NPM: - if version: - cmd = ['npm', 'install', '-g', f'{name}@{version}'] - else: - cmd = ['npm', 'install', '-g', name] - result = subprocess.run(cmd, capture_output=True, timeout=self.INSTALLATION_TIMEOUT) - return result.returncode == 0 - - except Exception: - return False - - return False + return self._install_direct(name, version or None, source) def main(): From 1abff24dc629f8b1fcbb43be9a6b4f79d888aa0f Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 14:30:21 +0000 Subject: [PATCH 11/16] feedback change --- src/config_manager.py | 4 ---- 1 file changed, 4 deletions(-) mode change 100644 => 100755 src/config_manager.py diff --git a/src/config_manager.py b/src/config_manager.py old mode 100644 new mode 100755 index 7bd6c1e..8851345 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -758,10 +758,6 @@ def _install_direct(self, name: str, version: Optional[str], source: str) -> boo try: if source == self.SOURCE_APT: cmd = ['sudo', 'apt-get', 'install', '-y', f'{name}={version}' if version else name] - if not version: - cmd = ['sudo', 'apt-get', 'install', '-y', name] - else: - cmd = ['sudo', 'apt-get', 'install', '-y', f'{name}={version}'] elif source == self.SOURCE_PIP: cmd = ['pip3', 'install', f'{name}=={version}'] if version else ['pip3', 'install', name] elif source == self.SOURCE_NPM: From bd468f31a4148ebd99bdef067b6aea710b1b3999 Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 15:51:44 +0000 Subject: [PATCH 12/16] Remove shebang from config_manager.py to resolve bot warning --- src/config_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/config_manager.py b/src/config_manager.py index 8851345..f4a81af 100755 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """ Configuration Manager for Cortex Linux Handles export/import of system state for reproducibility. From 9205eddadab3c8aa7c7fa6e7911571a66f9f9f83 Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 17:46:45 +0000 Subject: [PATCH 13/16] Refactor main() function to reduce cognitive complexity - Extract argument parser setup to _setup_argument_parser() - Extract command handlers to separate functions (_handle_export_command, _handle_import_command, _handle_diff_command) - Extract result printing to _print_dry_run_results() and _print_import_results() - Extract package list printing to _print_package_list() - Reduce main() cognitive complexity from 73 to <15 --- src/config_manager.py | 218 ++++++++++++++++++++++++------------------ 1 file changed, 127 insertions(+), 91 deletions(-) diff --git a/src/config_manager.py b/src/config_manager.py index f4a81af..d9eabcb 100755 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -795,10 +795,9 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: return self._install_direct(name, version or None, source) -def main(): - """CLI entry point for configuration manager.""" +def _setup_argument_parser(): + """Create and configure argument parser for CLI.""" import argparse - import sys parser = argparse.ArgumentParser(description='Cortex Configuration Manager') subparsers = parser.add_subparsers(dest='command', help='Command to execute') @@ -829,6 +828,128 @@ def main(): diff_parser = subparsers.add_parser('diff', help='Show configuration differences') diff_parser.add_argument('config_file', help='Configuration file to compare') + return parser + + +def _print_package_list(packages: List[Dict[str, Any]], max_display: int = 5) -> None: + """Print a list of packages with optional truncation.""" + for pkg in packages[:max_display]: + if 'current_version' in pkg: + print(f" - {pkg['name']} ({pkg.get('current_version')} → {pkg['version']})") + else: + print(f" - {pkg['name']} ({pkg['source']})") + + if len(packages) > max_display: + print(f" ... and {len(packages) - max_display} more") + + +def _print_dry_run_results(result: Dict[str, Any]) -> None: + """Print dry-run results in a formatted manner.""" + print("\n🔍 Dry-run results:\n") + diff = result['diff'] + + if diff['packages_to_install']: + print(f"📦 Packages to install: {len(diff['packages_to_install'])}") + _print_package_list(diff['packages_to_install']) + + if diff['packages_to_upgrade']: + print(f"\n⬆️ Packages to upgrade: {len(diff['packages_to_upgrade'])}") + _print_package_list(diff['packages_to_upgrade']) + + if diff['preferences_changed']: + print(f"\n⚙️ Preferences to change: {len(diff['preferences_changed'])}") + for key in diff['preferences_changed']: + print(f" - {key}") + + if diff['warnings']: + print("\n⚠️ Warnings:") + for warning in diff['warnings']: + print(f" {warning}") + + print(f"\n{result['message']}") + + +def _print_import_results(result: Dict[str, Any]) -> None: + """Print import results in a formatted manner.""" + print("\n✅ Import completed:\n") + + if result['installed']: + print(f"📦 Installed: {len(result['installed'])} packages") + if result['upgraded']: + print(f"⬆️ Upgraded: {len(result['upgraded'])} packages") + if result.get('downgraded'): + print(f"⬇️ Downgraded: {len(result['downgraded'])} packages") + if result['failed']: + print(f"❌ Failed: {len(result['failed'])} packages") + for pkg in result['failed']: + print(f" - {pkg}") + if result['preferences_updated']: + print("⚙️ Preferences updated") + + +def _handle_export_command(manager: 'ConfigManager', args) -> None: + """Handle the export command.""" + include_hardware = args.include_hardware + include_preferences = not args.no_preferences + + if args.packages_only: + include_hardware = False + include_preferences = False + + message = manager.export_configuration( + output_path=args.output, + include_hardware=include_hardware, + include_preferences=include_preferences + ) + print(message) + + +def _handle_import_command(manager: 'ConfigManager', args) -> None: + """Handle the import command.""" + selective = None + if args.packages_only: + selective = ['packages'] + elif args.preferences_only: + selective = ['preferences'] + + result = manager.import_configuration( + config_path=args.config_file, + dry_run=args.dry_run, + selective=selective, + force=args.force + ) + + if args.dry_run: + _print_dry_run_results(result) + else: + _print_import_results(result) + + +def _handle_diff_command(manager: 'ConfigManager', args) -> None: + """Handle the diff command.""" + with open(args.config_file, 'r') as f: + config = yaml.safe_load(f) + + diff = manager.diff_configuration(config) + + print("\n📊 Configuration Differences:\n") + print(f"Packages to install: {len(diff['packages_to_install'])}") + print(f"Packages to upgrade: {len(diff['packages_to_upgrade'])}") + print(f"Packages to downgrade: {len(diff['packages_to_downgrade'])}") + print(f"Packages already installed: {len(diff['packages_already_installed'])}") + print(f"Preferences changed: {len(diff['preferences_changed'])}") + + if diff['warnings']: + print("\n⚠️ Warnings:") + for warning in diff['warnings']: + print(f" {warning}") + + +def main(): + """CLI entry point for configuration manager.""" + import sys + + parser = _setup_argument_parser() args = parser.parse_args() if not args.command: @@ -839,96 +960,11 @@ def main(): try: if args.command == 'export': - include_hardware = args.include_hardware - include_preferences = not args.no_preferences - - if args.packages_only: - include_hardware = False - include_preferences = False - - message = manager.export_configuration( - output_path=args.output, - include_hardware=include_hardware, - include_preferences=include_preferences - ) - print(message) - + _handle_export_command(manager, args) elif args.command == 'import': - selective = None - if args.packages_only: - selective = ['packages'] - elif args.preferences_only: - selective = ['preferences'] - - result = manager.import_configuration( - config_path=args.config_file, - dry_run=args.dry_run, - selective=selective, - force=args.force - ) - - if args.dry_run: - print("\n🔍 Dry-run results:\n") - diff = result['diff'] - - if diff['packages_to_install']: - print(f"📦 Packages to install: {len(diff['packages_to_install'])}") - for pkg in diff['packages_to_install'][:5]: - print(f" - {pkg['name']} ({pkg['source']})") - if len(diff['packages_to_install']) > 5: - print(f" ... and {len(diff['packages_to_install']) - 5} more") - - if diff['packages_to_upgrade']: - print(f"\n⬆️ Packages to upgrade: {len(diff['packages_to_upgrade'])}") - for pkg in diff['packages_to_upgrade'][:5]: - print(f" - {pkg['name']} ({pkg.get('current_version')} → {pkg['version']})") - if len(diff['packages_to_upgrade']) > 5: - print(f" ... and {len(diff['packages_to_upgrade']) - 5} more") - - if diff['preferences_changed']: - print(f"\n⚙️ Preferences to change: {len(diff['preferences_changed'])}") - for key in diff['preferences_changed']: - print(f" - {key}") - - if diff['warnings']: - print("\n⚠️ Warnings:") - for warning in diff['warnings']: - print(f" {warning}") - - print(f"\n{result['message']}") - else: - print("\n✅ Import completed:\n") - if result['installed']: - print(f"📦 Installed: {len(result['installed'])} packages") - if result['upgraded']: - print(f"⬆️ Upgraded: {len(result['upgraded'])} packages") - if result.get('downgraded'): - print(f"⬇️ Downgraded: {len(result['downgraded'])} packages") - if result['failed']: - print(f"❌ Failed: {len(result['failed'])} packages") - for pkg in result['failed']: - print(f" - {pkg}") - if result['preferences_updated']: - print("⚙️ Preferences updated") - + _handle_import_command(manager, args) elif args.command == 'diff': - with open(args.config_file, 'r') as f: - config = yaml.safe_load(f) - - diff = manager.diff_configuration(config) - - print("\n📊 Configuration Differences:\n") - print(f"Packages to install: {len(diff['packages_to_install'])}") - print(f"Packages to upgrade: {len(diff['packages_to_upgrade'])}") - print(f"Packages to downgrade: {len(diff['packages_to_downgrade'])}") - print(f"Packages already installed: {len(diff['packages_already_installed'])}") - print(f"Preferences changed: {len(diff['preferences_changed'])}") - - if diff['warnings']: - print("\n⚠️ Warnings:") - for warning in diff['warnings']: - print(f" {warning}") - + _handle_diff_command(manager, args) except Exception as e: print(f"❌ Error: {e}", file=sys.stderr) sys.exit(1) From a3209975bb8f9fda71dd3f9f61a31460769e311e Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 19:07:34 +0000 Subject: [PATCH 14/16] Add proper type annotations and harden package identifier validation - Add ClassVar to typing imports - Annotate DEFAULT_SOURCES as ClassVar[List[str]] - Fix function signatures to use Optional[List[str]] for Python 3.8 compatibility - Harden _validate_package_identifier with allow_slash parameter - Explicitly reject path-like inputs (./pkg, ../pkg, /abs, ~user, .., /.) - Only allow slash for NPM scoped packages when allow_slash=True - Update call sites to pass allow_slash=True only for NPM package names --- src/config_manager.py | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/config_manager.py b/src/config_manager.py index d9eabcb..c8c8f9c 100755 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -10,7 +10,7 @@ import yaml import subprocess import re -from typing import Dict, List, Optional, Any, Tuple +from typing import Dict, List, Optional, Any, Tuple, ClassVar from datetime import datetime from pathlib import Path @@ -38,7 +38,7 @@ class ConfigManager: SOURCE_APT = 'apt' SOURCE_PIP = 'pip' SOURCE_NPM = 'npm' - DEFAULT_SOURCES = [SOURCE_APT, SOURCE_PIP, SOURCE_NPM] + DEFAULT_SOURCES: ClassVar[List[str]] = [SOURCE_APT, SOURCE_PIP, SOURCE_NPM] def __init__(self, sandbox_executor=None): """ @@ -154,7 +154,7 @@ def detect_npm_packages(self) -> List[Dict[str, Any]]: return packages - def detect_installed_packages(self, sources: List[str] = None) -> List[Dict[str, Any]]: + def detect_installed_packages(self, sources: Optional[List[str]] = None) -> List[Dict[str, Any]]: """ Detect all installed packages from specified sources. @@ -251,7 +251,7 @@ def export_configuration(self, output_path: str, include_hardware: bool = True, include_preferences: bool = True, - package_sources: List[str] = None) -> str: + package_sources: Optional[List[str]] = None) -> str: """ Export current system configuration to YAML file. @@ -697,23 +697,34 @@ def _import_preferences(self, config: Dict[str, Any], summary: Dict[str, Any]) - except Exception as e: summary['failed'].append(f"preferences ({str(e)})") - def _validate_package_identifier(self, identifier: str) -> bool: + def _validate_package_identifier(self, identifier: str, allow_slash: bool = False) -> bool: """ Validate package name or version contains only safe characters. Prevents command injection by ensuring package identifiers only contain alphanumeric characters and common package naming characters. - Supports NPM scoped packages (@scope/package) while blocking path traversal. + Supports NPM scoped packages (@scope/package) when allow_slash=True. Args: identifier: Package name or version string to validate + allow_slash: Whether to allow a single slash (for NPM scoped packages) Returns: bool: True if identifier is safe, False otherwise """ - # Allow standard characters plus exactly one forward slash for NPM scoped packages - # This prevents path traversal (../, ../../, etc.) while allowing @scope/package - return bool(re.match(r'^[a-zA-Z0-9._:@=+\-]+(/[a-zA-Z0-9._\-]+)?$', identifier)) + # Reject path-like patterns immediately + if identifier.startswith('.') or identifier.startswith('/') or identifier.startswith('~'): + return False + if '..' in identifier or '/.' in identifier: + return False + + # Apply character whitelist with optional slash support + if allow_slash: + # Allow exactly one forward slash for NPM scoped packages (@scope/package) + return bool(re.match(r'^[a-zA-Z0-9._:@=+\-]+(/[a-zA-Z0-9._\-]+)?$', identifier)) + else: + # No slashes allowed for versions or non-NPM packages + return bool(re.match(r'^[a-zA-Z0-9._:@=+\-]+$', identifier)) def _install_with_sandbox(self, name: str, version: Optional[str], source: str) -> bool: """ @@ -784,9 +795,11 @@ def _install_package(self, pkg: Dict[str, Any]) -> bool: source = pkg['source'] # Validate package identifiers to prevent command injection - if not self._validate_package_identifier(name): + # Allow slash only for NPM package names (for scoped packages like @scope/package) + allow_slash = (source == self.SOURCE_NPM) + if not self._validate_package_identifier(name, allow_slash=allow_slash): return False - if version and not self._validate_package_identifier(version): + if version and not self._validate_package_identifier(version, allow_slash=False): return False if self.sandbox_executor: From bf4c04592b6f9ecb8eafc5c06980c88a76de1931 Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 19:27:50 +0000 Subject: [PATCH 15/16] Enforce ownership and permissions on .cortex directory - Add _enforce_directory_security() method to check and fix directory security - Verify directory ownership matches current process uid/gid - Attempt to fix ownership with os.chown() if privileges allow - Enforce mode 0o700 (owner-only access) with os.chmod() - Verify chmod succeeded by re-checking actual permissions - Raise PermissionError with detailed message if security invariants fail - Run security checks even when directory pre-exists - Prevents security vulnerabilities from incorrect directory permissions --- src/config_manager.py | 55 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/src/config_manager.py b/src/config_manager.py index c8c8f9c..b466c7b 100755 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -46,13 +46,66 @@ def __init__(self, sandbox_executor=None): Args: sandbox_executor: Optional SandboxExecutor instance for safe command execution + + Raises: + PermissionError: If directory ownership or permissions cannot be secured """ self.sandbox_executor = sandbox_executor self.cortex_dir = Path.home() / '.cortex' self.preferences_file = self.cortex_dir / 'preferences.yaml' - # Ensure .cortex directory exists + # Ensure .cortex directory exists with secure permissions self.cortex_dir.mkdir(mode=0o700, exist_ok=True) + self._enforce_directory_security(self.cortex_dir) + + def _enforce_directory_security(self, directory: Path) -> None: + """ + Enforce ownership and permission security on a directory. + + Ensures the directory is owned by the current user and has mode 0o700 + (read/write/execute for owner only). + + Args: + directory: Path to the directory to secure + + Raises: + PermissionError: If ownership or permissions cannot be secured + """ + try: + # Get directory statistics + stat_info = directory.stat() + current_uid = os.getuid() + current_gid = os.getgid() + + # Check and fix ownership if needed + if stat_info.st_uid != current_uid or stat_info.st_gid != current_gid: + try: + os.chown(directory, current_uid, current_gid) + except PermissionError: + raise PermissionError( + f"Directory {directory} is owned by uid={stat_info.st_uid}, " + f"gid={stat_info.st_gid}, but process is running as uid={current_uid}, " + f"gid={current_gid}. Insufficient privileges to change ownership." + ) + + # Enforce mode 0o700 + os.chmod(directory, 0o700) + + # Verify the chmod succeeded + stat_info = directory.stat() + actual_mode = stat_info.st_mode & 0o777 + if actual_mode != 0o700: + raise PermissionError( + f"Failed to set secure permissions on {directory}. " + f"Expected mode 0o700, but actual mode is {oct(actual_mode)}. " + f"Security invariant failed." + ) + except OSError as e: + if isinstance(e, PermissionError): + raise + raise PermissionError( + f"Failed to enforce security on {directory}: {e}" + ) def detect_apt_packages(self) -> List[Dict[str, Any]]: """ From d2597cffc3f8df7a4b20c472158edad5989d09b8 Mon Sep 17 00:00:00 2001 From: Danish Irfan <44131991+danishirfan21@users.noreply.github.com> Date: Wed, 19 Nov 2025 19:38:40 +0000 Subject: [PATCH 16/16] Add downgrade display to dry-run output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Display packages_to_downgrade section in _print_dry_run_results() - Show downgrade count and package list consistently with install/upgrade - Use ⬇️ emoji for visual consistency with import results --- src/config_manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/config_manager.py b/src/config_manager.py index b466c7b..ff6e91c 100755 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -922,6 +922,10 @@ def _print_dry_run_results(result: Dict[str, Any]) -> None: print(f"\n⬆️ Packages to upgrade: {len(diff['packages_to_upgrade'])}") _print_package_list(diff['packages_to_upgrade']) + if diff['packages_to_downgrade']: + print(f"\n⬇️ Packages to downgrade: {len(diff['packages_to_downgrade'])}") + _print_package_list(diff['packages_to_downgrade']) + if diff['preferences_changed']: print(f"\n⚙️ Preferences to change: {len(diff['preferences_changed'])}") for key in diff['preferences_changed']: