# Building RAG for Anyscale docs

## Initialization

In [None]:
# Imports
import os
import ray
import sys; sys.path.append("..")
import warnings; warnings.filterwarnings("ignore")
from dotenv import load_dotenv; load_dotenv()
%load_ext autoreload
%autoreload 2
from rag.config import ROOT_DIR

In [None]:
# Start the Ray cluster, with relevant credentials; we're not using Anyscale Endpoints.

ray.init(runtime_env={
    "env_vars": {
        "OPENAI_API_BASE": os.environ["OPENAI_API_BASE"],
        "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"], 
        "DB_CONNECTION_STRING": os.environ["DB_CONNECTION_STRING"],
    },
    "working_dir": str(ROOT_DIR)
})

2024-08-29 10:30:30,666	INFO worker.py:1598 -- Connecting to existing Ray cluster at address: 10.0.62.88:6379...
2024-08-29 10:30:30,675	INFO worker.py:1774 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://session-vig1su7dbnvzqbs3l1tw35fhk3.i.anyscaleuserdata-staging.com [39m[22m
2024-08-29 10:30:30,831	INFO packaging.py:530 -- Creating a file package for local directory '/home/ray/default/anyscale-ragbot/notebooks/..'.
2024-08-29 10:30:31,058	INFO packaging.py:358 -- Pushing file package 'gcs://_ray_pkg_fad13b6aaaea39b1.zip' (46.98MiB) to Ray cluster...
2024-08-29 10:30:31,589	INFO packaging.py:371 -- Successfully pushed file package 'gcs://_ray_pkg_fad13b6aaaea39b1.zip'.


0,1
Python version:,3.12.4
Ray version:,2.35.0
Dashboard:,http://session-vig1su7dbnvzqbs3l1tw35fhk3.i.anyscaleuserdata-staging.com


[36m(autoscaler +1h5m43s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.


In [None]:
# Only text-embedding-3-large dimension and gpt-4o context length relevant for now.
from rag.config import EMBEDDING_DIMENSIONS, MAX_CONTEXT_LENGTHS

## Data

I've pre-loaded the data into `/mnt/shared_storage/emmy` for both Ray docs (`/mnt/shared_storage/emmy/docs.ray.io/en/master`) and Anyscale docs (`/mnt/shared_storage/emmy/docs.anyscale.com/docs`) respectively. So in this section, we'll clean and chunk the data.

In [None]:
from pathlib import Path
from rag.config import EFS_DIR

ANYSCALE_DOCS_DIR = Path(EFS_DIR, "docs.anyscale.com/docs")
ANYSCALE_DOCS_URL = "https://docs.anyscale.com"

In [None]:
# Create a list of dictionaries, each containing the source and text
data = []
for path in ANYSCALE_DOCS_DIR.rglob("*.md"):
    if not path.is_dir():
        with open(path, 'r', encoding='utf-8') as file:
            text = file.read()
        # Convert the file path to a URL, remove the '.md' extension
        relative_path = path.relative_to(ANYSCALE_DOCS_DIR).with_suffix('')  # Remove the '.md'
        source = f"{ANYSCALE_DOCS_URL}/{relative_path.as_posix()}"
        data.append({"source": source, "text": text})

In [None]:
anyscale_sections_ds = ray.data.from_items(data)

## Chunking

In [None]:
from functools import partial
from langchain_text_splitters import MarkdownHeaderTextSplitter

In [None]:
sample = anyscale_sections_ds.take(1)

In [None]:
sample[0]

In [None]:
headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
        ("####", "Header 4"),
    ]

markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)

In [None]:
chunks = markdown_splitter.split_text(sample[0]['text'])

In [None]:
chunks

In [None]:
def chunk_md(md_doc):
    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
        ("####", "Header 4"),
    ]

    markdown_splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=headers_to_split_on, 
        strip_headers=False
        )
    
    chunks = markdown_splitter.split_text(md_doc["text"])
    return[{"text": chunk.page_content, "source": md_doc["source"]} for chunk in chunks]


In [None]:
sample[0]

In [None]:
temp = chunk_md(sample[0])

In [None]:
temp[0]

In [None]:
chunks_ds = anyscale_sections_ds.flat_map(chunk_md)

In [None]:
chunks_ds.show(1)

