From 870b311fb1cb0df20489f01158d9a910782cb8d0 Mon Sep 17 00:00:00 2001 From: alvinouille Date: Tue, 29 Jul 2025 19:56:34 +0200 Subject: [PATCH 1/3] enable token tracking into response body and update readme to add packages/agent/config.toml info --- README.md | 25 +++++++++++-- .../cairo_coder/dspy/document_retriever.py | 37 +++++++++++++++++++ .../cairo_coder/dspy/generation_program.py | 7 ++++ .../src/cairo_coder/dspy/query_processor.py | 6 +++ python/src/cairo_coder/server/app.py | 4 ++ 5 files changed, 75 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f1aa70fa..c4e4da50 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,24 @@ Using Docker is highly recommended for a streamlined setup. For instructions on SIMILARITY_MEASURE="cosine" ``` -3. **Configure LangSmith (Optional but Recommended)** +3. **Configure Agents Package (`packages/agents/config.toml`)** + The ingester requires a configuration file in the `packages/agents` directory. Create `packages/agents/config.toml` with the following content: + + ```toml + [API_KEYS] + OPENAI = "your-openai-api-key-here" + + [VECTOR_DB] + POSTGRES_USER = "cairocoder" + POSTGRES_HOST = "postgres" + POSTGRES_DB = "cairocoder" + POSTGRES_PASSWORD = "cairocoder" + POSTGRES_PORT = "5432" + ``` + + Replace `"your-openai-api-key-here"` with your actual OpenAI API key. The database credentials should match those configured in your `.env` file. + +4. **Configure LangSmith (Optional but Recommended)** To monitor and debug LLM calls, configure LangSmith. - Create an account at [LangSmith](https://smith.langchain.com/) and create a project. @@ -95,7 +112,7 @@ Using Docker is highly recommended for a streamlined setup. For instructions on LANGSMITH_API_KEY="lsv2..." ``` -4. **Add your API keys to `python/.env`: (mandatory)** +5. **Add your API keys to `python/.env`: (mandatory)** ```yaml OPENAI_API_KEY="sk-..." @@ -105,7 +122,7 @@ Using Docker is highly recommended for a streamlined setup. For instructions on Add the API keys required for the LLMs you want to use. -5. **Run the ingesters (mandatory)** +6. **Run the ingesters (mandatory)** The ingesters are responsible for populating the vector database with the documentation sources. They need to be ran a first time, in isolation, so that the database is created. @@ -115,7 +132,7 @@ Using Docker is highly recommended for a streamlined setup. For instructions on Once the ingester completes, the database will be populated with embeddings from all supported documentation sources, making them available for the RAG pipeline. Stop the database when you no longer need it. -6. **Run the Application** +7. **Run the Application** Once the ingesters are done, start the database and the Python backend service using Docker Compose: ```bash docker compose up postgres backend --build diff --git a/python/src/cairo_coder/dspy/document_retriever.py b/python/src/cairo_coder/dspy/document_retriever.py index 118fe9ff..e7a1e929 100644 --- a/python/src/cairo_coder/dspy/document_retriever.py +++ b/python/src/cairo_coder/dspy/document_retriever.py @@ -703,3 +703,40 @@ def _enhance_context(self, query: str, context: list[Document]) -> list[Document ) ) return context + + def get_lm_usage(self) -> dict[str, int]: + """ + Get the total number of tokens used by the LLM. + Note: Document retrieval doesn't use LLM tokens directly, but embedding tokens might be tracked. + """ + # Document retrieval doesn't use LLM tokens, but we return empty dict for consistency + return {} + + +def create_document_retriever( + vector_store_config: VectorStoreConfig, + vector_db: SourceFilteredPgVectorRM | None = None, + max_source_count: int = 5, + similarity_threshold: float = 0.4, + embedding_model: str = "text-embedding-3-large", +) -> DocumentRetrieverProgram: + """ + Factory function to create a DocumentRetrieverProgram instance. + + Args: + vector_store_config: VectorStoreConfig for document retrieval + vector_db: Optional pre-initialized vector database instance + max_source_count: Maximum number of documents to retrieve + similarity_threshold: Minimum similarity score for document inclusion + embedding_model: OpenAI embedding model to use for reranking + + Returns: + Configured DocumentRetrieverProgram instance + """ + return DocumentRetrieverProgram( + vector_store_config=vector_store_config, + vector_db=vector_db, + max_source_count=max_source_count, + similarity_threshold=similarity_threshold, + embedding_model=embedding_model, + ) \ No newline at end of file diff --git a/python/src/cairo_coder/dspy/generation_program.py b/python/src/cairo_coder/dspy/generation_program.py index 9ac34f97..481d9b78 100644 --- a/python/src/cairo_coder/dspy/generation_program.py +++ b/python/src/cairo_coder/dspy/generation_program.py @@ -269,6 +269,13 @@ def forward(self, documents: list[Document]) -> dspy.Prediction: return dspy.Prediction(answer='\n'.join(formatted_docs)) + def get_lm_usage(self) -> dict[str, int]: + """ + Get the total number of tokens used by the LLM. + Note: MCP mode doesn't use LLM generation, so no tokens are consumed. + """ + # MCP mode doesn't use LLM generation, return empty dict + return {} def create_generation_program(program_type: str = "general") -> GenerationProgram: diff --git a/python/src/cairo_coder/dspy/query_processor.py b/python/src/cairo_coder/dspy/query_processor.py index d72b3e28..916f664b 100644 --- a/python/src/cairo_coder/dspy/query_processor.py +++ b/python/src/cairo_coder/dspy/query_processor.py @@ -226,6 +226,12 @@ def _is_test_query(self, query: str) -> bool: query_lower = query.lower() return any(keyword in query_lower for keyword in self.test_keywords) + def get_lm_usage(self) -> dict[str, int]: + """ + Get the total number of tokens used by the LLM. + """ + return self.retrieval_program.get_lm_usage() + def create_query_processor() -> QueryProcessorProgram: """ diff --git a/python/src/cairo_coder/server/app.py b/python/src/cairo_coder/server/app.py index 30f2546e..68032b65 100644 --- a/python/src/cairo_coder/server/app.py +++ b/python/src/cairo_coder/server/app.py @@ -456,7 +456,10 @@ async def _generate_chat_completion( # Somehow this is not always returning something (None). In that case, we're not capable of getting the # tracked usage. lm_usage = response.get_lm_usage() + logger.info(f"LM usage from response: {lm_usage}") + if not lm_usage: + logger.warning("No LM usage data available, setting defaults to 0") total_prompt_tokens = 0 total_completion_tokens = 0 total_tokens = 0 @@ -467,6 +470,7 @@ async def _generate_chat_completion( entry.get("completion_tokens", 0) for entry in lm_usage.values() ) total_tokens = sum(entry.get("total_tokens", 0) for entry in lm_usage.values()) + logger.info(f"Token usage - prompt: {total_prompt_tokens}, completion: {total_completion_tokens}, total: {total_tokens}") return ChatCompletionResponse( id=response_id, From 407fb4da861f6f20d8eb3bc6045470866ff8781d Mon Sep 17 00:00:00 2001 From: enitrat Date: Wed, 30 Jul 2025 22:00:55 +0200 Subject: [PATCH 2/3] use sum merge strategy for LM usage + add tests fmt --- python/src/cairo_coder/core/rag_pipeline.py | 18 +++- python/tests/unit/test_rag_pipeline.py | 104 +++++++++++++++++++- 2 files changed, 118 insertions(+), 4 deletions(-) diff --git a/python/src/cairo_coder/core/rag_pipeline.py b/python/src/cairo_coder/core/rag_pipeline.py index 4316f86c..b4ea986d 100644 --- a/python/src/cairo_coder/core/rag_pipeline.py +++ b/python/src/cairo_coder/core/rag_pipeline.py @@ -264,8 +264,22 @@ def get_lm_usage(self) -> dict[str, int]: """ generation_usage = self.generation_program.get_lm_usage() query_usage = self.query_processor.get_lm_usage() - # merge both dictionaries - return {**generation_usage, **query_usage} + + # Additive merge strategy + merged_usage = {} + + # Helper function to merge usage dictionaries + def merge_usage_dict(target: dict, source: dict) -> None: + for model_name, metrics in source.items(): + if model_name not in target: + target[model_name] = {} + for metric_name, value in metrics.items(): + target[model_name][metric_name] = target[model_name].get(metric_name, 0) + value + + merge_usage_dict(merged_usage, generation_usage) + merge_usage_dict(merged_usage, query_usage) + + return merged_usage def _format_chat_history(self, chat_history: list[Message]) -> str: """ diff --git a/python/tests/unit/test_rag_pipeline.py b/python/tests/unit/test_rag_pipeline.py index b1c878e8..ad931140 100644 --- a/python/tests/unit/test_rag_pipeline.py +++ b/python/tests/unit/test_rag_pipeline.py @@ -9,7 +9,6 @@ import pytest import dspy - from cairo_coder.core.rag_pipeline import ( RagPipeline, RagPipelineConfig, @@ -21,6 +20,17 @@ from cairo_coder.dspy.generation_program import GenerationProgram, McpGenerationProgram from cairo_coder.dspy.query_processor import QueryProcessorProgram +# Helper function to merge usage dictionaries +def merge_usage_dict(sources: list[dict]) -> dict: + """Merge usage dictionaries.""" + merged_usage = {} + for source in sources: + for model_name, metrics in source.items(): + if model_name not in merged_usage: + merged_usage[model_name] = {} + for metric_name, value in metrics.items(): + merged_usage[model_name][metric_name] = merged_usage[model_name].get(metric_name, 0) + value + return merged_usage @pytest.fixture(scope='function') def mock_pgvector_rm(): @@ -57,7 +67,8 @@ def mock_query_processor(self): resources=[DocumentSource.CAIRO_BOOK, DocumentSource.STARKNET_DOCS], ) processor.forward.return_value = mock_res - processor.aforward.return_value = mock_res + processor.aforward = AsyncMock(return_value=mock_res) + processor.get_lm_usage.return_value = {} return processor @pytest.fixture @@ -84,6 +95,7 @@ def mock_document_retriever(self): ] retriever.aforward = AsyncMock(return_value=mock_return_value) retriever.forward = Mock(return_value=mock_return_value) + retriever.get_lm_usage.return_value = {} return retriever @pytest.fixture @@ -101,6 +113,7 @@ async def mock_streaming(*args, **kwargs): yield chunk program.forward_streaming = mock_streaming + program.get_lm_usage.return_value = {} return program @pytest.fixture @@ -125,6 +138,7 @@ def mock_mcp_generation_program(self): Storage variables use #[storage] attribute. """ program.forward.return_value = dspy.Prediction(answer=mock_res) + program.get_lm_usage.return_value = {} return program @pytest.fixture @@ -409,6 +423,92 @@ def test_get_current_state(self, pipeline): assert state["config"]["max_source_count"] == 10 assert state["config"]["similarity_threshold"] == 0.4 + # Define reusable usage constants to keep tests DRY + _QUERY_USAGE_MINI = { + "gpt-4o-mini": {"prompt_tokens": 200, "completion_tokens": 100, "total_tokens": 300} + } + _GEN_USAGE_MINI = { + "gpt-4o-mini": {"prompt_tokens": 1000, "completion_tokens": 500, "total_tokens": 1500} + } + _GEN_USAGE_FULL = { + "gpt-4o": {"prompt_tokens": 1000, "completion_tokens": 500, "total_tokens": 1500} + } + + + @pytest.mark.parametrize( + "query_usage, generation_usage, expected_usage", + [ + pytest.param( + _QUERY_USAGE_MINI, + _GEN_USAGE_MINI, + merge_usage_dict([_QUERY_USAGE_MINI, _GEN_USAGE_MINI]), + id="same_model_aggregation", + ), + pytest.param( + _QUERY_USAGE_MINI, + _GEN_USAGE_FULL, + merge_usage_dict([_QUERY_USAGE_MINI, _GEN_USAGE_FULL]), + id="different_model_aggregation", + ), + pytest.param({}, {}, {}, id="empty_usage"), + pytest.param( + _QUERY_USAGE_MINI, {}, _QUERY_USAGE_MINI, id="partial_empty_usage" + ), + ], + ) + def test_get_lm_usage_aggregation( + self, pipeline, query_usage, generation_usage, expected_usage + ): + """Tests that get_lm_usage correctly aggregates token usage from its components.""" + # The RAG pipeline implementation merges dictionaries with query_usage taking precedence + pipeline.query_processor.get_lm_usage.return_value = query_usage + pipeline.generation_program.get_lm_usage.return_value = generation_usage + + result = pipeline.get_lm_usage() + + pipeline.query_processor.get_lm_usage.assert_called_once() + pipeline.generation_program.get_lm_usage.assert_called_once() + + assert result == expected_usage + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mcp_mode, expected_usage", + [ + pytest.param(True, _QUERY_USAGE_MINI, id="mcp_mode"), + pytest.param( + False, merge_usage_dict([_QUERY_USAGE_MINI, _GEN_USAGE_FULL]), id="normal_mode" + ), + ], + ) + async def test_get_lm_usage_after_streaming( + self, pipeline, mcp_mode, expected_usage + ): + """Tests that get_lm_usage works correctly after a streaming execution.""" + # To test token aggregation, we mock the return values of sub-components' + # get_lm_usage methods. The test logic simulates which components would + # be "active" in each mode by setting others to return empty usage. + pipeline.query_processor.get_lm_usage.return_value = self._QUERY_USAGE_MINI + if mcp_mode: + pipeline.generation_program.get_lm_usage.return_value = {} + # MCP program doesn't use an LM, so its usage is empty + pipeline.mcp_generation_program.get_lm_usage.return_value = {} + else: + pipeline.generation_program.get_lm_usage.return_value = self._GEN_USAGE_FULL + pipeline.mcp_generation_program.get_lm_usage.return_value = {} + + # Execute the pipeline to ensure the full flow is invoked. + async for _ in pipeline.forward_streaming( + query="How do I create a Cairo contract?", mcp_mode=mcp_mode + ): + pass + + result = pipeline.get_lm_usage() + + assert result == expected_usage + pipeline.query_processor.get_lm_usage.assert_called() + pipeline.generation_program.get_lm_usage.assert_called() + class TestRagPipelineFactory: """Test suite for RagPipelineFactory.""" From cd36b064275dd62ee9e18614a4ae0e8e347d5326 Mon Sep 17 00:00:00 2001 From: enitrat Date: Wed, 30 Jul 2025 23:17:05 +0200 Subject: [PATCH 3/3] fmt --- python/tests/unit/test_rag_pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/tests/unit/test_rag_pipeline.py b/python/tests/unit/test_rag_pipeline.py index ad931140..58e3d34c 100644 --- a/python/tests/unit/test_rag_pipeline.py +++ b/python/tests/unit/test_rag_pipeline.py @@ -7,8 +7,9 @@ from unittest.mock import AsyncMock, Mock, patch -import pytest import dspy +import pytest + from cairo_coder.core.rag_pipeline import ( RagPipeline, RagPipelineConfig, @@ -20,6 +21,7 @@ from cairo_coder.dspy.generation_program import GenerationProgram, McpGenerationProgram from cairo_coder.dspy.query_processor import QueryProcessorProgram + # Helper function to merge usage dictionaries def merge_usage_dict(sources: list[dict]) -> dict: """Merge usage dictionaries."""