Hybrid Retrival (Keyword & Semantic Search)

In [22]:
from llama_index.core import StorageContext
from llama_index.core import load_index_from_storage
storage_context = StorageContext.from_defaults(persist_dir="./infoStorage")
vector_index = load_index_from_storage(storage_context, index_id="1")
keyword_index= load_index_from_storage(storage_context, index_id="2")

In [18]:
from llama_index.llms.google_genai import GoogleGenAI
Settings.llm = GoogleGenAI(model="gemini-2.0-flash", api_key="AIzaSyB7rjodkd1sxe_EJ5lSi_cb6ro7anTi3XQ")

In [23]:
from llama_index.core import QueryBundle
from llama_index.core.schema import NodeWithScore
from llama_index.core.retrievers import (
    BaseRetriever,
    VectorIndexRetriever,
    KeywordTableSimpleRetriever,
)
from typing import List
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.core import get_response_synthesizer
from llama_index.core.query_engine import RetrieverQueryEngine


class CustomRetriever(BaseRetriever):
    """Custom retriever that performs both semantic search and hybrid search."""

    def __init__(
        self,
        vector_retriever: VectorIndexRetriever,
        keyword_retriever: KeywordTableSimpleRetriever,
        llm = GoogleGenAI(model="gemini-2.0-flash", api_key="AIzaSyB7rjodkd1sxe_EJ5lSi_cb6ro7anTi3XQ"),
        mode: str = "AND",
    ) -> None:
        """Init params."""

        self._vector_retriever = vector_retriever
        self._keyword_retriever = keyword_retriever
        self._llm = llm 
        if mode not in ("AND", "OR"):
            raise ValueError("Invalid mode.")
        self._mode = mode
        super().__init__()

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Retrieve nodes given query."""

        vector_nodes = self._vector_retriever.retrieve(query_bundle)
        keyword_nodes = self._keyword_retriever.retrieve(query_bundle)

        vector_ids = {n.node.node_id for n in vector_nodes}
        keyword_ids = {n.node.node_id for n in keyword_nodes}

        combined_dict = {n.node.node_id: n for n in vector_nodes}
        combined_dict.update({n.node.node_id: n for n in keyword_nodes})

        if self._mode == "AND":
            retrieve_ids = vector_ids.intersection(keyword_ids)
        else:
            retrieve_ids = vector_ids.union(keyword_ids)

        retrieve_nodes = [combined_dict[rid] for rid in retrieve_ids]
        return retrieve_nodes


# define custom retriever
vector_retriever = VectorIndexRetriever(index=vector_index, similarity_top_k=2)
keyword_retriever = KeywordTableSimpleRetriever(index=keyword_index)
custom_retriever_info = CustomRetriever(vector_retriever, keyword_retriever)

# define response synthesizer
response_synthesizer = get_response_synthesizer(llm=custom_retriever_info._llm)

# assemble query engine
custom_query_engine_info= RetrieverQueryEngine(
    retriever=custom_retriever_info,
    response_synthesizer=response_synthesizer,
)

# vector query engine
vector_query_engine = RetrieverQueryEngine(
    retriever=vector_retriever,
    response_synthesizer=response_synthesizer,
)
# keyword query engine
keyword_query_engine = RetrieverQueryEngine(
    retriever=keyword_retriever,
    response_synthesizer=response_synthesizer,
)

In [24]:
response = custom_query_engine_info.query(
    "How does the Finance Act, 2024 amend the calculation of income tax when there is net agricultural income?"
)
response

Response(response='When an assessee has net agricultural income exceeding five thousand rupees, in addition to total income exceeding two lakh fifty thousand rupees, the net agricultural income is considered when charging income-tax. It is treated as if it were part of the total income after the first two lakh fifty thousand rupees, but without being liable to tax. The income-tax is calculated by aggregating the total income and the net agricultural income, determining the income-tax on the aggregate income, increasing the net agricultural income by two lakh fifty thousand rupees, and determining the income-tax on the increased net agricultural income. The income-tax on the total income is then reduced by the income-tax on the increased net agricultural income to arrive at the final income-tax amount. For resident individuals in India aged sixty years or more but less than eighty years, the "two lakh fifty thousand rupees" is substituted with "three lakh rupees." For those aged eighty 

<h3> Similar Arguments Retrival - Pydantic Extractor </h3>

A pydantic model is defined for extracting from the argument documents

In [25]:
from pydantic import BaseModel, Field
from typing import List
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.llms.google_genai import GoogleGenAI


class ArgumentData(BaseModel):
    """Node metadata."""
    
    caseType: str = Field(
        ..., description="What was the case about"
    )
    response: str = Field(
        ..., description="How was the argument made to handle the particular case type."
    )
    legalPrinciples: List[str] = Field(
        ...,description = "the legal principles that were involved in the case"
    )


prompt_template_str = """\
You are a legal expert analyzing case documents. Given the following case description, extract the metadata in the specified format.