## Embed

To simplify, I'm just doing OpenAI across the board.

In [None]:
from langchain_openai import OpenAIEmbeddings

In [None]:
embedding_model = OpenAIEmbeddings(
            model="text-embedding-3-large",
            openai_api_base=os.environ["OPENAI_API_BASE"],
            openai_api_key=os.environ["OPENAI_API_KEY"]
            )

In [None]:
class EmbedChunks:
    def __init__(self):
        self.embedding_model = OpenAIEmbeddings(
            model="text-embedding-3-large",
            openai_api_base=os.environ["OPENAI_API_BASE"],
            openai_api_key=os.environ["OPENAI_API_KEY"]
            )
    def __call__(self, batch):
        embeddings = self.embedding_model.embed_documents(batch["text"])
        return {"text": batch["text"], "source": batch["source"], "embeddings": embeddings}

In [None]:
embedded_chunks = chunks_ds.map_batches(
    EmbedChunks,
    batch_size=100, 
    num_gpus=1,
    concurrency=1)

In [None]:
sample = embedded_chunks.take(1)
print ("embedding size:", len(sample[0]["embeddings"]))
print (sample[0]["text"])

## Store vectors

In [None]:
import psycopg
from pgvector.psycopg import register_vector

embedding_model_name = "text-embedding-3-large"

os.environ["MIGRATION_FP"] = f"../migrations/vector-{EMBEDDING_DIMENSIONS[embedding_model_name]}.sql"
os.environ["SQL_DUMP_FP"] = f"{EFS_DIR}/sql_dumps/{embedding_model_name.split('/')[-1]}.sql"

### We're back in business!

In [None]:
%%bash
# Set up
psql "$DB_CONNECTION_STRING" -c "DROP TABLE IF EXISTS document;"
echo $MIGRATION_FP
sudo -u postgres psql -f $MIGRATION_FP
echo $SQL_DUMP_FP

NOTICE:  table "document" does not exist, skipping


DROP TABLE
../migrations/vector-3072.sql
CREATE TABLE
/mnt/shared_storage/emmy/sql_dumps/text-embedding-3-large.sql


In [None]:
%%bash
# Drop the existing `document` table and create a new one with the schema to store embeddings. 
psql "$DB_CONNECTION_STRING" -c "DROP TABLE IF EXISTS document;"  # drop
sudo -u postgres psql -f $MIGRATION_FP  # create
psql "$DB_CONNECTION_STRING" -c "SELECT count(*) FROM document;"  # num rows

# DROP TABLE
# CREATE TABLE
#  count 
# -------
#      0
# (1 row)

DROP TABLE
CREATE TABLE
 count 
-------
     0
(1 row)



In [None]:
class StoreResults:
    def __call__(self, batch):
        with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
            register_vector(conn)
            with conn.cursor() as cur:
                for text, source, embedding in zip(batch["text"], batch["source"], batch["embeddings"]):
                    cur.execute("INSERT INTO document (text, source, embedding) VALUES (%s, %s, %s)", (text, source, embedding,),)
        return {}

In [None]:
# Index data
embedded_chunks.map_batches(
    StoreResults,
    batch_size=128,
    num_cpus=1,
    concurrency=6,
).materialize()

# Verify whether the embedding was stored successfully in Postgres or not.
# sudo -u postgres psql
# SELECT * FROM document LIMIT 10;

2024-08-29 10:40:15,736	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-29_10-21-03_072216_2778/logs/ray-data
2024-08-29 10:40:15,737	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(chunk_md)] -> ActorPoolMapOperator[MapBatches(EmbedChunks)] -> ActorPoolMapOperator[MapBatches(StoreResults)]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(chunk_md) 1: 0.00 row [00:00, ? row/s]

- MapBatches(EmbedChunks) 2: 0.00 row [00:00, ? row/s]

- MapBatches(StoreResults) 3: 0.00 row [00:00, ? row/s]



MaterializedDataset(num_blocks=24, num_rows=0, schema=Unknown schema)

In [None]:
%%bash
# Save index
rm -rf $SQL_DUMP_FP
mkdir -p $(dirname "$SQL_DUMP_FP") && touch $SQL_DUMP_FP
sudo -u postgres pg_dump -c > $SQL_DUMP_FP  # save

