diff --git a/src/mcp_optimizer/cli.py b/src/mcp_optimizer/cli.py index 59a4a33..2fbe97e 100644 --- a/src/mcp_optimizer/cli.py +++ b/src/mcp_optimizer/cli.py @@ -221,7 +221,9 @@ def main(**kwargs: Any) -> None: # Pass config values to components instead of using get_config() db_config = DatabaseConfig(database_url=config.async_db_url) embedding_manager = EmbeddingManager( - model_name=config.embedding_model_name, enable_cache=config.enable_embedding_cache + model_name=config.embedding_model_name, + enable_cache=config.enable_embedding_cache, + threads=config.embedding_threads, ) ingestion_service = IngestionService( db_config, diff --git a/src/mcp_optimizer/config.py b/src/mcp_optimizer/config.py index fdd0056..236838a 100644 --- a/src/mcp_optimizer/config.py +++ b/src/mcp_optimizer/config.py @@ -152,6 +152,14 @@ def normalize_runtime_mode(cls, v) -> str: description="Name of the embedding model to use", ) + embedding_threads: int | None = Field( + default=2, + ge=1, + le=16, + description="Number of threads for embedding generation (1-16). " + "Lower values reduce CPU usage. Set to None to use all CPU cores. " + ) + # Token counting configuration encoding: Literal["o200k_base", "cl100k_base", "p50k_base", "r50k_base"] = Field( default="cl100k_base", @@ -468,6 +476,7 @@ def _populate_config_from_env() -> dict[str, Any]: "REGISTRY_POLLING_INTERVAL": "registry_polling_interval", "MCP_TIMEOUT": "mcp_timeout", "EMBEDDING_MODEL_NAME": "embedding_model_name", + "EMBEDDING_THREADS": "embedding_threads", "ENCODING": "encoding", "MAX_TOOLS_TO_RETURN": "max_tools_to_return", "TOOL_DISTANCE_THRESHOLD": "tool_distance_threshold", diff --git a/src/mcp_optimizer/embeddings.py b/src/mcp_optimizer/embeddings.py index 43be9b3..e8a76b5 100644 --- a/src/mcp_optimizer/embeddings.py +++ b/src/mcp_optimizer/embeddings.py @@ -34,7 +34,7 @@ class EmbeddingManager: See database migration file for the configured dimension in vector tables. """ - def __init__(self, model_name: str, enable_cache: bool): + def __init__(self, model_name: str, enable_cache: bool, threads: int | None = None): """Initialize with specified embedding model. Args: @@ -43,16 +43,20 @@ def __init__(self, model_name: str, enable_cache: bool): WARNING: Changing models may require database migration if the new model has a different embedding dimension. enable_cache: Whether to enable embedding caching. + threads: Number of threads to use for embedding generation. + None = use all available CPU cores (default FastEmbed behavior). + Set to 1-4 to limit CPU usage in production. """ self.model_name = model_name self._model: TextEmbedding | None = None self.enable_cache = enable_cache + self.threads = threads @property def model(self) -> TextEmbedding: """Lazy load the embedding model.""" if self._model is None: - self._model = TextEmbedding(model_name=self.model_name) + self._model = TextEmbedding(model_name=self.model_name, threads=self.threads) return self._model def _generate_single_cached_embedding(self, text: str) -> np.ndarray: diff --git a/src/mcp_optimizer/polling_manager.py b/src/mcp_optimizer/polling_manager.py index 981dc30..0bb9ec4 100644 --- a/src/mcp_optimizer/polling_manager.py +++ b/src/mcp_optimizer/polling_manager.py @@ -157,7 +157,9 @@ def configure_polling(toolhive_client: ToolhiveClient, config: MCPOptimizerConfi # Create database and embedding manager with config values db_config = DatabaseConfig(database_url=config.async_db_url) embedding_manager_local = EmbeddingManager( - model_name=config.embedding_model_name, enable_cache=config.enable_embedding_cache + model_name=config.embedding_model_name, + enable_cache=config.enable_embedding_cache, + threads=config.embedding_threads, ) _polling_state.polling_manager = PollingManager( diff --git a/src/mcp_optimizer/server.py b/src/mcp_optimizer/server.py index 915573c..05d6ac0 100644 --- a/src/mcp_optimizer/server.py +++ b/src/mcp_optimizer/server.py @@ -146,7 +146,9 @@ def initialize_server_components(config: MCPOptimizerConfig) -> None: workload_server_ops = WorkloadServerOps(db) registry_server_ops = RegistryServerOps(db) embedding_manager = EmbeddingManager( - model_name=config.embedding_model_name, enable_cache=config.enable_embedding_cache + model_name=config.embedding_model_name, + enable_cache=config.enable_embedding_cache, + threads=config.embedding_threads, ) mcp.settings.port = config.mcp_port toolhive_client = ToolhiveClient( diff --git a/tests/test_config_basic.py b/tests/test_config_basic.py index 6cb24a4..fad8fc3 100644 --- a/tests/test_config_basic.py +++ b/tests/test_config_basic.py @@ -107,3 +107,61 @@ def test_runtime_mode_invalid(): with pytest.raises(ValidationError, match="Input should be 'docker' or 'k8s'"): MCPOptimizerConfig(runtime_mode="kubernetes") + + +def test_embedding_threads_default(): + """Test that embedding_threads defaults to 2.""" + config = MCPOptimizerConfig() + assert config.embedding_threads == 2 + + +def test_embedding_threads_none(): + """Test that embedding_threads can be set to None (use all CPU cores).""" + config = MCPOptimizerConfig(embedding_threads=None) + assert config.embedding_threads is None + + +def test_embedding_threads_boundaries(): + """Test embedding_threads validation with boundary values (1, 16).""" + # Test lower boundary (1) + config_min = MCPOptimizerConfig(embedding_threads=1) + assert config_min.embedding_threads == 1 + + # Test upper boundary (16) + config_max = MCPOptimizerConfig(embedding_threads=16) + assert config_max.embedding_threads == 16 + + # Test valid middle value + config_mid = MCPOptimizerConfig(embedding_threads=8) + assert config_mid.embedding_threads == 8 + + +def test_embedding_threads_invalid_values(): + """Test that invalid embedding_threads values are rejected.""" + # Test below lower boundary (0) + with pytest.raises(ValidationError, match="greater than or equal to 1"): + MCPOptimizerConfig(embedding_threads=0) + + # Test negative value + with pytest.raises(ValidationError, match="greater than or equal to 1"): + MCPOptimizerConfig(embedding_threads=-1) + + # Test above upper boundary (17) + with pytest.raises(ValidationError, match="less than or equal to 16"): + MCPOptimizerConfig(embedding_threads=17) + + # Test far above upper boundary + with pytest.raises(ValidationError, match="less than or equal to 16"): + MCPOptimizerConfig(embedding_threads=100) + + +def test_embedding_threads_string_conversion(): + """Test that embedding_threads handles string-to-int conversion.""" + # Test string to int conversion + config = MCPOptimizerConfig(embedding_threads="4") + assert config.embedding_threads == 4 + assert isinstance(config.embedding_threads, int) + + # Test invalid string conversion + with pytest.raises(ValidationError): + MCPOptimizerConfig(embedding_threads="invalid") diff --git a/tests/test_embeddings.py b/tests/test_embeddings.py index ccf49c7..8472f23 100644 --- a/tests/test_embeddings.py +++ b/tests/test_embeddings.py @@ -4,6 +4,7 @@ import numpy as np +from mcp_optimizer.config import MCPOptimizerConfig from mcp_optimizer.embeddings import EmbeddingManager @@ -36,7 +37,9 @@ def test_lazy_model_loading(self, mock_text_embedding): model = manager.model assert model == mock_model_instance assert manager._model == mock_model_instance - mock_text_embedding.assert_called_once_with(model_name="BAAI/bge-small-en-v1.5") + mock_text_embedding.assert_called_once_with( + model_name="BAAI/bge-small-en-v1.5", threads=None + ) # Second access should reuse existing model model2 = manager.model @@ -145,3 +148,94 @@ def test_switch_model_same_model(self, mock_text_embedding): model2 = manager.model assert model2 == mock_model assert mock_text_embedding.call_count == 1 # Should not create new model + + @patch("mcp_optimizer.embeddings.TextEmbedding") + def test_initialization_with_explicit_threads(self, mock_text_embedding): + """Test EmbeddingManager initialization with explicit thread count.""" + mock_model_instance = Mock() + mock_text_embedding.return_value = mock_model_instance + + # Test with threads=2 + manager = EmbeddingManager( + model_name="BAAI/bge-small-en-v1.5", enable_cache=True, threads=2 + ) + assert manager.threads == 2 + + # Access model to trigger loading + model = manager.model + assert model == mock_model_instance + mock_text_embedding.assert_called_once_with( + model_name="BAAI/bge-small-en-v1.5", threads=2 + ) + + @patch("mcp_optimizer.embeddings.TextEmbedding") + def test_initialization_with_boundary_threads(self, mock_text_embedding): + """Test EmbeddingManager initialization with boundary thread values.""" + mock_model_instance = Mock() + mock_text_embedding.return_value = mock_model_instance + + # Test with threads=1 (lower boundary) + manager_min = EmbeddingManager( + model_name="BAAI/bge-small-en-v1.5", enable_cache=True, threads=1 + ) + assert manager_min.threads == 1 + _ = manager_min.model # Trigger lazy loading + assert mock_text_embedding.call_count == 1 + mock_text_embedding.assert_called_with(model_name="BAAI/bge-small-en-v1.5", threads=1) + + # Test with threads=16 (upper boundary) + mock_text_embedding.reset_mock() + manager_max = EmbeddingManager( + model_name="BAAI/bge-small-en-v1.5", enable_cache=True, threads=16 + ) + assert manager_max.threads == 16 + _ = manager_max.model # Trigger lazy loading + assert mock_text_embedding.call_count == 1 + mock_text_embedding.assert_called_with(model_name="BAAI/bge-small-en-v1.5", threads=16) + + @patch("mcp_optimizer.embeddings.TextEmbedding") + def test_config_integration_with_threads(self, mock_text_embedding): + """Test that config properly passes threading to EmbeddingManager.""" + mock_model_instance = Mock() + mock_text_embedding.return_value = mock_model_instance + + # Test with config default (threads=2) + config_default = MCPOptimizerConfig() + manager_default = EmbeddingManager( + model_name=config_default.embedding_model_name, + enable_cache=config_default.enable_embedding_cache, + threads=config_default.embedding_threads, + ) + assert manager_default.threads == 2 + _ = manager_default.model # Trigger lazy loading + mock_text_embedding.assert_called_once_with( + model_name="BAAI/bge-small-en-v1.5", threads=2 + ) + + # Test with config set to None + mock_text_embedding.reset_mock() + config_none = MCPOptimizerConfig(embedding_threads=None) + manager_none = EmbeddingManager( + model_name=config_none.embedding_model_name, + enable_cache=config_none.enable_embedding_cache, + threads=config_none.embedding_threads, + ) + assert manager_none.threads is None + _ = manager_none.model # Trigger lazy loading + mock_text_embedding.assert_called_once_with( + model_name="BAAI/bge-small-en-v1.5", threads=None + ) + + # Test with config set to custom value (8) + mock_text_embedding.reset_mock() + config_custom = MCPOptimizerConfig(embedding_threads=8) + manager_custom = EmbeddingManager( + model_name=config_custom.embedding_model_name, + enable_cache=config_custom.enable_embedding_cache, + threads=config_custom.embedding_threads, + ) + assert manager_custom.threads == 8 + _ = manager_custom.model # Trigger lazy loading + mock_text_embedding.assert_called_once_with( + model_name="BAAI/bge-small-en-v1.5", threads=8 + )