## Routing Queries to Different Data Sources

A routing system sends each user query to the **most appropriate data source** based on its content.

### Data Sources
- **Documents**  
  PDFs, Word files, manuals  
  → handled by **Semantic Search**

- **Database**  
  Structured tables (SQL)  
  → handled by **Text-to-SQL generation**

- **APIs**  
  External services (weather, search, internal tools)  
  → handled by **Function Calling / REST API calls**

---

### Routing Flow
```
             ┌──────────────────────────────┐
             │            QUERY             │
             └───────────────┬──────────────┘
                             ▼
                      ┌────────────┐
                      │   ROUTER   │
                      └─────┬──────┘
     ┌──────────────────────┼────────────────────────┐
     ▼                      ▼                        ▼
┌─────────────────┐   ┌────────────────┐ ┌────────────────┐
│    DOCUMENTS    |   |    DATABASE    | |     APIs       │
│(Semantic Search)|   |  (Text-to-SQL) │ │(Function Calls)│
└─────────────────┘   └────────────────┘ └────────────────┘
```
---



### Summary
- Queries about **topics, explanations, paragraphs** → **Semantic Search**
- Queries about **rows, tables, numbers, filters** → **Text-to-SQL**
- Queries requiring **actions** or **external data** → **API Calls / Functions**


In [15]:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatMessagePromptTemplate, ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pydantic import BaseModel, Field
from typing import Literal
import os
from sk import my_gpt
from config import (
    llm, embeddings, setup_vectorstores,
    get_retriever, format_docs
)


# ROUTING SCHEMA (Pydantic)

This schema defines the **structured output** of a router.  
The LLM must return:

- **datasource**: one of the allowed routes  
- **reasoning**: short explanation of why that route was selected  
- **confidence**: a score from `0.0` to `1.0`

In [16]:
# ROUTING SCHEMA

class DataSourceRoute(BaseModel):
    datasource: Literal["documents", "database", "api", "general_llm"] = Field(
        description="The data source to route to"
    )
    reasoning: str = Field(description="Why this data source was chosen")
    confidence: float = Field(ge=0.0, le=1.0)

In [17]:
# ROUTER

def create_router():

    system = """You are an expert at routing queries to the appropriate data source.
    
    DATA SOURCES:
    - documents: Company docs, SOPs, policies, research papers, PDFs
    - database: Sales data, customer records, inventory, metrics (SQL queries)
    - api: CRM (Salesforce), payments (Stripe), accounting (Xero)
    - general_llm: General knowledge, definitions, concepts
    
    ROUTING RULES:
    - "What does the document/policy say..." → documents
    - "How many orders/customers...", "Total revenue..." → database
    - "Get leads from Salesforce...", "Check payment..." → api
    - "What is X?", "Explain..." → general_llm

    Choose the single most relevant data source. """

    router_prompt = ChatPromptTemplate.from_messages([
        ("system", system),
        ("human", "{question}")
    ])

    structured_llm = llm.with_structured_output(DataSourceRoute)
    
    router = router_prompt | structured_llm
    return router

router = create_router()

In [11]:
# DATA SOURCE HANDLERS

def handle_documents(question, vectorstores):
    """Semantic Search over documents."""
    all_docs = []

    for vs in vectorstores.values():
        docs = vs.similarity_search(question, k=2)
        all_docs.extend(docs)
   
    content = "\n\n".join(d.page_content for d in all_docs)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Answer based on context:\n\n{context}"),
        ("human", "{question}")
    ])
    chain = prompt | llm | StrOutputParser()
    return chain.invoke({"question": question, 
                         "context": context})

def handle_database(question):
    """Handle database queries with Text-to-SQL."""
    schema = """Tables: 
    - customers(id, name, email, city, state, created_at)
    - orders(id, customer_id, product_id, total, order_date)
    - products(id, name, category, price, stock)"""
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", f"Generate SQL. Schema: {schema}\nReturn ONLY SQL."),
        ("human", "{question}")
    ])
    chain = prompt | llm | StrOutputParser()
    sql = chain.invoke({"question": question})
    return f"Generated SQL:\n```sql\n{sql}\n```\n(In production, this executes against database)"

def handle_api(question):
    """Handle API queries with function calling recommendation."""
    prompt = ChatPromptTemplate.from_messages([
        ("system", """Determine which API and function to call.
            
    Available APIs:
    - Salesforce: get_leads(), get_contacts(), get_opportunities()
    - Stripe: get_payments(), get_invoices(), get_subscriptions()
    - HubSpot: get_contacts(), get_campaigns(), get_deals()
    
    Return the recommended API call."""),
        ("human", "{question}")
    ])
    chain = prompt | llm | StrOutputParser()
    result = (prompt | llm | StrOutputParser()).invoke({"question": question})
    return f"API Recommendation:\n{result}\n(In production, this makes actual API calls)"

def handle_general(question):
    """Direct LLM response."""
    return llm.invoke(question).content

In [12]:
def query_rag(question, vectorstores=None):

    
    # Route
    route = router.invoke({"question": question})
    print(f" Routing to: {route.datasource} (confidence: {route.confidence:.2f})")
    print(f"   {route.reasoning}")
    
    # Handle based on route (IF-ELSE LOGIC)
    if route.datasource == "documents":
        if vectorstores:
            answer = handle_documents(question, vectorstores)
        else:
            answer = "Documents not configured."
    elif route.datasource == "database":
        answer = handle_database(question)
    elif route.datasource == "api":
        answer = handle_api(question)
    else:
        answer = handle_general(question)
    
    print(f"\n ANSWER:\n{'-'*80}\n{answer}\n{'-'*80}\n")
    return {"datasource": route.datasource, "confidence": route.confidence, "answer": answer}
    

In [None]:
vectorstores = setup_vectorstores()
    
test_questions = [
        "What does the CV say about education?",          # → documents
        "How many orders did we get last month?",         # → database
        "Get latest leads from Salesforce",               # → api
        "What is machine learning?",                      # → general_llm
    ]
    
for q in test_questions:
    query_rag(q, vectorstores)