In [1]:
import nest_asyncio

nest_asyncio.apply()

In [2]:
!pip install \
llama-index \
llama-index-tools-tavily-research \
llama-index-embeddings-huggingface \
    llama-index-llms-huggingface \
llama-index-llms-huggingface-api \
llama-index-vector-stores-pinecone \
transformers \
torch \
sentence-transformers \
pinecone-client \
tavily-python \
llama-index-readers-file \
accelerate bitsandbytes

Collecting llama-index
  Downloading llama_index-0.12.31-py3-none-any.whl.metadata (12 kB)
Collecting llama-index-tools-tavily-research
  Downloading llama_index_tools_tavily_research-0.3.0-py3-none-any.whl.metadata (3.0 kB)
Collecting llama-index-embeddings-huggingface
  Downloading llama_index_embeddings_huggingface-0.5.3-py3-none-any.whl.metadata (767 bytes)
Collecting llama-index-llms-huggingface
  Downloading llama_index_llms_huggingface-0.5.0-py3-none-any.whl.metadata (2.8 kB)
Collecting llama-index-llms-huggingface-api
  Downloading llama_index_llms_huggingface_api-0.4.1-py3-none-any.whl.metadata (1.3 kB)
Collecting llama-index-vector-stores-pinecone
  Downloading llama_index_vector_stores_pinecone-0.4.5-py3-none-any.whl.metadata (709 bytes)
Collecting pinecone-client
  Downloading pinecone_client-6.0.0-py3-none-any.whl.metadata (3.4 kB)
Collecting tavily-python
  Downloading tavily_python-0.5.4-py3-none-any.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [3]:
!mkdir -p 'data/'

In [4]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore


class PrepEvent(Event):
    """Prep event (prepares for retrieval)."""

    pass


class RetrieveEvent(Event):
    """Retrieve event (gets retrieved nodes)."""

    retrieved_nodes: list[NodeWithScore]


class RelevanceEvalEvent(Event):
    """Relevance evaluation event (gets results of relevance evaluation)."""

    relevant_results: list[str]


class TextExtractEvent(Event):
    """Text extract event. Extracts relevant text and concatenates."""

    relevant_text: str


class QueryEvent(Event):
    """Query event. Queries given relevant text and search text."""

    relevant_text: str
    search_text: str

In [5]:
from google.colab import userdata
pc_api_key = userdata.get('PINECONE_API_KEY')
tavily_ai_apikey = userdata.get('TAVILY_API_KEY')
HF_TOKEN = userdata.get("HF_TOKEN")

In [8]:
from llama_index.vector_stores.pinecone import PineconeVectorStore
from llama_index.core import StorageContext
import os
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(
        api_key=f"{pc_api_key}",
        environment="us-east-1-aws",
        index_name="crag"
    )
index_name = "crag"


# Now do stuff
if index_name not in pc.list_indexes().names():
        pc.create_index(
            name=index_name,
            dimension=768,
            metric='cosine',
            spec=ServerlessSpec(
                cloud='aws',
                region='us-east-1'
            )
        )

# Connect to your existing index
index = pc.Index(index_name)  # replace with your actual index name

# Get index stats
stats = index.describe_index_stats()
totVectorCount = stats['total_vector_count']
print(totVectorCount)

0


In [9]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
from llama_index.core.settings import Settings
from tavily import TavilyClient
from llama_index.core.workflow import (
    Workflow,
    step,
    Context,
    StartEvent,
    StopEvent,
)
from llama_index.core import (
    VectorStoreIndex,
    Document,
    PromptTemplate,
    SummaryIndex,
)
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.tools.tavily_research.base import TavilyToolSpec
from llama_index.core.base.base_retriever import BaseRetriever

# Initialize the embedding model
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")
# Settings.embed_model = HuggingFaceEmbedding(model_name="thenlper/gte-base")
# Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")


DEFAULT_RELEVANCY_PROMPT_TEMPLATE = PromptTemplate(
    template="""As a grader, your task is to evaluate the relevance of a document retrieved in response to a user's question.

    Retrieved Document:
    -------------------
    {context_str}

    User Question:
    --------------
    {query_str}

    Evaluation Criteria:
    - Consider whether the document contains keywords or topics related to the user's question.
    - The evaluation should not be overly stringent; the primary objective is to identify and filter out clearly irrelevant retrievals.

    Decision:
    - Assign a binary score to indicate the document's relevance.
    - Use 'yes' if the document is relevant to the question, or 'no' if it is not.

    Please provide your binary score ('yes' or 'no') below to indicate the document's relevance to the user question."""
)

# DEFAULT_TRANSFORM_QUERY_TEMPLATE = PromptTemplate(
#     template="""Your task is to refine a query to ensure it is highly effective for retrieving relevant search results. \n
#     Analyze the given input to grasp the core semantic intent or meaning. \n
#     Original Query:
#     \n ------- \n
#     {query_str}
#     \n ------- \n
#     Your goal is to rephrase or enhance this query to improve its search performance. Ensure the revised query is concise and directly aligned with the intended search objective. \n
#     Respond with the optimized query only:"""
# )

