## Imports

In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
import os
import time
from langchain import hub
from langchain.tools import tool
from collections import defaultdict
from langchain_openai import ChatOpenAI
from langchain_core.tools import BaseTool
from langchain.agents import AgentExecutor
from langchain_openai import OpenAIEmbeddings
from pydantic.v1 import BaseModel, Field, EmailStr
from langchain_pinecone import PineconeVectorStore
from langchain.memory import ConversationBufferMemory
from langchain_core.runnables import RunnablePassthrough
from langchain_core.utils.input import get_color_mapping
from langchain_core.callbacks import CallbackManagerForChainRun
from typing import Optional, Dict, Any, List, Tuple, Iterator, Union
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain.agents.format_scratchpad import format_to_openai_functions
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain_core.callbacks import AsyncCallbackManagerForToolRun, CallbackManagerForToolRun

## Define Langchain Utilitites

In [3]:
class DuplicateTool(BaseTool):
    """Tool that is run when a tool gets called repeatedly."""

    name: str = "duplicate_tool"
    description: str = "Called when a tool gets called repeatedly."
        
    def _run(
        self,
        tool_name: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the tool."""
        return (
            f"`{tool_name}` called too many times."
        )

    async def _arun(
        self,
        tool_name: str,
        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
    ) -> str:
        """Use the tool asynchronously."""
        return (
            f"`{tool_name}` called too many times."
        )


class CustomAgentExecutor(AgentExecutor):

    tool_counter = defaultdict(int)

    def _perform_agent_action(
        self,
        name_to_tool_map: Dict[str, BaseTool],
        color_mapping: Dict[str, str],
        agent_action: AgentAction,
        run_manager: Optional[CallbackManagerForChainRun] = None,
    ) -> AgentStep:
        if run_manager:
            run_manager.on_agent_action(agent_action, color="green")
        # Otherwise we lookup the tool
        if agent_action.tool in name_to_tool_map:
            tool = name_to_tool_map[agent_action.tool]
            return_direct = tool.return_direct
            color = color_mapping[agent_action.tool]
            tool_run_kwargs = self.agent.tool_run_logging_kwargs()
            if return_direct:
                tool_run_kwargs["llm_prefix"] = ""
            # We then call the tool on the tool input to get an observation
            if agent_action.tool == "send_email" and self.tool_counter[agent_action.tool] > 1:
                observation = DuplicateTool().run(
                    {
                        "tool_name": agent_action.tool
                    },
                    verbose=self.verbose,
                    color=None,
                    callbacks=run_manager.get_child() if run_manager else None,
                    **tool_run_kwargs,
                )
            else:
                observation = tool.run(
                    agent_action.tool_input,
                    verbose=self.verbose,
                    color=color,
                    callbacks=run_manager.get_child() if run_manager else None,
                    **tool_run_kwargs,
                )
        else:
            tool_run_kwargs = self.agent.tool_run_logging_kwargs()
            observation = InvalidTool().run(
                {
                    "requested_tool_name": agent_action.tool,
                    "available_tool_names": list(name_to_tool_map.keys()),
                },
                verbose=self.verbose,
                color=None,
                callbacks=run_manager.get_child() if run_manager else None,
                **tool_run_kwargs,
            )
        return AgentStep(action=agent_action, observation=observation)

    def _iter_next_step(
        self,
        name_to_tool_map: Dict[str, BaseTool],
        color_mapping: Dict[str, str],
        inputs: Dict[str, str],
        intermediate_steps: List[Tuple[AgentAction, str]],
        run_manager: Optional[CallbackManagerForChainRun] = None,
    ) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]:
        """Take a single step in the thought-action-observation loop.

        Override this to take control of how the agent makes and acts on choices.
        """
        try:
            intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)

            # Call the LLM to see what to do.
            output = self.agent.plan(
                intermediate_steps,
                callbacks=run_manager.get_child() if run_manager else None,
                **inputs,
            )
        except OutputParserException as e:
            if isinstance(self.handle_parsing_errors, bool):
                raise_error = not self.handle_parsing_errors
            else:
                raise_error = False
            if raise_error:
                raise ValueError(
                    "An output parsing error occurred. "
                    "In order to pass this error back to the agent and have it try "
                    "again, pass `handle_parsing_errors=True` to the AgentExecutor. "
                    f"This is the error: {str(e)}"
                )
            text = str(e)
            if isinstance(self.handle_parsing_errors, bool):
                if e.send_to_llm:
                    observation = str(e.observation)
                    text = str(e.llm_output)
                else:
                    observation = "Invalid or incomplete response"
            elif isinstance(self.handle_parsing_errors, str):
                observation = self.handle_parsing_errors
            elif callable(self.handle_parsing_errors):
                observation = self.handle_parsing_errors(e)
            else:
                raise ValueError("Got unexpected type of `handle_parsing_errors`")
            output = AgentAction("_Exception", observation, text)
            if run_manager:
                run_manager.on_agent_action(output, color="green")
            tool_run_kwargs = self.agent.tool_run_logging_kwargs()
            observation = ExceptionTool().run(
                output.tool_input,
                verbose=self.verbose,
                color=None,
                callbacks=run_manager.get_child() if run_manager else None,
                **tool_run_kwargs,
            )
            yield AgentStep(action=output, observation=observation)
            return

        # If the tool chosen is the finishing tool, then we end and return.
        if isinstance(output, AgentFinish):
            yield output
            return

        actions: List[AgentAction]
        if isinstance(output, AgentAction):
            actions = [output]
        else:
            actions = output
        for agent_action in actions:
            self.tool_counter[agent_action.tool] += 1
        for agent_action in actions:
            yield agent_action
        for agent_action in actions:
            yield self._perform_agent_action(
                name_to_tool_map, color_mapping, agent_action, run_manager
            )
            
    def _call(
        self,
        inputs: Dict[str, str],
        run_manager: Optional[CallbackManagerForChainRun] = None,
    ) -> Dict[str, Any]:
        """Run text through and get agent response."""
        # Construct a mapping of tool name to tool for easy lookup
        name_to_tool_map = {tool.name: tool for tool in self.tools}
        # We construct a mapping from each tool to a color, used for logging.
        color_mapping = get_color_mapping(
            [tool.name for tool in self.tools], excluded_colors=["green", "red"]
        )
        intermediate_steps: List[Tuple[AgentAction, str]] = []
        # Let's start tracking the number of iterations and time elapsed
        iterations = 0
        time_elapsed = 0.0
        start_time = time.time()
        # We now enter the agent loop (until it returns something).
        while self._should_continue(iterations, time_elapsed):
            next_step_output = self._take_next_step(
                name_to_tool_map,
                color_mapping,
                inputs,
                intermediate_steps,
                run_manager=run_manager,
            )
            if isinstance(next_step_output, AgentFinish):
                self.tool_counter = defaultdict(int)
                return self._return(
                    next_step_output, intermediate_steps, run_manager=run_manager
                )

            intermediate_steps.extend(next_step_output)
            if len(next_step_output) == 1:
                next_step_action = next_step_output[0]
                # See if tool should return directly
                tool_return = self._get_tool_return(next_step_action)
                if tool_return is not None:
                    self.tool_counter = defaultdict(int)
                    return self._return(
                        tool_return, intermediate_steps, run_manager=run_manager
                    )
            iterations += 1
            time_elapsed = time.time() - start_time
        self.tool_counter = defaultdict(int)
        output = self.agent.return_stopped_response(
            self.early_stopping_method, intermediate_steps, **inputs
        )
        return self._return(output, intermediate_steps, run_manager=run_manager)
        
class EmailInput(BaseModel):
    email: EmailStr = Field(..., description="The email of the human")
    message: str = Field(..., description="The message content.")

@tool(args_schema=EmailInput)
def send_email(email: str, message: str) -> str:
    """
    Send a message or a text to Nikhil.
    """
    print(f"TODO: Send email to {email} | message: {message}")
    return f"`send_email` ran successfully."

## Chatbot

In [4]:
EMBEDDING_FUNCTION = OpenAIEmbeddings(model=os.getenv("EMBEDDING_TYPE"))
EMBEDDING_FUNCTION

OpenAIEmbeddings(client=<openai.resources.embeddings.Embeddings object at 0x114853eb0>, async_client=<openai.resources.embeddings.AsyncEmbeddings object at 0x114851240>, model='text-embedding-3-large', dimensions=None, deployment='text-embedding-ada-002', openai_api_version='', openai_api_base=None, openai_api_type='', openai_proxy='', embedding_ctx_length=8191, openai_api_key=SecretStr('**********'), openai_organization=None, allowed_special=None, disallowed_special=None, chunk_size=1000, max_retries=2, request_timeout=None, headers=None, tiktoken_enabled=True, tiktoken_model_name=None, show_progress_bar=False, model_kwargs={}, skip_empty=False, default_headers=None, default_query=None, retry_min_seconds=4, retry_max_seconds=20, http_client=None, http_async_client=None, check_embedding_ctx_length=True)

In [5]:
PINECONE_VS = PineconeVectorStore(index_name=os.getenv("PINECONE_INDEX_NAME"), embedding=EMBEDDING_FUNCTION)
PINECONE_VS

<langchain_pinecone.vectorstores.PineconeVectorStore at 0x11484d090>

In [6]:
search_kwargs_vs = {
    "k": int(os.getenv("TOP_K")),
}
if os.getenv("SEARCH_TYPE") == "mmr":
    search_kwargs_vs["fetch_k"] = int(os.getenv("FETCH_K"))
    search_kwargs_vs["lambda_multiplier"] = os.getenv("LAMBDA_MULTIPLIER")
RETRIEVER = PINECONE_VS.as_retriever(search_type=os.getenv("SEARCH_TYPE"), search_kwargs=search_kwargs_vs)
RETRIEVER

VectorStoreRetriever(tags=['PineconeVectorStore', 'OpenAIEmbeddings'], vectorstore=<langchain_pinecone.vectorstores.PineconeVectorStore object at 0x11484d090>, search_kwargs={'k': 4})

In [7]:
PROMPT = hub.pull(os.getenv("LLM_RAG_PROMPT_NAME"))
PROMPT



In [8]:
TOOLS = [send_email,]
FUNCTIONS = [convert_to_openai_function(t) for t in TOOLS]
FUNCTIONS

[{'name': 'send_email',
  'description': 'Send a message or a text to Nikhil.',
  'parameters': {'type': 'object',
   'properties': {'email': {'description': 'The email of the human',
     'type': 'string',
     'format': 'email'},
    'message': {'description': 'The message content.', 'type': 'string'}},
   'required': ['email', 'message']}}]

In [9]:
model_kwargs = {
    "top_p": float(os.getenv("LLM_TOP_P")),
    "frequency_penalty": float(os.getenv("LLM_FREQUENCY_PENALTY")),
    "presence_penalty": float(os.getenv("LLM_PRESENCE_PENALTY")),
}
LLM = ChatOpenAI(
    model=os.getenv("LLM_MODEL_NAME"),
    temperature=float(os.getenv("LLM_TEMPERATURE")),
    model_kwargs=model_kwargs
).bind(functions=FUNCTIONS)
LLM

RunnableBinding(bound=ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x114868c40>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x11486baf0>, temperature=1.0, model_kwargs={'top_p': 1.0, 'frequency_penalty': 0.0, 'presence_penalty': 0.0}, openai_api_key=SecretStr('**********'), openai_proxy=''), kwargs={'functions': [{'name': 'send_email', 'description': 'Send a message or a text to Nikhil.', 'parameters': {'type': 'object', 'properties': {'email': {'description': 'The email of the human', 'type': 'string', 'format': 'email'}, 'message': {'description': 'The message content.', 'type': 'string'}}, 'required': ['email', 'message']}}]})

In [10]:
CHAIN = (
    RunnablePassthrough.assign(agent_scratchpad=(lambda x: format_to_openai_functions(x["intermediate_steps"])))
    | RunnablePassthrough.assign(context=(lambda x: x["question"]) | RETRIEVER | (lambda docs: "\n\n".join(doc.page_content for doc in docs)))
    | RunnablePassthrough.assign(question=(lambda x: x["question"]))
    | PROMPT
    | LLM
    | OpenAIFunctionsAgentOutputParser()
)
CHAIN

RunnableAssign(mapper={
  agent_scratchpad: RunnableLambda(lambda x: format_to_openai_functions(x['intermediate_steps']))
})
| RunnableAssign(mapper={
    context: RunnableLambda(...)
             | VectorStoreRetriever(tags=['PineconeVectorStore', 'OpenAIEmbeddings'], vectorstore=<langchain_pinecone.vectorstores.PineconeVectorStore object at 0x11484d090>, search_kwargs={'k': 4})
             | RunnableLambda(...)
  })
| RunnableAssign(mapper={
    question: RunnableLambda(...)
  })
| RunnableBinding(bound=ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x114868c40>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x11486baf0>, temperature=1.0, model_kwargs={'top_p': 1.0, 'frequency_penalty': 0.0, 'presence_penalty': 0.0}, openai_api_key=SecretStr('**********'), openai_proxy=''), kwargs={'functions': [{'name': 'send_email', 'description': 'Send a message or a text to Nikhil.', 'parameters': {'type': 'object', 'properties': {'email':

### Test 1

In [14]:
AGENT = CustomAgentExecutor(agent=CHAIN, tools=TOOLS, max_iterations=os.getenv("LLM_AGENT_MAX_ITERATIONS"), verbose=True, memory=ConversationBufferMemory(return_messages=True,memory_key="chat_history", ))
print('='*100)
print('='*100)
AGENT.invoke(
    {
        "question": "Hi"
    }
)
print('='*100)
print('='*100)
print("\n\n")
print('='*100)
print('='*100)
AGENT.invoke(
    {
        "question": "Message nikhil asking for his indetail work experience?"
    }
)
print('='*100)
print('='*100)
print("\n\n")
print('='*100)
print('='*100)
AGENT.invoke(
    {
        "question": "my email is groove@gmail.com"
    }
)
print('='*100)
print('='*100)



[1m> Entering new CustomAgentExecutor chain...[0m
[32;1m[1;3mHey! I am Harpy, your chat assistant. Please ask questions about Nikhil. I can answer them for you :-)[0m

[1m> Finished chain.[0m





[1m> Entering new CustomAgentExecutor chain...[0m
[32;1m[1;3mNikhil has a total work experience of two-plus years and almost six years in the Computer Science domain. He has worked as a CTO, Software Development Engineer (SDE), ML Engineer, and research assistant. Nikhil has experience in coding vastly in AI and has worked as a Machine Learning Engineer at Insureka for almost two years. He also worked as the CTO of a startup called BhavamAI and as a research assistant at the University of Southern California. Additionally, he interned at Dragonfruit AI as a software engineer.[0m

[1m> Finished chain.[0m





[1m> Entering new CustomAgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `send_email` with `{'email': 'groove@gmail.com', 'message': 'Nikhil has a total work experience

### Test 2

In [12]:
AGENT = CustomAgentExecutor(agent=CHAIN, tools=TOOLS, max_iterations=os.getenv("LLM_AGENT_MAX_ITERATIONS"), verbose=True, memory=ConversationBufferMemory(return_messages=True,memory_key="chat_history", ))
print('='*100)
print('='*100)
AGENT.invoke(
    {
        "question": "Message nikhil asking for available time"
    }
)
print('='*100)
print('='*100)
print("\n\n")
print('='*100)
print('='*100)
AGENT.invoke(
    {
        "question": "my email is groove@yahoo.com"
    }
)
print('='*100)
print('='*100)
print("\n\n")
print('='*100)
print('='*100)
AGENT.invoke(
    {
        "question": "Message nikhil asking for his whereabouts"
    }
)
print('='*100)
print('='*100)



[1m> Entering new CustomAgentExecutor chain...[0m
[32;1m[1;3mIn order to proceed, I will be needing your email ID so as to send a copy of this message to you.[0m

[1m> Finished chain.[0m





[1m> Entering new CustomAgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `send_email` with `{'email': 'user@example.com', 'message': 'Hello Nikhil, Could you please provide your available time slots for a meeting? Thank you!'}`


[0mTODO: Send email to user@example.com | message: Hello Nikhil, Could you please provide your available time slots for a meeting? Thank you!
[36;1m[1;3m`send_email` ran successfully.[0m[32;1m[1;3m
Invoking: `send_email` with `{'email': 'user@example.com', 'message': 'Hello Nikhil, Could you please provide your available time slots for a meeting? Thank you!'}`


[0m`send_email` called too many times.[32;1m[1;3m
Invoking: `send_email` with `{'email': 'user@example.com', 'message': 'Hello Nikhil, Could you please provide your available time slots for a 