# Agents

In [1]:
# Built-in library
from pathlib import Path
import re
import json
from typing import Any, Literal, Optional, Union
import logging
import warnings

# Standard imports
import numpy as np
import numpy.typing as npt
from pprint import pprint
import pandas as pd
import polars as pl
from rich.console import Console
from rich.theme import Theme

custom_theme = Theme(
    {
        "white": "#FFFFFF",  # Bright white
        "info": "#00FF00",  # Bright green
        "warning": "#FFD700",  # Bright gold
        "error": "#FF1493",  # Deep pink
        "success": "#00FFFF",  # Cyan
        "highlight": "#FF4500",  # Orange-red
    }
)
console = Console(theme=custom_theme)

# Visualization
# import matplotlib.pyplot as plt

# NumPy settings
np.set_printoptions(precision=4)

# Pandas settings
pd.options.display.max_rows = 1_000
pd.options.display.max_columns = 1_000
pd.options.display.max_colwidth = 600

# Polars settings
pl.Config.set_fmt_str_lengths(1_000)
pl.Config.set_tbl_cols(n=1_000)

warnings.filterwarnings("ignore")

# Black code formatter (Optional)
%load_ext lab_black

# auto reload imports
%load_ext autoreload
%autoreload 2

In [2]:
def go_up_from_current_directory(*, go_up: int = 1) -> None:
    """This is used to up a number of directories.

    Params:
    -------
    go_up: int, default=1
        This indicates the number of times to go back up from the current directory.

    Returns:
    --------
    None
    """
    import os
    import sys

    CONST: str = "../"
    NUM: str = CONST * go_up

    # Goto the previous directory
    prev_directory = os.path.join(os.path.dirname(__name__), NUM)
    # Get the 'absolute path' of the previous directory
    abs_path_prev_directory = os.path.abspath(prev_directory)

    # Add the path to the System paths
    sys.path.insert(0, abs_path_prev_directory)
    print(abs_path_prev_directory)

In [3]:
go_up_from_current_directory(go_up=2)

from QA_and_RAG import ROOT_PATH
from config import settings


ROOT_PATH

/Users/neidu/Desktop/Projects/Personal/My_Projects/Gen-AI-Projects


PosixPath('/Users/neidu/Desktop/Projects/Personal/My_Projects/Gen-AI-Projects/QA_and_RAG')

In [4]:
import os
import langchain
from langchain.chains import create_sql_query_chain
from langchain_community.utilities import SQLDatabase
from langchain_community.tools.sql_database.tool import QuerySQLDatabaseTool
from langchain_community.agent_toolkits import create_sql_agent
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import (
    RunnableLambda,
    RunnablePassthrough,
    RunnableSequence,
)
from langchain_openai import ChatOpenAI


langchain.debug = True

In [None]:
from omegaconf import OmegaConf, DictConfig


config: DictConfig = OmegaConf.load("../../config/config.yaml")

print(OmegaConf.to_yaml(config, resolve=True))

In [None]:
file_path: str = str(ROOT_PATH / "/data/sql/chinook.db")
file_path

In [None]:
llm = ChatOpenAI(model="gpt-4o-mini", api_key=settings.OPENAI_API_KEY, temperature=0)
llm

### Old Approach (Previous Versions of LangChain)

In [8]:
prompt_1: str = """Based on the database query result, provide a complete sentence
    answer to the user''s question.

    Question: {question}
    SQL Result: {result}
    Answer:
    """

