# RagServer

This notebook is how the project began.
Initial version was built and tested (barely) right in the notebook.

## Current state

Current actual project code evolved considerabley, compared to the *All in one cell* version.
*All in one* is depricated, buggy, and shouldn't be used, but it may be useful to observe
train of thought that brought it to life.

However, this notebook can be used as a playground to interact with the individual server components.

In [None]:
pip install fastapi langchain-community langchain-huggingface uvicorn

In [None]:
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from pathlib import Path
import sys
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

In [None]:
from services.embedding import embedding_service
from services.llm import llm_service
from services.vector_store import vector_store
from services.snippet_cache import snippet_cache
from services.reranking import reranker
from core.retrieval import retrieve_documents, retrieve_documents_by_headers
embedding_service.initialize()
vector_store.initialize()
snippet_cache.initialize()
reranker.initialize()

In [None]:
from query.lang_detection import extract_code_from_query
explain_query = """
What the following code does?: ```
 var count = 0;

    var cs = self.beginParseAllowExotic();
    do {
        // these checks could be cheaper in terms of gas if we just try to parse cs;
        // but we want to throw exactly `ERROR_INVALID_C5` (not `9` "cell underflow")
        var (nBits, nRefs) = cs.remainingBitsAndRefsCount();
        assert (nRefs == 2)      throw ERROR_INVALID_C5;
        assert (nBits == 32 + 8) throw ERROR_INVALID_C5;

        val outAction = lazy OutActionWithSendMessageOnly.fromSlice(cs, {
            throwIfOpcodeDoesNotMatch: ERROR_INVALID_C5
        });

        if (isExternal) {
            assert (outAction.sendMode & SEND_MODE_IGNORE_ERRORS) throw ERROR_EXTERNAL_SEND_MESSAGE_MUST_HAVE_IGNORE_ERRORS_SEND_MODE;
        }

        cs = outAction.prev.beginParseAllowExotic();
        count += 1;
    } while (!cs.isEmpty());

    assert (count <= 255) throw ERROR_INVALID_C5;
    return self;```
"""

In [None]:
extract_code_from_query(explain_query)

In [None]:
extract_code_from_query("Let's deBuG something")

In [None]:
extract_code_from_query(" ".join(["many"] * 12))

In [None]:
extract_code_from_query("tolk current time")

In [None]:

extract_code_from_query("What implementations of jetton exist")
#extract_code_from_query("How to do something", settings.COMPLEX_QUERY_INDICATORS)

In [None]:
short_loop = """
do {
        var (nBits, nRefs) = cs.remainingBitsAndRefsCount();
        assert (nRefs == 2)      throw ERROR_INVALID_C5;
        assert (nBits == 32 + 8) throw ERROR_INVALID_C5;
    } while (!cs.isEmpty());
"""

In [None]:
loop_only = """
do {
        // these checks could be cheaper in terms of gas if we just try to parse cs;
        // but we want to throw exactly `ERROR_INVALID_C5` (not `9` "cell underflow")
        var (nBits, nRefs) = cs.remainingBitsAndRefsCount();
        assert (nRefs == 2)      throw ERROR_INVALID_C5;
        assert (nBits == 32 + 8) throw ERROR_INVALID_C5;

        val outAction = lazy OutActionWithSendMessageOnly.fromSlice(cs, {
            throwIfOpcodeDoesNotMatch: ERROR_INVALID_C5
        });

        if (isExternal) {
            assert (outAction.sendMode & SEND_MODE_IGNORE_ERRORS) throw ERROR_EXTERNAL_SEND_MESSAGE_MUST_HAVE_IGNORE_ERRORS_SEND_MODE;
        }

        cs = outAction.prev.beginParseAllowExotic();
        count += 1;
    } while (!cs.isEmpty());
"""

In [None]:
from core.retrieval import pull_context
test_concept = "do-while loop"
concepts = ['do-while loop',
   'cell parsing with beginParseAllowExotic and fromSlice',
   'assert-based error handling',
   'remainingBitsAndRefsCount usage',
   'send mode validation with SEND_MODE_IGNORE_ERRORS',
   'iteration counting with limit check']
