# Real-Time RAG and Agentic Analytics

<img src="./web/pinot.png" alt="drawing" style="width:800px;"/>

***Getting started with vectors, similarity search, RAG, and agents in real-time analytics.***


# Hubert Dulay - Developer Advocate @StarTree
<img src="./web/sdb.jpg" alt="drawing" style="width:300px;"/>
<img src="./web/sdm.jpg" alt="drawing" style="width:300px;"/>


## What is a Vector?

> A vector is an array of numbers that represents unstructured data like text and images. For example, let’s represent these sentences as vectors:


s1 = “I love data”

s2 = “I love candy”

We can take all the terms and create what is called a bag of words:

|ID|candy|data|I|love|
|-|-|-|-|-|
|s1|0|1|1|1|
|s2|1|0|1|1|

## What is an Embedding?

What is an embedding?
> 
> A vector with a large number of dimensions created by a neural network, the vectors are created by predicting for each word what its neighboring words may be.
> 

How is it different from a Bag of Words (BoW)?
> BoW rely on frequencies of words under the unrealistic assumption that each word occurs independently of all others. Example - “Good” vs “Not Good”

## What is a Vector Index?

> An "index" is a data structure that improves the speed of data retrieval operations on a database table.

> A “vector index” is a mechanism that efficiently organizes and retrieves vectors based on their content. 

## What is a Vector Database?

A vector database is a database that encompasses the features designed to manage vector data, including storage, retrieval, and query processing. It may utilize vector indexing as part of its strategy for efficient vector-oriented operations.

## Image Search Demo with pg_vector

What is `pg_vector`?
> Open-source vector similarity search for Postgres.

In [None]:
import psycopg
from sentence_transformers import SentenceTransformer
from PIL import Image
from matplotlib import pyplot as plt
from matplotlib import image as mpimg
from pgvector.psycopg import register_vector
import os

# localhost
conn = psycopg.connect(dbname="postgres", autocommit=True)

# image and text embedding model
model = SentenceTransformer('clip-ViT-B-32')

## Mulitmodal Models

`SentenceTransformer('clip-ViT-B-32')` is a multimodal model.

Multimodal models are advanced AI systems designed to process and integrate information from multiple modalities, such as text, images, audio, and video, to generate more comprehensive and contextually relevant outputs. 

These models leverage diverse data types to enhance understanding and decision-making, enabling applications like image captioning, speech-to-text translation, and cross-modal retrieval. By combining different sensory inputs, multimodal models can achieve a richer, more nuanced understanding of complex scenarios, making them powerful tools for tasks that require a holistic interpretation of diverse data sources.

## Setting Up The Database

We load all the images in the `mixed_wiki` directory and convert them into embeddings. Then we insert them into the vector database.

In [None]:
# Create the database and table
conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
register_vector(conn)
conn.execute('DROP TABLE IF EXISTS images')
conn.execute('CREATE TABLE images (id bigserial PRIMARY KEY, path varchar(64), embedding vector(512))')
cur = conn.cursor()
cur.execute('create extension if not exists vector with schema public')

# load images and create embeddings
images = os.listdir("./mixed_wiki")
for f in images:
    file = f'./mixed_wiki/{f}'
    img_emb = model.encode(Image.open(file))
    cur.execute('INSERT INTO images (embedding, path) VALUES (%s,%s)', (img_emb.tolist(), file))


## Let's ask for Van Gogh paintings from the vector database. 

> The only way we can do this is to use the same embedding model to ensure the vectors are within the same vector space. This requires a multimodal model that can create embeddings for both text and images.

> Notice below the `where` clause leveraging the vector index.

In [None]:
def show(path, distance):
    plt.title(f'{path} {distance}')
    image = mpimg.imread(path)
    plt.imshow(image)
    plt.show()


query_string = "Van Gogh paintings"
text_emb = model.encode(query_string)

cur = conn.cursor()
cur.execute("""
            SELECT 
                id, 
                path, 
                embedding <-> %s AS distance 
            FROM images 
            WHERE embedding <-> %s < 15
            ORDER BY distance
            LIMIT 3
            """, 
            (
                str(text_emb.tolist())
                ,str(text_emb.tolist())
            )
    )

rows = cur.fetchall()
for row in rows:
    print(row[1], rows[2])
    show(row[1], rows[2])
    

# Distance Algorithms

Measures how close two vectors are. Supported distance functions:

<-> - L2 distance (euclidean)

<#> - (negative) inner product

<=> - cosine distance (angle)

<+> - L1 distance (manhattan)

![alt](./web/distance.png)


# What is RAG vs Real-Time RAG

Retrieval-Augmented Generation (RAG) example using Apache Pinot,  LangChain, and OpenAI. The use case is to load documentation and allow an LLM to answer questions provided by a user. This approach enables you to generate AI responses that are fresh and in real time. A diagram of the data flow is shown in the Mermaid diagram below.

```mermaid
flowchart LR

Documents-->Chunk-->e[Embedding Model]


e-.->|streaming|k[Kafka/Confluent Cloud]
k-.->|upsert|db[(Vector Database)]
e-->|batching|db


```

