## Chat Models

In [47]:

import os
os.environ["GOOGLE_API_KEY"] = ""
from langchain.chat_models import init_chat_model

model = init_chat_model("gemini-2.0-flash", model_provider="google_genai")
model.invoke("Hello, world!")

AIMessage(content='Hello there! How can I help you today?', additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run--140ecbb5-04d1-4514-b90d-b49a5e91e8a7-0', usage_metadata={'input_tokens': 4, 'output_tokens': 11, 'total_tokens': 15, 'input_token_details': {'cache_read': 0}})

Chat models are language models that use a sequence of messages as inputs and return messages as outputs 

### Tool calling

In [48]:
# The function name, type hints, and docstring are all part of the tool
# schema that's passed to the model. Defining good, descriptive schemas
# is an extension of prompt engineering and is an important part of
# getting models to perform well.

# LangChain also implements a @tool decorator that allows for further control of the tool schema, such as tool names and argument descriptions.


from typing_extensions import Annotated, TypedDict


def add(a: int, b: int) -> int:
    """Add two integers.

    Args:
        a: First integer
        b: Second integer
    """
    return a + b


def multiply(a: int, b: int) -> int:
    """Multiply two integers.

    Args:
        a: First integer
        b: Second integer
    """
    return a * b


tools = [add, multiply]

llm_with_tools = model.bind_tools(tools)

query = "What is 3 * 12?"

print(llm_with_tools.invoke(query))

llm_forced_to_multiply = model.bind_tools(tools, tool_choice="multiply") # forced to use multiply
print(llm_forced_to_multiply.invoke("what is 2 + 4"))


query = "what is 2 + 4"
llm_forced_to_tool = model.bind_tools(tools, tool_choice="any") # forced to use any
result = llm_forced_to_tool.invoke(query)

content='' additional_kwargs={'function_call': {'name': 'multiply', 'arguments': '{"a": 3.0, "b": 12.0}'}} response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []} id='run--822008c7-ebff-4be0-8415-bfc2223d30ce-0' tool_calls=[{'name': 'multiply', 'args': {'a': 3.0, 'b': 12.0}, 'id': '48fbe613-c598-427f-bf52-cf2465ef98a1', 'type': 'tool_call'}] usage_metadata={'input_tokens': 41, 'output_tokens': 5, 'total_tokens': 46, 'input_token_details': {'cache_read': 0}}
content='' additional_kwargs={'function_call': {'name': 'multiply', 'arguments': '{"a": 2.0, "b": 4.0}'}} response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []} id='run--34a16a58-a11d-45f1-a673-218740668719-0' tool_calls=[{'name': 'multiply', 'args': {'a': 2.0, 'b': 4.0}, 'id': '8c9f3dff-779c-47c9-a784-60b9bd9fed44', 'type': 'tool_call'}] usage_metadata={'input_tokens': 39, 'output_tokens':

### Structured output from a chat model

In [49]:
from typing import Optional

from pydantic import BaseModel, Field


# Pydantic
class Joke(BaseModel):
    """Joke to tell user."""

    setup: str = Field(description="The setup of the joke")
    punchline: str = Field(description="The punchline to the joke")
    rating: Optional[int] = Field(
        default=None, description="How funny the joke is, from 1 to 10"
    )


structured_llm = model.with_structured_output(Joke)

structured_llm.invoke("Tell me a joke about cats")

Joke(setup='Why did the cat join the Red Cross?', punchline='They get claw-strophobic!', rating=None)

In [50]:
from typing import Union,Literal


class Joke(BaseModel):
    """Joke to tell user."""
    setup: str = Field(description="The setup of the joke")
    punchline: str = Field(description="The punchline to the joke")
    rating: Optional[int] = Field(
        default=None, description="How funny the joke is, from 1 to 10"
    )


class ConversationalResponse(BaseModel):
    """Respond in a conversational manner. Be kind and helpful."""
    response: str = Field(description="A conversational response to the user's query")


class FinalResponse(BaseModel):
    final_output: Union[Joke,ConversationalResponse]


structured_llm = model.with_structured_output(FinalResponse)
print(structured_llm.invoke("Tell me a joke."))

structured_llm = model.with_structured_output(FinalResponse)
print(structured_llm.invoke("Hello, How are you today?"))

# union is not working. Not sure why ?

final_output=ConversationalResponse(response="Why don't scientists trust atoms? Because they make up everything!")
final_output=ConversationalResponse(response='I am doing great, thank you for asking! How can I help you today?')


Not all models support .with_structured_output(). For such models you'll need to directly prompt the model to use a specific format, and use an output parser to extract the structured response from the raw model output. It can be done with PydenticOutputParser. 

In [51]:
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate

parser = PydanticOutputParser(pydantic_object= FinalResponse)


# Prompt
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "Answer the user query. Wrap the output in `json` tags\n{format_instructions}",
        ),
        ("human", "{query}"),
    ]
).partial(format_instructions=parser.get_format_instructions())

query = "Tell me a joke."
# print(prompt.invoke({"query": query}))

chain = prompt | model | parser

print(chain.invoke({"query": query}))

query = "How are you?"
# print(prompt.invoke({"query": query}))

chain = prompt | model | parser

print(chain.invoke({"query": query}))

final_output=Joke(setup="Why don't scientists trust atoms?", punchline='Because they make up everything!', rating=None)
final_output=ConversationalResponse(response='I am doing well, thank you for asking! How can I help you today?')


### How to cache chat model response

LangChain provides an optional caching layer for chat models. This is useful for two main reasons:

It can save you money by reducing the number of API calls you make to the LLM provider, if you're often requesting the same completion multiple times. This is especially useful during app development.
It can also speed up your application by reducing the number of API calls you make to the LLM provider.

In [52]:
from langchain_core.globals import set_llm_cache

In [53]:
# in memory cache
from langchain_core.caches import InMemoryCache

set_llm_cache(InMemoryCache())

# The first time, it is not yet in cache, so it should take longer, nex time onwards it is very fast
query = "tell me a cat joke."
chain.invoke({"query": query})

FinalResponse(final_output=Joke(setup='Why did the cat join the Red Cross?', punchline='Because he wanted to be a first aid kit!', rating=None))

In [54]:
query = "tell me a dog joke."
chain.invoke({"query": query})

FinalResponse(final_output=Joke(setup='Why are Dalmatians no good at hide and seek?', punchline="Because they're always spotted!", rating=None))

In [55]:
## sqlite cache 
# This cache implementation uses a SQLite database to store responses, and will last across process restarts.

from langchain_community.cache import SQLiteCache

set_llm_cache(SQLiteCache(database_path=".langchain.db"))

query = "tell me a mouse joke."
chain.invoke({"query": query})