DEFAULT_TRANSFORM_QUERY_TEMPLATE = PromptTemplate(
    template="""Rephrase the following question into a concise and natural search engine query that would work well for web search:

    Original Question:
    {query_str}

    Revised Query:"""
)

class CorrectiveRAGWorkflow(Workflow):
    @step
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
      """Ingest step (for ingesting docs and initializing Pinecone index)."""
      documents: list[Document] | None = ev.get("documents")
      if documents is None:
          return None

      # Use the Pinecone vector store
      pinecone_vs = PineconeVectorStore(index_name=index_name,api_key=f"{pc_api_key}", environment="us-east-1-aws")

      # Check if documents are provided (for new index creation)
      if documents:
        # Create new index from documents
        storage_context = StorageContext.from_defaults(vector_store=pinecone_vs)
        index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)
      else:
        # Use existing index
        index = VectorStoreIndex.from_vector_store(pinecone_vs)

      return StopEvent(result=index)


    @step
    async def prepare_for_retrieval(
        self, ctx: Context, ev: StartEvent
    ) -> PrepEvent | None:
        """Prepare for retrieval."""

        query_str: str | None = ev.get("query_str")
        retriever_kwargs: dict | None = ev.get("retriever_kwargs", {})

        if query_str is None:
            return None

        tavily_ai_apikey: str | None = ev.get("tavily_ai_apikey")
        index = ev.get("index")

        # llm = HuggingFaceLLM(model_name="meta-llama/Meta-Llama-3-8B-Instruct",tokenizer_name="meta-llama/Meta-Llama-3-8B-Instruct") no gpu hence cant do the 4 bit quantization.
        # llm = HuggingFaceLLM(model_name="mistralai/Mistral-7B-Instruct-v0.1",tokenizer_name="mistralai/Mistral-7B-Instruct-v0.1")
        # llm = HuggingFaceInferenceAPI(model_name="mistralai/Mistral-7B-Instruct-v0.1",tokenizer_name="mistralai/Mistral-7B-Instruct-v0.1",token=HF_TOKEN) not working because
        # Hugging Face Inference API only supports models < 10GB unless you’re on paid tiers
        llm = HuggingFaceInferenceAPI(model_name="microsoft/Phi-3-mini-4k-instruct",tokenizer_name="microsoft/Phi-3-mini-4k-instruct",token= HF_TOKEN)
        Settings.llm = llm
        await ctx.set(
            "relevancy_pipeline",
            QueryPipeline(chain=[DEFAULT_RELEVANCY_PROMPT_TEMPLATE, llm]),
        )
        await ctx.set(
            "transform_query_pipeline",
            QueryPipeline(chain=[DEFAULT_TRANSFORM_QUERY_TEMPLATE, llm]),
        )

        await ctx.set("llm", llm)
        await ctx.set("index", index)
        await ctx.set("tavily_tool", TavilyToolSpec(api_key=tavily_ai_apikey))

        await ctx.set("query_str", query_str)
        await ctx.set("retriever_kwargs", retriever_kwargs)

        return PrepEvent()

    @step
    async def retrieve(
        self, ctx: Context, ev: PrepEvent
    ) -> RetrieveEvent | None:
        """Retrieve the relevant nodes for the query."""
        query_str = await ctx.get("query_str")
        retriever_kwargs = await ctx.get("retriever_kwargs")

        if query_str is None:
            return None

        index = await ctx.get("index", default=None)
        tavily_tool = await ctx.get("tavily_tool", default=None)
        if not (index or tavily_tool):
            raise ValueError(
                "Index and tavily tool must be constructed. Run with 'documents' and 'tavily_ai_apikey' params first."
            )

        retriever: BaseRetriever = index.as_retriever(**retriever_kwargs)
        result = retriever.retrieve(query_str)
        await ctx.set("retrieved_nodes", result)
        await ctx.set("query_str", query_str)
        return RetrieveEvent(retrieved_nodes=result)

    @step
    async def eval_relevance(
        self, ctx: Context, ev: RetrieveEvent
    ) -> RelevanceEvalEvent:
        """Evaluate relevancy of retrieved documents with the query."""
        retrieved_nodes = ev.retrieved_nodes
        query_str = await ctx.get("query_str")

        relevancy_results = []
        for node in retrieved_nodes:
            relevancy_pipeline = await ctx.get("relevancy_pipeline")
            relevancy = relevancy_pipeline.run(
                context_str=node.text, query_str=query_str
            )
            # relevancy_results.append(relevancy.message.content.lower().strip())
            score = relevancy.message.content.lower().strip()
            print(f"[Relevance Evaluation] Doc snippet: {node.text[:100]}... → Score: {score}")
            relevancy_results.append(score)

        await ctx.set("relevancy_results", relevancy_results)
        return RelevanceEvalEvent(relevant_results=relevancy_results)

    @step
    async def extract_relevant_texts(
        self, ctx: Context, ev: RelevanceEvalEvent
    ) -> TextExtractEvent:
        """Extract relevant texts from retrieved documents."""
        retrieved_nodes = await ctx.get("retrieved_nodes")
        relevancy_results = ev.relevant_results

        relevant_texts = [
            retrieved_nodes[i].text
            for i, result in enumerate(relevancy_results)
            if result == "yes"
        ]

        result = "\n".join(relevant_texts)
        return TextExtractEvent(relevant_text=result)

    @step
    async def transform_query_pipeline(
        self, ctx: Context, ev: TextExtractEvent
    ) -> QueryEvent:
        """Search the transformed query with Tavily API."""
        relevant_text = ev.relevant_text
        relevancy_results = await ctx.get("relevancy_results")
        query_str = await ctx.get("query_str")

        # If any document is found irrelevant, transform the query string for better search results.
        if "no" in relevancy_results:
            print("[Corrective RAG] At least one irrelevant result found. Using Tavily to enhance query...")  #Added

            qp = await ctx.get("transform_query_pipeline")
            transformed_query_str = qp.run(query_str=query_str).message.content
            print(f"[Corrective RAG] Transformed query: {transformed_query_str}") #Added
            # Conduct a search with the transformed query string and collect the results.
            client = TavilyClient(f"{tavily_ai_apikey}")
            response = client.search(query=transformed_query_str)
            search_results = response['results']
            # tavily_tool = await ctx.get("tavily_tool")
            # search_results = tavily_tool.search(
            #     transformed_query_str, max_results=5
            # )
            print(f"[Corrective RAG] Tavily returned {len(search_results)} search results.") #Added

            if search_results:
              print(f"[Corrective RAG] Tavily best search result: {search_results[0]['content']}")
            else:
              print("[Corrective RAG] Tavily returned no usable results.")

            search_text = "\n".join([result.get("content", "") for result in search_results])
        else:
            print("[Corrective RAG] All results are relevant. Tavily not used.") #Added
            print("[Corrective RAG] Relevant text when Tavily is skipped:\n", relevant_text)
            search_text = ""

        return QueryEvent(relevant_text=relevant_text, search_text=search_text)

    @step
    async def query_result(self, ctx: Context, ev: QueryEvent) -> StopEvent:
        """Get result with relevant text."""
        relevant_text = ev.relevant_text
        search_text = ev.search_text
        query_str = await ctx.get("query_str")

        documents = [Document(text=relevant_text + "\n" + search_text)]
        index = SummaryIndex.from_documents(documents)
        query_engine = index.as_query_engine()
        result = query_engine.query(query_str)
        return StopEvent(result=result)