## Retrieval

We use the same embedding model to convert the question into an embedding. We take advantage of the multimodal capabilities of the embedding model to put the question and images in the same vector space and use distance algorithms for similarity searching.

```mermaid
flowchart LR

Query-->e[Embedding Model]-->|retrieve|db[(Vector Database)]-->|context|Prompt-->LLM-->Response
Query-->Prompt
```

# Apache Pinot Vector Index

Apache Pinot has the same capabilities. By sending our embeddings over Kafka and into StarTree cloud (Pinot), we can take advantage of a real-time OLAP database to serve our embeddings. This provides you with better performance with low latency similarity queries and high concurrent end users.

```mermaid
flowchart LR

subgraph ie[Pinot]
Images
end

k[Kafka]
frame

video-->frame-->|embedding stream|k-->Images

Search-->Images

```

## Similarity Search with HNSW: Apache Pinot Vector Index

Apache Pinot's vector index, specifically the HNSW (Hierarchical Navigable Small World) vector index, is a feature that enables efficient similarity search within high-dimensional data. It allows users to perform approximate nearest neighbor (ANN) searches, which is crucial for applications like recommendation engines, anomaly detection, and real-time machine learning inference. By leveraging this index, Pinot can quickly and accurately retrieve the most similar items in a large dataset, making it well-suited for real-time analytics and AI-driven use cases.

The HNSW algorithm builds a multi-layer graph where each vector is connected to its nearest neighbors. During search, it navigates from a high-level sparse graph to a more detailed one, using a greedy approach to find approximate nearest neighbors efficiently.

<img src="web/hnsw.png" alt="drawing" style="width:800px;"/>

## When to use OLAP Vector Database vs OLTP Vector Database
- AI application needs millisecond latency for exact and similarity retrieval.
- AI application needs to serve many concurrent users.
- AI application needs to aggregate historical data that exceeds the capacity of an OLTP database.

## When to use OLTP Vector Database vs OLAP Vector Database
- AI application needs consistency versus eventually consistent OLAP.
- AI application only needs to search small amounts of data.


## NYC Cameras

In NYC, there are about ~50,000 street cameras in all 5 boroughs (according to llama3). If each camera captures a frame every 10 seconds, that is 157,680,000,000 images in 1 year.

<img src="./web/cameras.png" alt="drawing" style="width:800px;"/>

# Traditional Rigid Dashboard


In [None]:
from pinotdb import connect
import pandas as pd
from dotenv import load_dotenv

load_dotenv()

# StarTree Cloud Broker URL and Bearer Auth Token
PINOT_BROKER_HOST = os.getenv("PINOT_BROKER_HOST")
PINOT_USER = os.getenv("PINOT_USER")
PINOT_PASSWORD = os.getenv("PINOT_PASSWORD")
DATABASE = os.getenv('PINOT_WORKSPACE')

conn = connect(host=f'{PINOT_USER}:{PINOT_PASSWORD}@{PINOT_BROKER_HOST}', 
    port=443, 
    path='/query/sql', 
    scheme='https',
    database=DATABASE,
    use_multistage_engine=False)

curs = conn.cursor()
sql = f"""
    select 
        event_type, 
        count(event_type) as event 
    from clickstream
    group by event_type
    order by event desc
    """
curs.execute(sql)
df = pd.DataFrame(curs, columns=[item[0] for item in curs.description])

import plotly.express as px
data = dict(
    number=df['event'].tolist(),
    stage=['Clicks', 'Views', 'Saves', 'Purchases']
)
fig = px.funnel(data, x='number', y='stage')
fig.show()

# Next-Generation Agentic Analytics

<img src="./web/agent.jpg" alt="drawing" style="width:400px;"/>

An agentic AI system can 

- make decisions
- take actions
- engage with its surroundings independently, aiming to accomplish set goals or tasks. 
 
This idea of agency in AI marks a transition from `passive systems` that follow preset instructions to `active entities` that can think, act, and react independently within defined parameters. Especially within rigid and inflexible analytical dashboards.

Agents execute `tasks` using `tools`. 


```mermaid
flowchart LR

query-->Agent-->Tools-->Response

```

In the next-generation real-time analytics, we can build real-time tools using `Apache Pinot`, a real-time OLAP database that can query streams with `low-latency` and `high concurrency`.

## Let's Build and Agent Using Pinot and LlamaIndex

In [None]:
from typing import List
from llama_index.llms.openai import OpenAI
from llama_index.core.tools import FunctionTool
from llama_index.agent.openai import OpenAIAgent
from llama_index.embeddings.openai import OpenAIEmbedding

from dotenv import load_dotenv
load_dotenv()

# StarTree Cloud Broker URL and Bearer Auth Token
PINOT_BROKER_HOST = os.getenv("PINOT_BROKER_HOST")
PINOT_USER = os.getenv("PINOT_USER")
PINOT_PASSWORD = os.getenv("PINOT_PASSWORD")
DATABASE = os.getenv('PINOT_WORKSPACE')