FinalResponse(final_output=Joke(setup='What do you call a mouse that can lift heavy things?', punchline='Muscles Mouse!', rating=7))

In [56]:
# 2nd time calling llm is super fast. just the first time its slow. look at the time of compilation. 
query = "tell me a cow joke."
chain.invoke({"query": query})

FinalResponse(final_output=Joke(setup='Why do cows wear bells?', punchline="Because their horns don't work!", rating=7))

### Custom chat model

Wrapping your LLM with the standard BaseChatModel interface allow you to use your LLM in existing LangChain programs with minimal code modifications!

As an bonus, your LLM will automatically become a LangChain Runnable and will benefit from some optimizations out of the box (e.g., batch via a threadpool), async support, the astream_events API, etc.

#### Messages

messages are used to represent the input and output of a chat model, as well as any additional context or metadata that may be associated with a conversation.

Each message has a role (e.g., "user", "assistant") , content (e.g., text, multimodal data), additional metadata(id, name, token usage and other model-specific metadata).

LangChain provides a unified message format that can be used across chat models, allowing users to work with different chat models without worrying about the specific details of the message format used by each model provider.


**roles:**

Roles are used to distinguish between different types of messages in a conversation and help the chat model understand how to respond to a given sequence of messages.

| **Role**              | **Description**                                                                                                                                                                                                 |
|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **system**            | Used to tell the chat model how to behave and provide additional context. Not supported by all chat model providers.                                                                                            |
| **user**              | Represents input from a user interacting with the model, usually in the form of text or other interactive input.                                                                                                |
| **assistant**         | Represents a response from the model, which can include text or a request to invoke tools.                                                                                                                      |
| **tool**              | A message used to pass the results of a tool invocation back to the model after external data or processing has been retrieved. Used with chat models that support [tool calling](/docs/concepts/tool_calling). |
| **function** (legacy) | This is a legacy role, corresponding to OpenAI's legacy function-calling API. **tool** role should be used instead.  




**Content:**

Currently, most chat models support text as the primary content type, with some models also supporting multimodal data.


Other Message Data: 

Depending on the chat model provider, messages can include other data such as

ID: An optional unique identifier for the message.

Name: An optional name property which allows differentiate between different entities/speakers with the same role. Not all models support this!

**Metadata:** Additional information about the message, such as timestamps, token usage, etc.

4 Important types of messages:
1. SystemMessage -- for content which should be passed to direct the conversation, corresponds to system role. instructing the model to adopt a specific persona or setting the tone of the conversation. Different chat providers may support system message in one of the following ways:

    Through a "system" message role: In this case, a system message is included as part of the message sequence with the role explicitly set as "system."

    Through a separate API parameter for system instructions: Instead of being included as a message, system instructions are passed via a dedicated API parameter.

    No support for system messages: Some models do not support system messages at all.

    LangChain will automatically adapt based on the provider’s capabilities. If the provider supports a separate API parameter for system instructions, LangChain will extract the content of a system message and pass it through that parameter.

2. HumanMessage -- for content in the input from the user. corresponds to user role

3. AIMessage -- for content in the response from the model. corresponds to assistant role. This is the response from the model, which can include text or a request to invoke tools. An AIMessage has the following attributes: content, tool_call, invalid_tool_calls, usage_metadata, id, response_metadata. standardized attributes(tool_call, invalid_tool_calls, usage_metadata, id)are the ones that LangChain attempts to standardize across different chat model providers. raw fields (content,response_metadata)are specific to the model provider and may vary.

4. ToolMessage -- corresponds to tool role. In addition to role and content, this message has: a tool_call_id field which conveys the id of the call to the tool that was called to produce this result, an artifact field which can be used to pass along arbitrary artifacts of the tool execution which are useful to track but which should not be sent to the model.

Conversation Structure:

The sequence of messages into a chat model should follow a specific structure to ensure that the chat model can generate a valid response.

For example, a typical conversation structure might look like this:

User Message: "Hello, how are you?"

Assistant Message: "I'm doing well, thank you for asking."

User Message: "Can you tell me a joke?"

Assistant Message: "Sure! Why did the scarecrow win an award? Because he was outstanding in his field!"


Most conversations start with a system message that sets the context for the conversation. This is followed by a user message containing the user's input, and then an assistant message containing the model's response.
The assistant may respond directly to the user or if configured with tools request that a tool be invoked to perform a specific task.


A full conversation often involves a combination of two patterns of alternating messages:

The user and the assistant representing a back-and-forth conversation.
The assistant and tool messages representing an "agentic" workflow where the assistant is invoking tools to perform specific tasks

Managing chat history:

Since chat models have a maximum limit on input size, it's important to manage chat history and trim it as needed to avoid exceeding the context window.

While processing chat history, it's essential to preserve a correct conversation structure.

Key guidelines for managing chat history:

The conversation should follow one of these structures:
The first message is either a "user" message or a "system" message, followed by a "user" and then an "assistant" message.
The last message should be either a "user" message or a "tool" message containing the result of a tool call.
When using tool calling, a "tool" message should only follow an "assistant" message that requested the tool invocation.

In [57]:
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    FunctionMessage,
    HumanMessage,
    SystemMessage,
    ToolMessage,
)

#### Streaming Variant

All the chat messages have a streaming variant that contains Chunk in the name.

These chunks are used when streaming output from chat models, and they all define an additive property!



In [58]:
from langchain_core.messages import (
    AIMessageChunk,
    FunctionMessageChunk,
    HumanMessageChunk,
    SystemMessageChunk,
    ToolMessageChunk,
)



#### BaseChatModel

 we will inherit from `BaseChatModel` and we'll need to implement the following:

 | Method/Property                | Description                                                        | Required/Optional |
|--------------------------------|--------------------------------------------------------------------|-------------------|
| `_generate`                    | Use to generate a chat result from a prompt                        | Required          |
| `_llm_type` (property)         | Used to uniquely identify the type of the model. Used for logging. | Required          |
| `_identifying_params` (property)| Represent model parameterization for tracing purposes.             | Optional          |
| `_stream`                      | Use to implement streaming.                                        | Optional          |
| `_agenerate`                   | Use to implement a native async method.                            | Optional          |
| `_astream`                     | Use to implement async version of `_stream`.                       | Optional          |

The _astream implementation uses run_in_executor to launch the sync _stream in a separate thread if _stream is implemented, otherwise it fallsback to use _agenerate.

You can use this trick if you want to reuse the _stream implementation, but if you're able to implement code that's natively async that's a better solution since that code will run with less overhead.

In [59]:
from typing import Any, Dict, Iterator, List, Optional

from langchain_core.callbacks import (
    CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
)
from langchain_core.messages.ai import UsageMetadata
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from pydantic import Field


