# pipelines.rag

> A pipeline module for Retrieval Augmented Generation (RAG)

In [None]:
# | default_exp pipelines.rag

In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
# | export
from typing import Optional, Dict, List, Any
from langchain_core.documents import Document
from onprem.utils import format_string
from onprem.llm import helpers

In [None]:
# | export

DEFAULT_QA_PROMPT = """Use the following pieces of context delimited by three backticks to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.

```{context}```

Question: {question}
Helpful Answer:"""


class RAGPipeline:
    """
    Retrieval-Augmented Generation pipeline for answering questions based on source documents.
    """
    
    def __init__(self, llm, qa_template: str = DEFAULT_QA_PROMPT):
        """
        Initialize RAG pipeline.
        
        Args:
            llm: The language model instance (LLM object)
            qa_template: Question-answering prompt template
        """
        self.llm = llm
        self.qa_template = qa_template
    
    def _retrieve_documents(self, 
                          question: str,
                          filters: Optional[Dict[str, str]] = None,
                          where_document = None,
                          folders: Optional[list] = None,
                          limit: int = 4,
                          score_threshold: float = 0.0,
                          table_k: int = 1,
                          table_score_threshold: float = 0.35) -> List[Document]:
        """
        Retrieve relevant documents from vector database.
        """
        docs = self.llm.semantic_search(
            question, 
            filters=filters, 
            where_document=where_document, 
            folders=folders,
            limit=limit,
            score_threshold=score_threshold
        )
        
        # Add table documents if requested
        if table_k > 0:
            table_filters = filters.copy() if filters else {}
            table_filters = dict(table_filters, table=True)
            table_docs = self.llm.semantic_search(
                f'{question} (table)', 
                filters=table_filters, 
                where_document=where_document,
                folders=folders,
                limit=table_k,
                score_threshold=table_score_threshold
            )
            if table_docs:
                docs.extend(table_docs[:limit])
        
        return docs
    
    def _generate_answer(self, question: str, context: str, **kwargs) -> str:
        """
        Generate answer using the language model.
        """
        prompt = format_string(
            self.qa_template,
            question=question,
            context=context
        )
        return self.llm.prompt(prompt, **kwargs)
    
    def ask(self,
            question: str, # question as string
            contexts: Optional[list] = None, # optional list of contexts to answer question. If None, retrieve from vectordb.
            qa_template: Optional[str] = None, # question-answering prompt template to use
            filters: Optional[Dict[str, str]] = None, # filter sources by metadata values using Chroma metadata syntax (e.g., {'table':True})
            where_document = None, # filter sources by document content (syntax varies by store type)
            folders: Optional[list] = None, # folders to search (needed because LangChain does not forward "where" parameter)
            limit: Optional[int] = None, # Number of sources to consider. If None, use `LLM.rag_num_source_docs`.
            score_threshold: Optional[float] = None, # minimum similarity score of source. If None, use `LLM.rag_score_threshold`.
            table_k: int = 1, # maximum number of tables to consider when generating answer
            table_score_threshold: float = 0.35, # minimum similarity score for table to be considered in answer
            selfask: bool = False, # If True, use an agentic Self-Ask prompting strategy.
            **kwargs) -> Dict[str, Any]:
        """
        Answer a question using RAG approach.
        
        Args:
            question: Question to answer
            contexts: Optional list of contexts. If None, retrieve from vectordb
            qa_template: Optional custom QA prompt template
            filters: Filter sources by metadata values
            where_document: Filter sources by document content
            folders: Folders to search
            limit: Number of sources to consider
            score_threshold: Minimum similarity score
            table_k: Maximum number of tables to consider
            table_score_threshold: Minimum similarity score for tables
            selfask: Use agentic Self-Ask prompting strategy
            **kwargs: Additional arguments passed to LLM.prompt
            
        Returns:
            Dictionary with keys: answer, source_documents, question
        """
        template = qa_template or self.qa_template
        limit = limit if limit is not None else self.llm.rag_num_source_docs
        score_threshold = score_threshold if score_threshold is not None else self.llm.rag_score_threshold
        
        if selfask and helpers.needs_followup(question, self.llm):
            return self._ask_with_decomposition(
                question, template, filters, where_document, folders,
                limit, score_threshold, table_k, table_score_threshold, **kwargs
            )
        else:
            return self._ask_direct(
                question, contexts, template, filters, where_document, folders,
                limit, score_threshold, table_k, table_score_threshold, **kwargs
            )
    
    def _ask_direct(self,
                   question: str,
                   contexts: Optional[list],
                   qa_template: str,
                   filters: Optional[Dict[str, str]],
                   where_document,
                   folders: Optional[list],
                   limit: int,
                   score_threshold: float,
                   table_k: int,
                   table_score_threshold: float,
                   **kwargs) -> Dict[str, Any]:
        """Direct RAG without decomposition."""
        if contexts is None:
            docs = self._retrieve_documents(
                question, filters, where_document, folders,
                limit, score_threshold, table_k, table_score_threshold
            )
            context = '\n\n'.join([d.page_content for d in docs])
        else:
            docs = [Document(page_content=c, metadata={'source': '<SUBANSWER>'}) for c in contexts]
            context = "\n\n".join(contexts)
        
        answer = self._generate_answer(question, context, **kwargs)
        
        return {
            'question': question,
            'answer': answer,
            'source_documents': docs
        }
    
    def _ask_with_decomposition(self,
                               question: str,
                               qa_template: str,
                               filters: Optional[Dict[str, str]],
                               where_document,
                               folders: Optional[list],
                               limit: int,
                               score_threshold: float,
                               table_k: int,
                               table_score_threshold: float,
                               **kwargs) -> Dict[str, Any]:
        """RAG with question decomposition (Self-Ask)."""
        subquestions = helpers.decompose_question(question, self.llm)
        subanswers = []
        sources = []
        
        for q in subquestions:
            res = self._ask_direct(
                q, None, qa_template, filters, where_document, folders,
                limit, score_threshold, table_k, table_score_threshold, **kwargs
            )
            subanswers.append(res['answer'])
            for doc in res['source_documents']:
                doc.metadata = dict(doc.metadata, subquestion=q)
            sources.extend(res['source_documents'])
        
        # Generate final answer based on subanswers
        res = self._ask_direct(
            question, subanswers, qa_template, filters, where_document, folders,
            limit, score_threshold, table_k, table_score_threshold, **kwargs
        )
        res['source_documents'] = sources
        
        return res