Case description:
----------------
{context_str}
----------------

Extract the following details:
1. **Case Type**: What was the case about?
2. **Response**: How was the argument made to handle the case?
3. **Legal Principles**: What legal principles were involved in this case? List them.

Return the extracted details as a structured `NodeMetadata` object.
"""

llm = GoogleGenAI(model="gemini-2.0-flash", api_key="AIzaSyB7rjodkd1sxe_EJ5lSi_cb6ro7anTi3XQ")
programArgumentDocs = LLMTextCompletionProgram.from_defaults(
    output_cls=ArgumentData,
    prompt_template_str=prompt_template_str,
    llm=llm,
    verbose=True,
)

In [26]:
import nest_asyncio
from llama_index.core import SimpleDirectoryReader
ArgumentDocs = SimpleDirectoryReader("./ArgumentDocs").load_data()
nest_asyncio.apply()
from llama_index.core.schema import TextNode


text_files = []
file_name = ArgumentDocs[0].metadata['file_name'] 
text = ""
for docs in ArgumentDocs:
    if docs.metadata['file_name'] != file_name:
        text_files.append(text)
        file_name= docs.metadata['file_name']
        text = ""
    text = text + f"{docs.text_resource.text}"

if text:
    text_files.append(text)

nodeCaseType = []
nodeLegalPrinciples = []
nodeResponse = []
i=0
for t in text_files:
    nodeResponse.append(TextNode(text=programArgumentDocs(context_str=t).response, id_=str(i)))
    nodeLegalPrinciples.append(TextNode(text="|".join(t for t in programArgumentDocs(context_str=t).legalPrinciples), id_=str(i)))
    nodeCaseType.append(TextNode(text=programArgumentDocs(context_str=t).caseType, id_=str(i)))
    i= i+1

Overwriting cache for 0 154


The pydantic model is extracted from the user query

In [29]:
class QueryData(BaseModel):
    """User Query data."""
    
    caseType: str = Field(
        ..., description="What case type is the query"
    )
    legalPrinciples: List[str] = Field(
        ...,description = "what are the legal principles that seem to be involved in the query"
    )

from llama_index.core.program import LLMTextCompletionProgram
from llama_index.llms.google_genai import GoogleGenAI

prompt_template_str = """\
You are a legal expert analyzing legal arguments. Given the following case description, extract the metadata in the specified format.

### Case Description:
----------------
{context_str}
----------------

### Extract the following details:
1. **Case Type**: Identify the category of this case. Return a single case type as a string.
2. **Legal Principles**: Identify key legal principles involved in this case. Return a list of strings.