class ChatParrotLink(BaseChatModel):
    """A custom chat model that echoes the first `parrot_buffer_length` characters
    of the input.

    When contributing an implementation to LangChain, carefully document
    the model including the initialization parameters, include
    an example of how to initialize the model and include any relevant
    links to the underlying models documentation or API.

    Example:

        .. code-block:: python

            model = ChatParrotLink(parrot_buffer_length=2, model="bird-brain-001")
            result = model.invoke([HumanMessage(content="hello")])
            result = model.batch([[HumanMessage(content="hello")],
                                 [HumanMessage(content="world")]])
    """

    model_name: str = Field(alias="model")
    """The name of the model"""
    parrot_buffer_length: int
    """The number of characters from the last message of the prompt to be echoed."""
    temperature: Optional[float] = None
    max_tokens: Optional[int] = None
    timeout: Optional[int] = None
    stop: Optional[List[str]] = None
    max_retries: int = 2

    def _generate(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> ChatResult:
        """Override the _generate method to implement the chat model logic.

        This can be a call to an API, a call to a local model, or any other
        implementation that generates a response to the input prompt.

        Args:
            messages: the prompt composed of a list of messages.
            stop: a list of strings on which the model should stop generating.
                  If generation stops due to a stop token, the stop token itself
                  SHOULD BE INCLUDED as part of the output. This is not enforced
                  across models right now, but it's a good practice to follow since
                  it makes it much easier to parse the output of the model
                  downstream and understand why generation stopped.
            run_manager: A run manager with callbacks for the LLM.
        """
        # Replace this with actual logic to generate a response from a list
        # of messages.
        last_message = messages[-1]
        tokens = last_message.content[: self.parrot_buffer_length]
        ct_input_tokens = sum(len(message.content) for message in messages)
        ct_output_tokens = len(tokens)
        message = AIMessage(
            content=tokens,
            additional_kwargs={},  # Used to add additional payload to the message
            response_metadata={  # Use for response metadata
                "time_in_seconds": 3,
                "model_name": self.model_name,
            },
            usage_metadata={
                "input_tokens": ct_input_tokens,
                "output_tokens": ct_output_tokens,
                "total_tokens": ct_input_tokens + ct_output_tokens,
            },
        )
        ##

        generation = ChatGeneration(message=message)
        return ChatResult(generations=[generation])

    def _stream(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        """Stream the output of the model.

        This method should be implemented if the model can generate output
        in a streaming fashion. If the model does not support streaming,
        do not implement it. In that case streaming requests will be automatically
        handled by the _generate method.

        Args:
            messages: the prompt composed of a list of messages.
            stop: a list of strings on which the model should stop generating.
                  If generation stops due to a stop token, the stop token itself
                  SHOULD BE INCLUDED as part of the output. This is not enforced
                  across models right now, but it's a good practice to follow since
                  it makes it much easier to parse the output of the model
                  downstream and understand why generation stopped.
            run_manager: A run manager with callbacks for the LLM.
        """
        last_message = messages[-1]
        tokens = str(last_message.content[: self.parrot_buffer_length])
        ct_input_tokens = sum(len(message.content) for message in messages)

        for token in tokens:
            usage_metadata = UsageMetadata(
                {
                    "input_tokens": ct_input_tokens,
                    "output_tokens": 1,
                    "total_tokens": ct_input_tokens + 1,
                }
            )
            ct_input_tokens = 0
            chunk = ChatGenerationChunk(
                message=AIMessageChunk(content=token, usage_metadata=usage_metadata)
            )

            if run_manager:
                # This is optional in newer versions of LangChain
                # The on_llm_new_token will be called automatically
                run_manager.on_llm_new_token(token, chunk=chunk)

            yield chunk

        # Let's add some other information (e.g., response metadata)
        chunk = ChatGenerationChunk(
            message=AIMessageChunk(
                content="",
                response_metadata={"time_in_sec": 3, "model_name": self.model_name},
            )
        )
        if run_manager:
            # This is optional in newer versions of LangChain
            # The on_llm_new_token will be called automatically
            run_manager.on_llm_new_token(token, chunk=chunk)
        yield chunk

    @property
    def _llm_type(self) -> str:
        """Get the type of language model used by this chat model."""
        return "echoing-chat-model-advanced"

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        """Return a dictionary of identifying parameters.

        This information is used by the LangChain callback system, which
        is used for tracing purposes make it possible to monitor LLMs.
        """
        return {
            # The model name allows users to specify custom token counting
            # rules in LLM monitoring applications (e.g., in LangSmith users
            # can provide per token pricing for their model and monitor
            # costs for the given LLM.)
            "model_name": self.model_name,
        }

### How to stream chat model response 

The stream method is a way to retrieve output from a language/chat model in a piece-by-piece manner, rather than waiting for the entire response at once. It provides an iterator that yields data as it becomes available.

Why is stream useful ?
1. Responsiveness: Users can see the response as it’s being generated, improving the interactive experience (like watching words appear in a chat window).
2. Early Processing: Applications can start processing or displaying information before the model has finished its entire response.
3. Token-by-token control: If supported by the model provider, you can handle every word or token as it’s produced.


Behaviours : 

1. Default behavior (no real streaming, stream method not defined in custom model):
    The default implementation of stream yields a single value: the final output.
    Even though you call stream, you don’t get intermediate results—instead, you get the complete response at the end, just as if you had called a regular invoke method.
    This ensures API compatibility for all chat models, even those that don’t support real streaming.
    ```
    for chunk in model.stream("Tell me a joke."):
        print(chunk)
    # Output (after a pause):
    # Why did the chicken cross the road? To get to the other side!
    ```

2. With provider streaming (real streaming, stream method defined in custom model method):
If the chat model provider supports streaming, the stream method will yield multiple chunks as the model generates them (often token-by-token or sentence-by-sentence depending upon the implementation of stream method).
This means your app can display new content to the user as soon as it’s ready.

All chat models implement the Runnable interface, which comes with a default implementations of standard runnable methods (i.e. invoke, ainvoke, batch, abatch, stream, astream, astream_events).

#### Sync vs Async

stream: Synchronous Streaming

How it works:
You use a regular (blocking) for loop to iterate over the streamed output.

When to use:
In standard Python scripts or environments where asynchronous code (async/await) is not needed.


```
from langchain_anthropic.chat_models import ChatAnthropic

chat = ChatAnthropic(model="claude-3-haiku-20240307")
for chunk in chat.stream("Write me a 1 verse song about goldfish on the moon"):
    print(chunk.content, end="|", flush=True)
```

Here, each chunk.content is printed as soon as it is available.

This code blocks execution until each new chunk is received and printed.

astream: Asynchronous Streaming

How it works:
You use an async for loop, which allows other tasks to run while waiting for new pieces of the response.

When to use:
In asynchronous Python applications (using asyncio), such as web servers (FastAPI, etc.), GUIs, or whenever you want non-blocking behavior.

```
from langchain_anthropic.chat_models import ChatAnthropic

chat = ChatAnthropic(model="claude-3-haiku-20240307")
async for chunk in chat.astream("Write me a 1 verse song about goldfish on the moon"):
    print(chunk.content, end="|", flush=True)
```

As soon as each piece (chunk) of text is ready, it’s printed out immediately.
If the model streams token by token, you’ll see the answer appear gradually, like typing but not with astream. 

Here, the output is streamed asynchronously, so your program can perform other operations(other operations in the for loop) while waiting for each chunk.

This requires your code to be inside an async function and run within an event loop.

astream_events:


Purpose:
It streams not only the output chunks (tokens, messages, etc.) but also detailed events that occur during the execution of a runnable/chat model.

Usage:
You use it with an async for loop, just like astream, but instead of only getting content chunks, you receive event dictionaries (with event types, metadata, etc.).


```
async for event in chat.astream_events("Write me a 1 verse song about goldfish on the moon"):
    print(event)
```
Typical events include:

on_chat_model_start

on_chat_model_stream (one for each chunk/token)

on_chat_model_end

...and more, with accompanying metadata



Use astream_events if you want:

1. To monitor the full lifecycle of an LLM call (start, each chunk, end, errors, etc.)
2. To build complex, interactive UIs or logs that react to more than just the model's text output
3. To integrate with pipelines/chains where you need to track progress, errors, or intermediate steps

### Response Metadata

In [3]:
from langchain_openai import ChatOpenAI
import os
os.environ["GOOGLE_API_KEY"] = "AIzaSyBuTnZ2CpdF3bfZsuyomwWJqbQOIf3hU8o"
from langchain.chat_models import init_chat_model

llm = init_chat_model("gemini-2.0-flash", model_provider="google_genai")
msg = llm.invoke("What's the oldest known example of cuneiform")
msg.response_metadata

# different chat model providers have different response_metadata 

{'prompt_feedback': {'block_reason': 0, 'safety_ratings': []},
 'finish_reason': 'STOP',
 'safety_ratings': []}

### How to use chat models to call tools 

The chat model can only generate the arguments to a tool, and actually running the tool (or not) is up to the user.

In [2]:
import os
os.environ["GOOGLE_API_KEY"] = "AIzaSyBuTnZ2CpdF3bfZsuyomwWJqbQOIf3hU8o"
from langchain.chat_models import init_chat_model

model = init_chat_model("gemini-2.0-flash", model_provider="google_genai")
model.invoke("Hello, world!")

AIMessage(content='Hello there! How can I help you today?', additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run--23f03b53-7f47-4189-ab4d-879c2b3787fa-0', usage_metadata={'input_tokens': 4, 'output_tokens': 11, 'total_tokens': 15, 'input_token_details': {'cache_read': 0}})

In [10]:
from typing_extensions import Annotated, TypedDict
from langchain.tools import tool



@tool
def add(a: int, b: int) -> int:
    """Add two integers.

    Args:
        a: First integer
        b: Second integer
    """
    return a + b


@tool
def multiply(a: int, b: int) -> int:
    """Multiply two integers.

    Args:
        a: First integer
        b: Second integer
    """
    return a * b


tools = [add, multiply]

llm_with_tools = model.bind_tools(tools)

In [11]:
llm_with_tools = model.bind_tools(tools)

query = "What is 3 * 12?"

llm_with_tools.invoke(query)

# llm generates arugment to a tool

AIMessage(content='', additional_kwargs={'function_call': {'name': 'multiply', 'arguments': '{"a": 3.0, "b": 12.0}'}}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run--fe56a27c-5430-48b3-b1d5-e202acb0f4e9-0', tool_calls=[{'name': 'multiply', 'args': {'a': 3.0, 'b': 12.0}, 'id': '24ac9e80-cf73-4c08-bb6b-5053f25c0c37', 'type': 'tool_call'}], usage_metadata={'input_tokens': 63, 'output_tokens': 5, 'total_tokens': 68, 'input_token_details': {'cache_read': 0}})

In [12]:
# Note that chat models can call multiple tools at once.

query = "What is 3 * 12? Also, what is 11 + 49?"

llm_with_tools.invoke(query)

AIMessage(content='', additional_kwargs={'function_call': {'name': 'add', 'arguments': '{"a": 11.0, "b": 49.0}'}}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run--b4e81bbc-1749-40c8-8d62-0cbcbf3a95b2-0', tool_calls=[{'name': 'multiply', 'args': {'a': 3.0, 'b': 12.0}, 'id': 'b11eed05-eef1-4e4d-9038-656a73c58184', 'type': 'tool_call'}, {'name': 'add', 'args': {'a': 11.0, 'b': 49.0}, 'id': 'a87f9b79-f82c-4822-a044-b9ed1b77299c', 'type': 'tool_call'}], usage_metadata={'input_tokens': 75, 'output_tokens': 10, 'total_tokens': 85, 'input_token_details': {'cache_read': 0}})

In [13]:
# call multiple tools at once
llm_with_tools.invoke(query).tool_calls

'''
The .tool_calls attribute should contain valid tool calls. 
Note that on occasion, model providers may output malformed tool calls (e.g., arguments that are not valid JSON).
 When parsing fails in these cases, instances of InvalidToolCall are populated in the .invalid_tool_calls attribute. 
 An InvalidToolCall can have a name, string arguments, identifier, and error message.
'''

'\nThe .tool_calls attribute should contain valid tool calls. \nNote that on occasion, model providers may output malformed tool calls (e.g., arguments that are not valid JSON).\n When parsing fails in these cases, instances of InvalidToolCall are populated in the .invalid_tool_calls attribute. \n An InvalidToolCall can have a name, string arguments, identifier, and error message.\n'

In [14]:
llm_with_tools.invoke(query).invalid_tool_calls

[]

In [None]:
# parsing 
from langchain_core.output_parsers import PydanticToolsParser
from pydantic import BaseModel, Field

query = "What is 3 * 12? Also, what is 11 + 49?"
chain = llm_with_tools | PydanticToolsParser(tools=[add, multiply])
chain

AttributeError: 'StructuredTool' object has no attribute '__name__'

In [8]:
# actually using the tool by invoking the function and passing the results back to the model

query = "What is 3 * 12? Also, what is 11 + 49?"

messages = [HumanMessage(query)]

tool_dict = {"add": add, "multiply": multiply}

ai_msg = llm_with_tools.invoke(query)

for tool_call in ai_msg.tool_calls:
    selected_tool = tool_dict[tool_call["name"].lower()]
    print(selected_tool)
    # tool_msg = selected_tool.invoke(tool_call)
    # messages.append(tool_msg)

NameError: name 'HumanMessage' is not defined

In [68]:
tool_call

{'name': 'multiply',
 'args': {'a': 3.0, 'b': 12.0},
 'id': '3d032434-b9b6-4915-bd21-5c6aadbb416d',
 'type': 'tool_call'}

## Vector Store

A vector store stores embedded data and performs similarity search.

"In-memory" vector stores in LangChain (and similar frameworks) are simple vector databases that keep all data (vectors and documents) directly in your computer’s memory (RAM), rather than saving them to disk, a cloud service, or a dedicated database.


Other vector store integrated with langchain - astradb, chroma, faiss, milvus, mangodb, pinecone,  pgvector, qdrant

In [None]:
import os
os.environ["GOOGLE_API_KEY"] = "AIzaSyBvRvc3NjHH3B1_3DJO1dKiRcEJR4G64A8"
from langchain_google_genai import GoogleGenerativeAIEmbeddings

embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
# Embedding models create a vector representation of a piece of text.

# in memory 
from langchain_core.vectorstores import InMemoryVectorStore
vector_store = InMemoryVectorStore(embeddings)

# astradb
from langchain_astradb import AstraDBVectorStore
vector_store = AstraDBVectorStore(
    embedding=embeddings,
    api_endpoint=ASTRA_DB_API_ENDPOINT,
    collection_name="astra_vector_langchain",
    token=ASTRA_DB_APPLICATION_TOKEN,
    namespace=ASTRA_DB_NAMESPACE,
)

# chroma
from langchain_chroma import Chroma
vector_store = Chroma(
    collection_name="example_collection",
    embedding_function=embeddings,
    persist_directory="./chroma_langchain_db",  # Where to save data locally, remove if not necessary
)

# pinecone
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
pc = Pinecone(api_key=...)
index = pc.Index(index_name)
vector_store = PineconeVectorStore(embedding=embeddings, index=index)

## Dcoument Loader

DocumentLoaders load data into the standard LangChain Document format.
Each DocumentLoader has its own specific parameters, but they can all be invoked in the same way with the .load method. 

It can load below with multiple loader

1. webpages
2. pdf
3. cloud provider
4. social platform(twitter,reddit)
5. messaging services
6. Productivity tool(github,slack)
7. csv loader, directory loader, json loader, bshtml loader , unstructured 

In [None]:
from langchain_community.document_loaders.csv_loader import CSVLoader

loader = CSVLoader(
    ...  # <-- Integration specific parameters here
)
data = loader.load()

vector store classes do implement a class method called from_documents (or similar) for data ingestion.

## Retrievers

A Retriever is an object in LangChain that finds and returns relevant documents from a collection, given a user query.
Think of it like a search engine: you give it a question or some text, and it returns the most relevant pieces of information. A retriever is a high-level abstraction for fetching relevant documents 

Purpose: Abstracts the way documents are fetched, so you can swap out different retrieval methods (like keyword search, vector similarity, hybrid, etc.).
Interface: Typically, it has a .get_relevant_documents(query) method. It Can use vector store internally.

Retrievers can be created from vector stores. and  all vector stores can be cast to retrievers. Though vector store might have some search method but retriever can also use other methods (keyword search, hybrid search, filtering, etc.). It provides a consistent interface (get_relevant_documents(query)) regardless of the retrieval method used.

Retrievers accept a string query as input and return a list of Documents as output.

### How to use vector store as retriever ?


You can build a retriever from a vectorstore using its .as_retriever method. 

In [None]:
from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import CharacterTextSplitter

loader = TextLoader("state_of_the_union.txt")

documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(texts, embeddings)

In [None]:
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"score_threshold": 0.5,"k": 1})
docs = retriever.invoke("what did the president say about ketanji brown jackson?")

# search_type = similarity_search, max_marginal_relevance_search,etc
# Similarity score threshold retrieval by score_threshold
# limit the number of document with parameter k - top k doc retrieval 

# different vector stores will have different "??_kwargs" arguments.

### MultiQueryRetriever

Distance-based vector database retrieval embeds (represents) queries in high-dimensional space and finds similar embedded documents based on a distance metric. But, retrieval may produce different results with subtle changes in query wording, or if the embeddings do not capture the semantics of the data well. Prompt engineering / tuning is sometimes done to manually address these problems, but can be tedious.

The MultiQueryRetriever automates the process of prompt tuning by using an LLM to generate multiple queries from different perspectives for a given user input query. For each query, it retrieves a set of relevant documents and takes the unique union across all queries to get a larger set of potentially relevant documents. By generating multiple perspectives on the same question, the MultiQueryRetriever can mitigate some of the limitations of the distance-based retrieval and get a richer set of results.

In [None]:
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_openai import ChatOpenAI

question = "What are the approaches to Task Decomposition?"
llm = ChatOpenAI(temperature=0)
retriever_from_llm = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(), llm=llm
) # instead of llm , you can provide llm_chain also

unique_docs = retriever_from_llm.invoke(question)

### Contextual compression

Sometimes the information most relevant to a query may be buried in a document with a lot of irrelevant text. Passing that full document through your application can lead to more expensive LLM calls and poorer responses.

Contextual compression is meant to fix this. The idea is simple: instead of immediately returning retrieved documents as-is, you can compress them using the context of the given query, so that only the relevant information is returned. “Compressing” here refers to both compressing the contents of an individual document and filtering out documents wholesale.


To use the Contextual Compression Retriever, you'll need:

1. a base retriever
2. a Document Compressor

The Contextual Compression Retriever passes queries to the base retriever, takes the initial documents and passes them through the Document Compressor. The Document Compressor takes a list of documents and shortens it by reducing the contents of documents or dropping documents altogether.

In [None]:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor # there are more built in compressors
from langchain_openai import OpenAI

# base retriever
retriever = FAISS.from_documents(texts, OpenAIEmbeddings()).as_retriever()

# compressor
llm = OpenAI(temperature=0)
compressor = LLMChainExtractor.from_llm(llm)

# compressed retriever
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

compressed_docs = compression_retriever.invoke(
    "What did the president say about Ketanji Jackson Brown"
)

In [None]:
# chaining multiple compressors and document transform togather 

from langchain.retrievers.document_compressors import DocumentCompressorPipeline

from langchain.retrievers.document_compressors import EmbeddingsFilter
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain_text_splitters import CharacterTextSplitter

splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=0, separator=". ") # first compressor
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings) # 2nd compressor # Remove redundant (duplicate or near-duplicate) documents
relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76) # 3rd compressor # Select only documents similar to a reference/query embedding

