[1]Let’s build a powerful hybrid system that combines Retrieval-Augmented Generation (RAG) and Text-to-SQL into a single query engine!

We’ll leverage @llama_index, @OpenAI, and @arizeAI Phoenix for observability. Let's dive in!

[2] Setting Up the Environment

First, we install the required packages:

In [None]:
!pip install -U llama-index-callbacks-arize-phoenix arize-phoenix
!pip install llama-index-llms-openai llama-index-embeddings-openai
!pip install llama-index-indices-managed-llama-cloud
!pip install llama-index-viz


[3]Securely Setting Up OpenAI API Key for this

In [None]:
import os
os.environ["OPENAI_API_KEY"] = "Add here your  openai_api_key"

In [None]:
import nest_asyncio

nest_asyncio.apply()

[4] Integrating Arize Phoenix for RAG and text to sql Model Logging & Observability

In [None]:
# setup Arize Phoenix for logging/observability
import llama_index.core
import os

PHOENIX_API_KEY = "Add here your phoenix_api_key"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
llama_index.core.set_global_handler(
    "arize_phoenix", endpoint="https://llamatrace.com/v1/traces"
)

[5] Creating and Querying an In-Memory SQL Database with LlamaIndex & OpenAI"


This code snippet sets up an in-memory SQLite database and integrates it with LlamaIndex and OpenAI’s GPT-3.5-turbo for natural language querying. It:

Configures LlamaIndex to use OpenAI’s LLM for text-based SQL queries.
Creates a SQLite database in memory using SQLAlchemy.
Defines a city_stats table with columns for city name, population, and state.
Initializes the database schema to store structured data for future AI-driven SQL interactions

In [None]:
from llama_index.core import SQLDatabase, Settings
from llama_index.llms.openai import OpenAI
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    String,
    Integer,
)

Settings.llm = OpenAI("gpt-3.5-turbo")


engine = create_engine("sqlite:///:memory:", future=True)
metadata_obj = MetaData()

# create city SQL table
table_name = "city_stats"
city_stats_table = Table(
    table_name,
    metadata_obj,
    Column("city_name", String(16), primary_key=True),
    Column("population", Integer),
    Column("state", String(16), nullable=False),
)

metadata_obj.create_all(engine)

[6] Populating and Querying an In-Memory SQL Database with SQLAlchemy"


This code snippet inserts city population data into an in-memory SQLite database using SQLAlchemy and then queries the data to retrieve all records.

Defines a list of cities with their population and state information.
Uses SQLAlchemy’s insert() function to add rows to the city_stats table.
Executes an SQL query (SELECT * FROM city_stats) to retrieve and print all stored records.
Utilizes database transactions (engine.begin()) for efficient and safe data insertion

In [None]:
from sqlalchemy import insert

rows = [
    {"city_name": "New York City", "population": 8336000, "state": "New York"},
    {"city_name": "Los Angeles", "population": 3822000, "state": "California"},
    {"city_name": "Chicago", "population": 2665000, "state": "Illinois"},
    {"city_name": "Houston", "population": 2303000, "state": "Texas"},
    {"city_name": "Miami", "population": 449514, "state": "Florida"},
    {"city_name": "Seattle", "population": 749256, "state": "Washington"},
]
for row in rows:
    stmt = insert(city_stats_table).values(**row)
    with engine.begin() as connection:
        cursor = connection.execute(stmt)

with engine.connect() as connection:
    cursor = connection.exec_driver_sql("SELECT * FROM city_stats")
    print(cursor.fetchall())

[7]AI-Powered Natural Language to SQL Querying with LlamaIndex & OpenAI Embeddings"


This code sets up an AI-driven Text-to-SQL query engine using LlamaIndex and OpenAI Embeddings, enabling users to query structured SQL data using natural language.

Initializes an SQLDatabase with an existing SQLite engine and includes the city_stats table.
Creates an NLSQLTableQueryEngine that converts natural language queries into SQL queries.
Utilizes OpenAI Embeddings to improve query understanding and accuracy

In [None]:
from llama_index.core.query_engine import NLSQLTableQueryEngine
from llama_index.embeddings.openai import OpenAIEmbedding # Import OpenAIEmbedding

sql_database = SQLDatabase(engine, include_tables=["city_stats"])
sql_query_engine = NLSQLTableQueryEngine(
    sql_database=sql_database,
    tables=["city_stats"],
    embed_model=OpenAIEmbedding()
)

[8] Deploying RAG-Enabled Querying with LlamaCloud Index
This code integrates LlamaIndex’s LlamaCloud to enable Retrieval-Augmented Generation (RAG) over structured and unstructured data, allowing for scalable AI-driven querying.