for concept in concepts:
    initial_docs = await vector_store.search(concept)
    docs_ranked = await reranker.rerank(initial_docs, f"Explanation of {concept}")
    print(f"{concept}\n{docs_ranked}")

In [None]:
models_path = str(project_root.resolve() / ".models")
embedder = HuggingFaceEmbeddings(
        model_name="sentence-transformers/all-MiniLM-L6-v2",
        cache_folder=models_path
)
reranker_model = HuggingFaceCrossEncoder(model_name="cross-encoder/ms-marco-MiniLM-L6-v2",model_kwargs={"cache_folder": models_path})

In [None]:
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS
from langchain_community.vectorstores.utils import DistanceStrategy
from langchain_huggingface import HuggingFaceEmbeddings
from utils.json import load_json_dump
import faiss

docs_uniq = {}
docs = list(map(lambda doc: Document(id=doc["id"], page_content=doc["page_content"], metadata=doc["metadata"]), load_json_dump(str(project_root / "rag-data/latest_docs.jsonl"))))
for doc in docs:
    docs_uniq[doc.id] = doc
clean_docs = list(docs_uniq.values())

indexes_path = project_root / "indexes"
if not indexes_path.exists():
    indexes_path.mkdir()
full_storage = FAISS.from_documents(clean_docs, embedding=embedder)
full_storage.save_local(indexes_path / "full_separate_snip")


In [None]:
from core.reranking import CrossEncoderRerankerWithScores
test_reranker = CrossEncoderRerankerWithScores(model=reranker_model, score_threshold=1.0)
test_query = "do-while loop over OutAction cells"
test_set = full_storage.similarity_search_with_relevance_scores(test_query, k=10)
ranked_set = test_reranker.compress_documents(list(map(lambda doc: doc[0], test_set)), test_query)
#test_set[6][0].id == ranked_set[0].id
test_set[6]
ranked_set[1]
#test_set

In [None]:
doc_crumbs = map(lambda doc: Document(page_content=doc.metadata["crumbs"], id=doc.id), clean_docs)
top_lvl_idx = FAISS.from_documents(list(doc_crumbs), embedding=embedder)
top_lvl_idx.save_local(indexes_path / "top_level_index")

In [None]:
%%writefile libs/reranker.py
from langchain_classic.retrievers.document_compressors import CrossEncoderReranker
from langchain_core.documents import Document
from typing import Sequence, Optional
from concurrent.futures import ThreadPoolExecutor
import asyncio
import logging

log = logging.getLogger(__name__)
class CrossEncoderRerankerWithScores(CrossEncoderReranker):
    score_threshold: float = 0
    executor: Optional[ThreadPoolExecutor] = None
    _owns_executor: bool = False
    
    def __init__(self, *args, max_workers: int = 4, **kwargs):
        super().__init__(*args, **kwargs)
        # Dedicated thread pool for CPU-intensive reranking
        if self.executor is not None:
            self._owns_executor = True
        else:
            log.debug("Creating separate executor for re-ranker")
            self.executor = ThreadPoolExecutor(
                max_workers=max_workers,
                thread_name_prefix="reranker"
            )
        
    def compress_documents(
        self,
        documents: Sequence[Document],
        query: str,
    ) -> Sequence[Document]:
        if len(documents) == 0:
            return []
            
        scores = self.model.score(
            [(query, doc.page_content) for doc in documents]
        )
        
        docs_with_scores = list(zip(documents, scores))
        docs_with_scores.sort(key=lambda doc: doc[1], reverse=True)

        result_docs = []
        for (doc, score) in docs_with_scores:
            # Since array is sorted already, we continue till first miss.
            if score < self.score_threshold:
                break
            result_docs.append(doc)
            
        log.debug(f"Reranked {len(documents)} -> {len(result_docs)} documents")
        return result_docs
    
    async def acompress_documents(
        self,
        documents: Sequence[Document],
        query: str,
    ) -> Sequence[Document]:
        if len(documents) == 0:
            return []
        
        # Run blocking scoring in thread pool
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: self.compress_documents(documents, query)
        )
    def __del__(self):
        """Only shutdown if we own the executor."""
        if self._owns_executor and self.executor:
            self.executor.shutdown(wait=False)

