diff --git a/cortex/api_key_detector.py b/cortex/api_key_detector.py new file mode 100644 index 0000000..3f3c1e0 --- /dev/null +++ b/cortex/api_key_detector.py @@ -0,0 +1,346 @@ +"""Auto-detect API keys from common locations. + +This module scans common configuration files and locations to find +API keys for supported LLM providers, making onboarding easier. + +Implements Issue #255: Auto-detect API keys from common locations +""" + +import os +import re +import logging +from pathlib import Path +from typing import Optional, Dict, List, Tuple +from dataclasses import dataclass +from enum import Enum + +logger = logging.getLogger(__name__) + + +class Provider(Enum): + """Supported LLM providers.""" + ANTHROPIC = "anthropic" + OPENAI = "openai" + + +@dataclass +class DetectedKey: + """Represents a detected API key. + + Attributes: + provider: The LLM provider (anthropic, openai) + key: The actual API key value + source: Where the key was found + env_var: The environment variable name for this key + """ + provider: Provider + key: str + source: str + env_var: str + + @property + def masked_key(self) -> str: + """Return a masked version of the key for display.""" + if len(self.key) <= 12: + return "*" * len(self.key) + return f"{self.key[:8]}...{self.key[-4:]}" + + +# Patterns to match API keys in files +KEY_PATTERNS = { + Provider.ANTHROPIC: [ + # Environment variable exports + r'(?:export\s+)?ANTHROPIC_API_KEY\s*=\s*["\']?(sk-ant-[a-zA-Z0-9_-]+)["\']?', + # Direct assignment + r'ANTHROPIC_API_KEY\s*[:=]\s*["\']?(sk-ant-[a-zA-Z0-9_-]+)["\']?', + ], + Provider.OPENAI: [ + # Environment variable exports + r'(?:export\s+)?OPENAI_API_KEY\s*=\s*["\']?(sk-[a-zA-Z0-9_-]+)["\']?', + # Direct assignment + r'OPENAI_API_KEY\s*[:=]\s*["\']?(sk-[a-zA-Z0-9_-]+)["\']?', + ], +} + +# Environment variable names for each provider +ENV_VAR_NAMES = { + Provider.ANTHROPIC: "ANTHROPIC_API_KEY", + Provider.OPENAI: "OPENAI_API_KEY", +} + +# Common locations to search for API keys +SEARCH_LOCATIONS = [ + # Shell configuration files + "~/.bashrc", + "~/.bash_profile", + "~/.zshrc", + "~/.zprofile", + "~/.profile", + # Environment files + "~/.env", + "./.env", + "./.env.local", + # Config directories + "~/.config/cortex/.env", + "~/.config/cortex/config", + "~/.cortex/.env", + "~/.cortex/config", + # Project-specific + "./cortex.env", +] + + +class APIKeyDetector: + """Detects API keys from various sources.""" + + def __init__(self, additional_paths: Optional[List[str]] = None): + """Initialize the detector. + + Args: + additional_paths: Extra file paths to search + """ + self.search_paths = [Path(p).expanduser() for p in SEARCH_LOCATIONS] + if additional_paths: + self.search_paths.extend([Path(p).expanduser() for p in additional_paths]) + + def _extract_key_from_content( + self, + content: str, + provider: Provider + ) -> Optional[str]: + """Extract API key from file content. + + Args: + content: File content to search + provider: Provider to search for + + Returns: + API key if found, None otherwise + """ + for pattern in KEY_PATTERNS[provider]: + match = re.search(pattern, content, re.MULTILINE) + if match: + return match.group(1) + return None + + def _search_file(self, filepath: Path) -> List[DetectedKey]: + """Search a single file for API keys. + + Args: + filepath: Path to the file to search + + Returns: + List of detected keys + """ + detected = [] + + if not filepath.exists() or not filepath.is_file(): + return detected + + try: + content = filepath.read_text(encoding='utf-8', errors='ignore') + + for provider in Provider: + key = self._extract_key_from_content(content, provider) + if key: + detected.append(DetectedKey( + provider=provider, + key=key, + source=str(filepath), + env_var=ENV_VAR_NAMES[provider] + )) + logger.debug(f"Found {provider.value} key in {filepath}") + + except PermissionError: + logger.debug(f"Permission denied reading {filepath}") + except Exception as e: + logger.debug(f"Error reading {filepath}: {e}") + + return detected + + def detect_from_environment(self) -> List[DetectedKey]: + """Check environment variables for API keys. + + Returns: + List of detected keys from environment + """ + detected = [] + + anthropic_key = os.environ.get("ANTHROPIC_API_KEY") + if anthropic_key and anthropic_key.startswith("sk-ant-"): + detected.append(DetectedKey( + provider=Provider.ANTHROPIC, + key=anthropic_key, + source="environment variable", + env_var="ANTHROPIC_API_KEY" + )) + + openai_key = os.environ.get("OPENAI_API_KEY") + if openai_key and openai_key.startswith("sk-"): + detected.append(DetectedKey( + provider=Provider.OPENAI, + key=openai_key, + source="environment variable", + env_var="OPENAI_API_KEY" + )) + + return detected + + def detect_from_files(self) -> List[DetectedKey]: + """Search all configured paths for API keys. + + Returns: + List of detected keys from files + """ + detected = [] + + for filepath in self.search_paths: + found = self._search_file(filepath) + detected.extend(found) + + return detected + + def detect_all(self) -> List[DetectedKey]: + """Detect API keys from all sources. + + Checks environment variables first, then files. + Returns unique keys (same key from multiple sources is deduplicated). + + Returns: + List of all detected keys + """ + all_keys = [] + seen_keys = set() + + # Environment variables take priority + for key in self.detect_from_environment(): + if key.key not in seen_keys: + all_keys.append(key) + seen_keys.add(key.key) + + # Then check files + for key in self.detect_from_files(): + if key.key not in seen_keys: + all_keys.append(key) + seen_keys.add(key.key) + + return all_keys + + def get_best_key(self, preferred_provider: Optional[Provider] = None) -> Optional[DetectedKey]: + """Get the best available API key. + + Args: + preferred_provider: Preferred provider if multiple keys available + + Returns: + Best detected key, or None if no keys found + """ + keys = self.detect_all() + + if not keys: + return None + + # If preferred provider specified and available, use it + if preferred_provider: + for key in keys: + if key.provider == preferred_provider: + return key + + # Default priority: Anthropic > OpenAI (Cortex is optimized for Claude) + for provider in [Provider.ANTHROPIC, Provider.OPENAI]: + for key in keys: + if key.provider == provider: + return key + + return keys[0] if keys else None + + +def auto_configure_api_key( + preferred_provider: Optional[str] = None, + set_env: bool = True +) -> Optional[DetectedKey]: + """Auto-detect and optionally configure an API key. + + This is the main entry point for API key auto-detection. + It searches common locations and can set the environment variable. + + Args: + preferred_provider: Preferred provider ('anthropic' or 'openai') + set_env: Whether to set the environment variable if key is found + + Returns: + DetectedKey if found, None otherwise + + Example: + key = auto_configure_api_key() + if key: + print(f"Found {key.provider.value} key from {key.source}") + """ + detector = APIKeyDetector() + + provider = None + if preferred_provider: + try: + provider = Provider(preferred_provider.lower()) + except ValueError: + logger.warning(f"Unknown provider: {preferred_provider}") + + key = detector.get_best_key(preferred_provider=provider) + + if key and set_env: + # Set the environment variable for the current process + os.environ[key.env_var] = key.key + logger.info(f"Auto-configured {key.env_var} from {key.source}") + + return key + + +def get_detection_summary() -> Dict[str, any]: + """Get a summary of API key detection results. + + Returns: + Dictionary with detection summary for display + """ + detector = APIKeyDetector() + keys = detector.detect_all() + + summary = { + "found": len(keys) > 0, + "count": len(keys), + "keys": [], + "searched_locations": [str(p) for p in detector.search_paths if p.exists()] + } + + for key in keys: + summary["keys"].append({ + "provider": key.provider.value, + "source": key.source, + "masked_key": key.masked_key, + "env_var": key.env_var + }) + + return summary + + +def validate_detected_key(key: DetectedKey) -> Tuple[bool, Optional[str]]: + """Validate a detected API key format. + + Args: + key: The detected key to validate + + Returns: + Tuple of (is_valid, error_message) + """ + if key.provider == Provider.ANTHROPIC: + if not key.key.startswith("sk-ant-"): + return False, "Anthropic key should start with 'sk-ant-'" + if len(key.key) < 20: + return False, "Anthropic key appears too short" + + elif key.provider == Provider.OPENAI: + if not key.key.startswith("sk-"): + return False, "OpenAI key should start with 'sk-'" + if len(key.key) < 20: + return False, "OpenAI key appears too short" + + return True, None diff --git a/tests/test_api_key_detector.py b/tests/test_api_key_detector.py new file mode 100644 index 0000000..d6a6a8f --- /dev/null +++ b/tests/test_api_key_detector.py @@ -0,0 +1,510 @@ +"""Tests for the API key auto-detection module. + +Tests Issue #255: Auto-detect API keys from common locations +""" + +import os +import pytest +import tempfile +from pathlib import Path +from unittest.mock import patch, MagicMock + +from cortex.api_key_detector import ( + Provider, + DetectedKey, + APIKeyDetector, + auto_configure_api_key, + get_detection_summary, + validate_detected_key, + KEY_PATTERNS, + ENV_VAR_NAMES, +) + + +class TestDetectedKey: + """Tests for DetectedKey dataclass.""" + + def test_masked_key_long(self): + """Test key masking for long keys.""" + key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-api03-abcdefghijklmnop", + source="~/.bashrc", + env_var="ANTHROPIC_API_KEY" + ) + masked = key.masked_key + assert masked.startswith("sk-ant-a") + assert masked.endswith("mnop") + assert "..." in masked + + def test_masked_key_short(self): + """Test key masking for short keys.""" + key = DetectedKey( + provider=Provider.OPENAI, + key="sk-short", + source="test", + env_var="OPENAI_API_KEY" + ) + masked = key.masked_key + assert masked == "********" + + +class TestKeyPatterns: + """Tests for API key regex patterns.""" + + def test_anthropic_export_pattern(self): + """Test Anthropic key detection with export.""" + import re + content = 'export ANTHROPIC_API_KEY="sk-ant-api03-test123"' + for pattern in KEY_PATTERNS[Provider.ANTHROPIC]: + match = re.search(pattern, content) + if match: + assert match.group(1) == "sk-ant-api03-test123" + return + pytest.fail("Pattern should match export statement") + + def test_anthropic_simple_pattern(self): + """Test Anthropic key detection without export.""" + import re + content = "ANTHROPIC_API_KEY=sk-ant-api03-simple" + for pattern in KEY_PATTERNS[Provider.ANTHROPIC]: + match = re.search(pattern, content) + if match: + assert match.group(1) == "sk-ant-api03-simple" + return + pytest.fail("Pattern should match simple assignment") + + def test_openai_export_pattern(self): + """Test OpenAI key detection with export.""" + import re + content = "export OPENAI_API_KEY='sk-proj-test456'" + for pattern in KEY_PATTERNS[Provider.OPENAI]: + match = re.search(pattern, content) + if match: + assert match.group(1) == "sk-proj-test456" + return + pytest.fail("Pattern should match export statement") + + def test_env_file_format(self): + """Test key detection in .env file format.""" + import re + content = 'ANTHROPIC_API_KEY="sk-ant-test-envfile"' + for pattern in KEY_PATTERNS[Provider.ANTHROPIC]: + match = re.search(pattern, content) + if match: + assert "sk-ant-test-envfile" in match.group(1) + return + pytest.fail("Pattern should match .env format") + + +class TestAPIKeyDetector: + """Tests for APIKeyDetector class.""" + + def test_detect_from_environment_anthropic(self): + """Test detection from ANTHROPIC_API_KEY env var.""" + detector = APIKeyDetector() + + with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-ant-test-env-key"}, clear=True): + keys = detector.detect_from_environment() + + assert len(keys) == 1 + assert keys[0].provider == Provider.ANTHROPIC + assert keys[0].key == "sk-ant-test-env-key" + assert keys[0].source == "environment variable" + + def test_detect_from_environment_openai(self): + """Test detection from OPENAI_API_KEY env var.""" + detector = APIKeyDetector() + + with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai-test-key"}, clear=False): + # Clear anthropic key if present + env = os.environ.copy() + env.pop("ANTHROPIC_API_KEY", None) + env["OPENAI_API_KEY"] = "sk-openai-test-key" + + with patch.dict(os.environ, env, clear=True): + keys = detector.detect_from_environment() + + openai_keys = [k for k in keys if k.provider == Provider.OPENAI] + assert len(openai_keys) == 1 + assert openai_keys[0].key == "sk-openai-test-key" + + def test_detect_from_environment_both(self): + """Test detection when both keys are set.""" + detector = APIKeyDetector() + + with patch.dict(os.environ, { + "ANTHROPIC_API_KEY": "sk-ant-both-test", + "OPENAI_API_KEY": "sk-both-openai" + }, clear=True): + keys = detector.detect_from_environment() + + assert len(keys) == 2 + providers = {k.provider for k in keys} + assert Provider.ANTHROPIC in providers + assert Provider.OPENAI in providers + + def test_detect_from_environment_invalid_prefix(self): + """Test that invalid prefixes are not detected.""" + detector = APIKeyDetector() + + with patch.dict(os.environ, { + "ANTHROPIC_API_KEY": "invalid-key-no-prefix" + }, clear=True): + keys = detector.detect_from_environment() + + assert len(keys) == 0 + + def test_detect_from_file(self): + """Test detection from a file.""" + detector = APIKeyDetector() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.env', delete=False) as f: + f.write('export ANTHROPIC_API_KEY="sk-ant-file-test-key"\n') + f.write('# Some comment\n') + f.write('OTHER_VAR=value\n') + temp_path = f.name + + try: + keys = detector._search_file(Path(temp_path)) + assert len(keys) == 1 + assert keys[0].provider == Provider.ANTHROPIC + assert keys[0].key == "sk-ant-file-test-key" + assert temp_path in keys[0].source + finally: + os.unlink(temp_path) + + def test_detect_from_file_multiple_keys(self): + """Test detection of multiple keys from one file.""" + detector = APIKeyDetector() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.env', delete=False) as f: + f.write('ANTHROPIC_API_KEY=sk-ant-multi-1\n') + f.write('OPENAI_API_KEY=sk-multi-openai\n') + temp_path = f.name + + try: + keys = detector._search_file(Path(temp_path)) + assert len(keys) == 2 + finally: + os.unlink(temp_path) + + def test_detect_from_nonexistent_file(self): + """Test that nonexistent files return empty list.""" + detector = APIKeyDetector() + keys = detector._search_file(Path("/nonexistent/path/file")) + assert keys == [] + + def test_additional_paths(self): + """Test custom additional search paths.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.env', delete=False) as f: + f.write('ANTHROPIC_API_KEY=sk-ant-custom-path\n') + temp_path = f.name + + try: + detector = APIKeyDetector(additional_paths=[temp_path]) + assert Path(temp_path) in detector.search_paths + finally: + os.unlink(temp_path) + + def test_detect_all_deduplicates(self): + """Test that detect_all removes duplicate keys.""" + detector = APIKeyDetector() + + # Mock both methods to return the same key + same_key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-duplicate-key", + source="env", + env_var="ANTHROPIC_API_KEY" + ) + + with patch.object(detector, 'detect_from_environment', return_value=[same_key]): + with patch.object(detector, 'detect_from_files', return_value=[same_key]): + keys = detector.detect_all() + + # Should only have one key despite being found twice + assert len(keys) == 1 + + def test_get_best_key_prefers_anthropic(self): + """Test that Anthropic keys are preferred by default.""" + detector = APIKeyDetector() + + anthropic_key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-preferred", + source="env", + env_var="ANTHROPIC_API_KEY" + ) + openai_key = DetectedKey( + provider=Provider.OPENAI, + key="sk-not-preferred", + source="env", + env_var="OPENAI_API_KEY" + ) + + with patch.object(detector, 'detect_all', return_value=[openai_key, anthropic_key]): + best = detector.get_best_key() + + assert best.provider == Provider.ANTHROPIC + + def test_get_best_key_respects_preference(self): + """Test that preferred provider is respected.""" + detector = APIKeyDetector() + + anthropic_key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-key", + source="env", + env_var="ANTHROPIC_API_KEY" + ) + openai_key = DetectedKey( + provider=Provider.OPENAI, + key="sk-openai-key", + source="env", + env_var="OPENAI_API_KEY" + ) + + with patch.object(detector, 'detect_all', return_value=[anthropic_key, openai_key]): + best = detector.get_best_key(preferred_provider=Provider.OPENAI) + + assert best.provider == Provider.OPENAI + + def test_get_best_key_no_keys(self): + """Test get_best_key when no keys available.""" + detector = APIKeyDetector() + + with patch.object(detector, 'detect_all', return_value=[]): + best = detector.get_best_key() + + assert best is None + + +class TestAutoConfigureApiKey: + """Tests for auto_configure_api_key function.""" + + def test_auto_configure_sets_env(self): + """Test that auto_configure sets environment variable.""" + key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-auto-config", + source="test", + env_var="ANTHROPIC_API_KEY" + ) + + with patch('cortex.api_key_detector.APIKeyDetector') as MockDetector: + mock_instance = MockDetector.return_value + mock_instance.get_best_key.return_value = key + + # Clear the env var first + with patch.dict(os.environ, {}, clear=True): + result = auto_configure_api_key(set_env=True) + + assert result == key + assert os.environ.get("ANTHROPIC_API_KEY") == "sk-ant-auto-config" + + def test_auto_configure_no_set_env(self): + """Test auto_configure with set_env=False.""" + key = DetectedKey( + provider=Provider.OPENAI, + key="sk-no-set-env", + source="test", + env_var="OPENAI_API_KEY" + ) + + with patch('cortex.api_key_detector.APIKeyDetector') as MockDetector: + mock_instance = MockDetector.return_value + mock_instance.get_best_key.return_value = key + + with patch.dict(os.environ, {}, clear=True): + result = auto_configure_api_key(set_env=False) + + assert result == key + assert "OPENAI_API_KEY" not in os.environ + + def test_auto_configure_no_key_found(self): + """Test auto_configure when no key is found.""" + with patch('cortex.api_key_detector.APIKeyDetector') as MockDetector: + mock_instance = MockDetector.return_value + mock_instance.get_best_key.return_value = None + + result = auto_configure_api_key() + + assert result is None + + def test_auto_configure_preferred_provider(self): + """Test auto_configure with preferred provider.""" + with patch('cortex.api_key_detector.APIKeyDetector') as MockDetector: + mock_instance = MockDetector.return_value + mock_instance.get_best_key.return_value = None + + auto_configure_api_key(preferred_provider="openai") + + mock_instance.get_best_key.assert_called_with( + preferred_provider=Provider.OPENAI + ) + + +class TestGetDetectionSummary: + """Tests for get_detection_summary function.""" + + def test_summary_with_keys(self): + """Test summary when keys are found.""" + key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-summary-test", + source="~/.bashrc", + env_var="ANTHROPIC_API_KEY" + ) + + with patch('cortex.api_key_detector.APIKeyDetector') as MockDetector: + mock_instance = MockDetector.return_value + mock_instance.detect_all.return_value = [key] + mock_instance.search_paths = [] + + summary = get_detection_summary() + + assert summary["found"] is True + assert summary["count"] == 1 + assert len(summary["keys"]) == 1 + assert summary["keys"][0]["provider"] == "anthropic" + + def test_summary_no_keys(self): + """Test summary when no keys found.""" + with patch('cortex.api_key_detector.APIKeyDetector') as MockDetector: + mock_instance = MockDetector.return_value + mock_instance.detect_all.return_value = [] + mock_instance.search_paths = [] + + summary = get_detection_summary() + + assert summary["found"] is False + assert summary["count"] == 0 + + +class TestValidateDetectedKey: + """Tests for validate_detected_key function.""" + + def test_valid_anthropic_key(self): + """Test validation of valid Anthropic key.""" + key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-api03-validkey12345678", + source="test", + env_var="ANTHROPIC_API_KEY" + ) + is_valid, error = validate_detected_key(key) + assert is_valid is True + assert error is None + + def test_valid_openai_key(self): + """Test validation of valid OpenAI key.""" + key = DetectedKey( + provider=Provider.OPENAI, + key="sk-proj-validkey123456789012", + source="test", + env_var="OPENAI_API_KEY" + ) + is_valid, error = validate_detected_key(key) + assert is_valid is True + assert error is None + + def test_invalid_anthropic_prefix(self): + """Test validation fails for wrong Anthropic prefix.""" + key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-wrong-prefix123456789", + source="test", + env_var="ANTHROPIC_API_KEY" + ) + is_valid, error = validate_detected_key(key) + assert is_valid is False + assert "sk-ant-" in error + + def test_invalid_openai_prefix(self): + """Test validation fails for wrong OpenAI prefix.""" + key = DetectedKey( + provider=Provider.OPENAI, + key="wrong-openai-key12345", + source="test", + env_var="OPENAI_API_KEY" + ) + is_valid, error = validate_detected_key(key) + assert is_valid is False + assert "sk-" in error + + def test_short_key(self): + """Test validation fails for too short key.""" + key = DetectedKey( + provider=Provider.ANTHROPIC, + key="sk-ant-short", + source="test", + env_var="ANTHROPIC_API_KEY" + ) + is_valid, error = validate_detected_key(key) + assert is_valid is False + assert "short" in error.lower() + + +class TestIntegration: + """Integration tests for realistic scenarios.""" + + def test_bashrc_detection(self): + """Test detecting key from a realistic .bashrc file.""" + bashrc_content = """ +# ~/.bashrc + +# If not running interactively, don't do anything +case $- in + *i*) ;; + *) return;; +esac + +# API Keys +export ANTHROPIC_API_KEY="sk-ant-api03-realkey123456789" + +# Aliases +alias ll='ls -la' +""" + detector = APIKeyDetector() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.bashrc', delete=False) as f: + f.write(bashrc_content) + temp_path = f.name + + try: + keys = detector._search_file(Path(temp_path)) + assert len(keys) == 1 + assert keys[0].key == "sk-ant-api03-realkey123456789" + finally: + os.unlink(temp_path) + + def test_env_file_detection(self): + """Test detecting key from a realistic .env file.""" + env_content = """ +# Environment variables for development +DATABASE_URL=postgres://localhost/dev +REDIS_URL=redis://localhost:6379 + +# LLM API Keys +ANTHROPIC_API_KEY=sk-ant-api03-envfilekey789 +OPENAI_API_KEY=sk-proj-envfilekey123 + +# Feature flags +DEBUG=true +""" + detector = APIKeyDetector() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.env', delete=False) as f: + f.write(env_content) + temp_path = f.name + + try: + keys = detector._search_file(Path(temp_path)) + assert len(keys) == 2 + + providers = {k.provider for k in keys} + assert Provider.ANTHROPIC in providers + assert Provider.OPENAI in providers + finally: + os.unlink(temp_path)