In [None]:
show_doc(RAGPipeline.ask)

---

### RAGPipeline.ask

>      RAGPipeline.ask (question:str, contexts:Optional[list]=None,
>                       qa_template:Optional[str]=None,
>                       filters:Optional[Dict[str,str]]=None,
>                       where_document=None, folders:Optional[list]=None,
>                       limit:Optional[int]=None,
>                       score_threshold:Optional[float]=None, table_k:int=1,
>                       table_score_threshold:float=0.35, selfask:bool=False,
>                       **kwargs)

*Answer a question using RAG approach.

Args:
    question: Question to answer
    contexts: Optional list of contexts. If None, retrieve from vectordb
    qa_template: Optional custom QA prompt template
    filters: Filter sources by metadata values
    where_document: Filter sources by document content
    folders: Folders to search
    limit: Number of sources to consider
    score_threshold: Minimum similarity score
    table_k: Maximum number of tables to consider
    table_score_threshold: Minimum similarity score for tables
    selfask: Use agentic Self-Ask prompting strategy
    **kwargs: Additional arguments passed to LLM.prompt

Returns:
    Dictionary with keys: answer, source_documents, question*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| question | str |  | question as string |
| contexts | Optional | None | optional list of contexts to answer question. If None, retrieve from vectordb. |
| qa_template | Optional | None | question-answering prompt template to use |
| filters | Optional | None | filter sources by metadata values using Chroma metadata syntax (e.g., {'table':True}) |
| where_document | NoneType | None | filter sources by document content (syntax varies by store type) |
| folders | Optional | None | folders to search (needed because LangChain does not forward "where" parameter) |
| limit | Optional | None | Number of sources to consider. If None, use `LLM.rag_num_source_docs`. |
| score_threshold | Optional | None | minimum similarity score of source. If None, use `LLM.rag_score_threshold`. |
| table_k | int | 1 | maximum number of tables to consider when generating answer |
| table_score_threshold | float | 0.35 | minimum similarity score for table to be considered in answer |
| selfask | bool | False | If True, use an agentic Self-Ask prompting strategy. |
| kwargs | VAR_KEYWORD |  |  |
| **Returns** | **Dict** |  |  |

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()