In [None]:
from langchain_classic.retrievers.document_compressors import CrossEncoderReranker, EmbeddingsFilter, DocumentCompressorPipeline
from langchain_classic.retrievers import ContextualCompressionRetriever
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from reranker import CrossEncoderRerankerWithScores

test_query = "Implement jetton contract using tolk language"

orig_search = full_storage.similarity_search(test_query, k=20)
reranker = CrossEncoderRerankerWithScores(model=reranker_model, top_n=5)
#print(orig_search[:5])
print(f"From reranker {reranker.compress_documents(documents=orig_search, query=test_query)}")

In [None]:
irrelevant_query = "Why is the sky is blue"
test_encodder = CrossEncoderRerankerWithScores(model=reranker_model,
                                               top_n=5,
                                               score_threshold=1)
test_encodder.compress_documents(query=irrelevant_query, documents=full_storage.similarity_search(irrelevant_query))

In [None]:
orig_res = list(map(lambda doc: doc[0], orig_search))
test_encodder.compress_documents(documents=orig_res, query=test_query)

In [None]:

scores = reranker_model.score(
            [(test_query, doc.page_content) for doc in orig_res]
        )
with_rerank_scores = list(zip(orig_res, scores))
with_rerank_scores.sort(key=lambda doc: doc[1], reverse=True)



In [None]:
compressor.compress_documents(list(map(lambda doc: doc[0], orig_search)), "TOLK language syntax")

In [None]:
result_docs = compression_retriever.invoke("TOLK language syntax")
print(len(result_docs))
print(result_docs[1])
#compressor.compress_documents(list(map(lambda doc: doc[0], orig_search)), "TOLK language syntax")

In [None]:
compression_retriever.invoke("Why is the sky is blue")

In [None]:
top_lvl_idx.save_local("top_level_index")

# Initial all in one version

