Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/mcp_optimizer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ def main(**kwargs: Any) -> None:
# Pass config values to components instead of using get_config()
db_config = DatabaseConfig(database_url=config.async_db_url)
embedding_manager = EmbeddingManager(
model_name=config.embedding_model_name, enable_cache=config.enable_embedding_cache
model_name=config.embedding_model_name,
enable_cache=config.enable_embedding_cache,
threads=config.embedding_threads,
)
ingestion_service = IngestionService(
db_config,
Expand Down
9 changes: 9 additions & 0 deletions src/mcp_optimizer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ def normalize_runtime_mode(cls, v) -> str:
description="Name of the embedding model to use",
)

embedding_threads: int | None = Field(
default=2,
ge=1,
le=16,
description="Number of threads for embedding generation (1-16). "
"Lower values reduce CPU usage. Set to None to use all CPU cores. "
)

# Token counting configuration
encoding: Literal["o200k_base", "cl100k_base", "p50k_base", "r50k_base"] = Field(
default="cl100k_base",
Expand Down Expand Up @@ -468,6 +476,7 @@ def _populate_config_from_env() -> dict[str, Any]:
"REGISTRY_POLLING_INTERVAL": "registry_polling_interval",
"MCP_TIMEOUT": "mcp_timeout",
"EMBEDDING_MODEL_NAME": "embedding_model_name",
"EMBEDDING_THREADS": "embedding_threads",
"ENCODING": "encoding",
"MAX_TOOLS_TO_RETURN": "max_tools_to_return",
"TOOL_DISTANCE_THRESHOLD": "tool_distance_threshold",
Expand Down
8 changes: 6 additions & 2 deletions src/mcp_optimizer/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class EmbeddingManager:
See database migration file for the configured dimension in vector tables.
"""

def __init__(self, model_name: str, enable_cache: bool):
def __init__(self, model_name: str, enable_cache: bool, threads: int | None = None):
"""Initialize with specified embedding model.

Args:
Expand All @@ -43,16 +43,20 @@ def __init__(self, model_name: str, enable_cache: bool):
WARNING: Changing models may require database migration if
the new model has a different embedding dimension.
enable_cache: Whether to enable embedding caching.
threads: Number of threads to use for embedding generation.
None = use all available CPU cores (default FastEmbed behavior).
Set to 1-4 to limit CPU usage in production.
"""
self.model_name = model_name
self._model: TextEmbedding | None = None
self.enable_cache = enable_cache
self.threads = threads

@property
def model(self) -> TextEmbedding:
"""Lazy load the embedding model."""
if self._model is None:
self._model = TextEmbedding(model_name=self.model_name)
self._model = TextEmbedding(model_name=self.model_name, threads=self.threads)
return self._model

def _generate_single_cached_embedding(self, text: str) -> np.ndarray:
Expand Down
4 changes: 3 additions & 1 deletion src/mcp_optimizer/polling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ def configure_polling(toolhive_client: ToolhiveClient, config: MCPOptimizerConfi
# Create database and embedding manager with config values
db_config = DatabaseConfig(database_url=config.async_db_url)
embedding_manager_local = EmbeddingManager(
model_name=config.embedding_model_name, enable_cache=config.enable_embedding_cache
model_name=config.embedding_model_name,
enable_cache=config.enable_embedding_cache,
threads=config.embedding_threads,
)

_polling_state.polling_manager = PollingManager(
Expand Down
4 changes: 3 additions & 1 deletion src/mcp_optimizer/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def initialize_server_components(config: MCPOptimizerConfig) -> None:
workload_server_ops = WorkloadServerOps(db)
registry_server_ops = RegistryServerOps(db)
embedding_manager = EmbeddingManager(
model_name=config.embedding_model_name, enable_cache=config.enable_embedding_cache
model_name=config.embedding_model_name,
enable_cache=config.enable_embedding_cache,
threads=config.embedding_threads,
)
mcp.settings.port = config.mcp_port
toolhive_client = ToolhiveClient(
Expand Down
58 changes: 58 additions & 0 deletions tests/test_config_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,61 @@ def test_runtime_mode_invalid():

with pytest.raises(ValidationError, match="Input should be 'docker' or 'k8s'"):
MCPOptimizerConfig(runtime_mode="kubernetes")


def test_embedding_threads_default():
"""Test that embedding_threads defaults to 2."""
config = MCPOptimizerConfig()
assert config.embedding_threads == 2


def test_embedding_threads_none():
"""Test that embedding_threads can be set to None (use all CPU cores)."""
config = MCPOptimizerConfig(embedding_threads=None)
assert config.embedding_threads is None


def test_embedding_threads_boundaries():
"""Test embedding_threads validation with boundary values (1, 16)."""
# Test lower boundary (1)
config_min = MCPOptimizerConfig(embedding_threads=1)
assert config_min.embedding_threads == 1

# Test upper boundary (16)
config_max = MCPOptimizerConfig(embedding_threads=16)
assert config_max.embedding_threads == 16

# Test valid middle value
config_mid = MCPOptimizerConfig(embedding_threads=8)
assert config_mid.embedding_threads == 8


def test_embedding_threads_invalid_values():
"""Test that invalid embedding_threads values are rejected."""
# Test below lower boundary (0)
with pytest.raises(ValidationError, match="greater than or equal to 1"):
MCPOptimizerConfig(embedding_threads=0)

# Test negative value
with pytest.raises(ValidationError, match="greater than or equal to 1"):
MCPOptimizerConfig(embedding_threads=-1)

# Test above upper boundary (17)
with pytest.raises(ValidationError, match="less than or equal to 16"):
MCPOptimizerConfig(embedding_threads=17)

# Test far above upper boundary
with pytest.raises(ValidationError, match="less than or equal to 16"):
MCPOptimizerConfig(embedding_threads=100)


def test_embedding_threads_string_conversion():
"""Test that embedding_threads handles string-to-int conversion."""
# Test string to int conversion
config = MCPOptimizerConfig(embedding_threads="4")
assert config.embedding_threads == 4
assert isinstance(config.embedding_threads, int)

# Test invalid string conversion
with pytest.raises(ValidationError):
MCPOptimizerConfig(embedding_threads="invalid")
96 changes: 95 additions & 1 deletion tests/test_embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np

from mcp_optimizer.config import MCPOptimizerConfig
from mcp_optimizer.embeddings import EmbeddingManager


Expand Down Expand Up @@ -36,7 +37,9 @@ def test_lazy_model_loading(self, mock_text_embedding):
model = manager.model
assert model == mock_model_instance
assert manager._model == mock_model_instance
mock_text_embedding.assert_called_once_with(model_name="BAAI/bge-small-en-v1.5")
mock_text_embedding.assert_called_once_with(
model_name="BAAI/bge-small-en-v1.5", threads=None
)

# Second access should reuse existing model
model2 = manager.model
Expand Down Expand Up @@ -145,3 +148,94 @@ def test_switch_model_same_model(self, mock_text_embedding):
model2 = manager.model
assert model2 == mock_model
assert mock_text_embedding.call_count == 1 # Should not create new model

@patch("mcp_optimizer.embeddings.TextEmbedding")
def test_initialization_with_explicit_threads(self, mock_text_embedding):
"""Test EmbeddingManager initialization with explicit thread count."""
mock_model_instance = Mock()
mock_text_embedding.return_value = mock_model_instance

# Test with threads=2
manager = EmbeddingManager(
model_name="BAAI/bge-small-en-v1.5", enable_cache=True, threads=2
)
assert manager.threads == 2

# Access model to trigger loading
model = manager.model
assert model == mock_model_instance
mock_text_embedding.assert_called_once_with(
model_name="BAAI/bge-small-en-v1.5", threads=2
)

@patch("mcp_optimizer.embeddings.TextEmbedding")
def test_initialization_with_boundary_threads(self, mock_text_embedding):
"""Test EmbeddingManager initialization with boundary thread values."""
mock_model_instance = Mock()
mock_text_embedding.return_value = mock_model_instance

# Test with threads=1 (lower boundary)
manager_min = EmbeddingManager(
model_name="BAAI/bge-small-en-v1.5", enable_cache=True, threads=1
)
assert manager_min.threads == 1
_ = manager_min.model # Trigger lazy loading
assert mock_text_embedding.call_count == 1
mock_text_embedding.assert_called_with(model_name="BAAI/bge-small-en-v1.5", threads=1)

# Test with threads=16 (upper boundary)
mock_text_embedding.reset_mock()
manager_max = EmbeddingManager(
model_name="BAAI/bge-small-en-v1.5", enable_cache=True, threads=16
)
assert manager_max.threads == 16
_ = manager_max.model # Trigger lazy loading
assert mock_text_embedding.call_count == 1
mock_text_embedding.assert_called_with(model_name="BAAI/bge-small-en-v1.5", threads=16)

@patch("mcp_optimizer.embeddings.TextEmbedding")
def test_config_integration_with_threads(self, mock_text_embedding):
"""Test that config properly passes threading to EmbeddingManager."""
mock_model_instance = Mock()
mock_text_embedding.return_value = mock_model_instance

# Test with config default (threads=2)
config_default = MCPOptimizerConfig()
manager_default = EmbeddingManager(
model_name=config_default.embedding_model_name,
enable_cache=config_default.enable_embedding_cache,
threads=config_default.embedding_threads,
)
assert manager_default.threads == 2
_ = manager_default.model # Trigger lazy loading
mock_text_embedding.assert_called_once_with(
model_name="BAAI/bge-small-en-v1.5", threads=2
)

# Test with config set to None
mock_text_embedding.reset_mock()
config_none = MCPOptimizerConfig(embedding_threads=None)
manager_none = EmbeddingManager(
model_name=config_none.embedding_model_name,
enable_cache=config_none.enable_embedding_cache,
threads=config_none.embedding_threads,
)
assert manager_none.threads is None
_ = manager_none.model # Trigger lazy loading
mock_text_embedding.assert_called_once_with(
model_name="BAAI/bge-small-en-v1.5", threads=None
)

# Test with config set to custom value (8)
mock_text_embedding.reset_mock()
config_custom = MCPOptimizerConfig(embedding_threads=8)
manager_custom = EmbeddingManager(
model_name=config_custom.embedding_model_name,
enable_cache=config_custom.enable_embedding_cache,
threads=config_custom.embedding_threads,
)
assert manager_custom.threads == 8
_ = manager_custom.model # Trigger lazy loading
mock_text_embedding.assert_called_once_with(
model_name="BAAI/bge-small-en-v1.5", threads=8
)