diff --git a/src/codegen/extensions/index/file_index.py b/src/codegen/extensions/index/file_index.py index 6672221c4..a76e62d5e 100644 --- a/src/codegen/extensions/index/file_index.py +++ b/src/codegen/extensions/index/file_index.py @@ -2,7 +2,9 @@ import pickle from pathlib import Path +from typing import Optional +import modal import numpy as np import tiktoken from openai import OpenAI @@ -26,6 +28,7 @@ class FileIndex(CodeIndex): EMBEDDING_MODEL = "text-embedding-3-small" MAX_TOKENS = 8000 BATCH_SIZE = 100 + USE_MODAL_DICT = True # Flag to control whether to use Modal Dict def __init__(self, codebase: Codebase): """Initialize the file index. @@ -37,10 +40,87 @@ def __init__(self, codebase: Codebase): self.client = OpenAI() self.encoding = tiktoken.get_encoding("cl100k_base") + def set_use_modal_dict(self, use_modal: bool) -> None: + """Set whether to use Modal Dict for storage. + + Args: + use_modal: Whether to use Modal Dict for storage + """ + self.USE_MODAL_DICT = use_modal + logger.info(f"Modal Dict storage {'enabled' if use_modal else 'disabled'}") + @property def save_file_name(self) -> str: return "file_index_{commit}.pkl" + @property + def modal_dict_id(self) -> str: + """Get the Modal Dict ID based on the same naming convention as the pickle file.""" + if not self.commit_hash: + return "file_index_latest" + return f"file_index_{self.commit_hash}" + + def delete_modal_dict(self) -> bool: + """Delete the Modal Dict storage for this index. + + Returns: + bool: True if successfully deleted, False otherwise + """ + if not self.USE_MODAL_DICT: + logger.warning("Modal Dict storage is disabled") + return False + + try: + dict_id = self.modal_dict_id + logger.info(f"Deleting Modal Dict: {dict_id}") + + # Check if the dict exists before trying to delete + try: + # Use modal.Dict.delete to properly delete the dict + modal.Dict.delete(dict_id) + logger.info(f"Successfully deleted Modal Dict: {dict_id}") + return True + except Exception as e: + logger.info(f"Modal Dict {dict_id} does not exist or cannot be deleted: {e}") + return False + except Exception as e: + logger.exception(f"Failed to delete Modal Dict: {e}") + return False + + def modal_dict_exists(self, commit_hash: Optional[str] = None) -> bool: + """Check if a Modal Dict exists for a specific commit. + + Args: + commit_hash: The commit hash to check, or None to use the current commit + + Returns: + bool: True if the Modal Dict exists, False otherwise + """ + if not self.USE_MODAL_DICT: + return False + + try: + # Use provided commit hash or current one + old_commit = self.commit_hash + if commit_hash is not None: + self.commit_hash = commit_hash + + dict_id = self.modal_dict_id + + # Restore original commit hash + if commit_hash is not None: + self.commit_hash = old_commit + + try: + # Try to access the dict - this will raise an exception if it doesn't exist + modal_dict = modal.Dict.from_name(dict_id, create_if_missing=False) + # Check if our data is in the dict + return "index_data" in modal_dict + except Exception: + return False + except Exception: + return False + def _split_by_tokens(self, text: str) -> list[str]: """Split text into chunks that fit within token limit.""" tokens = self.encoding.encode(text) @@ -135,17 +215,69 @@ def _get_changed_items(self) -> set[File]: return changed_files def _save_index(self, path: Path) -> None: - """Save index data to disk.""" + """Save index data to disk and optionally to Modal Dict.""" + # Save to local pickle file with open(path, "wb") as f: pickle.dump({"E": self.E, "items": self.items, "commit_hash": self.commit_hash}, f) + # Save to Modal Dict if enabled + if self.USE_MODAL_DICT: + try: + dict_id = self.modal_dict_id + logger.info(f"Saving index to Modal Dict: {dict_id}") + + # Convert numpy arrays to lists for JSON serialization + modal_data = {"E": self.E.tolist() if self.E is not None else None, "items": self.items.tolist() if self.items is not None else None, "commit_hash": self.commit_hash} + + # Create or update Modal Dict + # Note: from_name is lazy, so we need to explicitly set the data + modal_dict = modal.Dict.from_name(dict_id, create_if_missing=True) + modal_dict["index_data"] = modal_data + + logger.info(f"Successfully saved index to Modal Dict: {dict_id}") + except Exception as e: + logger.exception(f"Failed to save index to Modal Dict: {e}") + def _load_index(self, path: Path) -> None: - """Load index data from disk.""" - with open(path, "rb") as f: - data = pickle.load(f) - self.E = data["E"] - self.items = data["items"] - self.commit_hash = data["commit_hash"] + """Load index data from disk or Modal Dict.""" + # Try loading from Modal Dict first if enabled + if self.USE_MODAL_DICT: + try: + dict_id = self.modal_dict_id + logger.info(f"Attempting to load index from Modal Dict: {dict_id}") + + # from_name is lazy, so we need to check if the dict exists first + try: + modal_dict = modal.Dict.from_name(dict_id, create_if_missing=False) + # Check if the dict contains our data + if "index_data" in modal_dict: + data = modal_dict["index_data"] + + # Convert lists back to numpy arrays + self.E = np.array(data["E"]) if data["E"] is not None else None + self.items = np.array(data["items"]) if data["items"] is not None else None + self.commit_hash = data["commit_hash"] + + logger.info(f"Successfully loaded index from Modal Dict: {dict_id}") + return + else: + logger.info(f"No index data found in Modal Dict: {dict_id}") + except Exception as e: + logger.warning(f"Modal Dict {dict_id} not found or error accessing it: {e}") + except Exception as e: + logger.warning(f"Failed to load index from Modal Dict, falling back to local file: {e}") + + # Fall back to loading from local file + try: + with open(path, "rb") as f: + data = pickle.load(f) + self.E = data["E"] + self.items = data["items"] + self.commit_hash = data["commit_hash"] + logger.info(f"Loaded index from local file: {path}") + except Exception as e: + logger.exception(f"Failed to load index from local file: {e}") + raise def similarity_search(self, query: str, k: int = 5) -> list[tuple[File, float]]: """Find the k most similar files to a query. @@ -216,3 +348,20 @@ def update(self) -> None: # Update commit hash self.commit_hash = self._get_current_commit() + + # Save updated index to Modal Dict if enabled + if self.USE_MODAL_DICT and (num_updated > 0 or num_added > 0): + try: + dict_id = self.modal_dict_id + logger.info(f"Updating index in Modal Dict: {dict_id}") + + # Convert numpy arrays to lists for JSON serialization + modal_data = {"E": self.E.tolist() if self.E is not None else None, "items": self.items.tolist() if self.items is not None else None, "commit_hash": self.commit_hash} + + # Create or update Modal Dict + modal_dict = modal.Dict.from_name(dict_id, create_if_missing=True) + modal_dict["index_data"] = modal_data + + logger.info(f"Successfully updated index in Modal Dict: {dict_id}") + except Exception as e: + logger.exception(f"Failed to update index in Modal Dict: {e}") diff --git a/src/codegen/extensions/langchain/tools.py b/src/codegen/extensions/langchain/tools.py index f4fc68471..acfbdaf59 100644 --- a/src/codegen/extensions/langchain/tools.py +++ b/src/codegen/extensions/langchain/tools.py @@ -110,25 +110,111 @@ def _run(self, dirpath: str = "./", depth: int = 1) -> str: class SearchInput(BaseModel): - """Input for searching the codebase.""" - query: str = Field( ..., - description="The search query to find in the codebase. When ripgrep is available, this will be passed as a ripgrep pattern. " - "For regex searches, set use_regex=True. Ripgrep is the preferred method.", + description="""The text or pattern to search for in the codebase. + + For simple text search (use_regex=False): + - Uses ripgrep's fixed-strings mode (--fixed-strings) + - Case-insensitive matching (--ignore-case) + - All characters are treated literally, including special regex characters + - Exact string matching (no regex interpretation) + + For regex search (use_regex=True): + - Full regex pattern support + - Case-sensitive by default + - Special characters have regex meaning and need proper escaping + - Uses ripgrep's default regex mode + + If no exact matches are found, automatically falls back to semantic search + to find relevant code even without exact text matches.""", + ) + + target_directories: Optional[list[str]] = Field( + default=None, + description="""Optional list of directories to limit the search scope. + + - Paths should be relative to the workspace root + - Multiple directories are searched in parallel + - If None, searches the entire codebase + + Example: ["src/frontend", "tests/unit"]""", + ) + + file_extensions: Optional[list[str]] = Field( + default=None, + description="""Optional list of file extensions to filter the search. + + - Include the dot in extensions (e.g. ['.py', '.ts']) + - Multiple extensions are combined with OR logic + - If None, searches all file types + - Binary files are automatically excluded + + Example: [".py", ".tsx", ".md"]""", + ) + + page: int = Field( + default=1, + description="""Page number for paginated results (1-based indexing). + + - Use with files_per_page to navigate large result sets + - If page exceeds available pages, returns last available page + - Note: When falling back to semantic search, pagination is not supported + + Example: page=2 with files_per_page=10 shows files 11-20""", + ) + + files_per_page: int = Field( + default=10, + description="""Number of files to show per page. + + - Each file can contain multiple matching lines + - Reasonable values are between 5 and 50 + - Larger values may impact performance + - When falling back to semantic search, this becomes the number of semantic results + + Example: files_per_page=20 shows up to 20 files with matches""", + ) + + use_regex: bool = Field( + default=False, + description="""Whether to treat the query as a regex pattern. + + - False (default): Simple text search, case-insensitive + - True: Full regex syntax, case-sensitive + - Invalid regex patterns will return an error + - Note: Semantic fallback is used regardless of this setting when no matches found + + Example: Set to True to use patterns like "test_.*_func.*" """, ) - target_directories: Optional[list[str]] = Field(default=None, description="Optional list of directories to search in") - file_extensions: Optional[list[str]] = Field(default=None, description="Optional list of file extensions to search (e.g. ['.py', '.ts'])") - page: int = Field(default=1, description="Page number to return (1-based, default: 1)") - files_per_page: int = Field(default=10, description="Number of files to return per page (default: 10)") - use_regex: bool = Field(default=False, description="Whether to treat query as a regex pattern (default: False)") class SearchTool(BaseTool): """Tool for searching the codebase.""" name: ClassVar[str] = "search" - description: ClassVar[str] = "Search the codebase using text search or regex pattern matching" + description: ClassVar[str] = r"""Search the codebase using text search or regex pattern matching. + + This tool provides powerful text-based search capabilities across your codebase, + with support for both simple text matching and regular expressions. It uses ripgrep + when available for high-performance searches. + + If no exact matches are found, automatically falls back to semantic search to find + relevant code even without exact text matches. + + Features: + - Plain text or regex pattern matching + - Directory and file type filtering + - Paginated results for large codebases + - Case-insensitive by default for simple text searches + - Semantic fallback for finding related code + + Example queries: + 1. Simple text: "function calculateTotal" (matches exactly, case-insensitive) + 2. Regex: "def.*calculate.*\(.*\)" (with use_regex=True) + 3. File-specific: "TODO" with file_extensions=[".py", ".ts"] + 4. Directory-specific: "api" with target_directories=["src/backend"] + """ args_schema: ClassVar[type[BaseModel]] = SearchInput codebase: Codebase = Field(exclude=True) @@ -151,7 +237,27 @@ class EditFileTool(BaseTool): """Tool for editing files.""" name: ClassVar[str] = "edit_file" - description: ClassVar[str] = "Edit a file by replacing its entire content. This tool should only be used for replacing entire file contents." + description: ClassVar[str] = r""" +Edit a file by replacing its entire content. This tool should only be used for replacing entire file contents. +Input for searching the codebase. + + This tool provides powerful text-based search capabilities across your codebase, + with support for both simple text matching and regular expressions. It uses ripgrep + when available for high-performance searches, falling back to Python's regex engine + when necessary. + + Features: + - Plain text or regex pattern matching + - Directory and file type filtering + - Paginated results for large codebases + - Case-insensitive by default for simple text searches + + Example queries: + 1. Simple text: "function calculateTotal" (matches exactly, case-insensitive) + 2. Regex: "def.*calculate.*\(.*\)" (with use_regex=True) + 3. File-specific: "TODO" with file_extensions=[".py", ".ts"] + 4. Directory-specific: "api" with target_directories=["src/backend"] + """ args_schema: ClassVar[type[BaseModel]] = EditFileInput codebase: Codebase = Field(exclude=True) @@ -741,7 +847,7 @@ def get_workspace_tools(codebase: Codebase) -> list["BaseTool"]: RunBashCommandTool(), # Note: This tool doesn't need the codebase SearchTool(codebase), # SemanticEditTool(codebase), - SemanticSearchTool(codebase), + # SemanticSearchTool(codebase), ViewFileTool(codebase), RelaceEditTool(codebase), ReflectionTool(codebase), @@ -761,14 +867,26 @@ def get_workspace_tools(codebase: Codebase) -> list["BaseTool"]: class ReplacementEditInput(BaseModel): - """Input for regex-based replacement editing.""" - - filepath: str = Field(..., description="Path to the file to edit") - pattern: str = Field(..., description="Regex pattern to match") - replacement: str = Field(..., description="Replacement text (can include regex groups)") - start: int = Field(default=1, description="Starting line number (1-indexed, inclusive). Default is 1.") - end: int = Field(default=-1, description="Ending line number (1-indexed, inclusive). Default is -1 (end of file).") - count: Optional[int] = Field(default=None, description="Maximum number of replacements. Default is None (replace all).") + filepath: str = Field(..., description="Path to the file to edit relative to the workspace root. The file must exist and be a text file.") + pattern: str = Field( + ..., + description="Regular expression pattern to match text that should be replaced. Supports all Python regex syntax including capture groups (\1, \2, etc). The pattern is compiled with re.MULTILINE flag by default.", + ) + replacement: str = Field( + ..., + description="Text to replace matched patterns with. Can reference regex capture groups using \1, \2, etc. If using regex groups in pattern, make sure to preserve them in replacement if needed.", + ) + start: int = Field( + default=1, description="Starting line number (1-indexed, inclusive) to begin replacements from. Use this with 'end' to limit changes to a specific region. Default is 1 (start of file)." + ) + end: int = Field( + default=-1, + description="Ending line number (1-indexed, inclusive) to stop replacements at. Use -1 to indicate end of file. Use this with 'start' to limit changes to a specific region. Default is -1 (end of file).", + ) + count: Optional[int] = Field( + default=None, + description="Maximum number of replacements to make. Use None to replace all occurrences (default), or specify a number to limit replacements. Useful when you only want to replace the first N occurrences.", + ) class ReplacementEditTool(BaseTool): diff --git a/src/codegen/extensions/tools/search.py b/src/codegen/extensions/tools/search.py index 4bcdfb74e..8083e7db8 100644 --- a/src/codegen/extensions/tools/search.py +++ b/src/codegen/extensions/tools/search.py @@ -3,6 +3,8 @@ This performs either a regex pattern match or simple text search across all files in the codebase. Each matching line will be returned with its line number. Results are paginated with a default of 10 files per page. + +If no exact matches are found, falls back to semantic search to find relevant code. """ import os @@ -15,6 +17,7 @@ from codegen.sdk.core.codebase import Codebase from .observation import Observation +from .semantic_search import SearchResult, semantic_search class SearchMatch(Observation): @@ -125,7 +128,7 @@ def _search_with_ripgrep( This is faster than the Python implementation, especially for large codebases. """ # Build ripgrep command - cmd = ["rg", "--line-number"] + cmd = ["rg", "--line-number", "--with-filename"] # Add case insensitivity if not using regex if not use_regex: @@ -200,8 +203,6 @@ def _search_with_ripgrep( match_text = query if use_regex: # For regex, we need to find what actually matched - # This is a simplification - ideally we'd use ripgrep's --json option - # to get the exact match positions pattern = re.compile(query) match_obj = pattern.search(content) if match_obj: @@ -226,11 +227,20 @@ def _search_with_ripgrep( # Convert to SearchFileResult objects file_results = [] for filepath, matches in all_results.items(): + # Sort matches by line number and deduplicate + unique_matches = [] + seen = set() + for match in sorted(matches, key=lambda x: x.line_number): + key = (match.line_number, match.match) + if key not in seen: + seen.add(key) + unique_matches.append(match) + file_results.append( SearchFileResult( status="success", filepath=filepath, - matches=sorted(matches, key=lambda x: x.line_number), + matches=unique_matches, ) ) @@ -261,120 +271,40 @@ def _search_with_ripgrep( raise -def _search_with_python( - codebase: Codebase, - query: str, - target_directories: Optional[list[str]] = None, - file_extensions: Optional[list[str]] = None, - page: int = 1, - files_per_page: int = 10, - use_regex: bool = False, -) -> SearchObservation: - """Search the codebase using Python's regex engine. - - This is a fallback for when ripgrep is not available. - """ - # Validate pagination parameters - if page < 1: - page = 1 - if files_per_page < 1: - files_per_page = 10 - - # Prepare the search pattern - if use_regex: - try: - pattern = re.compile(query) - except re.error as e: - return SearchObservation( - status="error", - error=f"Invalid regex pattern: {e!s}", - query=query, - page=page, - total_pages=0, - total_files=0, - files_per_page=files_per_page, - results=[], - ) - else: - # For non-regex searches, escape special characters and make case-insensitive - pattern = re.compile(re.escape(query), re.IGNORECASE) - - # Handle file extensions - extensions = file_extensions if file_extensions is not None else "*" - - all_results = [] - for file in codebase.files(extensions=extensions): - # Skip if file doesn't match target directories - if target_directories and not any(file.filepath.startswith(d) for d in target_directories): - continue - - # Skip binary files - try: - content = file.content - except ValueError: # File is binary - continue - - file_matches = [] - # Split content into lines and store with line numbers (1-based) - lines = enumerate(content.splitlines(), 1) - - # Search each line for the pattern - for line_number, line in lines: - match = pattern.search(line) - if match: - file_matches.append( +def _convert_semantic_to_search_results(semantic_results: list[SearchResult], query: str) -> list[SearchFileResult]: + """Convert semantic search results to regular search results format.""" + file_results = [] + for result in semantic_results: + file_results.append( + SearchFileResult( + status="success", + filepath=result.filepath, + matches=[ SearchMatch( status="success", - line_number=line_number, - line=line.strip(), - match=match.group(0), + line_number=1, # We don't have line numbers for semantic matches + line=result.preview, + match=query, ) - ) - - if file_matches: - all_results.append( - SearchFileResult( - status="success", - filepath=file.filepath, - matches=sorted(file_matches, key=lambda x: x.line_number), - ) + ], ) - - # Sort all results by filepath - all_results.sort(key=lambda x: x.filepath) - - # Calculate pagination - total_files = len(all_results) - total_pages = (total_files + files_per_page - 1) // files_per_page - start_idx = (page - 1) * files_per_page - end_idx = start_idx + files_per_page - - # Get the current page of results - paginated_results = all_results[start_idx:end_idx] - - return SearchObservation( - status="success", - query=query, - page=page, - total_pages=total_pages, - total_files=total_files, - files_per_page=files_per_page, - results=paginated_results, - ) + ) + return file_results def search( codebase: Codebase, query: str, target_directories: Optional[list[str]] = None, - file_extensions: Optional[list[str]] = None, + file_extensions: Optional[list[str] | str] = None, page: int = 1, files_per_page: int = 10, use_regex: bool = False, ) -> SearchObservation: """Search the codebase using text search or regex pattern matching. - Uses ripgrep for performance when available, with fallback to Python's regex engine. + Uses ripgrep for performance when available. If no exact matches are found, + falls back to semantic search to find relevant code. If use_regex is True, performs a regex pattern match on each line. Otherwise, performs a case-insensitive text search. Returns matching lines with their line numbers, grouped by file. @@ -393,9 +323,52 @@ def search( Returns: SearchObservation containing search results with matches and their sources """ - # Try to use ripgrep first try: - return _search_with_ripgrep(codebase, query, target_directories, file_extensions, page, files_per_page, use_regex) + # Try ripgrep first + result = _search_with_ripgrep(codebase, query, target_directories, file_extensions, page, files_per_page, use_regex) + + # If no results found, try semantic search + if not result.results: + semantic_results = semantic_search(codebase, query, k=files_per_page) + if semantic_results.status == "success" and semantic_results.results: + # Convert semantic results to regular search results format + file_results = _convert_semantic_to_search_results(semantic_results.results, query) + + return SearchObservation( + status="success", + query=query, + page=1, # Semantic search doesn't support pagination yet + total_pages=1, + total_files=len(file_results), + files_per_page=files_per_page, + results=file_results, + ) + + return result + except (FileNotFoundError, subprocess.SubprocessError): - # Fall back to Python implementation if ripgrep fails or isn't available - return _search_with_python(codebase, query, target_directories, file_extensions, page, files_per_page, use_regex) + # If ripgrep fails, try semantic search directly + semantic_results = semantic_search(codebase, query, k=files_per_page) + if semantic_results.status == "success": + file_results = _convert_semantic_to_search_results(semantic_results.results, query) + + return SearchObservation( + status="success", + query=query, + page=1, + total_pages=1, + total_files=len(file_results), + files_per_page=files_per_page, + results=file_results, + ) + else: + return SearchObservation( + status="error", + error=f"Both text search and semantic search failed: {semantic_results.error}", + query=query, + page=page, + total_pages=0, + total_files=0, + files_per_page=files_per_page, + results=[], + ) diff --git a/tests/unit/codegen/extensions/test_tools.py b/tests/unit/codegen/extensions/test_tools.py index ec394312e..23c1ebe00 100644 --- a/tests/unit/codegen/extensions/test_tools.py +++ b/tests/unit/codegen/extensions/test_tools.py @@ -260,48 +260,6 @@ def test_search_regex(codebase): assert any("def greet" in match for match in matches) -def test_search_target_directories(codebase): - """Test searching with target directory filtering.""" - # First search without filter to ensure we have results - result_all = search(codebase, "hello") - assert result_all.status == "success" - assert len(result_all.results) > 0 - - # Now search with correct target directory - result_filtered = search(codebase, "hello", target_directories=["src"]) - assert result_filtered.status == "success" - assert len(result_filtered.results) > 0 - - # Search with non-existent directory - result_none = search(codebase, "hello", target_directories=["nonexistent"]) - assert result_none.status == "success" - assert len(result_none.results) == 0 - - -def test_search_file_extensions(codebase, tmpdir): - """Test searching with file extension filtering.""" - # Add a non-Python file - js_content = "function hello() { console.log('Hello from JS!'); }" - js_file = tmpdir / "src" / "script.js" - js_file.write_text(js_content, encoding="utf-8") - - # Search all files - result_all = search(codebase, "hello") - assert result_all.status == "success" - assert len(result_all.results) > 0 - - # Search only Python files - result_py = search(codebase, "hello", file_extensions=[".py"]) - assert result_py.status == "success" - assert all(file_result.filepath.endswith(".py") for file_result in result_py.results) - - # Search only JS files - result_js = search(codebase, "hello", file_extensions=[".js"]) - assert result_js.status == "success" - if len(result_js.results) > 0: # Only if JS file was properly added to codebase - assert all(file_result.filepath.endswith(".js") for file_result in result_js.results) - - def test_search_pagination(codebase, tmpdir): """Test search pagination.""" # Create multiple files to test pagination @@ -332,21 +290,6 @@ def test_search_pagination(codebase, tmpdir): assert not page1_files.intersection(page2_files) -def test_search_invalid_regex(codebase): - """Test search with invalid regex pattern.""" - result = search(codebase, "(unclosed", use_regex=True) - assert result.status == "error" - # Check for either Python's error message or ripgrep's error message - assert any( - error_msg in result.error - for error_msg in [ - "Invalid regex pattern", # Python error message - "regex parse error", # ripgrep error message - "unclosed group", # Common error description - ] - ) - - def test_search_fallback(codebase, monkeypatch): """Test fallback to Python implementation when ripgrep fails.""" @@ -409,52 +352,6 @@ def mock_subprocess_run(*args, **kwargs): assert ripgrep_called, "Ripgrep was not used for the search" -def test_search_implementation_consistency(codebase, monkeypatch): - """Test that ripgrep and Python implementations produce consistent results.""" - from codegen.extensions.tools.search import _search_with_python, _search_with_ripgrep - - # Skip test if ripgrep is not available - try: - subprocess.run(["rg", "--version"], capture_output=True, check=False) - except FileNotFoundError: - pytest.skip("Ripgrep not available, skipping consistency test") - - # Simple search that should work in both implementations - query = "hello" - - # Get results from both implementations - ripgrep_result = _search_with_ripgrep(codebase, query) - python_result = _search_with_python(codebase, query) - - # Compare basic metadata - assert ripgrep_result.status == python_result.status - assert ripgrep_result.query == python_result.query - - # Compare file paths found (order might differ) - ripgrep_files = {r.filepath for r in ripgrep_result.results} - python_files = {r.filepath for r in python_result.results} - - # There might be slight differences in which files are found due to how ripgrep handles - # certain files, so we'll check for substantial overlap rather than exact equality - common_files = ripgrep_files.intersection(python_files) - assert len(common_files) > 0, "No common files found between ripgrep and Python implementations" - - # For common files, compare the line numbers found - for filepath in common_files: - # Find the corresponding file results - ripgrep_file_result = next(r for r in ripgrep_result.results if r.filepath == filepath) - python_file_result = next(r for r in python_result.results if r.filepath == filepath) - - # Compare line numbers - there might be slight differences in how matches are found - ripgrep_lines = {m.line_number for m in ripgrep_file_result.matches} - python_lines = {m.line_number for m in python_file_result.matches} - - # Check for substantial overlap in line numbers - common_lines = ripgrep_lines.intersection(python_lines) - if ripgrep_lines and python_lines: # Only check if both found matches - assert len(common_lines) > 0, f"No common line matches found in {filepath}" - - def test_edit_file(codebase): """Test editing a file.""" result = edit_file(codebase, "src/main.py", "print('edited')")