In [9]:
def _qa_sql_agent(db_path: str, prompt: str, message: str, llm: Any) -> str:
    """Execute SQL queries and format answers using a language model.

    Parameters
    ----------
    db_path : str
        Path to the SQLite database file.
    prompt : str
        Template string for formatting the answer.
    message : str
        User question to be answered.
    llm : Any
        Language model instance for generating SQL queries and answers.

    Returns
    -------
    str
        Formatted response to the user's question.
    """
    db: SQLDatabase = SQLDatabase.from_uri(
        f"sqlite:///../{db_path}"
    )  # for jupyter notebook
    print(f"DB Dialect: {db.dialect}\nTable Names: {db.get_usable_table_names()}")

    exec_query: QuerySQLDatabaseTool = QuerySQLDatabaseTool(db=db)
    write_query: RunnableSequence = create_sql_query_chain(llm, db=db)

    answer_prompt: PromptTemplate = PromptTemplate.from_template(template=prompt)

    answer: RunnableSequence = answer_prompt | llm | StrOutputParser()

    def execute_and_format(data: dict[str, str]) -> str:
        """Execute SQL query and format the answer.

        Parameters
        ----------
        data : dict[str, str]
            Dictionary containing 'query' and 'question' keys with string values.
            The 'query' value may optionally start with 'SQLQuery:'.

        Returns
        -------
        str
            Formatted answer based on the question and query result.
        """
        # Execute query
        sql: str = data["query"]
        print("===" * 20)
        print("sql: ", sql)
        if (
            sql.startswith("SQLQuery:")
            or sql.startswith("sql:")
            or sql.__contains__("")
        ):
            sql = (
                sql.replace("SQLQuery:", "")
                .replace("sql:", "")
                .replace("sql", "")
                .replace("```", "")
                .strip()
            )
        result: str = exec_query.invoke(sql)

        return answer.invoke({"question": data["question"], "result": result})

    chain: RunnableSequence = (
        RunnablePassthrough()
        # Add the key `query` to the input dict
        .assign(query=write_query)
        # Chain the output of the prev step with the next step
        .pipe(RunnableLambda(execute_and_format))
    )
    response: str = chain.invoke({"question": message})
    return response

In [None]:
fp: str = "../data/flat_files/titanic-data.csv"
df: pl.DataFrame = pl.read_csv(fp)

# print(df["BloodPressure"].mean())
df.filter(pl.col("sex").eq("male")).describe()
# df.head()

In [None]:
from enum import Enum

from QA_and_RAG.src.db_utils import SQLFromTabularData


class ChatType(Enum):
    qa_with_stored_sql_db = "q&a-with-stored-sql-db"
    qa_with_stored_flat_file_sql_db = "q&a-with-stored-flat-file-sql-db"
    qa_with_uploaded_flat_file_sql_db = "q&a-with-uploaded-flat-file-sql-db"


chat_type: Literal[
    ChatType.qa_with_stored_sql_db,
    ChatType.qa_with_stored_flat_file_sql_db,
    ChatType.qa_with_uploaded_flat_file_sql_db,
] = ChatType.qa_with_stored_sql_db.value
app_functionality: str = "chat"
message: str = "How many employees are there"
chatbot: list[str] = []
file_path: str = str(ROOT_PATH / "/data/sql/chinook.db")
flat_file_path: str = str(ROOT_PATH / "/data/flat_files/stored_data.db")

if app_functionality == "chat":
    if chat_type == ChatType.qa_with_stored_sql_db.value:
        # if os.path.exists(config.QA_and_RAG.chinook_db_path):
        if os.path.exists(f"../{file_path}"):  # for jupyter notebook
            response = _qa_sql_agent(
                db_path=file_path, prompt=prompt_1, message=message, llm=llm
            )
        else:
            chatbot.append(
                (
                    message,
                    f"SQL DB does not exist. Please first create the `chinook` db",
                )
            )
    # elif chat_type == "qa-with-flat-file-sql-db":
    elif chat_type == ChatType.qa_with_uploaded_flat_file_sql_db.value:
        _db_path: str = "../data/for_upload/uploaded_data.db"
        create_db: SQLFromTabularData = SQLFromTabularData(
            file_path="uploaded_data.csv",
            db_path=_db_path,
            table_name="your_table_name",
        )
        if os.path.exists(f"../{file_path}"):  # for jupyter notebook
            response = _qa_sql_agent(
                db_path=_db_path, prompt=prompt_1, message=message, llm=llm
            )
        else:
            chatbot.append(
                (
                    message,
                    f"SQL DB from does not exist. Please upload a valid `CSV`/`PARQUET` file.",
                )
            )

    elif chat_type == ChatType.qa_with_stored_flat_file_sql_db.value:
        if os.path.exists(f"../{file_path}"):  # for jupyter notebook
            response = _qa_sql_agent(
                db_path=file_path, prompt=prompt_1, message=message, llm=llm
            )
        else:
            chatbot.append(
                (
                    message,
                    f"SQL DB from does not exist. Please first create the `chinook` db",
                )
            )

    chatbot.append((message, response))