conn2 = connect(host=f'{PINOT_USER}:{PINOT_PASSWORD}@{PINOT_BROKER_HOST}', 
    port=443, 
    path='/query/sql', 
    scheme='https',
    database=DATABASE,
    use_multistage_engine=True)


dimensions = 2048
config = {
    "model": 'text-embedding-3-large',
    "dimensions": int(dimensions) if dimensions is not None else None,
}
embed_model = OpenAIEmbedding(**config)


def get_all_users() -> List[str]:
    curs = conn2.cursor()
    sql = f"""
        SELECT 
            distinct ID
        from User
        """
    top = []
    curs.execute(sql)
    for row in curs:
        top.append(row[0])

    return top

def get_user_details(user_id:str): 
    curs = conn2.cursor()
    sql = f"""
        SELECT 
            Address, City, ID, Name, Phone, State, ZIP
        from User
        where ID = '{user_id}'
        """
    curs.execute(sql)
    columns=[item[0] for item in curs.description]
    for row in curs:
        rec = {}
        for i, col in enumerate(columns):
            rec[col] = row[i]
        return rec
    else:
        return None

def get_recent_purchases(user_id:str):
    curs = conn2.cursor()
    sql = f"""
        SELECT 
            Name, Description
        from Purchase p1
        join Product p2 on p1.product_id = p2.ID
        where p1.user_id = '{user_id}'
        """
    curs.execute(sql)
    columns=[item[0] for item in curs.description]
    for row in curs:
        rec = {}
        for i, col in enumerate(columns):
            rec[col] = row[i]
        return rec
    else:
        return None

def get_similar_products(product_description:str):
    embedding = embed_model.get_text_embedding(product_description)
    curs = conn2.cursor()
    sql = f"""
        SELECT 
            Name,
            Description,
            cosine_distance(embedding, ARRAY{embedding}) AS cosine, 
            l2_distance(embedding, ARRAY{embedding}) AS l2, 
            l1_distance(embedding, ARRAY{embedding}) AS l1
        from Product
        where 
            VECTOR_SIMILARITY(embedding, ARRAY{embedding}, 3)
        order by cosine asc
        limit 3
        """
    curs.execute(sql)
    columns=[item[0] for item in curs.description]
    for row in curs:
        rec = {}
        for i, col in enumerate(columns):
            rec[col] = row[i]
        return rec
    else:
        return None
    

def top_three() -> List[str]:
    curs = conn2.cursor()
    sql = f"""
        SELECT 
            user_id,
            count(event_type) as purchases
        from Clickstream
        where event_type = 'purchase'
        group by event_type, user_id
        order by purchases desc
        limit 3
        """
    top = []
    curs.execute(sql)
    for row in curs:
        top.append(row[0])

    return top

def most_time_spent() -> List[str]:
    curs = conn2.cursor()
    sql = f"""
        SELECT 
            user_id
        from Clickstream
        order by duration desc
        limit 3
        """
    top = []
    curs.execute(sql)
    for row in curs:
        print(row)
        top.append(row[0])

    return top



## Create Tools

In [None]:
top_three_tool = FunctionTool.from_defaults(
    fn=top_three,
    name="top_three",
    description="gets the top three purchasers in the application",
)

most_time_spent_tool = FunctionTool.from_defaults(
    fn=most_time_spent,
    name="most_time",
    description="gets the top 3 users that have spent the most time on an application"
)

get_all_users_tool = FunctionTool.from_defaults(
    fn=get_all_users,
    name="get_all_users",
    description="gets all the users of the application"
)

user_info_tool = FunctionTool.from_defaults(
    fn= get_user_details,
    description="gets the details about a user"
)

get_recent_purchases_tool = FunctionTool.from_defaults(
    fn=get_recent_purchases,
    name="get_recent_purchases",
    description="gets recent purchases for a user"
)

get_similar_products_tool = FunctionTool.from_defaults(
    fn=get_similar_products,
    name="get_similar_products",
    description="gets similar products based on another product description"
)


## Create the Agent

In [None]:
agent = OpenAIAgent.from_tools(
    [
        top_three_tool, 
        most_time_spent_tool, 
        get_all_users_tool, 
        user_info_tool,
        get_recent_purchases_tool,
        get_similar_products_tool
    ],
    llm=OpenAI(temperature=0, model="gpt-4o-mini"),
    verbose=True,
    system_prompt="""
You are a sales agent that is looking to get more users to spend more time \
in an application and ultimately provide incentives. You help this sales agent \
identify users of the application so that she can personally contact them \
with additional incentives. \
    """,
)


### Ask the Agent Questions

- What product should i send to the top user?


In [None]:
while(True):
    ask = input("ask:").strip()
    if(ask == "quit" or ask == ""):
        break;
    else:
        print(agent.chat(ask))

# More Information

|Try StarTree Cloud - Free Forever|Follow Me|
|-|-|
|<img src="./web/stqr.png" alt="drawing" style="width:300px;"/>| <img src="./web/hubertqr.png" alt="drawing" style="width:300px;"/>|