Creates a LlamaCloudIndex named "rag_sql" within the "llamacloud_demo" project.
Connects to an organization using API credentials for cloud-based indexing and query execution.
Initializes llama_cloud_query_engine to allow AI-powered natural language querying over indexed data

In [None]:
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex

index = LlamaCloudIndex(
    name="rag_sql",
    project_name="llamacloud_demo",
    organization_id="07cff857-92f7-49cb-8716-46b683b8d997",
    api_key="Add here your Llama-index_api_key"
)

llama_cloud_query_engine = index.as_query_engine()

[9] Configuring AI-Powered Query Tools for SQL and RAG-Based Retrieval

In [None]:
from llama_index.core.tools import QueryEngineTool

sql_tool = QueryEngineTool.from_defaults(
    query_engine=sql_query_engine,
    description=(
        "Useful for translating a natural language query into a SQL query over"
        " a table containing: city_stats, containing the population/state of"
        " each city located in the USA."
    ),
    name="sql_tool"
)

cities = ["New York City", "Los Angeles", "Chicago", "Houston", "Miami", "Seattle"]
llama_cloud_tool = QueryEngineTool.from_defaults(
    query_engine=llama_cloud_query_engine,
    description=(
        f"Useful for answering semantic questions about certain cities in the US."
    ),
    name="llama_cloud_tool"
)

[10] Building a Multi-Tool AI Agent for Intelligent Query Routing with LlamaIndex"


This code defines a custom AI agent workflow using LlamaIndex that dynamically routes user queries to the appropriate tool, enabling intelligent, multi-modal interactions.

Defines Custom Events & Workflows:

InputEvent: Handles user messages.
GatherToolsEvent: Identifies relevant tools for the query.
ToolCallEvent: Calls the selected tool.
ToolCallEventResult: Returns the tool’s response.
Implements RouterOutputAgentWorkflow:

Stores a chat history to maintain conversational context.
Uses OpenAI’s GPT-3.5-Turbo to decide which tool to use.
Routes natural language queries to either a SQL-based structured data retriever or an LLM-powered unstructured data retriever (RAG).
Supports parallel tool calls for efficiency.
Processes Queries in Steps:

Prepares chat history.
Decides if a tool is needed or responds directly.
Dispatches tool calls if necessary.
Gathers and returns results to the user

In [None]:
from typing import Dict, List, Any, Optional

from llama_index.core.tools import BaseTool
from llama_index.core.llms import ChatMessage
from llama_index.core.llms.llm import ToolSelection, LLM
from llama_index.core.workflow import (
    Workflow,
    Event,
    StartEvent,
    StopEvent,
    step,
     Context,
)
from llama_index.core.base.response.schema import Response
from llama_index.core.tools import FunctionTool


class InputEvent(Event):
    """Input event."""

class GatherToolsEvent(Event):
    """Gather Tools Event"""

    tool_calls: Any

class ToolCallEvent(Event):
    """Tool Call event"""

    tool_call: ToolSelection

class ToolCallEventResult(Event):
    """Tool call event result."""

    msg: ChatMessage

