diff --git a/dir_content_diff/__init__.py b/dir_content_diff/__init__.py index 92ec0e0..036d658 100644 --- a/dir_content_diff/__init__.py +++ b/dir_content_diff/__init__.py @@ -14,12 +14,17 @@ import copy import importlib.metadata +import os import re from collections.abc import Callable +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Any from typing import Dict from typing import Iterable +from typing import List +from typing import Literal from typing import Optional from typing import Pattern from typing import Tuple @@ -212,6 +217,11 @@ class ComparisonConfig: new directory with formatted compared data files. If a string is passed, this string is used as suffix for the new directory. If `True` is passed, the suffix is ``_FORMATTED``. + max_workers: Maximum number of worker threads/processes for parallel execution. If None, + defaults to min(32, (os.cpu_count() or 1) + 4) as per executor default. + executor_type: Type of executor to use for parallel execution. 'thread' uses + ThreadPoolExecutor (better for I/O-bound tasks), 'process' uses ProcessPoolExecutor + (better for CPU-bound tasks), 'sequential' disables parallel execution. """ include_patterns: Optional[Iterable[str]] = attrs.field( @@ -230,6 +240,10 @@ class ComparisonConfig: export_formatted_files: Union[bool, str] = attrs.field( default=False, validator=_validate_export_formatted_files ) + executor_type: Literal["sequential", "thread", "process"] = attrs.field( + default="sequential" + ) + max_workers: Optional[int] = attrs.field(default=None) # Compiled patterns - computed once, no caching complexity needed compiled_include_patterns: Tuple[Pattern[str], ...] = attrs.field(init=False) @@ -315,8 +329,8 @@ def should_ignore_file(self, relative_path: str) -> bool: def compare_files( - ref_file: str, - comp_file: str, + ref_file: Union[str, Path], + comp_file: Union[str, Path], comparator: ComparatorType, *args, return_raw_diffs: bool = False, @@ -343,7 +357,11 @@ def compare_files( try: return comparator( - ref_file, comp_file, *args, return_raw_diffs=return_raw_diffs, **kwargs + Path(ref_file), + Path(comp_file), + *args, + return_raw_diffs=return_raw_diffs, + **kwargs, ) except Exception as exception: # pylint: disable=broad-except load_kwargs = kwargs.pop("load_kwargs", None) @@ -377,8 +395,8 @@ def compare_files( def export_formatted_file( - file: str, - formatted_file: str, + file: Union[str, Path], + formatted_file: Union[str, Path], comparator: ComparatorType, **kwargs, ) -> None: @@ -455,29 +473,175 @@ def pick_comparator(comparator=None, suffix=None, comparators=None): ) if suffix is not None: if suffix in comparators: - return comparators.get(suffix) + suffix_comparator = comparators.get(suffix) + if suffix_comparator is not None: + return suffix_comparator LOGGER.debug("Could not find the comparator for the '%s' suffix", suffix) LOGGER.debug("Returning the default comparator") - return _COMPARATORS.get(None) + default_comparator = _COMPARATORS.get(None) + if default_comparator is None: + raise RuntimeError("No default comparator available") + return default_comparator def _check_config(config=None, **kwargs): + """Process configuration.""" if config is not None: if kwargs: # Override config attributes with kwargs config = attrs.evolve(config, **kwargs) else: - config = ComparisonConfig( - **kwargs, - ) + config = ComparisonConfig(**kwargs) + return config +def _compare_single_file( + ref_file: Path, + comp_path: Path, + relative_path: str, + config: ComparisonConfig, + formatted_data_path: Path, +) -> Tuple[str, Union[str, bool]]: + """Compare a single file and optionally export formatted version. + + Args: + ref_file: Path to the reference file. + comp_file: Path to the comparison file. + relative_path: Relative path of the file from the reference directory. + config: Comparison configuration. + formatted_data_path: Path where formatted files should be exported. + + Returns: + A tuple containing the relative path and the comparison result. + The result is False if files are equal, or a string describing differences. + """ + comp_file = comp_path / relative_path + if not comp_file.exists(): + msg = f"The file '{relative_path}' does not exist in '{comp_path}'." + return relative_path, msg + + # Get specific arguments for this file + specific_file_args = (config.specific_args or {}).get(relative_path, None) + if specific_file_args is None: + for pattern, pattern_args in config.pattern_specific_args.items(): + if pattern.match(relative_path): + specific_file_args = copy.deepcopy(pattern_args) + break + if specific_file_args is None: + specific_file_args = {} + + # Pick the appropriate comparator + comparator = pick_comparator( + comparator=specific_file_args.pop("comparator", None), + suffix=ref_file.suffix, + comparators=config.comparators, + ) + + # Get comparator arguments + comparator_args = specific_file_args.pop("args", []) + + # Compare files + comparison_result = compare_files( + ref_file, + comp_file, + comparator, + *comparator_args, + return_raw_diffs=config.return_raw_diffs, + **specific_file_args, + ) + + # Export formatted file if requested + if config.export_formatted_files is not False: + export_formatted_file( + comp_file, + formatted_data_path / relative_path, + comparator, + **specific_file_args, + ) + + return relative_path, comparison_result + + +def _collect_files_to_compare(ref_path: Path, config: ComparisonConfig): + """Collect all files that need to be compared. + + Args: + ref_path: Path to the reference directory. + config: Comparison configuration. + + Yields: + Tuples of (ref_file, relative_path) for files to compare. + """ + for ref_file in ref_path.glob("**/*"): + if ref_file.is_dir(): + continue + + relative_path = ref_file.relative_to(ref_path).as_posix() + + if config.should_ignore_file(relative_path): + LOGGER.debug("Ignore file: %s", relative_path) + continue + + yield ref_file, relative_path + + +def _compare_file_chunk( + file_chunk: List[Tuple[Path, Path, str]], + config: ComparisonConfig, + comp_path: Path, + formatted_data_path: Path, +) -> List[Tuple[str, Union[str, bool]]]: # pragma: no cover + """Compare a chunk of files. + + Args: + file_chunk: List of file tuples to compare. + config: Comparison configuration. + formatted_data_path: Path where formatted files should be exported. + + Returns: + List of comparison results for the chunk. + """ + results = [] + for ref_file, relative_path in file_chunk: + try: + result = _compare_single_file( + ref_file, comp_path, relative_path, config, formatted_data_path + ) + results.append(result) + except Exception as e: # pylint: disable=broad-except + LOGGER.error("Error comparing file %s: %s", relative_path, e) + results.append((relative_path, f"Error comparing file: {e}")) + return results + + +def _split_into_chunks(items: List[Any], num_chunks: int) -> List[List[Any]]: + """Split a list of items into approximately equal chunks. + + Args: + items: List of items to split. + num_chunks: Desired number of chunks. + + Returns: + List of chunks. + """ + if num_chunks <= 0: + return [items] + + chunk_size = max(1, len(items) // num_chunks) + chunks = [] + + for i in range(0, len(items), chunk_size): + chunks.append(items[i : i + chunk_size]) + + return chunks + + def compare_trees( ref_path: Union[str, Path], comp_path: Union[str, Path], *, - config: ComparisonConfig = None, + config: Optional[ComparisonConfig] = None, **kwargs, ): """Compare all files from 2 different directory trees and return the differences. @@ -519,54 +683,99 @@ def compare_trees( ) ) - # Loop over all files and call the correct comparator + # Collect all files to compare + files_to_compare = list(_collect_files_to_compare(ref_path, config)) + different_files = {} - for ref_file in ref_path.glob("**/*"): - if ref_file.is_dir(): - continue - relative_path = ref_file.relative_to(ref_path).as_posix() - comp_file = comp_path / relative_path + if config.executor_type != "sequential" and len(files_to_compare) > 1: + # Parallel execution + executor_class = ( + ThreadPoolExecutor + if config.executor_type == "thread" + else ProcessPoolExecutor + ) + LOGGER.debug( + "Starting parallel comparison of %d files with %s(max_workers=%s)", + len(files_to_compare), + executor_class.__name__, + config.max_workers, + ) - if config.should_ignore_file(relative_path): - LOGGER.debug("Ignore file: %s", relative_path) - continue + # Determine max_workers with default fallback + actual_max_workers = config.max_workers + if actual_max_workers is None: + actual_max_workers = min(32, (os.cpu_count() or 1) + 4) + + with executor_class(max_workers=actual_max_workers) as executor: + if config.executor_type == "process": + # For ProcessPoolExecutor, use chunk-based approach for better performance + file_chunks = _split_into_chunks(files_to_compare, actual_max_workers) + + future_to_chunk = { + executor.submit( + _compare_file_chunk, + chunk, + config, + comp_path, + formatted_data_path, + ): chunk + for chunk in file_chunks + if chunk # Skip empty chunks + } - if comp_file.exists(): - specific_file_args = (config.specific_args or {}).get(relative_path, None) - if specific_file_args is None: - for pattern, pattern_args in config.pattern_specific_args.items(): - if pattern.match(relative_path): - specific_file_args = copy.deepcopy(pattern_args) - break - if specific_file_args is None: - specific_file_args = {} - comparator = pick_comparator( - comparator=specific_file_args.pop("comparator", None), - suffix=ref_file.suffix, - comparators=config.comparators, - ) - comparator_args = specific_file_args.pop("args", []) - res = compare_files( - ref_file, - comp_file, - comparator, - *comparator_args, - return_raw_diffs=config.return_raw_diffs, - **specific_file_args, - ) - if res is not False: - different_files[relative_path] = res - if config.export_formatted_files is not False: - export_formatted_file( - comp_file, - formatted_data_path / relative_path, - comparator, - **specific_file_args, + # Collect results from chunks + for future in future_to_chunk: + try: + chunk_results = future.result() + for relative_path, result in chunk_results: + if result: + different_files[relative_path] = result + except Exception as e: # pragma: no cover + LOGGER.error("Error in chunk processing: %s", e) + raise + + else: + # For ThreadPoolExecutor, submit individual files (better load balancing) + future_to_file = { + executor.submit( + _compare_single_file, + ref_file, + comp_path, + relative_path, + config, + formatted_data_path, + ): relative_path + for ref_file, relative_path in files_to_compare + } + + # Collect results as they complete + for future in future_to_file: + try: + relative_path, result = future.result() + if result: + different_files[relative_path] = result + except Exception as e: # pragma: no cover + LOGGER.error( + "Error comparing file %s: %s", future_to_file[future], e + ) + raise + else: + # Sequential execution (original behavior) + LOGGER.debug( + "Starting sequential comparison of %d files", len(files_to_compare) + ) + + for ref_file, relative_path in files_to_compare: + try: + _, result = _compare_single_file( + ref_file, comp_path, relative_path, config, formatted_data_path ) - else: - msg = f"The file '{relative_path}' does not exist in '{comp_path}'." - different_files[relative_path] = msg + if result is not False: + different_files[relative_path] = result + except Exception as exc: # pragma: no cover + LOGGER.error("File comparison failed for %s: %s", relative_path, exc) + raise return different_files diff --git a/dir_content_diff/cli/__init__.py b/dir_content_diff/cli/__init__.py index f5b0de9..77a4b57 100644 --- a/dir_content_diff/cli/__init__.py +++ b/dir_content_diff/cli/__init__.py @@ -13,6 +13,8 @@ import logging import sys from pathlib import Path +from typing import Optional +from typing import Union import click from yaml import safe_load @@ -102,6 +104,19 @@ def load_config(ctx, param, value): # pylint: disable=unused-argument default=False, help="Sort the differences by file name.", ) +@click.option( + "--executor-type", + type=click.Choice(["sequential", "thread", "process"]), + default="sequential", + help="Type of executor for execution. 'sequential' for single-threaded, " + "'thread' for I/O-bound tasks, 'process' for CPU-bound tasks.", +) +@click.option( + "--max-workers", + type=int, + help="Maximum number of worker threads/processes for parallel execution. " + "Only used with 'thread' or 'process' executor types.", +) @click.version_option() @click.pass_context def main(ctx, *args, **kwargs): @@ -120,16 +135,31 @@ def main(ctx, *args, **kwargs): ref = Path(kwargs.pop("reference_input")) comp = Path(kwargs.pop("compared_input")) + + # Extract execution options + executor_type = kwargs.pop("executor_type", "sequential") + max_workers = kwargs.pop("max_workers", None) + input_diff( ref, comp, ctx.config, kwargs.pop("export_formatted_files", False), kwargs.pop("sort_diffs", False), + executor_type, + max_workers, ) -def input_diff(ref, comp, config, export_formatted_files=False, sort_diffs=False): +def input_diff( + ref: Union[str, Path], + comp: Union[str, Path], + config, + export_formatted_files: Union[bool, str] = False, + sort_diffs: bool = False, + executor_type: str = "sequential", + max_workers: Optional[int] = None, +): """Compute and display differences from given inputs.""" ref = Path(ref) comp = Path(comp) @@ -145,6 +175,8 @@ def input_diff(ref, comp, config, export_formatted_files=False, sort_diffs=False comp, specific_args=config, export_formatted_files=export_formatted_files, + executor_type=executor_type, + max_workers=max_workers, ) else: comparator_name = config.pop("comparator", None) diff --git a/tests/test_base.py b/tests/test_base.py index 0e6d81f..af75188 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -16,6 +16,7 @@ import configparser import copy import json +import logging import re import shutil @@ -1370,7 +1371,7 @@ def test_diff_tree(self, ref_tree, res_tree_diff, pdf_diff, dict_diff): class TestBaseFunctions: """Test some base functions.""" - def test_pick_comparator(self): + def test_pick_comparator(self, caplog): """Test the pick_comparator() function""" for ext, comparator in dir_content_diff.get_comparators().items(): assert dir_content_diff.pick_comparator(comparator) == comparator @@ -1389,3 +1390,43 @@ def test_pick_comparator(self): ) == comparator ) + + caplog.clear() + caplog.set_level(logging.DEBUG) + comparators = dir_content_diff.get_comparators() + dir_content_diff.pick_comparator("UknownComparator", "UnknownExt", comparators) + + assert caplog.messages == [ + "Could not find the comparator named 'UknownComparator' in the given comparators", + "Could not find the comparator for the 'UnknownExt' suffix", + "Returning the default comparator", + ] + + def test_pick_comparator_suffix(self, caplog): + """Test the pick_comparator() function with only suffix.""" + caplog.clear() + caplog.set_level(logging.DEBUG) + + assert ( + dir_content_diff.pick_comparator(suffix=".pdf") + == dir_content_diff.PdfComparator() + ) + assert not caplog.messages + + caplog.clear() + comparators = {".pdf": None} + assert ( + dir_content_diff.pick_comparator(suffix=".pdf", comparators=comparators) + == dir_content_diff.DefaultComparator() + ) + + assert caplog.messages == [ + "Could not find the comparator for the '.pdf' suffix", + "Returning the default comparator", + ] + + def test_pick_comparator_no_default(self, registry_reseter): + dir_content_diff._COMPARATORS.pop(None) # pylint: disable=protected-access + + with pytest.raises(RuntimeError, match="No default comparator available"): + dir_content_diff.pick_comparator() diff --git a/tests/test_parallel_execution.py b/tests/test_parallel_execution.py new file mode 100644 index 0000000..da637fc --- /dev/null +++ b/tests/test_parallel_execution.py @@ -0,0 +1,371 @@ +"""Test the parallel execution features of the dir-content-diff package.""" + +# LICENSE HEADER MANAGED BY add-license-header +# Copyright (c) 2023-2025 Blue Brain Project, EPFL. +# +# This file is part of dir-content-diff. +# See https://github.com/BlueBrain/dir-content-diff for further info. +# +# SPDX-License-Identifier: Apache-2.0 +# LICENSE HEADER MANAGED BY add-license-header + +import logging +import time +from unittest.mock import patch + +import attrs + +from dir_content_diff import ComparisonConfig +from dir_content_diff import _split_into_chunks +from dir_content_diff import assert_equal_trees +from dir_content_diff import compare_trees + +from . import generate_test_files + + +class TestParallelExecution: + """Test parallel execution functionality.""" + + def test_parallel_config_default(self): + """Test default execution configuration.""" + config = ComparisonConfig() + assert config.executor_type == "sequential" + assert config.max_workers is None + + def test_parallel_config_custom(self): + """Test custom execution configuration.""" + config = ComparisonConfig(executor_type="process", max_workers=4) + assert config.executor_type == "process" + assert config.max_workers == 4 + + def test_parallel_config_validation(self): + """Test execution configuration validation.""" + # Valid executor types + config_sequential = ComparisonConfig(executor_type="sequential") + config_thread = ComparisonConfig(executor_type="thread") + config_process = ComparisonConfig(executor_type="process") + assert config_sequential.executor_type == "sequential" + assert config_thread.executor_type == "thread" + assert config_process.executor_type == "process" + + # Invalid executor type is prevented by type checking, so we test valid types only + + def test_sequential_vs_parallel_thread_equal_trees(self, ref_tree, res_tree_equal): + """Test that sequential and thread execution give same results for equal trees.""" + # Sequential execution + sequential_result = compare_trees(ref_tree, res_tree_equal) + + # Parallel execution with threads + parallel_result = compare_trees( + ref_tree, res_tree_equal, executor_type="thread", max_workers=2 + ) + + assert sequential_result == parallel_result == {} + + def test_sequential_vs_parallel_thread_diff_trees(self, ref_tree, res_tree_diff): + """Test that sequential and thread execution give same results for different trees.""" + # Sequential execution + sequential_result = compare_trees(ref_tree, res_tree_diff) + + # Parallel execution with threads + parallel_result = compare_trees( + ref_tree, res_tree_diff, executor_type="thread", max_workers=2 + ) + + # Results should have same keys + assert set(sequential_result.keys()) == set(parallel_result.keys()) + + # Results should be the same (order might differ but content should match) + for key, value in sequential_result.items(): + assert key in parallel_result + # For deterministic comparison, we check that both have content + assert len(value) > 0 + assert len(parallel_result[key]) > 0 + + def test_sequential_vs_parallel_process_equal_trees(self, ref_tree, res_tree_equal): + """Test that sequential and process execution give same results for equal trees.""" + # Sequential execution + sequential_result = compare_trees(ref_tree, res_tree_equal) + + # Parallel execution with processes + parallel_result = compare_trees( + ref_tree, res_tree_equal, executor_type="process", max_workers=2 + ) + + assert sequential_result == parallel_result == {} + + def test_sequential_vs_parallel_process_diff_trees(self, ref_tree, res_tree_diff): + """Test that sequential and process execution give same results for different trees.""" + # Sequential execution + sequential_result = compare_trees(ref_tree, res_tree_diff) + + # Parallel execution with processes + parallel_result = compare_trees( + ref_tree, res_tree_diff, executor_type="process", max_workers=2 + ) + + # Results should have same keys + assert set(sequential_result.keys()) == set(parallel_result.keys()) + + # Results should be the same (order might differ but content should match) + for key, value in sequential_result.items(): + assert key in parallel_result + # For deterministic comparison, we check that both have content + assert len(value) > 0 + assert len(parallel_result[key]) > 0 + + def test_parallel_with_config_object(self, ref_tree, res_tree_equal): + """Test parallel execution using config object.""" + config = ComparisonConfig(executor_type="thread", max_workers=3) + + result = compare_trees(ref_tree, res_tree_equal, config=config) + assert not result + + def test_parallel_with_kwargs(self, ref_tree, res_tree_equal): + """Test parallel execution using kwargs.""" + result = compare_trees( + ref_tree, res_tree_equal, executor_type="process", max_workers=2 + ) + assert not result + + def test_parallel_assert_equal_trees(self, ref_tree, res_tree_equal): + """Test parallel execution with assert_equal_trees.""" + # Should not raise any exception + assert_equal_trees( + ref_tree, res_tree_equal, executor_type="thread", max_workers=2 + ) + + def test_parallel_with_specific_args(self, ref_tree, res_tree_equal): + """Test parallel execution with specific args.""" + specific_args = { + "file.yaml": {"args": [None, None, None, False, 0, False]}, + "file.json": {"tolerance": 0}, + } + + result = compare_trees( + ref_tree, + res_tree_equal, + executor_type="thread", + max_workers=2, + specific_args=specific_args, + ) + assert not result + + def test_parallel_with_patterns(self, ref_tree, res_tree_equal): + """Test parallel execution with include/exclude patterns.""" + result = compare_trees( + ref_tree, + res_tree_equal, + executor_type="thread", + max_workers=2, + include_patterns=[r".*\.[jy].*"], + exclude_patterns=[r".*\.yaml"], + ) + assert not result + + def test_parallel_with_export_formatted(self, ref_tree, res_tree_equal): + """Test parallel execution with export formatted files.""" + result = compare_trees( + ref_tree, + res_tree_equal, + executor_type="thread", + max_workers=2, + export_formatted_files=True, + ) + assert not result + + # Check that formatted files are created + formatted_path = res_tree_equal.with_name(res_tree_equal.name + "_FORMATTED") + assert formatted_path.exists() + + def test_parallel_single_file_fallback( + self, empty_ref_tree, empty_res_tree, caplog + ): + """Test that parallel execution falls back to sequential for single file.""" + # Create a reference tree with only one file + generate_test_files.create_json(empty_ref_tree / "file.json") + generate_test_files.create_json(empty_res_tree / "file.json") + + # Mock the parallel executors and configure logs + with patch( + "concurrent.futures.ThreadPoolExecutor" + ) as mock_thread_executor, patch( + "concurrent.futures.ProcessPoolExecutor" + ) as mock_process_executor, caplog.at_level(logging.DEBUG): + result = compare_trees( + empty_ref_tree, empty_res_tree, executor_type="thread", max_workers=4 + ) + assert not result + + # Check that parallel executors have not been instantiated + mock_thread_executor.assert_not_called() + mock_process_executor.assert_not_called() + + # Check that logs indicate sequential execution + sequential_logs = [ + record + for record in caplog.records + if "Starting sequential comparison of 1 files" in record.message + ] + assert len(sequential_logs) == 1 + + # Check that no parallel execution logs are present + parallel_logs = [ + record + for record in caplog.records + if "Starting parallel comparison" in record.message + ] + assert len(parallel_logs) == 0 + + def test_parallel_multiple_files_uses_threads( + self, empty_ref_tree, empty_res_tree, caplog + ): + """Check that parallel execution is used with multiple files.""" + # Create a reference tree with multiple files + generate_test_files.create_json(empty_ref_tree / "file1.json") + generate_test_files.create_json(empty_ref_tree / "file2.json") + generate_test_files.create_json(empty_res_tree / "file1.json") + generate_test_files.create_json(empty_res_tree / "file2.json") + + # Configure log level to capture DEBUG messages + with caplog.at_level(logging.DEBUG): + result = compare_trees( + empty_ref_tree, empty_res_tree, executor_type="thread", max_workers=2 + ) + assert not result + + # Check that logs indicate parallel execution + parallel_logs = [ + record + for record in caplog.records + if "Starting parallel comparison of 2 files with ThreadPoolExecutor" + in record.message + ] + assert len(parallel_logs) == 1 + + # Check that no sequential execution logs are present + sequential_logs = [ + record + for record in caplog.records + if "Starting sequential comparison" in record.message + ] + assert len(sequential_logs) == 0 + + def test_parallel_error_handling(self, ref_tree, res_tree_diff): + """Test error handling in parallel execution.""" + # This should work normally even with some differences + result = compare_trees( + ref_tree, res_tree_diff, executor_type="thread", max_workers=2 + ) + + # Should have differences but no exceptions + assert len(result) > 0 + for value in result.values(): + assert isinstance(value, str) + assert len(value) > 0 + + def test_parallel_missing_files(self, ref_tree, empty_res_tree): + """Test parallel execution when comparison files are missing.""" + result = compare_trees( + ref_tree, empty_res_tree, executor_type="thread", max_workers=2 + ) + + # Should report missing files + assert len(result) > 0 + for value in result.values(): + assert "does not exist" in value + + def test_config_evolution_with_parallel(self): + """Test that config evolution works with executor_type parameters.""" + base_config = ComparisonConfig(executor_type="thread", max_workers=2) + + # Evolve config with new parallel parameters using attrs.evolve + new_config = attrs.evolve(base_config, executor_type="process", max_workers=4) + + # Check that new values are applied + assert new_config.executor_type == "process" + assert new_config.max_workers == 4 + + # Test sequential execution config + config_seq = ComparisonConfig(executor_type="sequential") + assert config_seq.executor_type == "sequential" + + +class TestParallelPerformance: + """Test performance aspects of parallel execution.""" + + def test_parallel_vs_sequential_timing(self, ref_tree, res_tree_equal): + """Basic timing test to ensure parallel execution doesn't break.""" + # This is not a strict performance test, just ensuring both modes work + start_time = time.time() + sequential_result = compare_trees(ref_tree, res_tree_equal) + sequential_time = time.time() - start_time + + start_time = time.time() + parallel_result = compare_trees( + ref_tree, res_tree_equal, executor_type="thread", max_workers=2 + ) + parallel_time = time.time() - start_time + + # Both should give same result + assert sequential_result == parallel_result == {} + + # Both should complete in reasonable time (not testing speed, just that they complete) + assert sequential_time < 30 # 30 seconds max + assert parallel_time < 30 # 30 seconds max + + +class TestParallelEdgeCases: + """Test edge cases for parallel execution.""" + + def test_parallel_no_files(self, empty_ref_tree, empty_res_tree): + """Test parallel execution with no files to compare.""" + result = compare_trees( + empty_ref_tree, empty_res_tree, executor_type="thread", max_workers=4 + ) + assert not result + + def test_parallel_disabled_with_single_file(self, empty_ref_tree, empty_res_tree): + """Test that parallel execution is automatically disabled for single file.""" + # This tests the logic: if len(files_to_compare) > 1 + # Create a scenario with exactly one file + generate_test_files.create_json(empty_ref_tree / "file.json") + generate_test_files.create_json(empty_res_tree / "file.json") + + result = compare_trees( + empty_ref_tree, empty_res_tree, executor_type="thread", max_workers=4 + ) + assert not result + + def test_parallel_max_workers_none(self, ref_tree, res_tree_equal): + """Test parallel execution with max_workers=None (default).""" + result = compare_trees( + ref_tree, + res_tree_equal, + executor_type="thread", + max_workers=None, # Should use executor default + ) + assert not result + + def test_parallel_max_workers_large(self, ref_tree, res_tree_equal): + """Test parallel execution with large max_workers.""" + result = compare_trees( + ref_tree, + res_tree_equal, + executor_type="thread", + max_workers=100, # Larger than needed + ) + assert not result + + def test_chunking_edge_cases(self): + """Test edge cases for chunking.""" + # Test with num_chunks <= 0 + items = [1, 2, 3, 4, 5] + + # num_chunks = 0 + result = _split_into_chunks(items, 0) + assert result == [items] + + # num_chunks < 0 + result = _split_into_chunks(items, -1) + assert result == [items]