### Expected Output Format:
Return the extracted data as a structured JSON object with the following fields:
```json
{
  "caseType": "A single case type",
  "legalPrinciples": ["Principle1", "Principle2", "Principle3"]
}
"""

llm = GoogleGenAI(model="gemini-2.0-flash", api_key="AIzaSyB7rjodkd1sxe_EJ5lSi_cb6ro7anTi3XQ")
programQuery = LLMTextCompletionProgram.from_defaults(
    output_cls=QueryData,
    prompt_template_str=prompt_template_str,
    llm=llm,
    verbose=True,
)

In [30]:
print(programQuery(context_str="Can a parrot be sued for defamation?"))

caseType='Defamation' legalPrinciples=['Capacity to be sued', 'Defamation', 'Intent', 'Legal personhood']


A semantic search is done for the caseType and legalPrinciples between the query and the documents. The appropriate court response is mapped

In [31]:
from llama_index.core import VectorStoreIndex
vector_index_caseType= VectorStoreIndex(nodeCaseType)
vector_index_LegalPrinciples= VectorStoreIndex(nodeLegalPrinciples)
from llama_index.core import QueryBundle
from llama_index.core.schema import NodeWithScore
from llama_index.core.retrievers import (
    BaseRetriever,
    VectorIndexRetriever,
)
from typing import List

class CombinedRetriver(BaseRetriever):

    def __init__(
        self,
        vector_index_caseType: VectorIndexRetriever,
        vector_index_LegalPrinciples: VectorIndexRetriever,
        llm = GoogleGenAI(model="gemini-2.0-flash", api_key="AIzaSyB7rjodkd1sxe_EJ5lSi_cb6ro7anTi3XQ"),
        mode: str = "AND",
    ) -> None:
        """Init params."""

        self.vector_index_caseType = vector_index_caseType
        self.vector_index_LegalPrinciples = vector_index_LegalPrinciples
        self._llm = llm 
        if mode not in ("AND", "OR"):
            raise ValueError("Invalid mode.")
        self._mode = mode
        super().__init__()

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Retrieve nodes given query."""

        query_data = programQuery(context_str=query_bundle.query_str)

        vector_nodes_caseType = self.vector_index_caseType.retrieve(QueryBundle(query_str=query_data.caseType))
        vector_nodes_LegalPrinciples= self.vector_index_LegalPrinciples.retrieve(QueryBundle(query_str="|".join(t for t in query_data.legalPrinciples)))

        vector_index_caseType_ids = {n.node.node_id for n in vector_nodes_caseType}
        vector_index_LegalPrinciples_ids = {n.node.node_id for n in vector_nodes_LegalPrinciples}

        if self._mode == "AND":
            retrieve_ids = vector_index_caseType_ids.intersection(vector_index_LegalPrinciples_ids)
        else:
            retrieve_ids = vector_index_caseType_ids.union(vector_index_LegalPrinciples_ids)
        
        nodes = []
        for i in retrieve_ids:
            case_text = nodeCaseType[int(i)].text
            response_text = nodeResponse[int(i)].text

            text_content = f"""
            For the case: {case_text}
            The way the case was handled in court earlier: {response_text}
            """

            text_node = TextNode(text=text_content, id_=str(i))  
            nodes.append(NodeWithScore(node=text_node, score=1.0))  

        return nodes

In [32]:
from llama_index.core.response_synthesizers import BaseSynthesizer
from llama_index.core.query_engine import RetrieverQueryEngine

class CustomSynthesizer(BaseSynthesizer):
    def get_response(self,query_str: str,text_chunks):
        response = ''.join(text_chunks)
        return response
    
    async def aget_response(self,query_str: str,text_chunks):
        response = {''.join(text_chunks)}
        return response
    
    def _get_prompts(self):
        # Dummy implementation
        return {}

    def _update_prompts(self, prompts):
        # Dummy implementation
        pass


# custom retriever
vector_retriever_caseType = VectorIndexRetriever(index=vector_index_caseType, similarity_top_k=3)
vector_retriever_LegalPrinciples = VectorIndexRetriever(index=vector_index_LegalPrinciples, similarity_top_k=3)
custom_retriever = CombinedRetriver(vector_retriever_caseType, vector_retriever_LegalPrinciples)

# response synthesizer
response_synthesizer = CustomSynthesizer(llm=custom_retriever._llm)

# assemble query engine
custom_query_engine_argument = RetrieverQueryEngine(
    retriever=custom_retriever,
    response_synthesizer=response_synthesizer,
)

# vector query engine
vector_query_engine_caseType = RetrieverQueryEngine(
    retriever=vector_retriever_caseType,
    response_synthesizer=response_synthesizer,
)
# vector query engine
vector_query_engine_LegalPrinciples = RetrieverQueryEngine(
    retriever=vector_index_LegalPrinciples,
    response_synthesizer=response_synthesizer,
)
        

In [33]:
response = custom_query_engine_argument.query(
    "If a lender in Mumbai files a loan recovery suit in a Delhi court against a borrower who has moved there, and the court returns the plaint citing lack of territorial jurisdiction, what legal options does the lender have to challenge this decision?"
)
response

Response(response="\n            For the case: Appeal against the order of returning a plaint due to lack of territorial jurisdiction in a loan recovery case.\n            The way the case was handled in court earlier: The appeal argued that the Trial Court's decision to return the plaint was erroneous because the loan documents were executed and the loan was disbursed in Delhi, establishing territorial jurisdiction. The court agreed, set aside the Trial Court's order, and appointed a receiver to take possession of the hypothecated vehicle.\n            \n            For the case: The case involves writ petitions filed by individuals who were dismissed from service following a criminal case and subsequent departmental proceedings, but were later acquitted in the criminal case. The central issue is whether the respondents should reinstate the petitioners given their acquittal.\n            The way the case was handled in court earlier: The court allowed the writ petitions, setting aside

<h4>The RAG Agent </h4>

In [34]:
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.llms.google_genai import GoogleGenAI

async def find_similar_arguments(query: str) -> str:
    """Useful for understanding how similar legal cases were handled previously in the actual court"""
    response = await custom_query_engine_argument.query(query)
    return str(response)

async def search_documents(query: str) -> str:
    """Useful for getting legal information like laws,acts etc...that were passed"""
    response = await custom_query_engine_info.query(query)
    return str(response)


# Create an enhanced workflow with both tools
agent = FunctionAgent(
    name="Agent",
    tools=[find_similar_arguments, search_documents],
    description = "Check system_prompt below",
    llm = GoogleGenAI(model="gemini-2.0-flash", api_key="AIzaSyB7rjodkd1sxe_EJ5lSi_cb6ro7anTi3XQ"),
    system_prompt="""You are a legal expert assistant with access to tools for retrieving legal information. 
        1. **Try to answer based on your internal knowledge first.**
        2. If the question requires specific case precedents or laws, use the `search_documents` tool.
        3. If the question is about how similar cases were handled, use the `find_similar_arguments` tool.
    """
)

In [36]:
async def ragLawyerResponse():
    final = await agent.run("A man sues a parrot for defamation. What should be done to him?")
    return final

response = await ragLawyerResponse()  
print(response)  

This sounds like a frivolous lawsuit! Defamation requires the communication of a false statement to a third party that harms someone's reputation. While parrots can certainly "speak," they lack the intent and understanding necessary to form a defamatory statement.

The man's lawsuit is highly unlikely to succeed. The court would likely dismiss the case.



<h2> The Final Workflow </h2>

In [37]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
from pydantic import BaseModel,Field
from typing import List
import json

bot = GoogleGenAI(model="gemini-2.0-flash", api_key="AIzaSyB7rjodkd1sxe_EJ5lSi_cb6ro7anTi3XQ")

class Case(BaseModel):
    title: str = Field(..., example="A man sues a parrot for defamation.")
    #category: str = Field(example="Environmental Violation/Cybercrime/Fraud")
    evidence: List[str] = [] #empty brackets is used to set a default empty list if no data is provided for these fields

class ragEvent(Event):
    pass

class llmEvent(Event):
    pass

class ProgressEvent(Event):
    msg:str


In [68]:
import asyncio

In [75]:
class MyWorkflow(Workflow):

    @step
    async def start(self,ctx:Context,ev:StartEvent)->ragEvent:
        case = await ctx.set("initialCase",ev.input) 
        arguments = await ctx.set("arguments",{"ragLawyer":[],"bot":[]})
        return ragEvent()

    @step
    async def ragAgent(self,ctx:Context,ev:ragEvent)->llmEvent:

        #call the RAG lawyer to generate a response
        prevArg = await ctx.get("arguments",default={"ragLawyer": [], "bot": []})

        if prevArg["ragLawyer"]:
            response = await agent.run(
                f"""
                There is a legal case as follows: {await ctx.get("initialCase",default=None)}
                Below are certain arguments regarding the case. Assume that the arguments labelled as 'ragLawyer' were made by you.
                {prevArg}
                Continue your argument and make sure you counter the 'bot' by leveraging your tools.
                """
            )
        
        else:
            response = await agent.run(
                f"""
                There is a legal case as follows: {await ctx.get("initialCase",default=None)}
                Begin your argument regarding the above case. Use your tools effectively and give me exactly one arguement for now.
                """
            )

        print(f"RAG Lawyer: {response}")

        #add the response to the arguments list
        prevArg["ragLawyer"].append(response)
        await ctx.set("arguments", prevArg)
        return llmEvent()
    
    @step
    async def llmAgent(self,ctx:Context,ev:llmEvent)->StopEvent|ragEvent:
        #call the normal lawyer to generate a response
        prevArg = await ctx.get("arguments",default={"ragLawyer": [], "bot": []})

        if prevArg["bot"]:
            response = bot.complete(
                f"""
                There is a legal case as follows: {await ctx.get("initialCase",default=None)}
                Below are certain arguments regarding the case. Assume that the arguments labelled as 'bot' were made by you.
                {prevArg}
                Continue your argument by countering the 'ragLawyer'.
                """
            )
        
        else:
            response = bot.complete(
                f"""
                There is a legal case as follows: {await ctx.get("initialCase",default=None)}
                The below argument was made by another bot.
                {prevArg["ragLawyer"]}
                Begin your counter-argument regarding the above case. Give me exactly one arguement for now.
                """
            )
        

        print(f"BOT Laywer: {response}")

        #add the response to the arguments list
        prevArg["bot"].append(response)
        await ctx.set("arguments", prevArg)

        cont = input("Continue? y/n: ")
        if cont=='y':
             #call the RAG lawyer to continue generating a response
             return ragEvent()
        
        else:
            winner = input("Who do you think won the argument?")
            stop_event = StopEvent(result=f"Argument stopped by the user. According to the user {winner} won the argument!")
            print(stop_event.result)
            return stop_event


In [None]:
#this is how you should enter the input prompt
{"title": "Should Cryptocurrency Be Regulated??"}

In [76]:
w = MyWorkflow(timeout=1000,verbose=False)
caseDetails =  input("Enter the case details: ")
caseDetails_dict = json.loads(caseDetails)
try:
    case = Case(**caseDetails_dict) # dictionary unpacking operator

except:
    print("Enter the case with the required fields")
    
result = await w.run(input=caseDetails_dict)

RAG Lawyer: Banks should not be held responsible for loan defaults because the responsibility for repaying a loan primarily lies with the borrower. When a bank approves a loan, it assesses the borrower's creditworthiness and ability to repay the loan based on the information available at the time. However, the bank cannot predict unforeseen circumstances such as job loss, illness, or economic downturns that may affect the borrower's ability to repay. Holding banks responsible for loan defaults would create a moral hazard, encouraging borrowers to take on more debt than they can handle, knowing that the bank will bear the ultimate responsibility.

BOT Laywer: While the borrower bears primary responsibility, banks have a crucial role in responsible lending. Banks possess expertise in assessing risk and should be held accountable for negligent lending practices. If a bank knowingly approves a loan to a borrower who clearly lacks the capacity to repay, or if they fail to adequately verify 

In [65]:
w = MyWorkflow(timeout=1000,verbose=False)
caseDetails =  input("Enter the case details: ")
caseDetails_dict = json.loads(caseDetails)
try:
    case = Case(**caseDetails_dict) # dictionary unpacking operator

except:
    print("Enter the case with the required fields")
    
result = await w.run(input=caseDetails_dict)

RAG Lawyer: Cryptocurrency regulation is a complex issue with arguments for and against it. One argument in favor of regulation is investor protection. The cryptocurrency market is highly volatile and susceptible to fraud and manipulation. Regulation could provide a framework for protecting investors from these risks, ensuring fair trading practices, and promoting market stability. This could involve measures such as licensing requirements for cryptocurrency exchanges, disclosure requirements for issuers of digital assets, and anti-money laundering regulations.

BOT Laywer: While investor protection is a valid concern, excessive cryptocurrency regulation could stifle innovation and hinder the growth of the industry. Overly strict rules could make it difficult for new projects to launch and for existing businesses to operate, potentially driving innovation overseas and limiting the potential benefits of cryptocurrency technology. A balanced approach is needed that protects investors wit