Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,24 @@ Using Docker is highly recommended for a streamlined setup. For instructions on
SIMILARITY_MEASURE="cosine"
```

3. **Configure LangSmith (Optional but Recommended)**
3. **Configure Agents Package (`packages/agents/config.toml`)**
The ingester requires a configuration file in the `packages/agents` directory. Create `packages/agents/config.toml` with the following content:

```toml
[API_KEYS]
OPENAI = "your-openai-api-key-here"

[VECTOR_DB]
POSTGRES_USER = "cairocoder"
POSTGRES_HOST = "postgres"
POSTGRES_DB = "cairocoder"
POSTGRES_PASSWORD = "cairocoder"
POSTGRES_PORT = "5432"
```

Replace `"your-openai-api-key-here"` with your actual OpenAI API key. The database credentials should match those configured in your `.env` file.

4. **Configure LangSmith (Optional but Recommended)**
To monitor and debug LLM calls, configure LangSmith.

- Create an account at [LangSmith](https://smith.langchain.com/) and create a project.
Expand All @@ -95,7 +112,7 @@ Using Docker is highly recommended for a streamlined setup. For instructions on
LANGSMITH_API_KEY="lsv2..."
```

4. **Add your API keys to `python/.env`: (mandatory)**
5. **Add your API keys to `python/.env`: (mandatory)**

```yaml
OPENAI_API_KEY="sk-..."
Expand All @@ -105,7 +122,7 @@ Using Docker is highly recommended for a streamlined setup. For instructions on

Add the API keys required for the LLMs you want to use.

5. **Run the ingesters (mandatory)**
6. **Run the ingesters (mandatory)**

The ingesters are responsible for populating the vector database with the documentation sources. They need to be ran a first time, in isolation, so that the database is created.

Expand All @@ -115,7 +132,7 @@ Using Docker is highly recommended for a streamlined setup. For instructions on

Once the ingester completes, the database will be populated with embeddings from all supported documentation sources, making them available for the RAG pipeline. Stop the database when you no longer need it.

6. **Run the Application**
7. **Run the Application**
Once the ingesters are done, start the database and the Python backend service using Docker Compose:
```bash
docker compose up postgres backend --build
Expand Down
18 changes: 16 additions & 2 deletions python/src/cairo_coder/core/rag_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,22 @@ def get_lm_usage(self) -> dict[str, int]:
"""
generation_usage = self.generation_program.get_lm_usage()
query_usage = self.query_processor.get_lm_usage()
# merge both dictionaries
return {**generation_usage, **query_usage}

# Additive merge strategy
merged_usage = {}

# Helper function to merge usage dictionaries
def merge_usage_dict(target: dict, source: dict) -> None:
for model_name, metrics in source.items():
if model_name not in target:
target[model_name] = {}
for metric_name, value in metrics.items():
target[model_name][metric_name] = target[model_name].get(metric_name, 0) + value

merge_usage_dict(merged_usage, generation_usage)
merge_usage_dict(merged_usage, query_usage)

return merged_usage

def _format_chat_history(self, chat_history: list[Message]) -> str:
"""
Expand Down
37 changes: 37 additions & 0 deletions python/src/cairo_coder/dspy/document_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,40 @@ def _enhance_context(self, query: str, context: list[Document]) -> list[Document
)
)
return context

def get_lm_usage(self) -> dict[str, int]:
"""
Get the total number of tokens used by the LLM.
Note: Document retrieval doesn't use LLM tokens directly, but embedding tokens might be tracked.
"""
# Document retrieval doesn't use LLM tokens, but we return empty dict for consistency
return {}