# combine all compressors 
pipeline_compressor = DocumentCompressorPipeline(
    transformers=[splitter, redundant_filter, relevant_filter]
)


compression_retriever = ContextualCompressionRetriever(
    base_compressor=pipeline_compressor, base_retriever=retriever
)

compressed_docs = compression_retriever.invoke(
    "What did the president say about Ketanji Jackson Brown"
)

### custom Retriever

To create your own retriever, you need to extend the BaseRetriever class and implement the following methods:
1. _get_relevant_documents	
2. _aget_relevant_documents	(optional)

By inherting from BaseRetriever, your retriever automatically becomes a LangChain Runnable and will gain the standard Runnable functionality out of the box!

In [None]:
from typing import List

from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever


class ToyRetriever(BaseRetriever):
    """A toy retriever that contains the top k documents that contain the user query.

    This retriever only implements the sync method _get_relevant_documents.

    If the retriever were to involve file access or network access, it could benefit
    from a native async implementation of `_aget_relevant_documents`.

    As usual, with Runnables, there's a default async implementation that's provided
    that delegates to the sync implementation running on another thread.
    """

    documents: List[Document]
    """List of documents to retrieve from."""
    k: int
    """Number of top results to return"""

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        """Sync implementations for retriever."""
        matching_documents = []
        for document in self.documents:
            if len(matching_documents) > self.k:
                return matching_documents

            if query.lower() in document.page_content.lower():
                matching_documents.append(document)
        return matching_documents

    # Optional: Provide a more efficient native implementation by overriding
    # _aget_relevant_documents
    # async def _aget_relevant_documents(
    #     self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
    # ) -> List[Document]:
    #     """Asynchronously get documents relevant to a query.

    #     Args:
    #         query: String to find relevant documents for
    #         run_manager: The callbacks handler to use

    #     Returns:
    #         List of relevant documents
    #     """