Initial version was developed right in the notebook
``` python

%%writefile ../rag_backend.py
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional, List, Union, Dict, Any, Literal, Iterable, Callable, Optional, Tuple, Sequence
import httpx
import asyncio
import json
import os
import re
import numpy as np
from datetime import datetime
import requests
import logging
import hashlib

from langchain_community.vectorstores import FAISS
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain_core.documents import Document
from langchain_huggingface import HuggingFaceEmbeddings
from libs.reranker import CrossEncoderRerankerWithScores
from libs.query_api import chat_completion
from utils.json import load_json_dump
from libs.openai_proxy import Message, ChatCompletionRequest, forward_to_openai, stream_openai_response
from concurrent.futures import ThreadPoolExecutor


app = FastAPI()

# Configuration – adjust if you changed ports/names
CHEAP_MODEL =  "gpt-4o-mini"#"GPT-5.1-Codex-Mini"# "gpt-4o-mini"
TOP_MODEL = "claude-sonnet-4.5"

logging.basicConfig(
    filename='tolkative-rag.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

log = logging.getLogger("rag_backend")
log.setLevel(logging.DEBUG)

# Scores for the ms-marco-MiniLM-L6-v2 model.
# It produces scores from -10 to 10
HIGHLY_RELEVANT = 5.0
SOMEWHAT_RELEVANT = 1.0
HEADERS_THRESHOLD = 0.45
# Global thread pool
rag_thread_pool: Optional[ThreadPoolExecutor] = None

@app.on_event("startup")
async def load_models_and_index() -> None:
    """
    Load the embedding model and the FAISS index.
    This runs only once, when the server boots.
    """
    global embedder, reranker_model, reranker, storage, headers_storage
    global tolk_ctx
    global snippets_index
    global reranker
    global lang_index
    # global tolk_grammar
    # with open("docs-data/languages/tolk/grammar/grammar.js", encoding="utf8") as grammar_file:
    #    tolk_grammar = grammar_file.read()

    #tolk_ctx = [f"Here is the TOLK language grammar {tolk_grammar}"]
    log.info("Loading HuggingFace embedding model …")
    embedder = HuggingFaceEmbeddings(
        model_name="sentence-transformers/all-MiniLM-L6-v2",
        cache_folder=".models"
    )
    log.info("Loading HuggingFace re-ranker model …")
    reranker_model = HuggingFaceCrossEncoder(
        model_name="cross-encoder/ms-marco-MiniLM-L6-v2",
        model_kwargs={"cache_folder": ".models"}
    )
    log.info("Loading FAISS index …")
    # The index directory must exist; raise a clear error otherwise.
    workdir = os.path.realpath(os.path.dirname(__file__))
    index_path = os.path.join(workdir, "indexes")
    doc_idx_path = os.path.join(index_path, "full_separate_snip")
    if not os.path.isdir(doc_idx_path):
        raise RuntimeError(f"FAISS index folder not found: {doc_idx_path}")
    # Documents text index
    storage = FAISS.load_local(
        doc_idx_path,
        embedder,
        allow_dangerous_deserialization=True,
    )
    # Crumbs index
    headers_storage = FAISS.load_local(
        os.path.join(index_path, "top_level_index"),
        embedder,
        allow_dangerous_deserialization=True
    )
    
    # Snippets hash table for demo purposes
    snippets = load_json_dump(os.path.join(workdir,"rag-data", "latest_snippets.jsonl"))
    snippets_index = {}
    for snip in snippets:
        #if "id" in snip:
        snippets_index[snip["id"]] = snip
    log.debug("Instantiating reranker")
    reranker = CrossEncoderRerankerWithScores(model=reranker_model, score_threshold=SOMEWHAT_RELEVANT, executor=get_thread_pool())
    log.info("Startup finished – model & index ready.")

@app.on_event("shutdown")
def shutdown_rag():
    _shutdown_thread_pool()

def get_thread_pool():
    global rag_thread_pool
    if rag_thread_pool is None:
        log.info("Starting thread pool")
        rag_thread_pool = ThreadPoolExecutor(
            max_workers= max(1, len(os.sched_getaffinity(0)) - 2),
            thread_name_prefix="rag_worker"
        )
    return rag_thread_pool
    
def _shutdown_thread_pool():
    if rag_thread_pool:
        log.info("Shutting down thread pool")
        rag_thread_pool.shutdown(wait=True)
        rag_thread_pool = None
        
async def execute_async(operation: Callable):
    thread_pool = get_thread_pool()
    event_loop = asyncio.get_event_loop()
    return await event_loop.run_in_executor(
        thread_pool,
        operation
    )
        
def get_system_context(user_id: Optional[str] = None) -> str:
    context = """
You are an experienced TON developer.
Context is wrapped in <context></context> tags.
First cite the used context "id","doc-url","concept" for every context provided.
Keep close attention to the context provided and don't mix the context from different programming
languages when generating code.

Provide comprehensive answer to the user request using the context supplied.
If no context provided, explicitly indicate that fact and do not respond anything else.

When code examples provided in the context satisfy user request:
- Return code snippets EXACTLY as they appear in the context
- Don't merge snippets comming from different files into a single one, unless explicitly told so.
- Do NOT modify, improve, or fix the code without explicit request.
- If you must reference code, use EXACT copy-paste
    """
    return Message(role="system", content=context)

def embed_query(query: str):
    return embedder.embed_query(query)
    
def all_of(predicates: Iterable[Callable]) -> Callable:
    """Return a predicate that returns True only if *all* sub‑predicates are True."""
    return lambda x: all(p(x) for p in predicates)

async def retrieve_documents_by_headers(query: str, query_vector: Sequence[float], threshold: float, filter_lambda = None, top_k=5) -> List[Document]:
    log.debug(f"Headers query: {query}")
    doc_batch = await execute_async(
        lambda: headers_storage.similarity_search_with_score_by_vector(query_vector, top_k=top_k)
    )
    header_docs = [
        doc[0] for doc in doc_batch
        #No re-ranking with headers, since
        #retrieve_documents(headers_storage, query, threshold, None, top_k)
        if doc[1] >= threshold
    ]
    if len(header_docs) > 0:
        result = storage.get_by_ids(list(map(lambda doc: doc.id, header_docs)))
        log.debug(f"Headers result {result}")
        if filter_lambda is not None:
            return list(filter(filter_lambda, result))
    return list()

async def retrieve_documents(cur_index: FAISS, query: str, query_vector: Sequence[float], threshold: float, filter_lambda = None, top_k=10):
    # We retrieve more to re-rank later
    initial_docs = await execute_async(
        lambda: cur_index.similarity_search_by_vector(query_vector, top_k=top_k * 4)
    )
    # Filter out by tags prior to re-rank
    filtered_docs = initial_docs
    if filter_lambda is not None:
        filtered_docs = list(filter(filter_lambda, filtered_docs))

    if len(filtered_docs) == 0:
        return filtered_docs

    # We migh want to re-rank the result based on itent, instead of a full query
    #rank_query = query if intent is None else intent
    log.debug(f"Ranking query {query}")
    docs_ranked = await reranker.acompress_documents(query=query, documents=filtered_docs)
    log.debug(f"Ranked docs: {docs_ranked}")
    return docs_ranked

def pack_assintent_context(content: str):
    return {"role": "assistant", "content": content}
class IntentInfo(BaseModel):
    intent: str
    concepts: List[str]
async def extract_intent(prompt: str) -> IntentInfo:
    intent_prompt = """
Separate the request intent from the user supplied code.
Populate the list of explicitly named programming, execution environment concepts or operations necessary for the engineer to know to satisfy the following request.
Respond in a following structure {"intent": "<user_intent>", "concepts": [<json list of concepts>]}

DONT include into the concepts:
- Generic terms like algorithms, blockchain, TON blockchain, etc
- Anything related to blockchains other than TON like Ethereum, Solana, Solidity, etc

If user is requesting to implement certain functionality, add example request to the concepts. (Jetton contract example, Wrappers example, Unit tests example, etc)

Provide response in the raw json format without any markdown.
""" + f"Request: {prompt}"
    try:
        completion_res = await execute_async(lambda: chat_completion([{"role":"user", "content":intent_prompt}], CHEAP_MODEL))
        log.debug(f"Completion result:{completion_res}")
        return IntentInfo(**json.loads(completion_res))
    except Exception as e:
        log.debug(e)
        return IntentInfo(intent=prompt, concepts=[])

def fence_code(snip: Document):
    lang = ""
    if "lang" in snip["metadata"]:
        lang = snip["metadata"]["lang"]
    return f"""
```{lang}
{snip["page_content"]}
```
    """

def filter_doc_path(doc, filter_path:str):
    doc_content = doc[0] if isinstance(doc, tuple) else doc
    return filter_path not in doc_content.metadata["from"]

async def render_docs_batch(
    docs: List[Document],
    skip_languages: Optional[List[str]] = None
) -> List[str]:
    """Async version (though not strictly necessary for dict lookups)."""
    
    if not docs:
        return []
    
    doc_snippet_mapping = {}
    all_snippet_ids = set()
    
    for doc in docs:
        snippet_refs = doc.metadata.get("snippets", [])
        doc_snippet_mapping[doc.id] = snippet_refs
        all_snippet_ids.update(snip_ref["id"] for snip_ref in snippet_refs)
    
    log.debug(f"Batch rendering {len(docs)} docs with {len(all_snippet_ids)} unique snippets")
    
    # Batch fetch (could be async if fetching from database)
    def _batch_fetch_snippets():
        snippets_cache = {}
        missing_snippets = []
        for snip_id in all_snippet_ids:
            if snip_id in snippets_index:
                snippet_obj = snippets_index[snip_id]
                if skip_languages is None or snippet_obj["metadata"]["lang"] not in skip_languages:
                    snippets_cache[snip_id] = snippet_obj
            else:
                missing_snippets.append(snip_id)
        
        return snippets_cache, missing_snippets
    
    # Run in thread pool (useful if snippets_index becomes I/O later)
    snippets_cache, missing_snippets = _batch_fetch_snippets() #await execute_async(_batch_fetch_snippets)
    
    if missing_snippets:
        log.warning(f"Missing {len(missing_snippets)} snippets")
    
    # Render synchronously (fast string operations)
    rendered_docs = [
        _render_single_doc(doc, doc_snippet_mapping[doc.id], snippets_cache)
        for doc in docs
    ]
    
    return rendered_docs
def _build_doc_url(doc_path: str) -> str:
    """Extract documentation URL from file path."""
    if "docs-data" not in doc_path:
        return ""
    
    parsed_path = doc_path.split('/')[1:]
    file_path = parsed_path[-1].split(".")[0]
    parsed_path[-1] = file_path
    
    return f'doc-url="https://docs.ton.org/{"/".join(parsed_path)}"'
def _render_single_doc(
    doc: Document,
    snippet_refs: List[Dict],
    snippets_cache: Dict[str, Any]
) -> str:
    """Render a single document using pre-fetched snippets."""
    
    total_delta = len(doc.metadata.get("concept", "")) + 2
    doc_content = doc.page_content
    
    for snip_ref in snippet_refs:
        snip_id = snip_ref["id"]
        
        # No lookup! Just dictionary access to pre-fetched data
        snippet_obj = snippets_cache.get(snip_id)
        if not snippet_obj:
            continue
            
        fenced_block = fence_code(snippet_obj)
        start_idx = total_delta + snip_ref["pos"]
        doc_content = doc_content[:start_idx] + fenced_block + doc_content[start_idx:]
        total_delta += len(fenced_block)
    
    # Build context string
    doc_url = _build_doc_url(doc.metadata.get("from", ""))
    # Don't polute context with the empty tags
    if len(doc_content.strip()) > 0:
        return f'<context id="{doc.id}" orig-doc="{doc.metadata["from"]}" concept="{doc.metadata["crumbs"]}" {doc_url}>{doc_content}</context>'
    return ""

# Going one hop recursive, to guarantee the context order.
def add_uniq_context(chunks:List[Document], watch_set: set, cur_depth = 0, max_depth = 1):
    uniq_chunks = []
    for chunk in chunks:
        if chunk.id not in watch_set:
            watch_set.add(chunk.id)
            uniq_chunks.append(chunk)
            ref_ids = chunk.metadata["child_nodes"] + chunk.metadata["references"]
            if len(ref_ids) > 0 and cur_depth + 1 <= max_depth:
                log.debug(f"Fetching child docs from {chunk}")
                log.debug(f"Direct children {len(chunk.metadata['child_nodes'])}")
                log.debug(f"References {len(chunk.metadata['references'])}")
                child_docs = add_uniq_context(storage.get_by_ids(ref_ids), watch_set, cur_depth + 1)
                log.debug(f"Fetched {child_docs}")
                uniq_chunks.extend(child_docs)
    return uniq_chunks


class ContextResponse(BaseModel):
    context: Message
    system: Optional[Message] = None,
    intent: IntentInfo
    
'''
def cosine_similarity(a: Sequence[float], b: Sequence[float]) -> float:
    """Calculate cosine similarity."""
    a = np.array(a)
    b = np.array(b)
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
'''

def detect_language_from_keywords(intent: str, concepts: List[str]) -> Tuple[Optional[Callable], Optional[Sequence[str]]]:
    """
    Simple language detection from keywords.
    """
    skip_languages: Optional[Sequence] = None
    filter_lambda: Optional[Callable] = None
    has_tolk = False
    has_func = False
    text = f"{intent} {' '.join(concepts)}".lower().split()

    for tok in text:
        if tok == "tolk":
            has_tolk = True
        elif tok == "func":
            has_func = True
        if has_tolk and has_func:
            break
    
    if has_tolk and not has_func:
        log.debug("Excluding FunC context!")
        filter_lambda = lambda doc: filter_doc_path(doc, "languages/func")
        skip_languages   = ["func","fift","tact", "typescript", "ts"]
    elif has_func and not has_tolk:
        log.debug("Excluding TOLK context!")
        filter_lambda = lambda doc: filter_doc_path(doc, "languages/tolk")
        skip_languages   = ["tolk","fift","tact", "typescript", "ts"]
    return filter_lambda, skip_languages

async def pull_context(orig_msgs: List[Message]) -> ContextResponse:
    user_msg = orig_msgs[-1].content
    context_chunks = []
    has_system = any(msg.role == "system" for msg in orig_msgs)
    system_msg = None
    # If there is no system message, put it before the user message
    if not has_system:
        log.debug("Adding system message...")
        system_msg =  get_system_context()

    log.debug(f"Received user message: {user_msg}")
    intent_topics = await extract_intent(user_msg) #IntentInfo(intent=user_msg, concepts=[])
    log.debug(f"Topics from intent: {intent_topics}")

    filter_lambda, skip_languages = detect_language_from_keywords(intent_topics.intent, intent_topics.concepts)

    queries_to_embed = [
        intent_topics.intent,  # For language detection + header search
        user_msg,                 # For text search
        *intent_topics.concepts # For concept searches
    ]

    embeddings = await execute_async(lambda: embedder.embed_documents(queries_to_embed))
    
    # Unpack embeddings
    intent_emb = embeddings[0]
    user_msg_emb = embeddings[1]
    concept_embs = zip(intent_topics.concepts, embeddings[2:])

    # Exclude the others if it belongs to domain with exclusive concepts
    # Otherwise LLM will produce the code with mix of languages
   
    # We can further parellalize all the retrieval, but that's a problem for another day
    id_set = set()
    context = add_uniq_context(
        await retrieve_documents_by_headers(intent_topics.intent, intent_emb, HEADERS_THRESHOLD, filter_lambda),
        id_set
    )
    log.debug(f"Context from headers: {context}")
    # Less so for full text and
    context_from_texts = add_uniq_context(
        await retrieve_documents(storage, user_msg, user_msg_emb, SOMEWHAT_RELEVANT, filter_lambda),
        id_set
    )
    log.debug(f"Context from texts: {context_from_texts}")
    context.extend(context_from_texts)

    for topic, topic_emb in concept_embs:
        additional_context = await retrieve_documents(storage, topic, topic_emb, SOMEWHAT_RELEVANT, filter_lambda)
        log.debug(f"From topic {topic} added context {additional_context}")
        context.extend(add_uniq_context(additional_context, id_set))

    rendered_docs = await render_docs_batch(context, skip_languages)
    rendered_ctx = "\n".join(rendered_docs).strip()
    #log.debug(context_msgs
    '''
    rendered_ctx = "\n".join(
        await asyncio.gather(
            *[render_doc(doc, skip_languages) for doc in context]
        )
    )
    '''

    return ContextResponse(
        context=Message(
        role="user", content=f"{rendered_ctx}\n\n{user_msg}"
        ),
        system = system_msg,
        intent = intent_topics
    )

@app.post("/context", response_model=ContextResponse)
async def rag_chat(request: Request):
    data = await request.json()
    #log.debug("Initial context:")
    #log.debug(context_msgs)
    messages = list(map(lambda msg: Message(**msg), data["messages"]))
    res_ctx = await pull_context(messages)
    log.debug(res_ctx)
    return res_ctx

# Models endpoint (as above)
class Model(BaseModel):
    id: str
    object: str = "model"
    created: int = 0
    owned_by: str = "my-organization"

class ModelsResponse(BaseModel):
    object: str = "list"
    data: List[Model]

@app.get("/v1/models")
async def list_models():
    return ModelsResponse(
        object="list",
        data=[
            Model(id="TOLKative model", owned_by="TON Core"),
        ]
    )

@app.post("/v1/chat/completions")
async def proxy_chat_completion(
    request: ChatCompletionRequest,
    authorization: Optional[str] = Header(None),
):
    """
    Proxy endpoint that adds context before forwarding to OpenAI
    """
    try:
        # Inject context into messages
        request_ctx = await pull_context(request.messages)
        log.debug(request_ctx)
        # Prepare the request for OpenAI
        openai_request = request.dict(exclude_unset=True)
        openai_request["messages"][-1] = request_ctx.context.dict()
        openai_request["model"] = TOP_MODEL
        if request_ctx.system is not None:
            openai_request["messages"].insert(0, request_ctx.system.dict())

        # Determine if streaming
        if request.stream:
            return StreamingResponse(
                stream_openai_response(openai_request, authorization),
                media_type="text/event-stream"
            )
        else:
            return await forward_to_openai(openai_request)

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
#
# Health check
@app.get("/health")
async def health_check():
    return {"status": "healthy", "service": "openai-proxy"}

# Middleware for logging
@app.middleware("http")
async def log_requests(request: Request, call_next):
    log.debug(f"Request: {request.method} {request.url.path} {await request.json()}")
    response = await call_next(request)
    return response


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
```