def create_document_retriever(
vector_store_config: VectorStoreConfig,
vector_db: SourceFilteredPgVectorRM | None = None,
max_source_count: int = 5,
similarity_threshold: float = 0.4,
embedding_model: str = "text-embedding-3-large",
) -> DocumentRetrieverProgram:
"""
Factory function to create a DocumentRetrieverProgram instance.

Args:
vector_store_config: VectorStoreConfig for document retrieval
vector_db: Optional pre-initialized vector database instance
max_source_count: Maximum number of documents to retrieve
similarity_threshold: Minimum similarity score for document inclusion
embedding_model: OpenAI embedding model to use for reranking

Returns:
Configured DocumentRetrieverProgram instance
"""
return DocumentRetrieverProgram(
vector_store_config=vector_store_config,
vector_db=vector_db,
max_source_count=max_source_count,
similarity_threshold=similarity_threshold,
embedding_model=embedding_model,
)
7 changes: 7 additions & 0 deletions python/src/cairo_coder/dspy/generation_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ def forward(self, documents: list[Document]) -> dspy.Prediction:

return dspy.Prediction(answer='\n'.join(formatted_docs))

def get_lm_usage(self) -> dict[str, int]:
"""
Get the total number of tokens used by the LLM.
Note: MCP mode doesn't use LLM generation, so no tokens are consumed.
"""
# MCP mode doesn't use LLM generation, return empty dict
return {}


def create_generation_program(program_type: str = "general") -> GenerationProgram:
Expand Down
6 changes: 6 additions & 0 deletions python/src/cairo_coder/dspy/query_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ def _is_test_query(self, query: str) -> bool:
query_lower = query.lower()
return any(keyword in query_lower for keyword in self.test_keywords)

def get_lm_usage(self) -> dict[str, int]:
"""
Get the total number of tokens used by the LLM.
"""
return self.retrieval_program.get_lm_usage()


def create_query_processor() -> QueryProcessorProgram:
"""
Expand Down
4 changes: 4 additions & 0 deletions python/src/cairo_coder/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,10 @@ async def _generate_chat_completion(
# Somehow this is not always returning something (None). In that case, we're not capable of getting the
# tracked usage.
lm_usage = response.get_lm_usage()
logger.info(f"LM usage from response: {lm_usage}")

if not lm_usage:
logger.warning("No LM usage data available, setting defaults to 0")
total_prompt_tokens = 0
total_completion_tokens = 0
total_tokens = 0
Expand All @@ -467,6 +470,7 @@ async def _generate_chat_completion(
entry.get("completion_tokens", 0) for entry in lm_usage.values()
)
total_tokens = sum(entry.get("total_tokens", 0) for entry in lm_usage.values())
logger.info(f"Token usage - prompt: {total_prompt_tokens}, completion: {total_completion_tokens}, total: {total_tokens}")

return ChatCompletionResponse(
id=response_id,
Expand Down
106 changes: 104 additions & 2 deletions python/tests/unit/test_rag_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from unittest.mock import AsyncMock, Mock, patch

import pytest
import dspy
import pytest

from cairo_coder.core.rag_pipeline import (
RagPipeline,
Expand All @@ -22,6 +22,18 @@
from cairo_coder.dspy.query_processor import QueryProcessorProgram


# Helper function to merge usage dictionaries
def merge_usage_dict(sources: list[dict]) -> dict:
"""Merge usage dictionaries."""
merged_usage = {}
for source in sources:
for model_name, metrics in source.items():
if model_name not in merged_usage:
merged_usage[model_name] = {}
for metric_name, value in metrics.items():
merged_usage[model_name][metric_name] = merged_usage[model_name].get(metric_name, 0) + value
return merged_usage

@pytest.fixture(scope='function')
def mock_pgvector_rm():
"""Patch the vector database for the document retriever."""
Expand Down Expand Up @@ -57,7 +69,8 @@ def mock_query_processor(self):
resources=[DocumentSource.CAIRO_BOOK, DocumentSource.STARKNET_DOCS],
)
processor.forward.return_value = mock_res
processor.aforward.return_value = mock_res
processor.aforward = AsyncMock(return_value=mock_res)
processor.get_lm_usage.return_value = {}
return processor

@pytest.fixture
Expand All @@ -84,6 +97,7 @@ def mock_document_retriever(self):
]
retriever.aforward = AsyncMock(return_value=mock_return_value)
retriever.forward = Mock(return_value=mock_return_value)
retriever.get_lm_usage.return_value = {}
return retriever

@pytest.fixture
Expand All @@ -101,6 +115,7 @@ async def mock_streaming(*args, **kwargs):
yield chunk

program.forward_streaming = mock_streaming
program.get_lm_usage.return_value = {}
return program

@pytest.fixture
Expand All @@ -125,6 +140,7 @@ def mock_mcp_generation_program(self):
Storage variables use #[storage] attribute.
"""
program.forward.return_value = dspy.Prediction(answer=mock_res)
program.get_lm_usage.return_value = {}
return program