### How to add score to retriever result

how to add retrieval scores to the .metadata of documents from vector store retriever and from higher order LangChain retrievers, such as SelfQueryRetriever or MultiVectorRetriever.

For vector store retriever ,  we will implement a short wrapper function around the corresponding vector store. 

For higher order langchain retriever, we will update a method of the corresponding class.

To obtain scores from a vector store retriever, we wrap the underlying vector store's .similarity_search_with_score method in a short function that packages scores into the associated document's metadata.


Steps:
- A custom retriever function is defined, decorated with `@chain` to make it a Runnable (usable in LangChain pipelines).
- It calls `vectorstore.similarity_search_with_score(query)`, which returns pairs of (`Document`, `score`).
- The score is inserted into each document's `.metadata`.
- The function returns the list of enriched `Document` objects.

In [None]:
from typing import List

from langchain_core.documents import Document
from langchain_core.runnables import chain


@chain
def retriever(query: str) -> List[Document]:
    docs, scores = zip(*vectorstore.similarity_search_with_score(query))
    for doc, score in zip(docs, scores):
        doc.metadata["score"] = score

    return docs

result = retriever.invoke("dinosaur")

#### SelfQueryRetriever

SelfQueryRetriever is a class that  will use a LLM to generate a query that is potentially structured.
It can generate structured queries that go beyond simple text search—like filtering documents based on metadata (attributes like year, genre, or rating).


