diff --git a/codeflash/discovery/discover_unit_tests.py b/codeflash/discovery/discover_unit_tests.py index d4bc9abd7..bc0e2fd67 100644 --- a/codeflash/discovery/discover_unit_tests.py +++ b/codeflash/discovery/discover_unit_tests.py @@ -868,6 +868,58 @@ def process_test_files( continue try: if not definition or definition[0].type != "function": + # Fallback: Try to match against functions_to_optimize when Jedi can't resolve + # This handles cases where Jedi fails with pytest fixtures + if functions_to_optimize and name.name: + for func_to_opt in functions_to_optimize: + # Check if this unresolved name matches a function we're looking for + if func_to_opt.function_name == name.name: + # Check if the test file imports the class/module containing this function + qualified_name_with_modules = func_to_opt.qualified_name_with_modules_from_root( + project_root_path + ) + + # Only add if this test actually tests the function we're optimizing + for test_func in test_functions_by_name[scope]: + if test_func.parameters is not None: + if test_framework == "pytest": + scope_test_function = ( + f"{test_func.function_name}[{test_func.parameters}]" + ) + else: # unittest + scope_test_function = ( + f"{test_func.function_name}_{test_func.parameters}" + ) + else: + scope_test_function = test_func.function_name + + function_to_test_map[qualified_name_with_modules].add( + FunctionCalledInTest( + tests_in_file=TestsInFile( + test_file=test_file, + test_class=test_func.test_class, + test_function=scope_test_function, + test_type=test_func.test_type, + ), + position=CodePosition(line_no=name.line, col_no=name.column), + ) + ) + tests_cache.insert_test( + file_path=str(test_file), + file_hash=file_hash, + qualified_name_with_modules_from_root=qualified_name_with_modules, + function_name=scope, + test_class=test_func.test_class or "", + test_function=scope_test_function, + test_type=test_func.test_type, + line_number=name.line, + col_number=name.column, + ) + + if test_func.test_type == TestType.REPLAY_TEST: + num_discovered_replay_tests += 1 + + num_discovered_tests += 1 continue definition_obj = definition[0] definition_path = str(definition_obj.module_path) diff --git a/tests/test_unit_test_discovery.py b/tests/test_unit_test_discovery.py index 1a0c73ffa..60facb17b 100644 --- a/tests/test_unit_test_discovery.py +++ b/tests/test_unit_test_discovery.py @@ -8,7 +8,7 @@ filter_test_files_by_imports, ) from codeflash.discovery.functions_to_optimize import FunctionToOptimize -from codeflash.models.models import TestsInFile, TestType +from codeflash.models.models import TestsInFile, TestType, FunctionParent from codeflash.verification.verification_utils import TestConfig @@ -714,6 +714,210 @@ def test_add_with_parameters(self): assert calculator_test.tests_in_file.test_file.resolve() == test_file_path.resolve() assert calculator_test.tests_in_file.test_function == "test_add_with_parameters" +def test_unittest_discovery_with_pytest_fixture(): + with tempfile.TemporaryDirectory() as tmpdirname: + path_obj_tmpdirname = Path(tmpdirname) + + # Create a simple code file + code_file_path = path_obj_tmpdirname / "topological_sort.py" + code_file_content = """ +import uuid +from collections import defaultdict + + +class Graph: + def __init__(self, vertices: int): + self.vertices=vertices + + def dummy_fn(self): + return 1 + + def topologicalSort(self): + return self.vertices + +""" + code_file_path.write_text(code_file_content) + + # Create a unittest test file with parameterized tests + test_file_path = path_obj_tmpdirname / "test_topological_sort.py" + test_file_content = """ +from topological_sort import Graph +import pytest + +@pytest.fixture +def g(): + return Graph(6) + +def test_topological_sort(g): + assert g.dummy_fn() == 1 + assert g.topologicalSort() == 6 +""" + test_file_path.write_text(test_file_content) + + # Configure test discovery + test_config = TestConfig( + tests_root=path_obj_tmpdirname, + project_root_path=path_obj_tmpdirname, + test_framework="pytest", # Using pytest framework to discover unittest tests + tests_project_rootdir=path_obj_tmpdirname.parent, + ) + fto = FunctionToOptimize(function_name="topologicalSort", file_path=code_file_path, parents=[FunctionParent(name="Graph", type="ClassDef")]) + # Discover tests + discovered_tests, _, _ = discover_unit_tests(test_config, file_to_funcs_to_optimize={code_file_path: [fto]}) + + # Verify the unittest was discovered + assert len(discovered_tests) == 2 + assert "topological_sort.Graph.topologicalSort" in discovered_tests + assert len(discovered_tests["topological_sort.Graph.topologicalSort"]) == 1 + tpsort_test = next(iter(discovered_tests["topological_sort.Graph.topologicalSort"])) + assert tpsort_test.tests_in_file.test_file.resolve() == test_file_path.resolve() + assert tpsort_test.tests_in_file.test_function == "test_topological_sort" + +def test_unittest_discovery_with_pytest_class_fixture(): + with tempfile.TemporaryDirectory() as tmpdirname: + path_obj_tmpdirname = Path(tmpdirname) + + # Create a simple code file + code_file_path = path_obj_tmpdirname / "router_file.py" + code_file_content = """ +from __future__ import annotations + +import hashlib +import json + +class Router: + model_names: list + cache_responses = False + tenacity = None + + def __init__( # noqa: PLR0915 + self, + model_list = None, + ) -> None: + self.model_list = model_list + self.model_id_to_deployment_index_map = {} + self.model_name_to_deployment_indices = {} + def _generate_model_id(self, model_group, litellm_params): + # Optimized: Use list and join instead of string concatenation in loop + # This avoids creating many temporary string objects (O(n) vs O(n²) complexity) + parts = [model_group] + for k, v in litellm_params.items(): + if isinstance(k, str): + parts.append(k) + elif isinstance(k, dict): + parts.append(json.dumps(k)) + else: + parts.append(str(k)) + + if isinstance(v, str): + parts.append(v) + elif isinstance(v, dict): + parts.append(json.dumps(v)) + else: + parts.append(str(v)) + + concat_str = "".join(parts) + hash_object = hashlib.sha256(concat_str.encode()) + + return hash_object.hexdigest() + def _add_model_to_list_and_index_map( + self, model, model_id = None + ) -> None: + idx = len(self.model_list) + self.model_list.append(model) + + # Update model_id index for O(1) lookup + if model_id is not None: + self.model_id_to_deployment_index_map[model_id] = idx + elif model.get("model_info", {}).get("id") is not None: + self.model_id_to_deployment_index_map[model["model_info"]["id"]] = idx + + # Update model_name index for O(1) lookup + model_name = model.get("model_name") + if model_name: + if model_name not in self.model_name_to_deployment_indices: + self.model_name_to_deployment_indices[model_name] = [] + self.model_name_to_deployment_indices[model_name].append(idx) + + def _build_model_id_to_deployment_index_map(self, model_list): + # First populate the model_list + self.model_list = [] + for _, model in enumerate(model_list): + # Extract model_info from the model dict + model_info = model.get("model_info", {}) + model_id = model_info.get("id") + + # If no ID exists, generate one using the same logic as set_model_list + if model_id is None: + model_name = model.get("model_name", "") + litellm_params = model.get("litellm_params", {}) + model_id = self._generate_model_id(model_name, litellm_params) + # Update the model_info in the original list + if "model_info" not in model: + model["model_info"] = {} + model["model_info"]["id"] = model_id + + self._add_model_to_list_and_index_map(model=model, model_id=model_id) + +""" + code_file_path.write_text(code_file_content) + + # Create a unittest test file with parameterized tests + test_file_path = path_obj_tmpdirname / "test_router_file.py" + test_file_content = """ +import pytest + +from router_file import Router + + +class TestRouterIndexManagement: + @pytest.fixture + def router(self): + return Router(model_list=[]) + def test_build_model_id_to_deployment_index_map(self, router): + model_list = [ + { + "model_name": "gpt-3.5-turbo", + "litellm_params": {"model": "gpt-3.5-turbo"}, + "model_info": {"id": "model-1"}, + }, + { + "model_name": "gpt-4", + "litellm_params": {"model": "gpt-4"}, + "model_info": {"id": "model-2"}, + }, + ] + + # Test: Build index from model list + router._build_model_id_to_deployment_index_map(model_list) + + # Verify: model_list is populated + assert len(router.model_list) == 2 + # Verify: model_id_to_deployment_index_map is correctly built + assert router.model_id_to_deployment_index_map["model-1"] == 0 + assert router.model_id_to_deployment_index_map["model-2"] == 1 +""" + test_file_path.write_text(test_file_content) + + # Configure test discovery + test_config = TestConfig( + tests_root=path_obj_tmpdirname, + project_root_path=path_obj_tmpdirname, + test_framework="pytest", # Using pytest framework to discover unittest tests + tests_project_rootdir=path_obj_tmpdirname.parent, + ) + fto = FunctionToOptimize(function_name="_build_model_id_to_deployment_index_map", file_path=code_file_path, parents=[FunctionParent(name="Router", type="ClassDef")]) + # Discover tests + discovered_tests, _, _ = discover_unit_tests(test_config, file_to_funcs_to_optimize={code_file_path: [fto]}) + + # Verify the unittest was discovered + assert len(discovered_tests) == 1 + assert "router_file.Router._build_model_id_to_deployment_index_map" in discovered_tests + assert len(discovered_tests["router_file.Router._build_model_id_to_deployment_index_map"]) == 1 + router_test = next(iter(discovered_tests["router_file.Router._build_model_id_to_deployment_index_map"])) + assert router_test.tests_in_file.test_file.resolve() == test_file_path.resolve() + assert router_test.tests_in_file.test_function == "test_build_model_id_to_deployment_index_map" + def test_unittest_discovery_with_pytest_parameterized(): with tempfile.TemporaryDirectory() as tmpdirname: @@ -1335,6 +1539,77 @@ def test_topological_sort(): assert should_process is True +def test_analyze_imports_fixture(): + with tempfile.TemporaryDirectory() as tmpdirname: + test_file = Path(tmpdirname) / "test_example.py" + test_content = """ +from code_to_optimize.topological_sort import Graph +import pytest + +@pytest.fixture +def g(): + return Graph(6) + +def test_topological_sort(g): + g.addEdge(5, 2) + g.addEdge(5, 0) + g.addEdge(4, 0) + g.addEdge(4, 1) + g.addEdge(2, 3) + g.addEdge(3, 1) + + assert g.topologicalSort()[0] == [5, 4, 2, 3, 1, 0] +""" + test_file.write_text(test_content) + + target_functions = {"Graph.topologicalSort"} + should_process = analyze_imports_in_test_file(test_file, target_functions) + + assert should_process is True + +def test_analyze_imports_class_fixture(): + with tempfile.TemporaryDirectory() as tmpdirname: + test_file = Path(tmpdirname) / "test_example.py" + test_content = """ +import pytest + +from router_file import Router + + +class TestRouterIndexManagement: + @pytest.fixture + def router(self): + return Router(model_list=[]) + def test_build_model_id_to_deployment_index_map(self, router): + model_list = [ + { + "model_name": "gpt-3.5-turbo", + "litellm_params": {"model": "gpt-3.5-turbo"}, + "model_info": {"id": "model-1"}, + }, + { + "model_name": "gpt-4", + "litellm_params": {"model": "gpt-4"}, + "model_info": {"id": "model-2"}, + }, + ] + + # Test: Build index from model list + router._build_model_id_to_deployment_index_map(model_list) + + # Verify: model_list is populated + assert len(router.model_list) == 2 + # Verify: model_id_to_deployment_index_map is correctly built + assert router.model_id_to_deployment_index_map["model-1"] == 0 + assert router.model_id_to_deployment_index_map["model-2"] == 1 +""" + test_file.write_text(test_content) + + target_functions = {"Router._build_model_id_to_deployment_index_map"} + should_process = analyze_imports_in_test_file(test_file, target_functions) + + assert should_process is True + def test_analyze_imports_aliased_class_method_negative(): with tempfile.TemporaryDirectory() as tmpdirname: test_file = Path(tmpdirname) / "test_example.py"