Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions API_DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ data: {"id":"...","object":"chat.completion.chunk","created":1718123456,"model":
data: [DONE]
```

#### Sources Events (streaming-only)

In addition to the OpenAI-compatible chunks above, Cairo Coder emits a custom SSE frame early in the stream with the documentation sources used for the answer. This enables frontends to display sources while the model is generating the response.

- The frame shape is: `data: {"type": "sources", "data": [{"title": string, "url": string}, ...]}`
- Clients should filter out objects with `type == "sources"` from the OpenAI chunks stream if they only expect OpenAI-compatible frames.

Example snippet:

```json
data: {"type":"sources","data":[{"metadata":{"title":"Introduction to Cairo","url":"https://book.cairo-lang.org/ch01-00-getting-started.html"}}]}
```

Notes:

- Exactly one sources event is typically emitted per request, shortly after retrieval completes.
- The `url` field maps to the ingester `sourceLink` when available; otherwise it may be a best-effort `url` present in metadata.

### Agent Selection

`POST /v1/agents/{agent_id}/chat/completions` validates that `{agent_id}` exists. Unknown IDs return `404 Not Found` with an OpenAI-style error payload. When the `agent_id` is omitted (`/v1/chat/completions` or `/chat/completions`) the server falls back to `cairo-coder`.
Expand All @@ -203,6 +221,7 @@ Setting either `mcp` or `x-mcp-mode` headers triggers **Model Context Protocol m

- Non-streaming responses still use the standard `chat.completion` envelope, but `choices[0].message.content` contains curated documentation blocks instead of prose answers.
- Streaming responses emit the same SSE wrapper; the payloads contain the formatted documentation as incremental `delta.content` strings.
- A streaming request in MCP mode also includes the same `{"type": "sources"}` event described above.
- MCP mode does not consume generation tokens (`usage.completion_tokens` reflects only retrieval/query processing).

Example non-streaming request:
Expand Down
5 changes: 1 addition & 4 deletions packages/ingester/src/ingesters/MarkdownIngester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,14 @@ export abstract class MarkdownIngester extends BaseIngester {
sections.forEach((section: ParsedSection, index: number) => {
const hash: string = calculateHash(section.content);

// If a baseUrl is provided in the config, build a source link.
// If useUrlMapping is true, map to specific page URLs with anchors.
// If useUrlMapping is false, only use the baseUrl.
const hasBase = !!this.config.baseUrl;
let sourceLink = '';

if (this.config.useUrlMapping) {
// Map to specific page URLs with anchors
const anchor = section.anchor || createAnchor(section.title);
const urlSuffix = this.config.urlSuffix ?? '';
sourceLink = `${this.config.baseUrl}/${page_name}${urlSuffix}${anchor ? `#${anchor}` : ''}`;
sourceLink = `${this.config.baseUrl}/${page_name}${this.config.urlSuffix}${anchor ? `#${anchor}` : ''}`;
} else {
// Only use the baseUrl
sourceLink = this.config.baseUrl;
Expand Down
23 changes: 10 additions & 13 deletions packages/ingester/src/ingesters/StarknetDocsIngester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class StarknetDocsIngester extends MarkdownIngester {
fileExtension: '.mdx',
chunkSize: 4096,
chunkOverlap: 512,
baseUrl: 'https://docs.starknet.io',
baseUrl: StarknetDocsIngester.BASE_URL,
urlSuffix: '',
useUrlMapping: true,
};
Expand Down Expand Up @@ -68,7 +68,7 @@ export class StarknetDocsIngester extends MarkdownIngester {
const exec = promisify(execCallback);
try {
// remove extractDir if it exists
await fs.rm(extractDir, { recursive: true, force: true });
await fs.rm(extractDir, { recursive: true, force: true }).catch(() => {});
await exec(`git clone ${repoUrl} ${extractDir}`);
} catch (error) {
logger.error('Error cloning repository:', error);
Expand All @@ -83,7 +83,7 @@ export class StarknetDocsIngester extends MarkdownIngester {
for (const folder of StarknetDocsIngester.DOCS_FOLDERS) {
const docsDir = path.join(extractDir, folder);
try {
const folderPages = await this.processDocFiles(this.config, docsDir);
const folderPages = await this.processDocFiles(docsDir);
pages.push(...folderPages);
logger.info(`Processed ${folderPages.length} pages from ${folder}/`);
} catch (error) {
Expand All @@ -101,16 +101,13 @@ export class StarknetDocsIngester extends MarkdownIngester {
/**
* Process documentation files from a directory
*
* @param config - The book configuration
* @param directory - The directory to process
* @returns Promise<BookPageDto[]> - Array of book pages
*/
private async processDocFiles(
config: BookConfig,
directory: string,
): Promise<BookPageDto[]> {
private async processDocFiles(directory: string): Promise<BookPageDto[]> {
const pages: BookPageDto[] = [];

async function processDirectory(dir: string) {
const processDirectory = async (dir: string) => {
const entries = await fs.readdir(dir, { withFileTypes: true });

for (const entry of entries) {
Expand All @@ -121,13 +118,13 @@ export class StarknetDocsIngester extends MarkdownIngester {
await processDirectory(fullPath);
} else if (
entry.isFile() &&
path.extname(entry.name).toLowerCase() === config.fileExtension
path.extname(entry.name).toLowerCase() === this.config.fileExtension
) {
// Process MDX files
const content = await fs.readFile(fullPath, 'utf8');

// Remove the repository path to get relative path
const relativePath = path.relative(directory, fullPath);
const relativePath = path.relative(this.getExtractDir(), fullPath);
const pageName = relativePath.replace('.mdx', '');

pages.push({
Expand All @@ -136,7 +133,7 @@ export class StarknetDocsIngester extends MarkdownIngester {
});
}
}
}
};

await processDirectory(directory);
return pages;
Expand Down Expand Up @@ -280,7 +277,7 @@ export class StarknetDocsIngester extends MarkdownIngester {
// Create a document for each section
sections.forEach((section, index: number) => {
logger.debug(
`Processed a section with title: ${section.title} and content length: ${section.content.length} from page: ${page_name}`,
`Processed a section with title: ${section.title} and content length: ${section.content.length} from page: ${page_name} with sourceUrl: ${sourceUrl}`,
);
const hash: string = calculateHash(section.content);
localChunks.push(
Expand Down
2 changes: 1 addition & 1 deletion python/optimizers/results/optimized_rag.json

Large diffs are not rendered by default.

56 changes: 35 additions & 21 deletions python/src/cairo_coder/core/rag_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import dspy
import structlog
from dspy.adapters import XMLAdapter
from dspy.adapters.baml_adapter import BAMLAdapter
from dspy.utils.callback import BaseCallback
from langsmith import traceable

Expand Down Expand Up @@ -47,7 +46,9 @@ def on_module_start(
logger.debug("Starting module", call_id=call_id, inputs=inputs)

# 2. Implement on_module_end handler to run a custom logging code.
def on_module_end(self, call_id: str, outputs: dict[str, Any], exception: Exception | None) -> None:
def on_module_end(
self, call_id: str, outputs: dict[str, Any], exception: Exception | None
) -> None:
step = "Reasoning" if self._is_reasoning_output(outputs) else "Acting"
logger.debug(f"== {step} Step ===")
for k, v in outputs.items():
Expand Down Expand Up @@ -109,7 +110,9 @@ async def _aprocess_query_and_retrieve_docs(
sources: list[DocumentSource] | None = None,
) -> tuple[ProcessedQuery, list[Document]]:
"""Process query and retrieve documents - shared async logic."""
processed_query = await self.query_processor.aforward(query=query, chat_history=chat_history_str)
processed_query = await self.query_processor.aforward(
query=query, chat_history=chat_history_str
)
self._current_processed_query = processed_query

# Use provided sources or fall back to processed query sources
Expand All @@ -119,7 +122,10 @@ async def _aprocess_query_and_retrieve_docs(
)

try:
with dspy.context(lm=dspy.LM("gemini/gemini-flash-lite-latest", max_tokens=10000, temperature=0.5), adapter=BAMLAdapter()):
with dspy.context(
lm=dspy.LM("gemini/gemini-flash-lite-latest", max_tokens=10000, temperature=0.5),
adapter=XMLAdapter(),
):
documents = await self.retrieval_judge.aforward(query=query, documents=documents)
except Exception as e:
logger.warning(
Expand All @@ -146,7 +152,9 @@ async def aforward(
processed_query, documents = await self._aprocess_query_and_retrieve_docs(
query, chat_history_str, sources
)
logger.info(f"Processed query: {processed_query.original[:100]}... and retrieved {len(documents)} doc titles: {[doc.metadata.get('title') for doc in documents]}")
logger.info(
f"Processed query: {processed_query.original[:100]}... and retrieved {len(documents)} doc titles: {[doc.metadata.get('title') for doc in documents]}"
)

if mcp_mode:
return await self.mcp_generation_program.aforward(documents)
Expand Down Expand Up @@ -183,7 +191,9 @@ async def aforward_streaming(
chat_history_str = self._format_chat_history(chat_history or [])

# Stage 2: Retrieve documents
yield StreamEvent(type=StreamEventType.PROCESSING, data="Retrieving relevant documents...")
yield StreamEvent(
type=StreamEventType.PROCESSING, data="Retrieving relevant documents..."
)

processed_query, documents = await self._aprocess_query_and_retrieve_docs(
query, chat_history_str, sources
Expand All @@ -194,7 +204,9 @@ async def aforward_streaming(

if mcp_mode:
# MCP mode: Return raw documents
yield StreamEvent(type=StreamEventType.PROCESSING, data="Formatting documentation...")
yield StreamEvent(
type=StreamEventType.PROCESSING, data="Formatting documentation..."
)

mcp_prediction = self.mcp_generation_program.forward(documents)
yield StreamEvent(type=StreamEventType.RESPONSE, data=mcp_prediction.answer)
Expand All @@ -205,9 +217,12 @@ async def aforward_streaming(
# Prepare context for generation
context = self._prepare_context(documents)

# Stream response generation. BAMLAdapter is not available for streaming, thus we swap it with the default adapter.
with dspy.context(lm=dspy.LM("gemini/gemini-flash-lite-latest", max_tokens=10000), adapter=XMLAdapter()):
async for chunk in self.generation_program.forward_streaming(
# Stream response generation. Use ChatAdapter for streaming, which performs better.
with dspy.context(
lm=dspy.LM("gemini/gemini-flash-lite-latest", max_tokens=10000),
adapter=dspy.adapters.ChatAdapter(),
):
async for chunk in self.generation_program.aforward_streaming(
query=query, context=context, chat_history=chat_history_str
):
yield StreamEvent(type=StreamEventType.RESPONSE, data=chunk)
Expand All @@ -218,6 +233,7 @@ async def aforward_streaming(
except Exception as e:
# Handle pipeline errors
import traceback

traceback.print_exc()
logger.error("Pipeline error", error=e)
yield StreamEvent(StreamEventType.ERROR, data=f"Pipeline error: {str(e)}")
Expand Down Expand Up @@ -269,24 +285,22 @@ def _format_chat_history(self, chat_history: list[Message]) -> str:

def _format_sources(self, documents: list[Document]) -> list[dict[str, Any]]:
"""
Format documents for sources event.
Format documents for the frontend-friendly sources event.

Produces a flat structure with `title` and `url` keys for each source,
mapping either `metadata.sourceLink` or `metadata.url` to the `url` field.

Args:
documents: List of retrieved documents

Returns:
List of formatted source information
List of dicts: [{"title": str, "url": str}, ...]
"""
sources = []
sources: list[dict[str, str]] = []
for doc in documents:
source_info = {
"title": doc.metadata.get("title", "Untitled"),
"url": doc.metadata.get("url", "#"),
"source_display": doc.metadata.get("source_display", "Unknown Source"),
"content_preview": doc.page_content[:SOURCE_PREVIEW_MAX_LEN]
+ ("..." if len(doc.page_content) > SOURCE_PREVIEW_MAX_LEN else ""),
}
sources.append(source_info)
if doc.source_link is None:
continue
sources.append({"metadata": {"title": doc.title, "url": doc.source_link}})

return sources

Expand Down
42 changes: 35 additions & 7 deletions python/src/cairo_coder/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
from typing import Any, Optional, TypedDict

from pydantic import BaseModel

Expand Down Expand Up @@ -40,6 +40,29 @@ class DocumentSource(str, Enum):
STARKNET_JS = "starknet_js"


class DocumentMetadata(TypedDict, total=False):
"""
Metadata structure for documents, matching the TypeScript ingester format.

All fields are optional (total=False) to maintain backward compatibility
with existing code that may not provide all fields.
"""

# Core identification fields
name: str # Page name (e.g., "ch01-01-installation")
title: str # Section title
uniqueId: str # Unique identifier (format: "{page_name}-{chunkNumber}")
contentHash: str # Hash of the content for change detection
chunkNumber: int # Index of this chunk within the page

# Source fields
source: DocumentSource # DocumentSource value (e.g., "cairo_book")
sourceLink: str # Full URL to the source documentation

# Additional metadata fields that may be present
similarity: Optional[float] # Similarity score from retrieval (if include_similarity=True)


@dataclass
class ProcessedQuery:
"""Processed query with extracted information."""
Expand All @@ -53,10 +76,15 @@ class ProcessedQuery:

@dataclass(frozen=True)
class Document:
"""Document with content and metadata."""
"""
Document with content and metadata.

The metadata field follows the DocumentMetadata structure defined by the TypeScript
ingester, ensuring consistency across the Python and TypeScript codebases.
"""

page_content: str
metadata: dict[str, Any] = field(default_factory=dict)
metadata: DocumentMetadata = field(default_factory=dict) # type: ignore[assignment]

@property
def source(self) -> str | None:
Expand All @@ -66,12 +94,12 @@ def source(self) -> str | None:
@property
def title(self) -> str | None:
"""Get document title from metadata."""
return self.metadata.get("title")
return self.metadata.get("title", self.page_content[:20])

@property
def url(self) -> str | None:
"""Get document URL from metadata."""
return self.metadata.get("url")
def source_link(self) -> str | None:
"""Get document source link from metadata."""
return self.metadata.get("sourceLink")

def __hash__(self) -> int:
"""Make Document hashable by using page_content and a frozen representation of metadata."""
Expand Down
17 changes: 9 additions & 8 deletions python/src/cairo_coder/dspy/generation_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async def aforward(self, query: str, context: str, chat_history: Optional[str] =
raise e
return None

async def forward_streaming(
async def aforward_streaming(
self, query: str, context: str, chat_history: Optional[str] = None
) -> AsyncGenerator[str, None]:
"""
Expand All @@ -134,22 +134,23 @@ async def forward_streaming(
if chat_history is None:
chat_history = ""


# Create a streamified version of the generation program
stream_generation = dspy.streamify(
self.generation_program,
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")], # type: ignore
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
)

# Execute the streaming generation. Do not swallow exceptions here;
# let them propagate so callers can emit structured error events.
output_stream = stream_generation( # type: ignore
query=query, context=context, chat_history=chat_history # type: ignore
output_stream = stream_generation(
query=query, context=context, chat_history=chat_history
)

# Process the stream and yield tokens
is_cached = True
async for chunk in output_stream:
if isinstance(chunk, dspy.streaming.StreamResponse): # type: ignore
if isinstance(chunk, dspy.streaming.StreamResponse):
# No streaming if cached
is_cached = False
# Yield the actual token content
Expand Down Expand Up @@ -215,9 +216,9 @@ def forward(self, documents: list[Document]) -> dspy.Prediction:

formatted_docs = []
for i, doc in enumerate(documents, 1):
source = doc.metadata.get("source_display", "Unknown Source")
url = doc.metadata.get("url", "#")
title = doc.metadata.get("title", f"Document {i}")
source = doc.source
url = doc.source_link
title = doc.title

formatted_doc = f"""
## {i}. {title}
Expand Down
Loading