In [None]:
import uvicorn
import sys
sys.path.insert(0, "../")
def _run_uvicorn():
    uvicorn.run("main:app", host="0.0.0.0", port=4321, log_level="debug")

In [None]:
import threading

fastapi_thread = threading.Thread(target=_run_uvicorn, name="fastapi_thread", daemon=True)
fastapi_thread.start()
print("Fastapi running...")

In [None]:
import requests
def get_models():
    return requests.get(
        "http://tolkative-notebook:8000/v1/models"
    ).text
def get_context(prompt: str, **additional_params):
    default_dict: dict = {"model": "whatever", "messages": [{"role": "user", "content": prompt}]}
    return requests.post(
            "http://tolkative-notebook:8000/context",
            json={**default_dict, **additional_params},
        ).json()
def proxy_request(prompt: str, stream: bool = False):
    resp = requests.post(
        "http://tolkative-notebook:8000/v1/chat/completions",
        json={"stream": stream, "model": "whatever", "messages": [{"role": "user", "content": prompt}]},
        stream=stream
    )
    if not stream:
        return resp.json()
    
    # This is stream implementation for testing purposes only.
    data_chunks = []
    for chunk in resp:
        data_chunks.append(chunk)
    return json.loads(data_chunks)

In [None]:
get_models()

