# How to handle multiple retrievers when doing query analysis

When using LangChain for complex information retrieval, our query analysis might determine that different types of information sources (retrievers) are needed to answer a user's question. For example, one part of the question might require searching a database, while another part might require searching a document repository.

In these cases, our LangChain workflow needs to be able to dynamically choose the appropriate retriever based on the query analysis results. This requires adding logic to our chain that can:

Examine the output of the query analysis.
Select the correct retriever based on the identified information needs.
Execute the retrieval using the chosen retriever.
We'll demonstrate how to implement this using simulated data. This will show how LangChain can be used to build workflows that intelligently route queries to different retrievers, ensuring that the most relevant information is retrieved for each part of the user's request.

Essentially, we're building LangChain workflows that can intelligently decide which information source (retriever) to use for each part of a complex query, enabling us to retrieve information from diverse sources in a unified way.

# 1. Install Dependencies

In [38]:
%pip install -qU langchain langchain-community langchain-openai langchain-chroma

In [39]:
import getpass
import os

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass()

# Optional, uncomment to trace runs with LangSmith. Sign up here: https://smith.langchain.com.
# os.environ["LANGSMITH_TRACING"] = "true"
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass()

# 2. Create Index

This code creates two separate knowledge bases, each stored in its own Chroma collection. This is useful for:

Isolating information: Each person's information is kept separate, which can be important for privacy or organization.

Targeted retrieval: You can use the appropriate retriever to find information about a specific person.

It also demonstrates the use of the search_kwargs parameter, which allows for customization of the search.

The code effectively sets up two independent retrieval systems, allowing you to search for information about Harrison and Ankush separately.

In [40]:
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

texts = ["Harrison worked at Kensho"]
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_texts(texts, embeddings, collection_name="harrison")
retriever_harrison = vectorstore.as_retriever(search_kwargs={"k": 1})

texts = ["Ankush worked at Facebook"]
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_texts(texts, embeddings, collection_name="ankush")
retriever_ankush = vectorstore.as_retriever(search_kwargs={"k": 1})

# 3. Query analysis

a. Import necessary dependencies

2. Search Class Definition:

class Search(BaseModel):: Defines a new class named Search that inherits from BaseModel, making it a Pydantic model.

3. query Field Definition:

query: str = Field(..., description="Query to look up"): Defines a field named query of type string.

...: The ellipsis indicates that this field is required.
description="Query to look up": Provides a description of the field, useful for documentation and schema generation.

4. person Field Definition:

person: str = Field(..., description="Person to look things up for. Should beHARRISONorANKUSH."): Defines a field named person of type string.

...: The ellipsis indicates that this field is required.

description="Person to look things up for. Should beHARRISONorANKUSH.": Provides a description of the field, including a constraint that it should be either "HARRISON" or "ANKUSH". This constraint isn't enforced by Pydantic itself (it's in the description, to inform the user), but it suggests that the code using this model will likely validate this constraint.



In [41]:
from typing import List, Optional

from pydantic import BaseModel, Field


class Search(BaseModel):
    """Search for information about a person."""

    query: str = Field(
        ...,
        description="Query to look up",
    )
    person: str = Field(
        ...,
        description="Person to look things up for. Should be `HARRISON` or `ANKUSH`.",
    )

### Design the langchain chain

1. Import necessary dependencies

2. Output Parser Creation:

output_parser = PydanticToolsParser(tools=[Search]): Creates a PydanticToolsParser object.

tools=[Search]: Specifies that the parser should parse tool calls that match the Search Pydantic model (which, in this case, allows for a specific query and a person name).

3. System Prompt Definition:

system = """...""": Defines a system prompt for the language model.
It instructs the model that it can issue search queries.

4. Chat Prompt Template Creation:

prompt = ChatPromptTemplate.from_messages([...]): Creates a chat prompt template.

("system", system): Adds the system prompt.

("human", "{question}"): Adds a placeholder for the user's question.

5. Language Model Initialization:

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0): Initializes a ChatOpenAI object.

model="gpt-4o-mini": Specifies the OpenAI chat model.

temperature=0: Sets the temperature to 0, making the model's responses more deterministic.

6. Structured LLM Creation:

structured_llm = llm.with_structured_output(Search): Creates a version of the LLM that is configured to return structured output matching the Search Pydantic model.

with_structured_output(Search): This LangChain method enables the LLM to directly output a pydantic object, in this case, a Search object.