In [22]:
def get_database(db_path: str) -> SQLDatabase:
    db: SQLDatabase = SQLDatabase.from_uri(
        f"sqlite:///../{db_path}"
    )  # for jupyter notebook
    print(db.dialect)
    return db

In [None]:
db_path: str = "/data/flat_files/stored_data.db"
message: str = "What is the average blood pressure?"
message: str = "How many men survived in the Titanic?"

_qa_sql_agent(db_path=db_path, prompt=prompt_1, message=message, llm=llm)

<br>

### Using Agent Executor (Old Approach)

In [None]:
db_path: str = "/data/flat_files/stored_data.db"
db: SQLDatabase = get_database(db_path=db_path)

agent_executor = create_sql_agent(
    llm=llm, db=db, agent_type="openai-tools", verbose=False
)
response = agent_executor.invoke(message)["output"]
response

### Modern Approach

In [None]:
from langchain import hub
from langchain_core.prompts.chat import ChatPromptTemplate


query_prompt_template: ChatPromptTemplate = hub.pull(
    "langchain-ai/sql-query-system-prompt"
)
# console.print(query_prompt_template.messages[0])
query_prompt_template.messages[0].pretty_print()

In [23]:
from typing import Annotated, TypedDict


chat_llm: ChatOpenAI = ChatOpenAI(
    model="gpt-4o-mini", api_key=settings.OPENAI_API_KEY, temperature=0
)


class State(TypedDict):
    question: str
    query: str
    result: str
    answer: str


class QueryOutput(TypedDict):
    """Generated SQL query."""

    query: Annotated[str, ..., "Syntactically valid SQL query."]


def write_query(state: State):
    """Generate SQL query to fetch information."""
    prompt = query_prompt_template.invoke(
        {
            "dialect": db.dialect,
            "top_k": 10,
            "table_info": db.get_table_info(),
            "input": state["question"],
        }
    )
    structured_llm = chat_llm.with_structured_output(QueryOutput)
    result = structured_llm.invoke(prompt)
    return {"query": result["query"]}


def execute_query(state: State):
    """Execute SQL query."""
    execute_query_tool = QuerySQLDatabaseTool(db=db)
    return {"result": execute_query_tool.invoke(state["query"])}


def generate_answer(state: State):
    """Answer question using retrieved information as context."""
    prompt = (
        "Given the following user question, corresponding SQL query, "
        "and SQL result, answer the user question.\n\n"
        f'Question: {state["question"]}\n'
        f'SQL Query: {state["query"]}\n'
        f'SQL Result: {state["result"]}'
    )
    response = chat_llm.invoke(prompt)
    return {"answer": response.content}

In [None]:
state_data: State = State(
    # {"question": "How many men were there in the Titanic that survived?"}
    {"question": "What is the ratio of men to women that survived the Titanic?"}
)
query: str = write_query(state_data)["query"]
state_data["query"] = query

query_result: dict[str, Any] = execute_query(state_data)["result"]
state_data["result"] = query_result

generate_answer(state=state_data)

### Using LangGraph

In [26]:
from langgraph.graph import START, StateGraph

In [27]:
graph_builder = StateGraph(State).add_sequence(
    [write_query, execute_query, generate_answer]
)
graph_builder.add_edge(START, "write_query")
graph = graph_builder.compile()

In [None]:
from IPython.display import display, Image


display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
langchain.debug = False

for step in graph.stream({"question": "How many men did NOT survive the Titanic?"}):
    print(step)

In [None]:
"How many employees are there?"

In [None]:
class UploadFile:
    """
    A class that acts as a controller to run various file processing pipelines
    based on the chatbot's current functionality when handling uploaded files.
    """

    @staticmethod
    def run_pipeline(files_dir: List, chatbot: List, chatbot_functionality: str):
        """
        Run the appropriate pipeline based on chatbot functionality.

        Args:
            files_dir (List): List of paths to uploaded files.
            chatbot (List): The current state of the chatbot's dialogue.
            chatbot_functionality (str): A string specifying the chatbot's current functionality.

        Returns:
            Tuple: A tuple of an empty string and the updated chatbot list, or None if functionality not matched.
        """
        if chatbot_functionality == "Process files":
            pipeline_instance = ProcessFiles(files_dir=files_dir, chatbot=chatbot)
            input_txt, chatbot = pipeline_instance.run()
            return input_txt, chatbot
        else:
            pass  # Other functionalities can be implemented here.


