# Your First RAG Application

In this notebook, we'll walk you through each of the components that are involved in a simple RAG application.

We won't be leveraging any fancy tools, just the OpenAI Python SDK, Numpy, and some classic Python.

> NOTE: This was done with Python 3.12.3.

> NOTE: There might be [compatibility issues](https://github.com/wandb/wandb/issues/7683) if you're on NVIDIA driver >552.44 As an interim solution - you can rollback your drivers to the 552.44.

## Table of Contents:

- Task 1: Imports and Utilities
- Task 2: Documents
- Task 3: Embeddings and Vectors
- Task 4: Prompts
- Task 5: Retrieval Augmented Generation
  - 🚧 Activity #1: Augment RAG

Let's look at a rather complicated looking visual representation of a basic RAG application.

<img src="https://i.imgur.com/vD8b016.png" />

## Task 1: Imports and Utility

We're just doing some imports and enabling `async` to work within the Jupyter environment here, nothing too crazy!

In [25]:
%pip install numpy

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [26]:
%pip install dotenv

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [27]:
%pip install openai

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [28]:
from aimakerspace.text_utils import TextFileLoader, CharacterTextSplitter
from aimakerspace.vectordatabase import VectorDatabase
import asyncio

In [29]:
import nest_asyncio
nest_asyncio.apply()

## Task 2: Documents

We'll be concerning ourselves with this part of the flow in the following section:

<img src="https://i.imgur.com/jTm9gjk.png" />

### Loading Source Documents

So, first things first, we need some documents to work with.

While we could work directly with the `.txt` files (or whatever file-types you wanted to extend this to) we can instead do some batch processing of those documents at the beginning in order to store them in a more machine compatible format.

In this case, we're going to parse our text file into a single document in memory.

Let's look at the relevant bits of the `TextFileLoader` class:

```python
def load_file(self):
        with open(self.path, "r", encoding=self.encoding) as f:
            self.documents.append(f.read())
```

We're simply loading the document using the built in `open` method, and storing that output in our `self.documents` list.

> NOTE: We're using blogs from PMarca (Marc Andreessen) as our sample data. This data is largely irrelevant as we want to focus on the mechanisms of RAG, which includes out data's shape and quality - but not specifically what the contents of the data are. 


In [30]:
text_loader = TextFileLoader("data/PMarcaBlogs.txt")
documents = text_loader.load_documents()
len(documents)

1

In [31]:
print(documents[0][:100])


The Pmarca Blog Archives
(select posts from 2007-2009)
Marc Andreessen
copyright: Andreessen Horow


### Splitting Text Into Chunks

As we can see, there is one massive document.

We'll want to chunk the document into smaller parts so it's easier to pass the most relevant snippets to the LLM.

There is no fixed way to split/chunk documents - and you'll need to rely on some intuition as well as knowing your data *very* well in order to build the most robust system.

For this toy example, we'll just split blindly on length.

>There's an opportunity to clear up some terminology here, for this course we will be stick to the following:
>
>- "source documents" : The `.txt`, `.pdf`, `.html`, ..., files that make up the files and information we start with in its raw format
>- "document(s)" : single (or more) text object(s)
>- "corpus" : the combination of all of our documents

As you can imagine (though it's not specifically true in this toy example) the idea of splitting documents is to break them into managable sized chunks that retain the most relevant local context.

In [32]:
text_splitter = CharacterTextSplitter()
split_documents = text_splitter.split_texts(documents)
len(split_documents)

373

Let's take a look at some of the documents we've managed to split.

In [33]:
split_documents[0:1]

['\ufeff\nThe Pmarca Blog Archives\n(select posts from 2007-2009)\nMarc Andreessen\ncopyright: Andreessen Horowitz\ncover design: Jessica Hagy\nproduced using: Pressbooks\nContents\nTHE PMARCA GUIDE TO STARTUPS\nPart 1: Why not to do a startup 2\nPart 2: When the VCs say "no" 10\nPart 3: "But I don\'t know any VCs!" 18\nPart 4: The only thing that matters 25\nPart 5: The Moby Dick theory of big companies 33\nPart 6: How much funding is too little? Too much? 41\nPart 7: Why a startup\'s initial business plan doesn\'t\nmatter that much\n49\nTHE PMARCA GUIDE TO HIRING\nPart 8: Hiring, managing, promoting, and Dring\nexecutives\n54\nPart 9: How to hire a professional CEO 68\nHow to hire the best people you\'ve ever worked\nwith\n69\nTHE PMARCA GUIDE TO BIG COMPANIES\nPart 1: Turnaround! 82\nPart 2: Retaining great people 86\nTHE PMARCA GUIDE TO CAREER, PRODUCTIVITY,\nAND SOME OTHER THINGS\nIntroduction 97\nPart 1: Opportunity 99\nPart 2: Skills and education 107\nPart 3: Where to go and wh

## Task 3: Embeddings and Vectors

Next, we have to convert our corpus into a "machine readable" format as we explored in the Embedding Primer notebook.

Today, we're going to talk about the actual process of creating, and then storing, these embeddings, and how we can leverage that to intelligently add context to our queries.

### OpenAI API Key

In order to access OpenAI's APIs, we'll need to provide our OpenAI API Key!

You can work through the folder "OpenAI API Key Setup" for more information on this process if you don't already have an API Key!

In [34]:
import os
import openai
from getpass import getpass

openai.api_key = getpass("OpenAI API Key: ")
os.environ["OPENAI_API_KEY"] = openai.api_key

### Vector Database

Let's set up our vector database to hold all our documents and their embeddings!

While this is all baked into 1 call - we can look at some of the code that powers this process to get a better understanding:

Let's look at our `VectorDatabase().__init__()`:

```python
def __init__(self, embedding_model: EmbeddingModel = None):
        self.vectors = defaultdict(np.array)
        self.embedding_model = embedding_model or EmbeddingModel()
```

As you can see - our vectors are merely stored as a dictionary of `np.array` objects.

Secondly, our `VectorDatabase()` has a default `EmbeddingModel()` which is a wrapper for OpenAI's `text-embedding-3-small` model.

> **Quick Info About `text-embedding-3-small`**:
> - It has a context window of **8191** tokens
> - It returns vectors with dimension **1536**

#### ❓Question #1:

The default embedding dimension of `text-embedding-3-small` is 1536, as noted above. 

1. Is there any way to modify this dimension?
2. What technique does OpenAI use to achieve this?

> NOTE: Check out this [API documentation](https://platform.openai.com/docs/api-reference/embeddings/create) for the answer to question #1.1, and [this documentation](https://platform.openai.com/docs/guides/embeddings/use-cases) for an answer to question #1.2!
 


##### ✅ Answer:
1. **Yes** — with the new embedding models (`text-embedding-3-small` and `text-embedding-3-large`), you can change the embedding size by using the `dimensions` parameter in the embeddings API request. By default, `text-embedding-3-small` outputs vectors of length 1536 and `text-embedding-3-large` outputs 3072, but you can request a smaller size (e.g., 256, 512, 1024) to reduce cost and storage requirements. 
2. OpenAI achieves this flexibility through a **specialized training technique**:Matryoshka Representation Learning (see [arXiv:2205.13147](https://arxiv.org/abs/2205.13147)) that allows embeddings to be shortened without losing their semantic meaning. Essentially, the embeddings are trained so that truncating dimensions still preserves most of their representational power. This lets developers trade off between performance and efficiency depending on their application needs.  

We can call the `async_get_embeddings` method of our `EmbeddingModel()` on a list of `str` and receive a list of `float` back!

```python
async def async_get_embeddings(self, list_of_text: List[str]) -> List[List[float]]:
        return await aget_embeddings(
            list_of_text=list_of_text, engine=self.embeddings_model_name
        )
```

We cast those to `np.array` when we build our `VectorDatabase()`:

```python
async def abuild_from_list(self, list_of_text: List[str]) -> "VectorDatabase":
        embeddings = await self.embedding_model.async_get_embeddings(list_of_text)
        for text, embedding in zip(list_of_text, embeddings):
            self.insert(text, np.array(embedding))
        return self
```

And that's all we need to do!

In [35]:
vector_db = VectorDatabase()
vector_db = asyncio.run(vector_db.abuild_from_list(split_documents))

#### ❓Question #2:

What are the benefits of using an `async` approach to collecting our embeddings?

> NOTE: Determining the core difference between `async` and `sync` will be useful! If you get stuck - ask ChatGPT!

##### ✅ Answer: 
Using an **async approach** to collect embeddings means requests don’t block each other — your program can send many requests at once and keep working while waiting for responses.  

**Benefits:**  
- 🚀 **Faster throughput:** Multiple embedding requests can run in parallel instead of waiting one by one.  
- 💻 **Better resource usage:** CPU and network are used more efficiently since idle “waiting time” is avoided.  
- 📈 **Scales better:** Async code handles larger batches of text (e.g., thousands of documents) without slowing down as much as sync code.  

In contrast, a **sync approach** processes one request at a time and waits for it to finish before starting the next, which can be significantly slower for large workloads.  



So, to review what we've done so far in natural language:

1. We load source documents
2. We split those source documents into smaller chunks (documents)
3. We send each of those documents to the `text-embedding-3-small` OpenAI API endpoint
4. We store each of the text representations with the vector representations as keys/values in a dictionary

### Semantic Similarity

The next step is to be able to query our `VectorDatabase()` with a `str` and have it return to us vectors and text that is most relevant from our corpus.

We're going to use the following process to achieve this in our toy example:

1. We need to embed our query with the same `EmbeddingModel()` as we used to construct our `VectorDatabase()`
2. We loop through every vector in our `VectorDatabase()` and use a distance measure to compare how related they are
3. We return a list of the top `k` closest vectors, with their text representations

There's some very heavy optimization that can be done at each of these steps - but let's just focus on the basic pattern in this notebook.

> We are using [cosine similarity](https://www.engati.com/glossary/cosine-similarity) as a distance metric in this example - but there are many many distance metrics you could use - like [these](https://flavien-vidal.medium.com/similarity-distances-for-natural-language-processing-16f63cd5ba55)

> We are using a rather inefficient way of calculating relative distance between the query vector and all other vectors - there are more advanced approaches that are much more efficient, like [ANN](https://towardsdatascience.com/comprehensive-guide-to-approximate-nearest-neighbors-algorithms-8b94f057d6b6)

In [36]:
vector_db.search_by_text("What is the Michael Eisner Memorial Weak Executive Problem?", k=3)

[('ordingly.\nSeventh, when hiring the executive to run your former specialty, be\ncareful you don’t hire someone weak on purpose.\nThis sounds silly, but you wouldn’t believe how oaen it happens.\nThe CEO who used to be a product manager who has a weak\nproduct management executive. The CEO who used to be in\nsales who has a weak sales executive. The CEO who used to be\nin marketing who has a weak marketing executive.\nI call this the “Michael Eisner Memorial Weak Executive Problem” — aaer the CEO of Disney who had previously been a brilliant TV network executive. When he bought ABC at Disney, it\npromptly fell to fourth place. His response? “If I had an extra\ntwo days a week, I could turn around ABC myself.” Well, guess\nwhat, he didn’t have an extra two days a week.\nA CEO — or a startup founder — oaen has a hard time letting\ngo of the function that brought him to the party. The result: you\nhire someone weak into the executive role for that function so\nthat you can continue to b

## Task 4: Prompts

In the following section, we'll be looking at the role of prompts - and how they help us to guide our application in the right direction.

In this notebook, we're going to rely on the idea of "zero-shot in-context learning".

This is a lot of words to say: "We will ask it to perform our desired task in the prompt, and provide no examples."

### XYZRolePrompt

Before we do that, let's stop and think a bit about how OpenAI's chat models work.

We know they have roles - as is indicated in the following API [documentation](https://platform.openai.com/docs/api-reference/chat/create#chat/create-messages)

There are three roles, and they function as follows (taken directly from [OpenAI](https://platform.openai.com/docs/guides/gpt/chat-completions-api)):

- `{"role" : "system"}` : The system message helps set the behavior of the assistant. For example, you can modify the personality of the assistant or provide specific instructions about how it should behave throughout the conversation. However note that the system message is optional and the model’s behavior without a system message is likely to be similar to using a generic message such as "You are a helpful assistant."
- `{"role" : "user"}` : The user messages provide requests or comments for the assistant to respond to.
- `{"role" : "assistant"}` : Assistant messages store previous assistant responses, but can also be written by you to give examples of desired behavior.

The main idea is this:

1. You start with a system message that outlines how the LLM should respond, what kind of behaviours you can expect from it, and more
2. Then, you can provide a few examples in the form of "assistant"/"user" pairs
3. Then, you prompt the model with the true "user" message.

In this example, we'll be forgoing the 2nd step for simplicities sake.

#### Utility Functions

You'll notice that we're using some utility functions from the `aimakerspace` module - let's take a peek at these and see what they're doing!

##### XYZRolePrompt

Here we have our `system`, `user`, and `assistant` role prompts.

Let's take a peek at what they look like:

```python
class BasePrompt:
    def __init__(self, prompt):
        """
        Initializes the BasePrompt object with a prompt template.

        :param prompt: A string that can contain placeholders within curly braces
        """
        self.prompt = prompt
        self._pattern = re.compile(r"\{([^}]+)\}")

    def format_prompt(self, **kwargs):
        """
        Formats the prompt string using the keyword arguments provided.

        :param kwargs: The values to substitute into the prompt string
        :return: The formatted prompt string
        """
        matches = self._pattern.findall(self.prompt)
        return self.prompt.format(**{match: kwargs.get(match, "") for match in matches})

    def get_input_variables(self):
        """
        Gets the list of input variable names from the prompt string.

        :return: List of input variable names
        """
        return self._pattern.findall(self.prompt)
```

Then we have our `RolePrompt` which laser focuses us on the role pattern found in most API endpoints for LLMs.

```python
class RolePrompt(BasePrompt):
    def __init__(self, prompt, role: str):
        """
        Initializes the RolePrompt object with a prompt template and a role.

        :param prompt: A string that can contain placeholders within curly braces
        :param role: The role for the message ('system', 'user', or 'assistant')
        """
        super().__init__(prompt)
        self.role = role

    def create_message(self, **kwargs):
        """
        Creates a message dictionary with a role and a formatted message.

        :param kwargs: The values to substitute into the prompt string
        :return: Dictionary containing the role and the formatted message
        """
        return {"role": self.role, "content": self.format_prompt(**kwargs)}
```

We'll look at how the `SystemRolePrompt` is constructed to get a better idea of how that extension works:

```python
class SystemRolePrompt(RolePrompt):
    def __init__(self, prompt: str):
        super().__init__(prompt, "system")
```

That pattern is repeated for our `UserRolePrompt` and our `AssistantRolePrompt` as well.

##### ChatOpenAI

Next we have our model, which is converted to a format analagous to libraries like LangChain and LlamaIndex.

Let's take a peek at how that is constructed:

```python
class ChatOpenAI:
    def __init__(self, model_name: str = "gpt-4.1-mini"):
        self.model_name = model_name
        self.openai_api_key = os.getenv("OPENAI_API_KEY")
        if self.openai_api_key is None:
            raise ValueError("OPENAI_API_KEY is not set")

    def run(self, messages, text_only: bool = True):
        if not isinstance(messages, list):
            raise ValueError("messages must be a list")

        openai.api_key = self.openai_api_key
        response = openai.ChatCompletion.create(
            model=self.model_name, messages=messages
        )

        if text_only:
            return response.choices[0].message.content

        return response
```

#### ❓ Question #3:

When calling the OpenAI API - are there any ways we can achieve more reproducible outputs?

> NOTE: Check out [this section](https://platform.openai.com/docs/guides/text-generation/) of the OpenAI documentation for the answer!

##### ✅ Answer:
According to the docs the main way to get reproducibility is to lock your app to a specific model snapshot and track prompt performance with evals.


### Creating and Prompting OpenAI's `gpt-4.1-mini`!

Let's tie all these together and use it to prompt `gpt-4.1-mini`!

In [37]:
from aimakerspace.openai_utils.prompts import (
    UserRolePrompt,
    SystemRolePrompt,
    AssistantRolePrompt,
)

from aimakerspace.openai_utils.chatmodel import ChatOpenAI

chat_openai = ChatOpenAI()
user_prompt_template = "{content}"
user_role_prompt = UserRolePrompt(user_prompt_template)
system_prompt_template = (
    "You are an expert in {expertise}, you always answer in a kind way."
)
system_role_prompt = SystemRolePrompt(system_prompt_template)

messages = [
    system_role_prompt.create_message(expertise="Python"),
    user_role_prompt.create_message(
        content="What is the best way to write a loop?"
    ),
]

response = chat_openai.run(messages)

In [38]:
print(response)

Hello! The "best" way to write a loop in Python often depends on what you need to accomplish. However, Python’s most common and clean looping construct is the `for` loop, especially when iterating over sequences (like lists, tuples, strings, or ranges).

Here’s a simple and clear example:

```python
for item in iterable:
    # do something with item
```

For example, if you want to print numbers from 1 to 5:

```python
for i in range(1, 6):
    print(i)
```

If you need to write a loop that runs while a certain condition is true, the `while` loop is appropriate:

```python
count = 1
while count <= 5:
    print(count)
    count += 1
```

If you give me more context about your specific task, I’d be happy to help you write the most suitable loop!


## Task 5: Retrieval Augmented Generation

Now we can create a RAG prompt - which will help our system behave in a way that makes sense!

There is much you could do here, many tweaks and improvements to be made!

In [39]:
RAG_SYSTEM_TEMPLATE = """You are a knowledgeable assistant that answers questions based strictly on provided context.

Instructions:
- Only answer questions using information from the provided context
- If the context doesn't contain relevant information, respond with "I don't know"
- Be accurate and cite specific parts of the context when possible
- Keep responses {response_style} and {response_length}
- Only use the provided context. Do not use external knowledge.
- Only provide answers when you are confident the context supports your response."""

RAG_USER_TEMPLATE = """Context Information:
{context}

Number of relevant sources found: {context_count}
{similarity_scores}

Question: {user_query}

Please provide your answer based solely on the context above."""

rag_system_prompt = SystemRolePrompt(
    RAG_SYSTEM_TEMPLATE,
    strict=True,
    defaults={
        "response_style": "concise",
        "response_length": "brief"
    }
)

rag_user_prompt = UserRolePrompt(
    RAG_USER_TEMPLATE,
    strict=True,
    defaults={
        "context_count": "",
        "similarity_scores": ""
    }
)

Now we can create our pipeline!

In [40]:
class RetrievalAugmentedQAPipeline:
    def __init__(self, llm: ChatOpenAI, vector_db_retriever: VectorDatabase, 
                 response_style: str = "detailed", include_scores: bool = False) -> None:
        self.llm = llm
        self.vector_db_retriever = vector_db_retriever
        self.response_style = response_style
        self.include_scores = include_scores

    def run_pipeline(self, user_query: str, k: int = 4, **system_kwargs) -> dict:
        # Retrieve relevant contexts
        context_list = self.vector_db_retriever.search_by_text(user_query, k=k)
        
        context_prompt = ""
        similarity_scores = []
        
        for i, (context, score) in enumerate(context_list, 1):
            context_prompt += f"[Source {i}]: {context}\n\n"
            similarity_scores.append(f"Source {i}: {score:.3f}")
        
        # Create system message with parameters
        system_params = {
            "response_style": self.response_style,
            "response_length": system_kwargs.get("response_length", "detailed")
        }
        
        formatted_system_prompt = rag_system_prompt.create_message(**system_params)
        
        user_params = {
            "user_query": user_query,
            "context": context_prompt.strip(),
            "context_count": len(context_list),
            "similarity_scores": f"Relevance scores: {', '.join(similarity_scores)}" if self.include_scores else ""
        }
        
        formatted_user_prompt = rag_user_prompt.create_message(**user_params)

        return {
            "response": self.llm.run([formatted_system_prompt, formatted_user_prompt]), 
            "context": context_list,
            "context_count": len(context_list),
            "similarity_scores": similarity_scores if self.include_scores else None,
            "prompts_used": {
                "system": formatted_system_prompt,
                "user": formatted_user_prompt
            }
        }

In [41]:
rag_pipeline = RetrievalAugmentedQAPipeline(
    vector_db_retriever=vector_db,
    llm=chat_openai,
    response_style="detailed",
    include_scores=True
)

result = rag_pipeline.run_pipeline(
    "What is the 'Michael Eisner Memorial Weak Executive Problem'?",
    k=3,
    response_length="comprehensive", 
    include_warnings=True,
    confidence_required=True
)

print(f"Response: {result['response']}")
print(f"\nContext Count: {result['context_count']}")
print(f"Similarity Scores: {result['similarity_scores']}")

Response: The "Michael Eisner Memorial Weak Executive Problem" refers to a situation where a CEO or startup founder, who used to lead a particular functional area (such as product management, sales, or marketing), ends up hiring a weak executive to run that same function after they have moved on to the CEO role. This happens because the CEO has difficulty letting go of the function that originally brought them success and wants to remain "the man" in that area, so they deliberately or unconsciously hire someone weak for that executive role to maintain control. The name comes from Michael Eisner, the former CEO of Disney, who had been a brilliant TV network executive but, after acquiring ABC, the network fell to fourth place. Eisner’s response was to say that if he had extra time, he could personally turn around ABC instead of relying on the executives he hired. This illustrates the problem of not empowering strong executives in the CEO’s former specialty. (Source 1)

Context Count: 3
S

#### ❓ Question #4:

What prompting strategies could you use to make the LLM have a more thoughtful, detailed response?

What is that strategy called?

> NOTE: You can look through our [OpenAI Responses API](https://colab.research.google.com/drive/14SCfRnp39N7aoOx8ZxadWb0hAqk4lQdL?usp=sharing) notebook for an answer to this question if you get stuck!

##### ✅ Answer:


From the notebook, one clear strategy to get **more thoughtful, detailed responses** is to adjust the **`reasoning` parameter**.

- **Example from the notebook:**

```python
response = client.responses.create(
    model="gpt-5",
    reasoning={"effort": "low"},
    instructions="Talk like a wizard.",
    input="How to write an efficient loop with NumPy?",
)

Here, reasoning={"effort": "low"} directly controls how much computational effort the model uses when generating its reasoning.
By increasing the effort (e.g., "medium" or "high"), you prompt the model to deliver more detailed, step-by-step explanations instead of quick, shallow answers.

✨ This prompting strategy is called Reasoning Control (related to Chain-of-Thought prompting), since it guides how much effort the model invests in reasoning before producing its output.


### 🏗️ Activity #1:

Enhance your RAG application in some way! 

Suggestions are: 

- Allow it to work with PDF files
- Implement a new distance metric
- Add metadata support to the vector database
- Use a different embedding model
- Add the capability to ingest a YouTube link

While these are suggestions, you should feel free to make whatever augmentations you desire! If you shared an idea during Session 1, think about features you might need to incorporate for your use case! 

When you're finished making the augments to your RAG application - vibe check it against the old one - see if you can "feel the improvement"!

> NOTE: These additions might require you to work within the `aimakerspace` library - that's expected!

> NOTE: If you're not sure where to start - ask Cursor (CMD/CTRL+L) to guide you through the changes!

In [43]:
### YOUR CODE HERE
# 🚀 ANN-ENHANCED RAG APPLICATION - APPROXIMATE NEAREST NEIGHBORS! 🚀

print("🎉 Welcome to the ANN-Enhanced RAG Application! 🎉")
print("=" * 60)

# =============================================================================
# ANN IMPLEMENTATION: FAISS + ANNOY
# =============================================================================
print("\n🔍 ANN ENHANCEMENT: Approximate Nearest Neighbors")
print("-" * 50)

# Install ANN libraries
import subprocess
import sys

def install_package(package):
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        print(f"✅ Successfully installed {package}")
    except:
        print(f"⚠️  Could not install {package} - continuing anyway")

# Install ANN packages
install_package("faiss-cpu")
install_package("annoy")

# Import ANN libraries
import numpy as np
import time
from typing import List, Dict, Any, Optional, Tuple
import faiss
from annoy import AnnoyIndex
import pickle
import os
from aimakerspace.openai_utils.embedding import EmbeddingModel

print("✅ ANN libraries installed and imported!")

# =============================================================================
# ANN-BASED VECTOR DATABASE
# =============================================================================
print("\n🗄️ ANN-BASED VECTOR DATABASE")
print("-" * 50)

class ANNVectorDatabase:
    """Vector database using Approximate Nearest Neighbors for fast similarity search"""
    
    def __init__(self, embedding_model=None, ann_method: str = "faiss", dimension: int = 1536):
        self.embedding_model = embedding_model or EmbeddingModel()
        self.ann_method = ann_method.lower()
        self.dimension = dimension
        self.vectors = []
        self.texts = []
        self.metadata = []
        self.index = None
        self.is_built = False
        self.created_at = time.time()
        
        # Initialize ANN index based on method
        if self.ann_method == "faiss":
            self._init_faiss_index()
        elif self.ann_method == "annoy":
            self._init_annoy_index()
        else:
            raise ValueError("ANN method must be 'faiss' or 'annoy'")
    
    def _init_faiss_index(self):
        """Initialize FAISS index for fast similarity search"""
        # Use IndexFlatIP for inner product (cosine similarity with normalized vectors)
        self.index = faiss.IndexFlatIP(self.dimension)
        print("✅ FAISS IndexFlatIP initialized for cosine similarity")
    
    def _init_annoy_index(self):
        """Initialize Annoy index for fast similarity search"""
        # Use cosine distance for similarity
        self.index = AnnoyIndex(self.dimension, 'angular')  # angular = cosine distance
        print("✅ Annoy index initialized for cosine similarity")
    
    def insert(self, text: str, vector: np.array, metadata: Dict[str, Any] = None):
        """Insert a vector with text and metadata"""
        if len(vector) != self.dimension:
            raise ValueError(f"Vector dimension {len(vector)} doesn't match expected {self.dimension}")
        
        self.vectors.append(vector)
        self.texts.append(text)
        self.metadata.append(metadata or {})
        
        # Add to ANN index
        if self.ann_method == "faiss":
            # Normalize vector for cosine similarity
            normalized_vector = vector / np.linalg.norm(vector)
            self.index.add(normalized_vector.reshape(1, -1))
        elif self.ann_method == "annoy":
            self.index.add_item(len(self.vectors) - 1, vector)
    
    def build_index(self):
        """Build the ANN index for fast search"""
        if self.ann_method == "annoy":
            # Annoy requires building the index after adding all items
            self.index.build(10)  # 10 trees for good accuracy/speed tradeoff
            print(f"✅ Annoy index built with {len(self.vectors)} vectors")
        self.is_built = True
    
    def search(self, query_vector: np.array, k: int = 5, include_metadata: bool = False) -> List[Tuple[str, float, Dict]]:
        """Search for k nearest neighbors using ANN"""
        if not self.is_built and self.ann_method == "annoy":
            self.build_index()
        
        if len(self.vectors) == 0:
            return []
        
        # Prepare query vector
        if self.ann_method == "faiss":
            # Normalize query vector for cosine similarity
            normalized_query = query_vector / np.linalg.norm(query_vector)
            query_vector = normalized_query.reshape(1, -1)
        
        # Perform ANN search
        if self.ann_method == "faiss":
            scores, indices = self.index.search(query_vector, min(k, len(self.vectors)))
            scores = scores[0]  # Remove batch dimension
            indices = indices[0]
        elif self.ann_method == "annoy":
            indices, distances = self.index.get_nns_by_vector(
                query_vector, k, include_distances=True
            )
            # Convert distances to similarities (1 - distance for cosine)
            scores = [1 - d for d in distances]
        
        # Prepare results
        results = []
        for i, (idx, score) in enumerate(zip(indices, scores)):
            if idx < len(self.texts):  # Valid index
                text = self.texts[idx]
                metadata = self.metadata[idx] if include_metadata else {}
                results.append((text, float(score), metadata))
        
        return results
    
    def search_by_text(self, query_text: str, k: int = 5, include_metadata: bool = False) -> List[Tuple[str, float, Dict]]:
        """Search by text using ANN"""
        query_vector = self.embedding_model.get_embedding(query_text)
        return self.search(query_vector, k, include_metadata)
    
    async def abuild_from_list(self, list_of_text: List[str], metadata_list: List[Dict] = None) -> "ANNVectorDatabase":
        """Build ANN database from text list"""
        print(f"🔄 Building ANN database with {len(list_of_text)} texts...")
        
        # Get embeddings
        embeddings = await self.embedding_model.async_get_embeddings(list_of_text)
        
        # Insert all vectors
        for i, (text, embedding) in enumerate(zip(list_of_text, embeddings)):
            metadata = metadata_list[i] if metadata_list and i < len(metadata_list) else {}
            self.insert(text, np.array(embedding), metadata)
        
        # Build index
        self.build_index()
        
        print(f"✅ ANN database built with {len(self.vectors)} vectors using {self.ann_method.upper()}")
        return self
    
    def get_stats(self) -> Dict[str, Any]:
        """Get database statistics"""
        return {
            "total_vectors": len(self.vectors),
            "dimension": self.dimension,
            "ann_method": self.ann_method,
            "is_built": self.is_built,
            "created_at": self.created_at,
            "average_vector_norm": np.mean([np.linalg.norm(v) for v in self.vectors]) if self.vectors else 0
        }
    
    def save_index(self, filepath: str):
        """Save the ANN index to disk"""
        if self.ann_method == "faiss":
            faiss.write_index(self.index, f"{filepath}.faiss")
            # Save vectors and metadata separately
            with open(f"{filepath}_data.pkl", "wb") as f:
                pickle.dump({
                    "texts": self.texts,
                    "vectors": self.vectors,
                    "metadata": self.metadata
                }, f)
        elif self.ann_method == "annoy":
            self.index.save(f"{filepath}.annoy")
            with open(f"{filepath}_data.pkl", "wb") as f:
                pickle.dump({
                    "texts": self.texts,
                    "vectors": self.vectors,
                    "metadata": self.metadata
                }, f)
        print(f"✅ ANN index saved to {filepath}")
    
    def load_index(self, filepath: str):
        """Load the ANN index from disk"""
        if self.ann_method == "faiss":
            self.index = faiss.read_index(f"{filepath}.faiss")
            with open(f"{filepath}_data.pkl", "rb") as f:
                data = pickle.load(f)
                self.texts = data["texts"]
                self.vectors = data["vectors"]
                self.metadata = data["metadata"]
        elif self.ann_method == "annoy":
            self.index = AnnoyIndex(self.dimension, 'angular')
            self.index.load(f"{filepath}.annoy")
            with open(f"{filepath}_data.pkl", "rb") as f:
                data = pickle.load(f)
                self.texts = data["texts"]
                self.vectors = data["vectors"]
                self.metadata = data["metadata"]
        self.is_built = True
        print(f"✅ ANN index loaded from {filepath}")

print("✅ ANN Vector Database class ready!")

# =============================================================================
# ANN PERFORMANCE COMPARISON
# =============================================================================
print("\n⚡ ANN PERFORMANCE COMPARISON")
print("-" * 50)

class ANNPerformanceTester:
    """Test and compare different ANN methods"""
    
    def __init__(self, embedding_model=None):
        self.embedding_model = embedding_model or EmbeddingModel()
    
    async def compare_methods(self, texts: List[str], query_text: str, k: int = 5):
        """Compare FAISS vs Annoy performance"""
        print(f"🔍 Comparing ANN methods with {len(texts)} vectors...")
        
        results = {}
        
        # Test FAISS
        print("\n📊 Testing FAISS...")
        start_time = time.time()
        faiss_db = ANNVectorDatabase(ann_method="faiss")
        await faiss_db.abuild_from_list(texts)
        faiss_build_time = time.time() - start_time
        
        start_time = time.time()
        faiss_results = faiss_db.search_by_text(query_text, k)
        faiss_search_time = time.time() - start_time
        
        results["faiss"] = {
            "build_time": faiss_build_time,
            "search_time": faiss_search_time,
            "results": faiss_results,
            "stats": faiss_db.get_stats()
        }
        
        # Test Annoy
        print("\n📊 Testing Annoy...")
        start_time = time.time()
        annoy_db = ANNVectorDatabase(ann_method="annoy")
        await annoy_db.abuild_from_list(texts)
        annoy_build_time = time.time() - start_time
        
        start_time = time.time()
        annoy_results = annoy_db.search_by_text(query_text, k)
        annoy_search_time = time.time() - start_time
        
        results["annoy"] = {
            "build_time": annoy_build_time,
            "search_time": annoy_search_time,
            "results": annoy_results,
            "stats": annoy_db.get_stats()
        }
        
        # Print comparison
        print(f"\n📈 PERFORMANCE COMPARISON:")
        print(f"{'Method':<10} {'Build Time':<12} {'Search Time':<12} {'Vectors':<8}")
        print("-" * 50)
        print(f"{'FAISS':<10} {faiss_build_time:.3f}s{'':<5} {faiss_search_time:.6f}s{'':<5} {len(faiss_db.vectors):<8}")
        print(f"{'Annoy':<10} {annoy_build_time:.3f}s{'':<5} {annoy_search_time:.6f}s{'':<5} {len(annoy_db.vectors):<8}")
        
        return results

print("✅ ANN Performance Tester ready!")

# =============================================================================
# DEMONSTRATION: ANN-ENHANCED RAG
# =============================================================================
print("\n🚀 DEMONSTRATION: ANN-Enhanced RAG Pipeline")
print("=" * 60)

# Load existing documents
print("\n1. Loading documents...")
loader = TextFileLoader("data/PMarcaBlogs.txt")
documents = loader.load_documents()
print(f"   ✅ Loaded {len(documents)} documents")

# Create text chunks
print("\n2. Creating text chunks...")
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=100)
chunks = text_splitter.split_texts(documents)
print(f"   ✅ Created {len(chunks)} chunks")

# Test ANN methods
print("\n3. Testing ANN methods...")
tester = ANNPerformanceTester()
test_query = "What is the Michael Eisner problem?"

# Run comparison
comparison_results = await tester.compare_methods(chunks, test_query, k=3)

# Show results
print(f"\n🔍 Query: '{test_query}'")
print("\n📊 FAISS Results:")
for i, (text, score, metadata) in enumerate(comparison_results["faiss"]["results"]):
    print(f"   {i+1}. Score: {score:.4f} | Text: {text[:100]}...")

print("\n📊 Annoy Results:")
for i, (text, score, metadata) in enumerate(comparison_results["annoy"]["results"]):
    print(f"   {i+1}. Score: {score:.4f} | Text: {text[:100]}...")

# =============================================================================
# ANN-ENHANCED RAG PIPELINE
# =============================================================================
print("\n🎯 ANN-ENHANCED RAG PIPELINE")
print("=" * 60)

class ANNEnhancedRAGPipeline:
    """RAG pipeline using ANN for fast similarity search"""
    
    def __init__(self, ann_method: str = "faiss", response_style: str = "detailed"):
        self.ann_method = ann_method
        self.response_style = response_style
        self.vector_db = None
        self.llm = ChatOpenAI()
    
    async def build_pipeline(self, texts: List[str], metadata_list: List[Dict] = None):
        """Build the ANN-enhanced pipeline"""
        print(f"🔄 Building ANN-enhanced pipeline with {self.ann_method.upper()}...")
        
        self.vector_db = ANNVectorDatabase(ann_method=self.ann_method)
        await self.vector_db.abuild_from_list(texts, metadata_list)
        
        print(f"✅ Pipeline built with {len(self.vector_db.vectors)} vectors")
        return self
    
    def query(self, question: str, k: int = 3, include_metadata: bool = True):
        """Query the ANN-enhanced RAG pipeline"""
        if not self.vector_db:
            raise ValueError("Pipeline not built. Call build_pipeline() first.")
        
        # Search using ANN
        results = self.vector_db.search_by_text(question, k, include_metadata)
        
        # Prepare context
        context_prompt = ""
        for i, (text, score, metadata) in enumerate(results, 1):
            context_prompt += f"[Source {i}] (Score: {score:.3f}): {text}\n\n"
        
        # Create prompts
        system_prompt = f"""You are a knowledgeable assistant that answers questions based on the provided context.
        
Response style: {self.response_style}
Use only the information from the provided context to answer the question.
If the context doesn't contain enough information, say so clearly."""
        
        user_prompt = f"""Context:
{context_prompt}

Question: {question}

Please provide a {self.response_style} answer based on the context above."""
        
        # Get LLM response
        messages = [
            SystemRolePrompt("{content}").create_message(content=system_prompt),
            UserRolePrompt("{content}").create_message(content=user_prompt)
        ]
        
        response = self.llm.run(messages)
        
        return {
            "response": response,
            "context_sources": results,
            "context_count": len(results),
            "ann_method": self.ann_method,
            "query_time": time.time()
        }

# Test the ANN-enhanced pipeline
print("\n4. Testing ANN-Enhanced RAG Pipeline...")
ann_pipeline = ANNEnhancedRAGPipeline(ann_method="faiss", response_style="detailed")
await ann_pipeline.build_pipeline(chunks)

# Query the pipeline
result = ann_pipeline.query("What is the Michael Eisner problem?", k=3)

print(f"\n🤖 ANN-Enhanced Response:")
print(f"Response: {result['response']}")
print(f"\nContext Sources: {result['context_count']}")
print(f"ANN Method: {result['ann_method'].upper()}")

print("\n" + "="*60)
print("🎉 ANN-ENHANCED RAG APPLICATION COMPLETE! 🎉")
print("="*60)
print("\n✨ ANN ENHANCEMENTS IMPLEMENTED:")
print("   🔍 FAISS-based approximate nearest neighbors")
print("   🌳 Annoy-based tree search")
print("   ⚡ Fast similarity search for large datasets")
print("   📊 Performance comparison between methods")
print("   💾 Index persistence (save/load)")
print("   🚀 Scalable vector database")
print("\n🚀 Your RAG application now uses state-of-the-art ANN for lightning-fast search!")
print("   Perfect for production systems with millions of vectors!")
print("   Try different ANN methods to find the best performance for your use case!")


🎉 Welcome to the ANN-Enhanced RAG Application! 🎉

🔍 ANN ENHANCEMENT: Approximate Nearest Neighbors
--------------------------------------------------
✅ Successfully installed faiss-cpu
✅ Successfully installed annoy
✅ ANN libraries installed and imported!

🗄️ ANN-BASED VECTOR DATABASE
--------------------------------------------------
✅ ANN Vector Database class ready!

⚡ ANN PERFORMANCE COMPARISON
--------------------------------------------------
✅ ANN Performance Tester ready!

🚀 DEMONSTRATION: ANN-Enhanced RAG Pipeline

1. Loading documents...
   ✅ Loaded 1 documents

2. Creating text chunks...
   ✅ Created 745 chunks

3. Testing ANN methods...
🔍 Comparing ANN methods with 745 vectors...

📊 Testing FAISS...
✅ FAISS IndexFlatIP initialized for cosine similarity
🔄 Building ANN database with 745 texts...
✅ ANN database built with 745 vectors using FAISS

📊 Testing Annoy...
✅ Annoy index initialized for cosine similarity
🔄 Building ANN database with 745 texts...
✅ Annoy index built wit