In [12]:
from llama_index.vector_stores.pinecone import PineconeVectorStore
from llama_index.core import VectorStoreIndex
from llama_index.core import SimpleDirectoryReader

# Initialize PineconeVectorStore
pinecone_vs = PineconeVectorStore(index_name=index_name, api_key=pc_api_key, environment="us-east-1-aws")

if totVectorCount > 0:
    print("Using existing index...")
    # Load existing index
    index = VectorStoreIndex.from_vector_store(pinecone_vs)
    workflow = CorrectiveRAGWorkflow()
else:
    print("Upserting vectors...")
    documents = SimpleDirectoryReader("./data").load_data()
    workflow = CorrectiveRAGWorkflow()
    index = await workflow.run(documents=documents)

Upserting vectors...


Upserted vectors:   0%|          | 0/2048 [00:00<?, ?it/s]

Upserted vectors:   0%|          | 0/2048 [00:00<?, ?it/s]

Upserted vectors:   0%|          | 0/1457 [00:00<?, ?it/s]

In [14]:
from IPython.display import Markdown, display

response = await workflow.run(
    query_str="Who launched the scheme “Financial Assistance to Disabled Students Pursuing (10th, 11th, 12th Equivalent Exams)”?",
    index=index,
    tavily_ai_apikey=tavily_ai_apikey,
)
display(Markdown(str(response)))

[Relevance Evaluation] Doc snippet: Title:Financial Assistance To Disabled Students Pursuing (10th, 11th, 12th Equivalent Exams)

Deta... → Score: yes
[Relevance Evaluation] Doc snippet: The objective of the scheme is to provide financial assistance to children whose parents/guardians b... → Score: no
[Corrective RAG] At least one irrelevant result found. Using Tavily to enhance query...
[Corrective RAG] Transformed query: "Launcher of Financial Assistance Scheme for Disabled Students in 10th, 11th, 12th Exams"
[Corrective RAG] Tavily returned 0 search results.
[Corrective RAG] Tavily returned no usable results.


The scheme “Financial Assistance to Disabled Students Pursuing (10th, 11th, 12th Equivalent Exams)” was launched by the Department of Social Justice, Government of Kerala.