In [None]:
from core.retrieval import retrieve_documents_by_headers
simple_test_query = "What are the know issues with FunC?"
#vector_res = await vector_store.search(simple_test_query)
#await reranker.rerank(vector_res, simple_test_query)

In [None]:
get_context(simple_test_query)

In [None]:
get_context("FunC language design issues")

In [None]:
get_models()

In [None]:
res_ctx = get_context(explain_query)
res_ctx

In [None]:
get_context("write a tolk snippet that checks that current time is within range of last 5 minutes of any hour")

In [None]:
get_context("current time tolk")

In [None]:
get_context("Provide a basic jetton using tolk")

In [None]:
get_context("Provie jetton wallet implementation  in FunC with sharding and unit tests")

In [None]:
get_context("write a tolk snippet that checks that current time is within range of last 5 minutes of any hour", **{"max_tokens": 300})

In [None]:
get_context("write a tolk snippet that checks that current time is within range of last 5 minutes of any hour", **{"max_tokens": 5000})

In [None]:
get_context("How to implement sharded contract")

In [None]:
get_context("tolk get current time")

In [None]:
get_context(explain_query)

In [None]:
proxy_request("Why is the sky is blue?")

In [None]:
proxy_request("Implement a jetton contract using tolk. Modify it in such a way that minter admin is able to lock arbitrary wallet transfers. After update wrappers and unit tests for this scenario")

In [None]:
proxy_request("What PFXDICT instruction actually does?")

In [None]:
import socket
def is_port_open(host="127.0.0.1", port=4321):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(0.5)
    result = s.connect_ex((host, port))
    s.close()
    return result == 0

print("RAG port open?", is_port_open())