## Retrieval

In [None]:
import json
import numpy as np

In [None]:
# Embed query
embedding_model = OpenAIEmbeddings(model=embedding_model_name)
query = "What are the different kinds of storage, and how do I use them?"
embedding = np.array(embedding_model.embed_query(query))
len(embedding)

3072

In [None]:
# Get context
num_chunks = 10
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
    register_vector(conn)
    with conn.cursor() as cur:
        # cur.execute("SELECT * FROM document ORDER BY embedding <=> %s LIMIT %s", (embedding, num_chunks))
        cur.execute("SELECT *, (embedding <=> %s) AS similarity_score FROM document ORDER BY similarity_score LIMIT %s", (embedding, num_chunks))
        rows = cur.fetchall()
        ids = [row[0] for row in rows]
        context = [{"text": row[1]} for row in rows]
        sources = [row[2] for row in rows]
        scores = [row[4] for row in rows]

In [None]:
for i, item in enumerate(context):
    print (ids[i])
    print (scores[i])
    print (sources[i])
    print (item["text"])
    print ()

764
0.5057324918816153
https://docs.anyscale.com/platform/workspaces/workspaces-storage
# Storage  
[File storage](./workspaces-files.md) covered in the previous section is optimal for storing your code. However, AI workloads often need access to large amounts of data, whether it's data for training and fine tuning, or a common storage to store model checkpoints. The options below cover different use cases.