@pytest.fixture
Expand Down Expand Up @@ -409,6 +425,92 @@ def test_get_current_state(self, pipeline):
assert state["config"]["max_source_count"] == 10
assert state["config"]["similarity_threshold"] == 0.4

# Define reusable usage constants to keep tests DRY
_QUERY_USAGE_MINI = {
"gpt-4o-mini": {"prompt_tokens": 200, "completion_tokens": 100, "total_tokens": 300}
}
_GEN_USAGE_MINI = {
"gpt-4o-mini": {"prompt_tokens": 1000, "completion_tokens": 500, "total_tokens": 1500}
}
_GEN_USAGE_FULL = {
"gpt-4o": {"prompt_tokens": 1000, "completion_tokens": 500, "total_tokens": 1500}
}


@pytest.mark.parametrize(
"query_usage, generation_usage, expected_usage",
[
pytest.param(
_QUERY_USAGE_MINI,
_GEN_USAGE_MINI,
merge_usage_dict([_QUERY_USAGE_MINI, _GEN_USAGE_MINI]),
id="same_model_aggregation",
),
pytest.param(
_QUERY_USAGE_MINI,
_GEN_USAGE_FULL,
merge_usage_dict([_QUERY_USAGE_MINI, _GEN_USAGE_FULL]),
id="different_model_aggregation",
),
pytest.param({}, {}, {}, id="empty_usage"),
pytest.param(
_QUERY_USAGE_MINI, {}, _QUERY_USAGE_MINI, id="partial_empty_usage"
),
],
)
def test_get_lm_usage_aggregation(
self, pipeline, query_usage, generation_usage, expected_usage
):
"""Tests that get_lm_usage correctly aggregates token usage from its components."""
# The RAG pipeline implementation merges dictionaries with query_usage taking precedence
pipeline.query_processor.get_lm_usage.return_value = query_usage
pipeline.generation_program.get_lm_usage.return_value = generation_usage

result = pipeline.get_lm_usage()

pipeline.query_processor.get_lm_usage.assert_called_once()
pipeline.generation_program.get_lm_usage.assert_called_once()

assert result == expected_usage

@pytest.mark.asyncio
@pytest.mark.parametrize(
"mcp_mode, expected_usage",
[
pytest.param(True, _QUERY_USAGE_MINI, id="mcp_mode"),
pytest.param(
False, merge_usage_dict([_QUERY_USAGE_MINI, _GEN_USAGE_FULL]), id="normal_mode"
),
],
)
async def test_get_lm_usage_after_streaming(
self, pipeline, mcp_mode, expected_usage
):
"""Tests that get_lm_usage works correctly after a streaming execution."""
# To test token aggregation, we mock the return values of sub-components'
# get_lm_usage methods. The test logic simulates which components would
# be "active" in each mode by setting others to return empty usage.
pipeline.query_processor.get_lm_usage.return_value = self._QUERY_USAGE_MINI
if mcp_mode:
pipeline.generation_program.get_lm_usage.return_value = {}
# MCP program doesn't use an LM, so its usage is empty
pipeline.mcp_generation_program.get_lm_usage.return_value = {}
else:
pipeline.generation_program.get_lm_usage.return_value = self._GEN_USAGE_FULL
pipeline.mcp_generation_program.get_lm_usage.return_value = {}

# Execute the pipeline to ensure the full flow is invoked.
async for _ in pipeline.forward_streaming(
query="How do I create a Cairo contract?", mcp_mode=mcp_mode
):
pass

result = pipeline.get_lm_usage()

assert result == expected_usage
pipeline.query_processor.get_lm_usage.assert_called()
pipeline.generation_program.get_lm_usage.assert_called()


class TestRagPipelineFactory:
"""Test suite for RagPipelineFactory."""
Expand Down
Loading