SelfQueryRetriever includes a short (1 - 2 line) method _get_docs_with_query that executes the vectorstore search. We can subclass SelfQueryRetriever and override this method to propagate similarity scores.

**SelfQueryRetriever** uses an LLM to build more complex (structured) queries, possibly including filters on metadata.

Steps:
- **The self-query retriever requires you to have lark package installed.**
- Set up attribute metadata (e.g., `genre`, `year`, etc.) for filtering.
- Instantiate a language model (`ChatOpenAI`).
- Subclass `SelfQueryRetriever` and override its `_get_docs_with_query` method:
    - Use `vectorstore.similarity_search_with_score` to get both docs and scores.
    - Insert scores into each document's metadata.

In [None]:
from langchain.chains.query_constructor.base import AttributeInfo
from langchain_openai import ChatOpenAI


# defining the structure of metadata field on your document. 
# tell the LLM what fields are available for filtering and how to use them.
metadata_field_info = [
    AttributeInfo(
        name="genre",
        description="The genre of the movie. One of ['science fiction', 'comedy', 'drama', 'thriller', 'romance', 'action', 'animated']",
        type="string",
    ),
    AttributeInfo(
        name="year",
        description="The year the movie was released",
        type="integer",
    ),
    AttributeInfo(
        name="director",
        description="The name of the movie director",
        type="string",
    ),
    AttributeInfo(
        name="rating", description="A 1-10 rating for the movie", type="float"
    ),
]
document_content_description = "Brief summary of a movie" # A short string describing what the main content of each document 
llm = ChatOpenAI(temperature=0)

In [None]:
from typing import Any, Dict
from langchain.retrievers.self_query.base import SelfQueryRetriever


class CustomSelfQueryRetriever(SelfQueryRetriever):
    def _get_docs_with_query(self, query: str, search_kwargs: Dict[str, Any]) -> List[Document]:
        """Get docs, adding score information."""
        docs, scores = zip(
            *self.vectorstore.similarity_search_with_score(query, **search_kwargs)
        )
        for doc, score in zip(docs, scores):
            doc.metadata["score"] = score

        return docs
    

retriever = CustomSelfQueryRetriever.from_llm(
    llm,
    vectorstore,
    document_content_description,
    metadata_field_info,
    enable_limit=True
)




result = retriever.invoke("dinosaur movie with rating less than 8")


# under the hood:
# the retriever constructs a prompt for llm using user's query, document_content_description, metadata_field_info (with allowed filters/fields).
'''
Prompt ex: 
User wants to find: dinosaur movie with rating less than 8
Each document is: Brief summary of a movie
You can filter on:
  - genre: string, "The genre of the movie..."
  - year: integer, "The year the movie was released"
  - rating: float, "A 1-10 rating for the movie"
''' 
# llm receives the prompt and interprets the query and output a structured query
'''
prompt ex: 
Text search: "dinosaur"
Metadata filter: rating < 8
'''

# The retriever takes the structured query: Runs a semantic search in the vectorstore using the text ("dinosaur") ,Applies the metadata filter (rating < 8)
# The vectorstore executes a filtered similarity search and returns the top-matching documents with scores added 
# final output : The original content(Brief summary of a movie) ,The original metadata, The added "score" in metadata.
# both the filterable fields and the similarity score are stored in the same metadata dictionary of each document.
'''
(
    Document(
        page_content='A bunch of scientists bring back dinosaurs and mayhem breaks loose',
        metadata={'genre': 'science fiction', 'rating': 7.7, 'year': 1993.0, 'score': 0.84429127}
    ),
)
'''