class RouterOutputAgentWorkflow(Workflow):
    """Custom router output agent workflow."""

    def __init__(self,
        tools: List[BaseTool],
        timeout: Optional[float] = 10.0,
        disable_validation: bool = False,
        verbose: bool = False,
        llm: Optional[LLM] = None,
        chat_history: Optional[List[ChatMessage]] = None,
    ):
        """Constructor."""

        super().__init__(timeout=timeout, disable_validation=disable_validation, verbose=verbose)

        self.tools: List[BaseTool] = tools
        self.tools_dict: Optional[Dict[str, BaseTool]] = {tool.metadata.name: tool for tool in self.tools}
        self.llm: LLM = llm or OpenAI(temperature=0, model="gpt-3.5-turbo")
        self.chat_history: List[ChatMessage] = chat_history or []


    def reset(self) -> None:
        """Resets Chat History"""

        self.chat_history = []

    @step()
    async def prepare_chat(self, ev: StartEvent) -> InputEvent:
        message = ev.get("message")
        if message is None:
            raise ValueError("'message' field is required.")

        # add msg to chat history
        chat_history = self.chat_history
        chat_history.append(ChatMessage(role="user", content=message))
        return InputEvent()

    @step()
    async def chat(self, ev: InputEvent) -> GatherToolsEvent | StopEvent:
        """Appends msg to chat history, then gets tool calls."""

        # Put msg into LLM with tools included
        chat_res = await self.llm.achat_with_tools(
            self.tools,
            chat_history=self.chat_history,
            verbose=self._verbose,
            allow_parallel_tool_calls=True
        )
        tool_calls = self.llm.get_tool_calls_from_response(chat_res, error_on_no_tool_call=False)

        ai_message = chat_res.message
        self.chat_history.append(ai_message)
        if self._verbose:
            print(f"Chat message: {ai_message.content}")

        # no tool calls, return chat message.
        if not tool_calls:
            return StopEvent(result=ai_message.content)

        return GatherToolsEvent(tool_calls=tool_calls)

    @step(pass_context=True)
    async def dispatch_calls(self, ctx: Context, ev: GatherToolsEvent) -> ToolCallEvent:
        """Dispatches calls."""

        tool_calls = ev.tool_calls
        await ctx.set("num_tool_calls", len(tool_calls))

        # trigger tool call events
        for tool_call in tool_calls:
            ctx.send_event(ToolCallEvent(tool_call=tool_call))

        return None

    @step()
    async def call_tool(self, ev: ToolCallEvent) -> ToolCallEventResult:
        """Calls tool."""

        tool_call = ev.tool_call

        # get tool ID and function call
        id_ = tool_call.tool_id

        if self._verbose:
            print(f"Calling function {tool_call.tool_name} with msg {tool_call.tool_kwargs}")

        # call function and put result into a chat message
        tool = self.tools_dict[tool_call.tool_name]
        output = await tool.acall(**tool_call.tool_kwargs)
        msg = ChatMessage(
            name=tool_call.tool_name,
            content=str(output),
            role="tool",
            additional_kwargs={
                "tool_call_id": id_,
                "name": tool_call.tool_name
            }
        )

        return ToolCallEventResult(msg=msg)

    @step(pass_context=True)
    async def gather(self, ctx: Context, ev: ToolCallEventResult) -> StopEvent | None:
        """Gathers tool calls."""
        # wait for all tool call events to finish.
        tool_events = ctx.collect_events(ev, [ToolCallEventResult] * await ctx.get("num_tool_calls"))
        if not tool_events:
            return None

        for tool_event in tool_events:
            # append tool call chat messages to history
            self.chat_history.append(tool_event.msg)

        # # after all tool calls finish, pass input event back, restart agent loop
        return InputEvent()

[11]Initializing an AI-Powered Query Routing Workflow with LlamaIndex

In [None]:
wf = RouterOutputAgentWorkflow(tools=[sql_tool, llama_cloud_tool], verbose=True, timeout=120)

[12]Checking the Installed Version of LlamaIndex

In [None]:
from importlib.metadata import version
print(version("llama-index"))

[13]Visualizing AI Workflow Execution with NetworkX"


This code creates and visualizes a directed graph representing the execution flow of the RouterOutputAgentWorkflow using NetworkX and Matplotlib.

Defines a directed graph (DiGraph) to illustrate the step-by-step process.
Adds edges to represent workflow transitions, from "Start" to "End" via multiple steps.
Uses nx.draw() to render the graph, with labeled nodes, custom colors, and an organized layout

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

# Example graph creation
G = nx.DiGraph()
G.add_edges_from([
    ("Start", "RouterOutputAgentWorkflow"),
    ("RouterOutputAgentWorkflow", "Step 1"),
    ("Step 1", "Step 2"),
    ("Step 2", "End"),
])

# Draw the graph
plt.figure(figsize=(8, 5))
nx.draw(G, with_labels=True, node_color="lightblue", edge_color="gray", node_size=3000, font_size=10)
plt.show()

[14]Executing AI-Powered Natural Language Queries and Displaying Results in Markdown

This code runs a natural language query through the AI-powered workflow (wf) and formats the output in Markdown for better readability in Jupyter notebooks.

Executes the query using await wf.run(message="Which city has the highest population?"), which processes the request using the LlamaIndex-based agent workflow.
Displays the result in Markdown format using display(Markdown(result)), improving the output presentation in Jupyter notebooks

In [None]:
from IPython.display import display, Markdown

result = await wf.run(message="Which city has the highest population?")
display(Markdown(result))

In [None]:
result = await wf.run(message="What state is Houston located in?")
display(Markdown(result))

In [None]:
result = await wf.run(message="Where is the Space Needle located?")
display(Markdown(result))

In [None]:
result = await wf.run(message="List all of the places to visit in Miami.")
display(Markdown(result))

In [None]:
result = await wf.run(message="How do people in Chicago get around?")
display(Markdown(result))

In [None]:
result = await wf.run(message="What is the historical name of Los Angeles?")
display(Markdown(result))