7. Query Analyzer Chain Creation:

query_analyzer = {"question": RunnablePassthrough()} | prompt | structured_llm: Creates a runnable chain.

{"question": RunnablePassthrough()}: Passes the user's question through.
| prompt: Formats the question with the system prompt.

| structured_llm: Sends the formatted prompt to the language model, which generates a structured Search object as output.

In essence:

This code sets up a pipeline that:

Takes a user question as input.

Formats the question with a system prompt that encourages the language model to use search queries.

Sends the formatted prompt to the language model, which is configured to return a structured Search object (containing a query and a person name).

Returns a pydantic object of type search.

This pipeline is designed to handle user questions that require searches about a specific person, ensuring that the generated search queries include both the search term and the target person's name.

In [42]:
from langchain_core.output_parsers.openai_tools import PydanticToolsParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

output_parser = PydanticToolsParser(tools=[Search])

system = """You have the ability to issue search queries to get information to help answer user information."""
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm = llm.with_structured_output(Search)
query_analyzer = {"question": RunnablePassthrough()} | prompt | structured_llm

In [43]:
query_analyzer.invoke("where did Harrison Work")

Search(query='Harrison work history', person='HARRISON')

In [44]:
query_analyzer.invoke("where did ankush Work")

Search(query='Ankush work history', person='ANKUSH')

# 4. Retrieval with query analysis

1. Import necessary dependencies

2. @chain Decorator:

@chain: This decorator makes the custom_chain function a LangChain runnable, allowing it to be used in LangChain's runnable framework.

3. custom_chain Asynchronous Function Definition:

async def custom_chain(question):: Defines an asynchronous function custom_chain.
async: Indicates that this function can use await to pause execution until asynchronous operations complete.
question: Takes a question (presumably a string) as input.

4. Asynchronous Query Analysis:

response = await query_analyzer.ainvoke(question): This line asynchronously invokes the query_analyzer chain (defined in a previous example) with the input question.
ainvoke(): This is the asynchronous version of invoke(), used for asynchronous runnables.
await: Pauses execution until the query_analyzer completes and returns a result.
The result (a Search object containing a list of queries) is stored in the response variable.

5. Document Retrieval Loop:

docs = []: Initializes an empty list named docs to store the retrieved documents.
for query in response.queries:: Iterates through the list of queries in the response.queries attribute.
new_docs = await retriever.ainvoke(query): Asynchronously invokes the retriever (defined in a previous example) with the current query.
ainvoke(): Asynchronous invocation of the retriever.
await: Pauses execution until the retriever completes and returns a list of documents.
docs.extend(new_docs): Extends the docs list with the retrieved documents.

6. Document Reranking/Deduplication (Comment):

# You probably want to think about reranking or deduplicating documents here: This comment reminds you that after retrieving documents from multiple queries, you might want to:
Rerank: Reorder the documents based on relevance, potentially considering factors like the original question and the combined content of the documents.
Deduplicate: Remove duplicate documents that might have been retrieved by different queries.
# But that is a separate topic: This indicates that reranking and deduplication are beyond the scope of this code snippet.

7. Return Documents:

return docs: Returns the list of retrieved documents.
In essence:

The custom_chain function does the following:

It takes a user question as input.

It asynchronously uses the query_analyzer to generate a list of search queries.

It iterates through the list of queries, asynchronously retrieving documents for each query using the retriever.

It combines the retrieved documents into a single list.

It returns the combined list of documents.

This chain is designed to handle user questions that might require multiple searches, retrieving documents for each search and combining the results. It also highlights the need for further processing (reranking and deduplication) in a real-world application.

In [45]:
from langchain_core.runnables import chain

retrievers = {
    "HARRISON": retriever_harrison,
    "ANKUSH": retriever_ankush,
}

@chain
def custom_chain(question):
    response = query_analyzer.invoke(question)
    retriever = retrievers[response.person]
    return retriever.invoke(response.query)

In [46]:
custom_chain.invoke("where did Harrison Work")

[Document(id='1f0c09b3-77af-482e-8c7e-9ca3ed7530b3', metadata={}, page_content='Harrison worked at Kensho')]

In [47]:
custom_chain.invoke("where did ankush Work")

[Document(id='a64dfaa7-b228-4976-86df-659b07999fb0', metadata={}, page_content='Ankush worked at Facebook')]