'''
When you use SelfQueryRetriever with enable_limit=True,
 you do not pass k as a separate parameter in your code. 
 Instead, you specify the desired number of results in your natural language query.
   The retriever uses an LLM to parse your query, extract the limit, and apply it.
'''
results = retriever.invoke("Give me two movies about science fiction")

#### MultiVectorRetriever

MultiVectorRetriever is a special retriever in LangChain that allows each logical document to be associated with multiple vectors (embeddings).
This is different from the traditional retriever where one document = one vector.


Sometimes, a single document is too large or too complex to be represented by a single embedding vector.

Solution:
Break the document into meaningful chunks (sub-documents), embed each chunk separately, and store them all in the vectorstore.
Each chunk is linked to the parent document via a unique ID.


Benefits:
More accurate retrieval (retrieves based on relevant chunk, but presents the whole document)
Supports highlighting or surfacing why a document was retrieved

Chunk and Embed:
Split each large document into smaller sub-documents (chunks).
Each chunk is embedded and stored in the vectorstore, with a metadata field like "doc_id": "parent_doc_id".

Store Parents:
Store the full parent documents in a docstore (can be in-memory or persistent).

Query:
At retrieval time, the user query is embedded, and the vectorstore is searched among all sub-documents(chunks).
The retriever finds the most relevant chunks.
For each relevant chunk, the retriever fetches its parent document using doc_id.
Optionally, the retriever can include the actual sub-documents (and their scores) in the parent document’s metadata to show why the parent was retrieved.

In [None]:
# The storage layer for the parent documents

from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
docstore = InMemoryStore()
fake_whole_documents = [
    ("fake_id_1", Document(page_content="fake whole document 1")),
    ("fake_id_2", Document(page_content="fake whole document 2")),
]
docstore.mset(fake_whole_documents)  # can store in a vector store also
# docstore.mset(list(zip(doc_ids, docs)))

In [None]:
# Add metadata so each chunk knows its parent 
sub_docs = [
    Document(
        page_content="A snippet from a larger document discussing cats.",
        metadata={"doc_id": "fake_id_1"},
    ),
    Document(
        page_content="A snippet from a larger document discussing discourse.",
        metadata={"doc_id": "fake_id_1"},
    ),
    Document(
        page_content="A snippet from a larger document discussing chocolate.",
        metadata={"doc_id": "fake_id_2"},
    ),
]
vectorstore.add_documents(sub_docs)

To propagate the scores, we subclass MultiVectorRetriever and override its _get_relevant_documents method. Here we will make two changes:

We will add similarity scores to the metadata of the corresponding "sub-documents" using the similarity_search_with_score method of the underlying vector store as above;
We will include a list of these sub-documents in the metadata of the retrieved parent document. This surfaces what snippets of text were identified by the retrieval, together with their corresponding similarity scores.

In [None]:
from collections import defaultdict

from langchain.retrievers import MultiVectorRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun


class CustomMultiVectorRetriever(MultiVectorRetriever):
    def _get_relevant_documents(self, query: str, *, run_manager: CallbackManagerForRetrieverRun) -> List[Document]:
        """Get documents relevant to a query.
        Args:
            query: String to find relevant documents for
            run_manager: The callbacks handler to use
        Returns:
            List of relevant documents
        """
        results = self.vectorstore.similarity_search_with_score(
            query, **self.search_kwargs
        )

        #docs, scores = zip(*self.vectorstore.similarity_search_with_score(query, **search_kwargs))

        # Map doc_ids to list of sub-documents, adding scores to metadata
        id_to_doc = defaultdict(list) # key - doc_id , value - list of sub_dcos
        for sub_doc, score in results:
            sub_doc.metadata["score"] = score
            doc_id = sub_doc.metadata.get("doc_id")
            if doc_id:
                id_to_doc[doc_id].append(sub_doc)

        # Fetch documents corresponding to doc_ids, retaining sub_docs in metadata
        docs = []
        for _id, sub_docs in id_to_doc.items(): # Iterate through the dictionary id_to_doc, _id is the parent document ID,
            # sub_docs is a list of sub-documents (chunks) from the vectorstore that matched the query and are linked to this parent.

            docstore_docs = self.docstore.mget([_id]) # Retrieve the parent document(s) from the docstore (could be a database, in-memory store, etc.) using the parent ID.
            # mget([_id]) returns a list (even if only one doc matches).

            if docstore_docs: # Check if any parent documents were found for this ID (the list is not empty).
                if doc := docstore_docs[0]: # check if the first (and usually only) parent document in the list is None or not 
                    doc.metadata["sub_docs"] = sub_docs # Attach the list of matching sub-documents (chunks) to the parent document’s metadata under the key "sub_docs".
                    docs.append(doc) # Add the parent document (now with its relevant sub-documents in its metadata) to the results list.

        return docs
    

retriever = CustomMultiVectorRetriever(vectorstore=vectorstore, docstore=docstore) # need both vectorstore (for chuncks) and docstore (for parent docs)
retriever.invoke("cat")

Differences between "CharacterTextSplitter + Simple Retriever" vs "MultiVectorRetriever":

1. Document Storage & Retrieval Granularity
- Simple Retriever:
  - Each chunk from CharacterTextSplitter is stored as a separate document in the vector store.
  - Retrieval returns the individual matching chunk(s) only.
- MultiVectorRetriever:
  - Each chunk is stored in the vector store with a parent document ID.
  - The full parent document is also stored in a docstore.
  - Retrieval returns the **whole parent document**, not just the matching chunk.

2. Output Context
- Simple Retriever:
  - User receives only the small snippet/chunk that matched.
  - No direct information about the context or full document.
- MultiVectorRetriever:
  - User receives the entire parent document.
  - Can also receive info about which chunk(s) triggered the match (in metadata).



### EnsembleRetriever

The EnsembleRetriever is a tool in LangChain that lets you combine results from multiple retrievers (search algorithms).
This is useful because different retrievers have different strengths:

Sparse retrievers (like BM25) are good at keyword matching.

Dense retrievers (like those using embeddings) are good at understanding meaning.

Ensembling (combining) them gives you better, more robust results—this is often called hybrid search.

In [None]:

from langchain_community.vectorstores import FAISS
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain.retrievers import SimpleRetriever
from langchain_openai import OpenAIEmbeddings

# Sample documents
docs = [
    "Banana is yellow",
    "Apple is red",
    "Grapes are green",
    "Banana and apple are fruits",
]

# Create a FAISS vector store with OpenAI embeddings
embedding_fn = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(docs, embedding_fn)

