From bfac19cce26a2d44369f93d45c1f532677bbe0b2 Mon Sep 17 00:00:00 2001 From: yaya1738 <65517364+yaya1738@users.noreply.github.com> Date: Thu, 4 Dec 2025 09:14:23 +0200 Subject: [PATCH] feat: Add requirements importer for multi-language dependency files (#126) Implements package import functionality for: - requirements.txt (Python/pip) - package.json (Node.js/npm) - Gemfile (Ruby/bundler) - Cargo.toml (Rust/cargo) - go.mod (Go modules) Features: - Auto-detection of requirements files - Dry-run mode for preview - Dev dependency support - Unified CLI interface - 50+ unit tests --- README_IMPORT.md | 235 +++++++++ cortex/import_cli.py | 216 ++++++++ cortex/requirements_importer.py | 648 +++++++++++++++++++++++ tests/test_requirements_importer.py | 762 ++++++++++++++++++++++++++++ 4 files changed, 1861 insertions(+) create mode 100644 README_IMPORT.md create mode 100644 cortex/import_cli.py create mode 100644 cortex/requirements_importer.py create mode 100644 tests/test_requirements_importer.py diff --git a/README_IMPORT.md b/README_IMPORT.md new file mode 100644 index 0000000..3701f67 --- /dev/null +++ b/README_IMPORT.md @@ -0,0 +1,235 @@ +# Cortex Import - Package Requirements Importer + +Import dependencies from various package manager requirement files. + +## Quick Start + +```bash +# Import from a single file +cortex import requirements.txt +cortex import package.json + +# Import from all detected files +cortex import --all + +# Preview what would be installed +cortex import --dry-run requirements.txt + +# Include dev dependencies +cortex import --dev package.json +``` + +## Supported File Formats + +| File | Package Manager | Language | +|------|-----------------|----------| +| `requirements.txt` | pip | Python | +| `package.json` | npm | Node.js | +| `Gemfile` | bundler | Ruby | +| `Cargo.toml` | cargo | Rust | +| `go.mod` | go | Go | + +## Commands + +### Import Single File + +```bash +cortex import + +# Examples: +cortex import requirements.txt +cortex import package.json +cortex import Gemfile +cortex import Cargo.toml +cortex import go.mod +``` + +### Import All Detected Files + +```bash +cortex import --all + +# From a specific directory +cortex import --all --dir /path/to/project +``` + +### Detect Files Only + +```bash +cortex import --detect + +# Output: +# Detected requirements files: +# - requirements.txt +# - package.json +``` + +### Dry Run (Preview) + +```bash +cortex import --dry-run requirements.txt + +# Output: +# Parsing requirements.txt (pip)... +# Dependencies (3): +# - flask (2.0.1) +# - requests (2.28.0) +# - django +# Installing... +# Success: [DRY RUN] Would install 3 packages via pip +``` + +### Include Dev Dependencies + +```bash +cortex import --dev package.json +``` + +## Options + +| Option | Short | Description | +|--------|-------|-------------| +| `--all` | `-a` | Import from all detected files | +| `--detect` | `-d` | Detect files without importing | +| `--dry-run` | `-n` | Preview without installing | +| `--dev` | | Include dev dependencies | +| `--verbose` | `-v` | Show detailed output | +| `--dir` | | Directory to search (default: `.`) | + +## File Format Examples + +### requirements.txt (Python) + +```txt +flask==2.0.1 +requests>=2.28.0 +django~=4.0 +celery[redis] +# Comments are ignored +``` + +Supported specifiers: +- Exact: `package==1.0.0` +- Range: `package>=1.0,<2.0` +- Compatible: `package~=1.0` +- Extras: `package[extra1,extra2]` +- Environment markers: `package; python_version >= "3.8"` + +### package.json (Node.js) + +```json +{ + "dependencies": { + "express": "^4.18.0", + "lodash": "~4.17.21" + }, + "devDependencies": { + "jest": "^29.0.0", + "eslint": "^8.0.0" + } +} +``` + +### Gemfile (Ruby) + +```ruby +source 'https://rubygems.org' + +gem 'rails', '7.0.0' +gem 'pg', '>= 0.18', '< 2.0' + +group :development do + gem 'rubocop' +end + +group :test do + gem 'rspec', '3.12.0' +end +``` + +### Cargo.toml (Rust) + +```toml +[package] +name = "myproject" +version = "0.1.0" + +[dependencies] +serde = "1.0" +tokio = { version = "1.28", features = ["full"] } + +[dev-dependencies] +criterion = "0.5" +``` + +### go.mod (Go) + +```go +module example.com/myproject + +go 1.21 + +require ( + github.com/gin-gonic/gin v1.9.1 + github.com/spf13/cobra v1.7.0 +) +``` + +Indirect dependencies (marked with `// indirect`) are skipped. + +## Python API + +```python +from cortex.requirements_importer import ( + RequirementsImporter, + RequirementsParser, + PackageJsonParser, +) + +# Create importer +importer = RequirementsImporter(dry_run=True, verbose=True) + +# Detect files in current directory +files = importer.detect_files('.') +print(f"Found: {files}") + +# Parse a single file +result = importer.parse_file('requirements.txt') +print(f"Dependencies: {len(result.dependencies)}") +print(f"Package manager: {result.package_manager.value}") + +# Install +success, message = importer.install(result, include_dev=True) +print(f"Install: {message}") + +# Get summary +print(importer.summary()) +``` + +## Exit Codes + +| Code | Description | +|------|-------------| +| 0 | Success | +| 1 | Error (file not found, parse error, install failed) | + +## Requirements + +- Python 3.8+ +- Package managers must be installed for installation: + - pip (Python) - usually included with Python + - npm (Node.js) - install Node.js + - bundler (Ruby) - `gem install bundler` + - cargo (Rust) - install Rust via rustup + - go (Go) - install Go + +## Files + +- `cortex/requirements_importer.py` - Core implementation +- `cortex/import_cli.py` - CLI commands +- `tests/test_requirements_importer.py` - Unit tests (50+ tests) +- `README_IMPORT.md` - This documentation + +## Related Issue + +- [#126 Package Import from Requirements Files](https://github.com/cortexlinux/cortex/issues/126) diff --git a/cortex/import_cli.py b/cortex/import_cli.py new file mode 100644 index 0000000..c35496e --- /dev/null +++ b/cortex/import_cli.py @@ -0,0 +1,216 @@ +""" +CLI commands for importing dependencies from requirements files. + +Usage: + cortex import requirements.txt + cortex import package.json + cortex import --all + cortex import --detect +""" + +import argparse +import sys +from pathlib import Path +from typing import List, Optional + +from cortex.requirements_importer import ( + RequirementsImporter, + PackageManager, +) + + +def create_import_parser(subparsers: argparse._SubParsersAction) -> argparse.ArgumentParser: + """Create the import subcommand parser.""" + import_parser = subparsers.add_parser( + 'import', + help='Import dependencies from requirements files', + description='Parse and install dependencies from requirements.txt, package.json, ' + 'Gemfile, Cargo.toml, or go.mod files.' + ) + + import_parser.add_argument( + 'file', + nargs='?', + help='Requirements file to import (e.g., requirements.txt, package.json)' + ) + + import_parser.add_argument( + '--all', '-a', + action='store_true', + dest='import_all', + help='Import from all detected requirements files' + ) + + import_parser.add_argument( + '--detect', '-d', + action='store_true', + help='Detect requirements files without importing' + ) + + import_parser.add_argument( + '--dry-run', '-n', + action='store_true', + help='Show what would be installed without actually installing' + ) + + import_parser.add_argument( + '--dev', + action='store_true', + help='Include dev dependencies' + ) + + import_parser.add_argument( + '--verbose', '-v', + action='store_true', + help='Show detailed output' + ) + + import_parser.add_argument( + '--dir', + type=str, + default='.', + help='Directory to search for requirements files (default: current directory)' + ) + + import_parser.set_defaults(func=handle_import) + + return import_parser + + +def handle_import(args: argparse.Namespace) -> int: + """Handle the import command.""" + importer = RequirementsImporter( + dry_run=args.dry_run, + verbose=args.verbose + ) + + # Detect mode - just list found files + if args.detect: + files = importer.detect_files(args.dir) + if files: + print("Detected requirements files:") + for f in files: + print(f" - {f}") + return 0 + else: + print("No requirements files detected.") + return 0 + + # Import all mode + if args.import_all: + files = importer.detect_files(args.dir) + if not files: + print("No requirements files detected.") + return 1 + + print(f"Found {len(files)} requirements file(s)") + results = [] + for file_path in files: + result = importer.parse_file(file_path) + results.append(result) + + total_deps = len(result.dependencies) + if args.dev: + total_deps += len(result.dev_dependencies) + + print(f"\n{file_path} ({result.package_manager.value}):") + print(f" Dependencies: {len(result.dependencies)}") + if result.dev_dependencies: + print(f" Dev dependencies: {len(result.dev_dependencies)}") + + if result.errors: + print(f" Warnings: {len(result.errors)}") + for err in result.errors: + print(f" - {err}") + + # Install + success, message = importer.install(result, include_dev=args.dev) + status = "OK" if success else "FAILED" + print(f" Install: [{status}] {message}") + + return 0 + + # Single file mode + if args.file: + file_path = args.file + if not Path(file_path).exists(): + print(f"Error: File not found: {file_path}") + return 1 + + result = importer.parse_file(file_path) + + print(f"\nParsing {file_path} ({result.package_manager.value})...") + + if result.errors: + print("\nWarnings:") + for err in result.errors: + print(f" - {err}") + + print(f"\nDependencies ({len(result.dependencies)}):") + for dep in result.dependencies: + version_str = f" ({dep.version})" if dep.version else "" + print(f" - {dep.name}{version_str}") + + if result.dev_dependencies: + print(f"\nDev dependencies ({len(result.dev_dependencies)}):") + for dep in result.dev_dependencies: + version_str = f" ({dep.version})" if dep.version else "" + print(f" - {dep.name}{version_str}") + + # Install + print("\nInstalling...") + success, message = importer.install(result, include_dev=args.dev) + + if success: + print(f"Success: {message}") + return 0 + else: + print(f"Failed: {message}") + return 1 + + # No file specified + print("Error: Please specify a file or use --all/--detect") + print("\nUsage:") + print(" cortex import requirements.txt") + print(" cortex import package.json") + print(" cortex import --all") + print(" cortex import --detect") + return 1 + + +def main(args: Optional[List[str]] = None) -> int: + """Main entry point for standalone CLI.""" + parser = argparse.ArgumentParser( + prog='cortex-import', + description='Import dependencies from requirements files' + ) + + # Create a temporary subparsers for standalone use + subparsers = parser.add_subparsers(dest='command') + create_import_parser(subparsers) + + # Also support direct usage without 'import' subcommand + parser.add_argument( + 'file', + nargs='?', + help='Requirements file to import' + ) + parser.add_argument('--all', '-a', action='store_true', dest='import_all') + parser.add_argument('--detect', '-d', action='store_true') + parser.add_argument('--dry-run', '-n', action='store_true') + parser.add_argument('--dev', action='store_true') + parser.add_argument('--verbose', '-v', action='store_true') + parser.add_argument('--dir', type=str, default='.') + parser.set_defaults(func=handle_import) + + parsed_args = parser.parse_args(args) + + if hasattr(parsed_args, 'func'): + return parsed_args.func(parsed_args) + + parser.print_help() + return 1 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/cortex/requirements_importer.py b/cortex/requirements_importer.py new file mode 100644 index 0000000..145c6cc --- /dev/null +++ b/cortex/requirements_importer.py @@ -0,0 +1,648 @@ +""" +Requirements Importer - Parse and install dependencies from various package files. + +Supports: +- requirements.txt (Python/pip) +- package.json (Node.js/npm) +- Gemfile (Ruby/bundler) +- Cargo.toml (Rust/cargo) +- go.mod (Go/go modules) +""" + +import json +import os +import re +import subprocess +import sys +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +class PackageManager(Enum): + """Supported package managers.""" + PIP = "pip" + NPM = "npm" + BUNDLER = "bundler" + CARGO = "cargo" + GO = "go" + + +@dataclass +class Dependency: + """Represents a single dependency.""" + name: str + version: Optional[str] = None + extras: List[str] = field(default_factory=list) + markers: Optional[str] = None + source: Optional[str] = None # Original file this came from + + def __str__(self) -> str: + if self.version: + return f"{self.name}=={self.version}" + return self.name + + +@dataclass +class ParseResult: + """Result of parsing a requirements file.""" + dependencies: List[Dependency] + dev_dependencies: List[Dependency] + package_manager: PackageManager + source_file: str + errors: List[str] = field(default_factory=list) + + +class RequirementsParser: + """Parse requirements.txt files (Python/pip).""" + + @staticmethod + def parse(file_path: str) -> ParseResult: + """Parse a requirements.txt file.""" + dependencies = [] + errors = [] + + path = Path(file_path) + if not path.exists(): + return ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.PIP, + source_file=file_path, + errors=[f"File not found: {file_path}"] + ) + + with open(path, 'r') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith('#'): + continue + + # Skip -r (recursive includes) and other flags for now + if line.startswith('-'): + if line.startswith('-r ') or line.startswith('--requirement'): + errors.append(f"Line {line_num}: Recursive includes not supported: {line}") + continue + + try: + dep = RequirementsParser._parse_requirement_line(line) + dep.source = file_path + dependencies.append(dep) + except ValueError as e: + errors.append(f"Line {line_num}: {e}") + + return ParseResult( + dependencies=dependencies, + dev_dependencies=[], + package_manager=PackageManager.PIP, + source_file=file_path, + errors=errors + ) + + @staticmethod + def _parse_requirement_line(line: str) -> Dependency: + """Parse a single requirement line.""" + # Remove inline comments + if ' #' in line: + line = line.split(' #')[0].strip() + + # Handle environment markers (e.g., ; python_version >= "3.8") + markers = None + if ';' in line: + line, markers = line.split(';', 1) + line = line.strip() + markers = markers.strip() + + # Handle extras (e.g., package[extra1,extra2]) + extras = [] + extras_match = re.match(r'^([a-zA-Z0-9_-]+)\[([^\]]+)\](.*)$', line) + if extras_match: + name = extras_match.group(1) + extras = [e.strip() for e in extras_match.group(2).split(',')] + remainder = extras_match.group(3) + else: + # Parse name and version specifier + # Supports: ==, >=, <=, >, <, ~=, != + match = re.match(r'^([a-zA-Z0-9_.-]+)\s*(.*)?$', line) + if not match: + raise ValueError(f"Invalid requirement: {line}") + name = match.group(1) + remainder = match.group(2) or '' + + # Extract version from remainder + version = None + if remainder: + # Handle exact version (==) + if '==' in remainder: + version = remainder.split('==')[1].strip() + # For other specifiers, store the whole thing + elif any(op in remainder for op in ['>=', '<=', '>', '<', '~=', '!=']): + version = remainder.strip() + + return Dependency( + name=name, + version=version, + extras=extras, + markers=markers + ) + + +class PackageJsonParser: + """Parse package.json files (Node.js/npm).""" + + @staticmethod + def parse(file_path: str) -> ParseResult: + """Parse a package.json file.""" + dependencies = [] + dev_dependencies = [] + errors = [] + + path = Path(file_path) + if not path.exists(): + return ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.NPM, + source_file=file_path, + errors=[f"File not found: {file_path}"] + ) + + try: + with open(path, 'r') as f: + data = json.load(f) + except json.JSONDecodeError as e: + return ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.NPM, + source_file=file_path, + errors=[f"Invalid JSON: {e}"] + ) + + # Parse dependencies + if 'dependencies' in data: + for name, version in data['dependencies'].items(): + dep = Dependency( + name=name, + version=PackageJsonParser._normalize_version(version), + source=file_path + ) + dependencies.append(dep) + + # Parse devDependencies + if 'devDependencies' in data: + for name, version in data['devDependencies'].items(): + dep = Dependency( + name=name, + version=PackageJsonParser._normalize_version(version), + source=file_path + ) + dev_dependencies.append(dep) + + return ParseResult( + dependencies=dependencies, + dev_dependencies=dev_dependencies, + package_manager=PackageManager.NPM, + source_file=file_path, + errors=errors + ) + + @staticmethod + def _normalize_version(version: str) -> str: + """Normalize npm version specifier.""" + # Remove ^ and ~ prefixes for exact matching + # Keep as-is for installation + return version + + +class GemfileParser: + """Parse Gemfile files (Ruby/bundler).""" + + @staticmethod + def parse(file_path: str) -> ParseResult: + """Parse a Gemfile.""" + dependencies = [] + dev_dependencies = [] + errors = [] + current_group = None + + path = Path(file_path) + if not path.exists(): + return ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.BUNDLER, + source_file=file_path, + errors=[f"File not found: {file_path}"] + ) + + with open(path, 'r') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith('#'): + continue + + # Track groups + group_match = re.match(r'^group\s+:(\w+)', line) + if group_match: + current_group = group_match.group(1) + continue + + if line == 'end': + current_group = None + continue + + # Parse gem declarations + gem_match = re.match(r"^gem\s+['\"]([^'\"]+)['\"](?:,\s*['\"]([^'\"]+)['\"])?", line) + if gem_match: + name = gem_match.group(1) + version = gem_match.group(2) + + dep = Dependency( + name=name, + version=version, + source=file_path + ) + + # Assign to dev or regular dependencies based on group + if current_group in ('development', 'test'): + dev_dependencies.append(dep) + else: + dependencies.append(dep) + + return ParseResult( + dependencies=dependencies, + dev_dependencies=dev_dependencies, + package_manager=PackageManager.BUNDLER, + source_file=file_path, + errors=errors + ) + + +class CargoTomlParser: + """Parse Cargo.toml files (Rust/cargo).""" + + @staticmethod + def parse(file_path: str) -> ParseResult: + """Parse a Cargo.toml file.""" + dependencies = [] + dev_dependencies = [] + errors = [] + + path = Path(file_path) + if not path.exists(): + return ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.CARGO, + source_file=file_path, + errors=[f"File not found: {file_path}"] + ) + + try: + # Simple TOML parser for Cargo.toml + content = path.read_text() + + # Parse [dependencies] section + deps = CargoTomlParser._parse_section(content, 'dependencies') + for name, version in deps.items(): + dependencies.append(Dependency( + name=name, + version=version, + source=file_path + )) + + # Parse [dev-dependencies] section + dev_deps = CargoTomlParser._parse_section(content, 'dev-dependencies') + for name, version in dev_deps.items(): + dev_dependencies.append(Dependency( + name=name, + version=version, + source=file_path + )) + + except Exception as e: + errors.append(f"Error parsing Cargo.toml: {e}") + + return ParseResult( + dependencies=dependencies, + dev_dependencies=dev_dependencies, + package_manager=PackageManager.CARGO, + source_file=file_path, + errors=errors + ) + + @staticmethod + def _parse_section(content: str, section: str) -> Dict[str, str]: + """Parse a TOML section for dependencies.""" + result = {} + + # Find the section + section_pattern = rf'^\[{re.escape(section)}\]' + section_match = re.search(section_pattern, content, re.MULTILINE) + if not section_match: + return result + + # Extract content until next section or end + start = section_match.end() + next_section = re.search(r'^\[', content[start:], re.MULTILINE) + if next_section: + section_content = content[start:start + next_section.start()] + else: + section_content = content[start:] + + # Parse key-value pairs + for line in section_content.split('\n'): + line = line.strip() + if not line or line.startswith('#'): + continue + + # Simple format: name = "version" + simple_match = re.match(r'^([a-zA-Z0-9_-]+)\s*=\s*"([^"]+)"', line) + if simple_match: + result[simple_match.group(1)] = simple_match.group(2) + continue + + # Complex format: name = { version = "x", features = [...] } + complex_match = re.match(r'^([a-zA-Z0-9_-]+)\s*=\s*\{.*version\s*=\s*"([^"]+)"', line) + if complex_match: + result[complex_match.group(1)] = complex_match.group(2) + + return result + + +class GoModParser: + """Parse go.mod files (Go modules).""" + + @staticmethod + def parse(file_path: str) -> ParseResult: + """Parse a go.mod file.""" + dependencies = [] + errors = [] + + path = Path(file_path) + if not path.exists(): + return ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.GO, + source_file=file_path, + errors=[f"File not found: {file_path}"] + ) + + in_require_block = False + + with open(path, 'r') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith('//'): + continue + + # Track require blocks + if line == 'require (': + in_require_block = True + continue + + if line == ')': + in_require_block = False + continue + + # Parse single-line require + single_require = re.match(r'^require\s+(\S+)\s+(\S+)', line) + if single_require: + dependencies.append(Dependency( + name=single_require.group(1), + version=single_require.group(2), + source=file_path + )) + continue + + # Parse require block entries + if in_require_block: + # Format: module/path v1.2.3 + parts = line.split() + if len(parts) >= 2: + # Skip indirect dependencies + if '// indirect' not in line: + dependencies.append(Dependency( + name=parts[0], + version=parts[1], + source=file_path + )) + + return ParseResult( + dependencies=dependencies, + dev_dependencies=[], + package_manager=PackageManager.GO, + source_file=file_path, + errors=errors + ) + + +class RequirementsImporter: + """Main class for importing and installing dependencies.""" + + # Mapping of file names to parsers + PARSERS = { + 'requirements.txt': RequirementsParser, + 'requirements-dev.txt': RequirementsParser, + 'requirements_dev.txt': RequirementsParser, + 'package.json': PackageJsonParser, + 'Gemfile': GemfileParser, + 'Cargo.toml': CargoTomlParser, + 'go.mod': GoModParser, + } + + def __init__(self, dry_run: bool = False, verbose: bool = False): + self.dry_run = dry_run + self.verbose = verbose + self.results: List[ParseResult] = [] + + def detect_files(self, directory: str = '.') -> List[str]: + """Detect supported requirements files in a directory.""" + detected = [] + dir_path = Path(directory) + + for filename in self.PARSERS.keys(): + file_path = dir_path / filename + if file_path.exists(): + detected.append(str(file_path)) + + return detected + + def parse_file(self, file_path: str) -> ParseResult: + """Parse a single requirements file.""" + filename = Path(file_path).name + + # Find appropriate parser + parser = None + for pattern, parser_class in self.PARSERS.items(): + if filename == pattern or filename.startswith('requirements'): + if filename.endswith('.txt'): + parser = RequirementsParser + else: + parser = parser_class + break + elif filename == pattern: + parser = parser_class + break + + if parser is None: + # Try to guess by extension/content + if file_path.endswith('.txt'): + parser = RequirementsParser + elif file_path.endswith('.json'): + parser = PackageJsonParser + elif file_path.endswith('.toml'): + parser = CargoTomlParser + elif 'Gemfile' in file_path: + parser = GemfileParser + elif file_path.endswith('.mod'): + parser = GoModParser + else: + return ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.PIP, + source_file=file_path, + errors=[f"Unknown file type: {file_path}"] + ) + + result = parser.parse(file_path) + self.results.append(result) + return result + + def parse_all(self, directory: str = '.') -> List[ParseResult]: + """Parse all detected requirements files in a directory.""" + files = self.detect_files(directory) + results = [] + + for file_path in files: + result = self.parse_file(file_path) + results.append(result) + + return results + + def install(self, result: ParseResult, include_dev: bool = False) -> Tuple[bool, str]: + """Install dependencies from a parse result.""" + if self.dry_run: + deps = result.dependencies + (result.dev_dependencies if include_dev else []) + return True, f"[DRY RUN] Would install {len(deps)} packages via {result.package_manager.value}" + + pm = result.package_manager + deps = result.dependencies + (result.dev_dependencies if include_dev else []) + + if not deps: + return True, "No dependencies to install" + + try: + if pm == PackageManager.PIP: + return self._install_pip(deps) + elif pm == PackageManager.NPM: + return self._install_npm(deps) + elif pm == PackageManager.BUNDLER: + return self._install_bundler(result.source_file) + elif pm == PackageManager.CARGO: + return self._install_cargo(result.source_file) + elif pm == PackageManager.GO: + return self._install_go(result.source_file) + else: + return False, f"Unsupported package manager: {pm}" + except Exception as e: + return False, str(e) + + def _install_pip(self, deps: List[Dependency]) -> Tuple[bool, str]: + """Install Python packages via pip.""" + packages = [] + for dep in deps: + if dep.version: + packages.append(f"{dep.name}=={dep.version}" if not any( + op in dep.version for op in ['>=', '<=', '>', '<', '~=', '!='] + ) else f"{dep.name}{dep.version}") + else: + packages.append(dep.name) + + cmd = [sys.executable, '-m', 'pip', 'install'] + packages + if self.verbose: + print(f"Running: {' '.join(cmd)}") + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode == 0: + return True, f"Installed {len(packages)} packages" + return False, result.stderr + + def _install_npm(self, deps: List[Dependency]) -> Tuple[bool, str]: + """Install Node.js packages via npm.""" + packages = [f"{dep.name}@{dep.version}" if dep.version else dep.name for dep in deps] + + cmd = ['npm', 'install'] + packages + if self.verbose: + print(f"Running: {' '.join(cmd)}") + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode == 0: + return True, f"Installed {len(packages)} packages" + return False, result.stderr + + def _install_bundler(self, gemfile_path: str) -> Tuple[bool, str]: + """Install Ruby gems via bundler.""" + cmd = ['bundle', 'install'] + if self.verbose: + print(f"Running: {' '.join(cmd)}") + + env = os.environ.copy() + env['BUNDLE_GEMFILE'] = gemfile_path + + result = subprocess.run(cmd, capture_output=True, text=True, env=env) + if result.returncode == 0: + return True, "Bundle install completed" + return False, result.stderr + + def _install_cargo(self, cargo_toml_path: str) -> Tuple[bool, str]: + """Install Rust crates via cargo.""" + # Navigate to directory containing Cargo.toml + cargo_dir = str(Path(cargo_toml_path).parent) + + cmd = ['cargo', 'build'] + if self.verbose: + print(f"Running: {' '.join(cmd)} in {cargo_dir}") + + result = subprocess.run(cmd, capture_output=True, text=True, cwd=cargo_dir) + if result.returncode == 0: + return True, "Cargo build completed" + return False, result.stderr + + def _install_go(self, go_mod_path: str) -> Tuple[bool, str]: + """Install Go modules.""" + go_dir = str(Path(go_mod_path).parent) + + cmd = ['go', 'mod', 'download'] + if self.verbose: + print(f"Running: {' '.join(cmd)} in {go_dir}") + + result = subprocess.run(cmd, capture_output=True, text=True, cwd=go_dir) + if result.returncode == 0: + return True, "Go modules downloaded" + return False, result.stderr + + def summary(self) -> str: + """Generate a summary of all parsed files.""" + lines = ["Requirements Import Summary", "=" * 40] + + for result in self.results: + lines.append(f"\n{result.source_file} ({result.package_manager.value}):") + lines.append(f" Dependencies: {len(result.dependencies)}") + lines.append(f" Dev Dependencies: {len(result.dev_dependencies)}") + if result.errors: + lines.append(f" Errors: {len(result.errors)}") + for err in result.errors: + lines.append(f" - {err}") + + return '\n'.join(lines) diff --git a/tests/test_requirements_importer.py b/tests/test_requirements_importer.py new file mode 100644 index 0000000..2e7be1c --- /dev/null +++ b/tests/test_requirements_importer.py @@ -0,0 +1,762 @@ +""" +Tests for the Requirements Importer module. +""" + +import json +import os +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from cortex.requirements_importer import ( + CargoTomlParser, + Dependency, + GemfileParser, + GoModParser, + PackageJsonParser, + PackageManager, + ParseResult, + RequirementsImporter, + RequirementsParser, +) + + +class TestDependency: + """Tests for the Dependency dataclass.""" + + def test_dependency_str_with_version(self): + dep = Dependency(name="flask", version="2.0.1") + assert str(dep) == "flask==2.0.1" + + def test_dependency_str_without_version(self): + dep = Dependency(name="flask") + assert str(dep) == "flask" + + def test_dependency_with_extras(self): + dep = Dependency(name="requests", version="2.28.0", extras=["security", "socks"]) + assert dep.extras == ["security", "socks"] + + def test_dependency_with_markers(self): + dep = Dependency(name="pywin32", markers='sys_platform == "win32"') + assert dep.markers == 'sys_platform == "win32"' + + +class TestRequirementsParser: + """Tests for Python requirements.txt parser.""" + + def test_parse_simple_requirements(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask\nrequests\ndjango\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert result.package_manager == PackageManager.PIP + assert len(result.dependencies) == 3 + assert result.dependencies[0].name == "flask" + assert result.dependencies[1].name == "requests" + assert result.dependencies[2].name == "django" + + def test_parse_with_versions(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask==2.0.1\nrequests>=2.28.0\ndjango~=4.0\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 3 + assert result.dependencies[0].version == "2.0.1" + assert result.dependencies[1].version == ">=2.28.0" + assert result.dependencies[2].version == "~=4.0" + + def test_parse_with_comments(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("# This is a comment\nflask==2.0.1\n# Another comment\nrequests\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + + def test_parse_with_inline_comments(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask==2.0.1 # web framework\nrequests # HTTP library\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + assert result.dependencies[0].name == "flask" + assert result.dependencies[0].version == "2.0.1" + + def test_parse_with_extras(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("requests[security,socks]\nflask[async]\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + assert result.dependencies[0].name == "requests" + assert result.dependencies[0].extras == ["security", "socks"] + assert result.dependencies[1].extras == ["async"] + + def test_parse_with_markers(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write('pywin32; sys_platform == "win32"\nuvloop; sys_platform != "win32"\n') + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + assert result.dependencies[0].markers == 'sys_platform == "win32"' + + def test_parse_empty_lines(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask\n\n\nrequests\n\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + + def test_parse_file_not_found(self): + result = RequirementsParser.parse("/nonexistent/requirements.txt") + + assert len(result.dependencies) == 0 + assert len(result.errors) == 1 + assert "File not found" in result.errors[0] + + def test_parse_recursive_includes_warning(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask\n-r dev-requirements.txt\nrequests\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + assert len(result.errors) == 1 + assert "Recursive includes not supported" in result.errors[0] + + +class TestPackageJsonParser: + """Tests for Node.js package.json parser.""" + + def test_parse_simple_package_json(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + data = { + "name": "test-project", + "dependencies": { + "express": "^4.18.0", + "lodash": "~4.17.21" + } + } + json.dump(data, f) + f.flush() + + result = PackageJsonParser.parse(f.name) + + os.unlink(f.name) + + assert result.package_manager == PackageManager.NPM + assert len(result.dependencies) == 2 + assert result.dependencies[0].name == "express" + assert result.dependencies[0].version == "^4.18.0" + + def test_parse_with_dev_dependencies(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + data = { + "dependencies": { + "express": "^4.18.0" + }, + "devDependencies": { + "jest": "^29.0.0", + "eslint": "^8.0.0" + } + } + json.dump(data, f) + f.flush() + + result = PackageJsonParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 1 + assert len(result.dev_dependencies) == 2 + assert result.dev_dependencies[0].name == "jest" + + def test_parse_empty_package_json(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump({"name": "empty-project"}, f) + f.flush() + + result = PackageJsonParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 0 + assert len(result.dev_dependencies) == 0 + assert len(result.errors) == 0 + + def test_parse_invalid_json(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + f.write("{ invalid json }") + f.flush() + + result = PackageJsonParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.errors) == 1 + assert "Invalid JSON" in result.errors[0] + + def test_parse_file_not_found(self): + result = PackageJsonParser.parse("/nonexistent/package.json") + + assert len(result.errors) == 1 + assert "File not found" in result.errors[0] + + +class TestGemfileParser: + """Tests for Ruby Gemfile parser.""" + + def test_parse_simple_gemfile(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write("gem 'rails', '7.0.0'\ngem 'pg'\ngem 'puma', '~> 5.0'\n") + f.flush() + + result = GemfileParser.parse(f.name) + + os.unlink(f.name) + + assert result.package_manager == PackageManager.BUNDLER + assert len(result.dependencies) == 3 + assert result.dependencies[0].name == "rails" + assert result.dependencies[0].version == "7.0.0" + assert result.dependencies[1].name == "pg" + assert result.dependencies[1].version is None + + def test_parse_with_groups(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + content = """ +gem 'rails', '7.0.0' + +group :development do + gem 'rubocop' +end + +group :test do + gem 'rspec', '3.12.0' +end + +gem 'pg' +""" + f.write(content) + f.flush() + + result = GemfileParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 # rails, pg + assert len(result.dev_dependencies) == 2 # rubocop, rspec + + def test_parse_with_comments(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write("# This is a Gemfile\ngem 'rails'\n# Another gem\ngem 'pg'\n") + f.flush() + + result = GemfileParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + + def test_parse_double_quotes(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write('gem "rails", "7.0.0"\ngem "pg"\n') + f.flush() + + result = GemfileParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + assert result.dependencies[0].name == "rails" + + def test_parse_file_not_found(self): + result = GemfileParser.parse("/nonexistent/Gemfile") + + assert len(result.errors) == 1 + assert "File not found" in result.errors[0] + + +class TestCargoTomlParser: + """Tests for Rust Cargo.toml parser.""" + + def test_parse_simple_cargo_toml(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + content = """ +[package] +name = "test-project" +version = "0.1.0" + +[dependencies] +serde = "1.0" +tokio = "1.28" +""" + f.write(content) + f.flush() + + result = CargoTomlParser.parse(f.name) + + os.unlink(f.name) + + assert result.package_manager == PackageManager.CARGO + assert len(result.dependencies) == 2 + assert result.dependencies[0].name == "serde" + assert result.dependencies[0].version == "1.0" + + def test_parse_with_dev_dependencies(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + content = """ +[dependencies] +serde = "1.0" + +[dev-dependencies] +criterion = "0.5" +""" + f.write(content) + f.flush() + + result = CargoTomlParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 1 + assert len(result.dev_dependencies) == 1 + assert result.dev_dependencies[0].name == "criterion" + + def test_parse_complex_dependency(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + content = """ +[dependencies] +serde = { version = "1.0", features = ["derive"] } +tokio = "1.28" +""" + f.write(content) + f.flush() + + result = CargoTomlParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 2 + assert result.dependencies[0].name == "serde" + assert result.dependencies[0].version == "1.0" + + def test_parse_no_dependencies(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + content = """ +[package] +name = "test" +version = "0.1.0" +""" + f.write(content) + f.flush() + + result = CargoTomlParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 0 + assert len(result.errors) == 0 + + def test_parse_file_not_found(self): + result = CargoTomlParser.parse("/nonexistent/Cargo.toml") + + assert len(result.errors) == 1 + assert "File not found" in result.errors[0] + + +class TestGoModParser: + """Tests for Go go.mod parser.""" + + def test_parse_simple_go_mod(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.mod', delete=False) as f: + content = """module example.com/myproject + +go 1.21 + +require github.com/gin-gonic/gin v1.9.1 +require github.com/spf13/cobra v1.7.0 +""" + f.write(content) + f.flush() + + result = GoModParser.parse(f.name) + + os.unlink(f.name) + + assert result.package_manager == PackageManager.GO + assert len(result.dependencies) == 2 + assert result.dependencies[0].name == "github.com/gin-gonic/gin" + assert result.dependencies[0].version == "v1.9.1" + + def test_parse_require_block(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.mod', delete=False) as f: + content = """module example.com/myproject + +go 1.21 + +require ( + github.com/gin-gonic/gin v1.9.1 + github.com/spf13/cobra v1.7.0 + github.com/stretchr/testify v1.8.4 +) +""" + f.write(content) + f.flush() + + result = GoModParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 3 + + def test_parse_skip_indirect(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.mod', delete=False) as f: + content = """module example.com/myproject + +go 1.21 + +require ( + github.com/gin-gonic/gin v1.9.1 + golang.org/x/sys v0.8.0 // indirect +) +""" + f.write(content) + f.flush() + + result = GoModParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 1 + assert result.dependencies[0].name == "github.com/gin-gonic/gin" + + def test_parse_with_comments(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.mod', delete=False) as f: + content = """module example.com/myproject + +// This is a comment +go 1.21 + +require github.com/gin-gonic/gin v1.9.1 +""" + f.write(content) + f.flush() + + result = GoModParser.parse(f.name) + + os.unlink(f.name) + + assert len(result.dependencies) == 1 + + def test_parse_file_not_found(self): + result = GoModParser.parse("/nonexistent/go.mod") + + assert len(result.errors) == 1 + assert "File not found" in result.errors[0] + + +class TestRequirementsImporter: + """Tests for the main RequirementsImporter class.""" + + def test_detect_files(self): + with tempfile.TemporaryDirectory() as tmpdir: + # Create test files + (Path(tmpdir) / "requirements.txt").write_text("flask\n") + (Path(tmpdir) / "package.json").write_text('{"dependencies":{}}') + + importer = RequirementsImporter() + detected = importer.detect_files(tmpdir) + + assert len(detected) == 2 + assert any("requirements.txt" in f for f in detected) + assert any("package.json" in f for f in detected) + + def test_detect_no_files(self): + with tempfile.TemporaryDirectory() as tmpdir: + importer = RequirementsImporter() + detected = importer.detect_files(tmpdir) + + assert len(detected) == 0 + + def test_parse_file_auto_detect_txt(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask\nrequests\n") + f.flush() + + importer = RequirementsImporter() + result = importer.parse_file(f.name) + + os.unlink(f.name) + + assert result.package_manager == PackageManager.PIP + assert len(result.dependencies) == 2 + + def test_parse_file_auto_detect_json(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump({"dependencies": {"express": "^4.0.0"}}, f) + f.flush() + + importer = RequirementsImporter() + result = importer.parse_file(f.name) + + os.unlink(f.name) + + assert result.package_manager == PackageManager.NPM + + def test_parse_all(self): + with tempfile.TemporaryDirectory() as tmpdir: + (Path(tmpdir) / "requirements.txt").write_text("flask\ndjango\n") + (Path(tmpdir) / "package.json").write_text( + '{"dependencies":{"express":"^4.0.0"}}' + ) + + importer = RequirementsImporter() + results = importer.parse_all(tmpdir) + + assert len(results) == 2 + + def test_dry_run_install(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask\nrequests\n") + f.flush() + + importer = RequirementsImporter(dry_run=True) + result = importer.parse_file(f.name) + success, message = importer.install(result) + + os.unlink(f.name) + + assert success + assert "[DRY RUN]" in message + assert "2 packages" in message + + def test_summary(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask\nrequests\n") + f.flush() + + importer = RequirementsImporter() + importer.parse_file(f.name) + summary = importer.summary() + + os.unlink(f.name) + + assert "Requirements Import Summary" in summary + assert "Dependencies: 2" in summary + + @patch('cortex.requirements_importer.subprocess.run') + def test_install_pip(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask==2.0.1\nrequests\n") + f.flush() + + importer = RequirementsImporter() + result = importer.parse_file(f.name) + success, message = importer.install(result) + + os.unlink(f.name) + + assert success + mock_run.assert_called_once() + call_args = mock_run.call_args[0][0] + assert 'pip' in ' '.join(call_args) + assert 'install' in call_args + + @patch('cortex.requirements_importer.subprocess.run') + def test_install_npm(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump({"dependencies": {"express": "^4.0.0"}}, f) + f.flush() + + importer = RequirementsImporter() + result = importer.parse_file(f.name) + success, message = importer.install(result) + + os.unlink(f.name) + + assert success + mock_run.assert_called_once() + + @patch('cortex.requirements_importer.subprocess.run') + def test_install_failure(self, mock_run): + mock_run.return_value = MagicMock( + returncode=1, stdout="", stderr="Installation failed" + ) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("nonexistent-package-12345\n") + f.flush() + + importer = RequirementsImporter() + result = importer.parse_file(f.name) + success, message = importer.install(result) + + os.unlink(f.name) + + assert not success + assert "Installation failed" in message + + def test_install_empty_dependencies(self): + importer = RequirementsImporter() + result = ParseResult( + dependencies=[], + dev_dependencies=[], + package_manager=PackageManager.PIP, + source_file="test.txt" + ) + + success, message = importer.install(result) + + assert success + assert "No dependencies" in message + + def test_install_with_dev_dependencies(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + data = { + "dependencies": {"express": "^4.0.0"}, + "devDependencies": {"jest": "^29.0.0"} + } + json.dump(data, f) + f.flush() + + importer = RequirementsImporter(dry_run=True) + result = importer.parse_file(f.name) + success, message = importer.install(result, include_dev=True) + + os.unlink(f.name) + + assert success + assert "2 packages" in message # Both express and jest + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def test_unknown_file_type(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.xyz', delete=False) as f: + f.write("some content") + f.flush() + + importer = RequirementsImporter() + result = importer.parse_file(f.name) + + os.unlink(f.name) + + assert "Unknown file type" in result.errors[0] + + def test_requirements_with_hashes(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask==2.0.1 --hash=sha256:abc123\n") + f.flush() + + result = RequirementsParser.parse(f.name) + + os.unlink(f.name) + + # Should still parse the package (hash is ignored for simplicity) + assert len(result.dependencies) == 1 + + def test_gemfile_complex_syntax(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + content = """ +source 'https://rubygems.org' + +gem 'rails', '7.0.0' +gem 'pg', '>= 0.18', '< 2.0' +""" + f.write(content) + f.flush() + + result = GemfileParser.parse(f.name) + + os.unlink(f.name) + + # Should parse both gems + assert len(result.dependencies) == 2 + + def test_verbose_mode(self, capsys): + importer = RequirementsImporter(verbose=True, dry_run=True) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("flask\n") + f.flush() + + result = importer.parse_file(f.name) + importer.install(result) + + os.unlink(f.name) + # Verbose mode should not crash, dry run prevents actual output + + +class TestIntegration: + """Integration tests with real file structures.""" + + def test_full_workflow(self): + with tempfile.TemporaryDirectory() as tmpdir: + # Create a realistic project structure + (Path(tmpdir) / "requirements.txt").write_text( + "flask==2.0.1\nrequests>=2.28.0\n" + ) + (Path(tmpdir) / "package.json").write_text(json.dumps({ + "name": "test-project", + "dependencies": {"express": "^4.18.0"}, + "devDependencies": {"jest": "^29.0.0"} + })) + + importer = RequirementsImporter(dry_run=True) + + # Detect files + files = importer.detect_files(tmpdir) + assert len(files) == 2 + + # Parse all + results = importer.parse_all(tmpdir) + assert len(results) == 2 + + # Get summary + summary = importer.summary() + assert "pip" in summary.lower() + assert "npm" in summary.lower() + + # Dry run install + for result in results: + success, message = importer.install(result) + assert success + assert "[DRY RUN]" in message