765
0.5094706190954934
https://docs.anyscale.com/platform/workspaces/workspaces-storage
## Types of storage options  
### NVMe support
`/mnt/local_storage` - Non-Volatile Memory Express (NVMe) interface to access Solid State Drive (SSD) storage volumes. It provides additional temporary storage to the Node's root disk/volume. This enables higher performance, lower latency, scalability, and support for versatile use cases across a variety of workloads. For instance types that don't have NVMe, **/mnt/local_storage** falls back to the root disk/volume. [Learn more](https://docs.anyscal

In [None]:
def semantic_search(query, embedding_model, k):
    embedding = np.array(embedding_model.embed_query(query))
    with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
        register_vector(conn)
        with conn.cursor() as cur:
            cur.execute("SELECT * FROM document ORDER BY embedding <=> %s LIMIT %s", (embedding, k),)
            rows = cur.fetchall()
            semantic_context = [{"id": row[0], "text": row[1], "source": row[2]} for row in rows]
    return semantic_context

## Generation

In [None]:
import openai
import time

In [None]:
from rag.generate import prepare_response


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [None]:
from rag.utils import get_client

In [None]:
def generate_response(
    llm, temperature=0.0, stream=True,
    system_content="", assistant_content="", user_content="", 
    max_retries=1, retry_interval=60):
    """Generate response from an LLM."""
    retry_count = 0
    client = get_client(llm=llm)
    messages = [{"role": role, "content": content} for role, content in [
        ("system", system_content), 
        ("assistant", assistant_content), 
        ("user", user_content)] if content]
    while retry_count <= max_retries:
        try:
            chat_completion = client.chat.completions.create(
                model=llm,
                temperature=temperature,
                stream=stream,
                messages=messages,
            )
            return prepare_response(chat_completion, stream=stream)

        except Exception as e:
            print(f"Exception: {e}")
            time.sleep(retry_interval)  # default is per-minute rate limits
            retry_count += 1
    return ""

In [None]:
context_results = semantic_search(query=query, embedding_model=embedding_model, k=5)
context = [item["text"] for item in context_results]
print(context)

["# Storage  \n[File storage](./workspaces-files.md) covered in the previous section is optimal for storing your code. However, AI workloads often need access to large amounts of data, whether it's data for training and fine tuning, or a common storage to store model checkpoints. The options below cover different use cases.", "## Types of storage options  \n### NVMe support\n`/mnt/local_storage` - Non-Volatile Memory Express (NVMe) interface to access Solid State Drive (SSD) storage volumes. It provides additional temporary storage to the Node's root disk/volume. This enables higher performance, lower latency, scalability, and support for versatile use cases across a variety of workloads. For instance types that don't have NVMe, **/mnt/local_storage** falls back to the root disk/volume. [Learn more](https://docs.anyscale.com/configure/compute-configs/storage#nvme-configuration) about how to configure NVMe support.", '## How to choose the storage\nThe choice depends on the expected perf

In [None]:
# Generate response
query = "What are the different kinds of storage, and how do I use them?"
response = generate_response(
    llm="gpt-4o",
    temperature=0.0,
    stream=True,
    system_content="Answer the query using the context provided. Be succinct.",
    user_content=f"query: {query}, context: {context}")
# Stream response
for content in response:
    print(content, end='', flush=True)

There are several types of storage options available, each suited for different use cases:

1. **NVMe Storage**: Accessed via `/mnt/local_storage`, it provides high performance and low latency SSD storage. Ideal for workloads requiring fast temporary storage.

2. **NFS Storage**: Suitable for sharing small files across different workspaces, jobs, or services. Not recommended for large files (terabyte scale).

3. **Cloud Storage**: Limited to 10 GB in the persisted project directory (`/home/ray/default`). Best for storing git repos and smaller files. For larger files, use object storage like Amazon S3 or Google Cloud Storage.

### How to Use Them:
- **NVMe Storage**: Configure it based on your instance type. Refer to the [NVMe configuration documentation](https://docs.anyscale.com/configure/compute-configs/storage#nvme-configuration).
- **NFS Storage**: Use for small file sharing, but be mindful of connection limits and avoid storing large datasets.
- **Cloud Storage**: Access using the

## Agent

In [None]:
from rag.embed import get_embedding_model
from rag.utils import get_num_tokens, trim

In [None]:
class QueryAgent:
    def __init__(self, embedding_model_name="text-embedding-3-large",
                 llm="gpt-4o", temperature=0.0, 
                 max_context_length=4096, system_content="", assistant_content=""):
        
        # Embedding model
        self.embedding_model = OpenAIEmbeddings(
            model="text-embedding-3-large",
            openai_api_base=os.environ["OPENAI_API_BASE"],
            openai_api_key=os.environ["OPENAI_API_KEY"]
            )
        
        # Context length (restrict input length to 50% of total context length)
        max_context_length = int(0.5*max_context_length)
        
        # LLM
        self.llm = llm
        self.temperature = temperature
        self.context_length = max_context_length - get_num_tokens(system_content + assistant_content)
        self.system_content = system_content
        self.assistant_content = assistant_content

    def __call__(self, query, num_chunks=5, stream=True):
        # Get sources and context
        context_results = semantic_search(
            query=query, 
            embedding_model=self.embedding_model, 
            k=num_chunks)
            
        # Generate response
        context = [item["text"] for item in context_results]
        sources = [item["source"] for item in context_results]
        user_content = f"query: {query}, context: {context}"
        answer = generate_response(
            llm=self.llm,
            temperature=self.temperature,
            stream=stream,
            system_content=self.system_content,
            assistant_content=self.assistant_content,
            user_content=trim(user_content, self.context_length))

        # Result
        result = {
            "question": query,
            "sources": sources,
            "answer": answer,
            "llm": self.llm,
        }
        return result

In [None]:
embedding_model_name = "text-embedding-3-large"
llm = "gpt-4o"

In [None]:
query = "What does auto-select worker nodes do?"
system_content = "Answer the query using the context provided. Be succinct."
agent = QueryAgent(
    embedding_model_name=embedding_model_name,
    llm=llm,
    max_context_length=MAX_CONTEXT_LENGTHS[llm],
    system_content=system_content)
result = agent(query=query, stream=False)
print(json.dumps(result, indent=2))

{
  "question": "What does auto-select worker nodes do?",
  "sources": [
    "https://docs.anyscale.com/examples/intro-workspaces",
    "https://docs.anyscale.com/examples/intro-workspaces",
    "https://docs.anyscale.com/platform/workspaces/workspaces",
    "https://docs.anyscale.com/platform/jobs/index",
    "https://docs.anyscale.com/platform/services/scale-a-service"
  ],
  "answer": "Auto-select worker nodes allows Ray to automatically add worker nodes to the cluster as needed to run submitted tasks and actors. This mode does not allow for manual configuration of the worker nodes, but it shows which nodes have been launched in the resources panel. It is recommended for users without specific cluster requirements who are okay with waiting for the autoscaler to add nodes on-demand.",
  "llm": "gpt-4o"
}