# Create BM25Retriever (uses the same docs, but doesn't use embeddings)
bm25_retriever = BM25Retriever.from_texts(docs)
bm25_retriever.k = 2

# Create SimpleRetriever (wraps the FAISS vectorstore)
simple_retriever = vectorstore.as_retriever(search_kwargs={"k": 2})

# Ensemble the two retrievers (equal weight)
ensemble = EnsembleRetriever(
    retrievers=[bm25_retriever, simple_retriever],
    weights=[0.5, 0.5]
)

# Run a query
results = ensemble.invoke("banana")

### Long Context Reorder

Usually queries against vector stores will typically return documents in descending order of relevance (e.g., as measured by cosine similarity of embeddings) and they are fed to llm in that particular sequence. 
But, LLM models are liable to miss relevant information in the middle of long contexts.

To mitigate the "lost in the middle" effect, you can re-order documents after retrieval such that the most relevant documents are positioned at extrema (e.g., the first and last pieces of context), and the least relevant documents are positioned in the middle. In some cases this can help surface the most relevant information to LLMs.


The LongContextReorder document transformer implements this re-ordering procedure. 

In [None]:
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings

# Get embeddings.
embeddings = OpenAIEmbeddings()

texts = [
    "Basquetball is a great sport.",
    "Fly me to the moon is one of my favourite songs.",
    "The Celtics are my favourite team.",
    "This is a document about the Boston Celtics",
    "I simply love going to the movies",
    "The Boston Celtics won the game by 20 points",
    "This is just a random text.",
    "Elden Ring is one of the best games in the last 15 years.",
    "L. Kornet is one of the best Celtics players.",
    "Larry Bird was an iconic NBA player.",
]

# Create a retriever
retriever = InMemoryVectorStore.from_texts(texts, embedding=embeddings).as_retriever(
    search_kwargs={"k": 10}
)
query = "What can you tell me about the Celtics?"

# Get relevant documents ordered by relevance score
docs = retriever.invoke(query)
for doc in docs:
    print(f"- {doc.page_content}")



from langchain_community.document_transformers import LongContextReorder

# Reorder the documents: Less relevant document will be at the middle of the list and more relevant elements at beginning / end.
reordering = LongContextReorder()
reordered_docs = reordering.transform_documents(docs)

# Confirm that the 4 relevant documents are at beginning and end.
for doc in reordered_docs:
    print(f"- {doc.page_content}")

### ParentVectorRetriever

Sometimes, the full documents can be too big to want to retrieve them as is. In that case, what we really want to do is to first split the raw documents into larger chunks, and then split each larger chunk it into smaller chunks. We then index the smaller chunks, but on retrieval we retrieve the larger chunks (but still not the full documents). 

As retriever is just a wrapper around vector store, you can add data after you define retriever.

If you don't provide "parent_splitter" arg during instatiating retriever, then you can just retrieve the whole parent document rather than larger chunk of parent document.

In [None]:
# Example: Two-level chunking with ParentDocumentRetriever in LangChain

from langchain.retrievers import ParentDocumentRetriever
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

# 1. Example raw document
raw_docs = [
    Document(page_content="""
    LangChain is a framework for developing applications powered by language models.
    It enables the composition of LLMs with external data, APIs, and computation.
    LangChain supports retrieval-augmented generation (RAG) pipelines.
    This document describes chunking strategies for efficient retrieval.
    Splitting documents helps balance retrieval accuracy and context size.
    """),
    Document(page_content="""
    ParentDocumentRetriever enables two-level chunking.
    First, split documents into large parent chunks (e.g., 200 chars).
    Then, split parents into smaller child chunks (e.g., 80 chars).
    Index only the child chunks for retrieval.
    Upon retrieval, return the parent chunk for additional context.
    """)
]

# 2. Define chunkers
# Parent splitter: large chunks (~200 chars)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=0)
# Child splitter: smaller chunks (~80 chars)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=80, chunk_overlap=0)

# 3. Embedding function and vectorstore for child chunks
embedding = OpenAIEmbeddings()
vectorstore = FAISS(embedding_function=embedding)

# 4. Create ParentDocumentRetriever
retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=None,  # Uses in-memory docstore by default
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
)

# 5. Add the documents (handles two-level chunking internally)
retriever.add_documents(raw_docs)

# 6. Query with a relevant term
results = retriever.invoke("retrieval-augmented generation")

print("Retrieved parent chunk(s):")
for doc in results:
    print("="*20)
    print(doc.page_content)

### Time Weighted Vector Store Retriever

In many applications (like chatbots, personal assistants, or memory-augmented LLMs), you want your system to:

1. Recall relevant information (semantic similarity)
2. Prefer information that is recent or frequently used (recency/freshness)

Problem:
If you only retrieve by semantic similarity, you might get outdated or rarely-used information—even if something more recent is just as relevant!

Time-weighted retrievers solve this by boosting the importance of recent (or recently accessed) information.

Each document gets a score based on:

1. semantic_similarity (how much the content matches your query)
2. recency_score (how recent or “fresh” the document is)

score = semantic_similarity + (1.0 - decay_rate) ** hours_passed

decay_rate: how quickly recency fades (0 = never fades, 1 = always faded)

hours_passed: time since the document was last accessed (not just created!)

if decay rate is close to 0, means old info never fades. and (1.0 - decay_rate) ** hours_passed = 1 and score ~ semantic_score.

if decay rate is close to 1, means old info fades very fast. and (1.0 - decay_rate) ** hours_passed = 0 and score ~ recency.

In [None]:
from datetime import datetime, timedelta
import faiss
from langchain.retrievers import TimeWeightedVectorStoreRetriever
from langchain_community.docstore import InMemoryDocstore
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings

# 1. Set up embeddings and vector store
embeddings_model = OpenAIEmbeddings()
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model, index, InMemoryDocstore({}), {})

# 2. Create the time-weighted retriever
retriever = TimeWeightedVectorStoreRetriever(
    vectorstore=vectorstore,
    decay_rate=0.05,  # Try 0.000...1 for persistent, 0.05 for moderate decay, or 0.99 for fast decay
    k=1
)

# 3. Add documents (one 'last_accessed_at' set to yesterday)
yesterday = datetime.now() - timedelta(days=1)
retriever.add_documents([
    Document(page_content="hello world", metadata={"last_accessed_at": yesterday}),
    Document(page_content="hello foo")  # 'last_accessed_at' is now
])

# 4. Retrieve for a query
results = retriever.invoke("hello world")
print("Most relevant & fresh result:", results[0].page_content)

When a query comes in, the retriever:

1. Looks at each document's "last_accessed_at" timestamp.
2. Calculates how many hours (or other time units) have passed since it was last accessed.
3. Applies the time decay formula to adjust the score.
4. Returns the most relevant and “fresh” document(s).