# Your First RAG Application

In this notebook, we'll walk you through each of the components that are involved in a simple RAG application.

We won't be leveraging any fancy tools, just the OpenAI Python SDK, Numpy, and some classic Python.

> NOTE: This was done with Python 3.12.3.

> NOTE: There might be [compatibility issues](https://github.com/wandb/wandb/issues/7683) if you're on NVIDIA driver >552.44 As an interim solution - you can rollback your drivers to the 552.44.

## Table of Contents:

- Task 1: Imports and Utilities
- Task 2: Documents
- Task 3: Embeddings and Vectors
- Task 4: Prompts
- Task 5: Retrieval Augmented Generation
  - üöß Activity #1: Augment RAG

Let's look at a rather complicated looking visual representation of a basic RAG application.

<img src="https://i.imgur.com/vD8b016.png" />

## Task 1: Imports and Utility

We're just doing some imports and enabling `async` to work within the Jupyter environment here, nothing too crazy!

In [32]:
from aimakerspace.text_utils import TextFileLoader, CharacterTextSplitter
from aimakerspace.vectordatabase import VectorDatabase
import asyncio

In [33]:
import nest_asyncio
nest_asyncio.apply()

## Task 2: Documents

We'll be concerning ourselves with this part of the flow in the following section:

<img src="https://i.imgur.com/jTm9gjk.png" />

### Loading Source Documents

So, first things first, we need some documents to work with.

While we could work directly with the `.txt` files (or whatever file-types you wanted to extend this to) we can instead do some batch processing of those documents at the beginning in order to store them in a more machine compatible format.

In this case, we're going to parse our text file into a single document in memory.

Let's look at the relevant bits of the `TextFileLoader` class:

```python
def load_file(self):
        with open(self.path, "r", encoding=self.encoding) as f:
            self.documents.append(f.read())
```

We're simply loading the document using the built in `open` method, and storing that output in our `self.documents` list.

> NOTE: We're using blogs from PMarca (Marc Andreessen) as our sample data. This data is largely irrelevant as we want to focus on the mechanisms of RAG, which includes out data's shape and quality - but not specifically what the contents of the data are. 


In [34]:
text_loader = TextFileLoader("data/PMarcaBlogs.txt")
documents = text_loader.load_documents()
len(documents)

1

In [35]:
print(documents[0][:100])


The Pmarca Blog Archives
(select posts from 2007-2009)
Marc Andreessen
copyright: Andreessen Horow


### Splitting Text Into Chunks

As we can see, there is one massive document.

We'll want to chunk the document into smaller parts so it's easier to pass the most relevant snippets to the LLM.

There is no fixed way to split/chunk documents - and you'll need to rely on some intuition as well as knowing your data *very* well in order to build the most robust system.

For this toy example, we'll just split blindly on length.

>There's an opportunity to clear up some terminology here, for this course we will be stick to the following:
>
>- "source documents" : The `.txt`, `.pdf`, `.html`, ..., files that make up the files and information we start with in its raw format
>- "document(s)" : single (or more) text object(s)
>- "corpus" : the combination of all of our documents

As you can imagine (though it's not specifically true in this toy example) the idea of splitting documents is to break them into managable sized chunks that retain the most relevant local context.

In [36]:
text_splitter = CharacterTextSplitter()
split_documents = text_splitter.split_texts(documents)
len(split_documents)

373

Let's take a look at some of the documents we've managed to split.

In [37]:
split_documents[0:1]

['\ufeff\nThe Pmarca Blog Archives\n(select posts from 2007-2009)\nMarc Andreessen\ncopyright: Andreessen Horowitz\ncover design: Jessica Hagy\nproduced using: Pressbooks\nContents\nTHE PMARCA GUIDE TO STARTUPS\nPart 1: Why not to do a startup 2\nPart 2: When the VCs say "no" 10\nPart 3: "But I don\'t know any VCs!" 18\nPart 4: The only thing that matters 25\nPart 5: The Moby Dick theory of big companies 33\nPart 6: How much funding is too little? Too much? 41\nPart 7: Why a startup\'s initial business plan doesn\'t\nmatter that much\n49\nTHE PMARCA GUIDE TO HIRING\nPart 8: Hiring, managing, promoting, and Dring\nexecutives\n54\nPart 9: How to hire a professional CEO 68\nHow to hire the best people you\'ve ever worked\nwith\n69\nTHE PMARCA GUIDE TO BIG COMPANIES\nPart 1: Turnaround! 82\nPart 2: Retaining great people 86\nTHE PMARCA GUIDE TO CAREER, PRODUCTIVITY,\nAND SOME OTHER THINGS\nIntroduction 97\nPart 1: Opportunity 99\nPart 2: Skills and education 107\nPart 3: Where to go and wh

## Task 3: Embeddings and Vectors

Next, we have to convert our corpus into a "machine readable" format as we explored in the Embedding Primer notebook.

Today, we're going to talk about the actual process of creating, and then storing, these embeddings, and how we can leverage that to intelligently add context to our queries.

### OpenAI API Key

In order to access OpenAI's APIs, we'll need to provide our OpenAI API Key!

You can work through the folder "OpenAI API Key Setup" for more information on this process if you don't already have an API Key!

In [38]:
import os
import openai
from dotenv import load_dotenv


# Load environment variables from .env file
load_dotenv()

# Get API key from environment variables
api_key = os.getenv("OPENAI_API_KEY")

# Set the API key for OpenAI
openai.api_key = api_key
os.environ["OPENAI_API_KEY"] = api_key
print("‚úÖ OpenAI API key set successfully")


‚úÖ OpenAI API key set successfully


In [39]:
# Test that the API key is working
import os
print("üîë API Key Status:")
print(f"Environment variable set: {'OPENAI_API_KEY' in os.environ}")
print(f"OpenAI API key set: {hasattr(openai, 'api_key') and openai.api_key is not None}")

if 'OPENAI_API_KEY' in os.environ:
    print(f"Key starts with: {os.environ['OPENAI_API_KEY'][:10]}...")
    print("‚úÖ Ready to use OpenAI APIs!")
else:
    print("‚ùå API key not set properly")


üîë API Key Status:
Environment variable set: True
OpenAI API key set: True
Key starts with: sk-proj-di...
‚úÖ Ready to use OpenAI APIs!


### Vector Database

Let's set up our vector database to hold all our documents and their embeddings!

While this is all baked into 1 call - we can look at some of the code that powers this process to get a better understanding:

Let's look at our `VectorDatabase().__init__()`:

```python
def __init__(self, embedding_model: EmbeddingModel = None):
        self.vectors = defaultdict(np.array)
        self.embedding_model = embedding_model or EmbeddingModel()
```

As you can see - our vectors are merely stored as a dictionary of `np.array` objects.

Secondly, our `VectorDatabase()` has a default `EmbeddingModel()` which is a wrapper for OpenAI's `text-embedding-3-small` model.

> **Quick Info About `text-embedding-3-small`**:
> - It has a context window of **8191** tokens
> - It returns vectors with dimension **1536**

#### ‚ùìQuestion #1:

The default embedding dimension of `text-embedding-3-small` is 1536, as noted above. 

1. Is there any way to modify this dimension? 
2. What technique does OpenAI use to achieve this? 

> NOTE: Check out this [API documentation](https://platform.openai.com/docs/api-reference/embeddings/create) for the answer to question #1.1, and [this documentation](https://platform.openai.com/docs/guides/embeddings/use-cases) for an answer to question #1.2!


##### ‚úÖ Answer:
1. Yes there is, one can modify the dimensions parameter in the embedding vector.
2. The embedding vector can be shortened without changing the meaning of it's concept representing properties. An example of this is on the MTEB benchmark, a text-embedding-3-large embedding can be shortened to a size of 256 while still outperforming an unshortened text-embedding-ada-002 embedding with a size of 1536. Shortening the dimensions parameter trades off some accuracy in exchange for smaller vector size and cost. OpenAI uses Matryoshka Representation Learning (MRL) to enable this by embedding concept information at different granularities, allowing for smaller vectors without losing significant semantic meaning. 

We can call the `async_get_embeddings` method of our `EmbeddingModel()` on a list of `str` and receive a list of `float` back!

```python
async def async_get_embeddings(self, list_of_text: List[str]) -> List[List[float]]:
        return await aget_embeddings(
            list_of_text=list_of_text, engine=self.embeddings_model_name
        )
```

We cast those to `np.array` when we build our `VectorDatabase()`:

```python
async def abuild_from_list(self, list_of_text: List[str]) -> "VectorDatabase":
        embeddings = await self.embedding_model.async_get_embeddings(list_of_text)
        for text, embedding in zip(list_of_text, embeddings):
            self.insert(text, np.array(embedding))
        return self
```

And that's all we need to do!

In [40]:
vector_db = VectorDatabase()
vector_db = asyncio.run(vector_db.abuild_from_list(split_documents))

#### ‚ùìQuestion #2:

What are the benefits of using an `async` approach to collecting our embeddings?

> NOTE: Determining the core difference between `async` and `sync` will be useful! If you get stuck - ask ChatGPT!

##### ‚úÖ Answer: Sync waits for each embedding to finish before starting the next (blocking, simple, lower throughput); async launches multiple requests and processes results as they complete (non-blocking, higher throughput), best for I/O-bound, high-latency APIs with bounded concurrency and retries.


So, to review what we've done so far in natural language:

1. We load source documents
2. We split those source documents into smaller chunks (documents)
3. We send each of those documents to the `text-embedding-3-small` OpenAI API endpoint
4. We store each of the text representations with the vector representations as keys/values in a dictionary

### Semantic Similarity

The next step is to be able to query our `VectorDatabase()` with a `str` and have it return to us vectors and text that is most relevant from our corpus.

We're going to use the following process to achieve this in our toy example:

1. We need to embed our query with the same `EmbeddingModel()` as we used to construct our `VectorDatabase()`
2. We loop through every vector in our `VectorDatabase()` and use a distance measure to compare how related they are
3. We return a list of the top `k` closest vectors, with their text representations

There's some very heavy optimization that can be done at each of these steps - but let's just focus on the basic pattern in this notebook.

> We are using [cosine similarity](https://www.engati.com/glossary/cosine-similarity) as a distance metric in this example - but there are many many distance metrics you could use - like [these](https://flavien-vidal.medium.com/similarity-distances-for-natural-language-processing-16f63cd5ba55)

> We are using a rather inefficient way of calculating relative distance between the query vector and all other vectors - there are more advanced approaches that are much more efficient, like [ANN](https://towardsdatascience.com/comprehensive-guide-to-approximate-nearest-neighbors-algorithms-8b94f057d6b6)

In [41]:
vector_db.search_by_text("What is the Michael Eisner Memorial Weak Executive Problem?", k=3)

[('ordingly.\nSeventh, when hiring the executive to run your former specialty, be\ncareful you don‚Äôt hire someone weak on purpose.\nThis sounds silly, but you wouldn‚Äôt believe how oaen it happens.\nThe CEO who used to be a product manager who has a weak\nproduct management executive. The CEO who used to be in\nsales who has a weak sales executive. The CEO who used to be\nin marketing who has a weak marketing executive.\nI call this the ‚ÄúMichael Eisner Memorial Weak Executive Problem‚Äù ‚Äî aaer the CEO of Disney who had previously been a brilliant TV network executive. When he bought ABC at Disney, it\npromptly fell to fourth place. His response? ‚ÄúIf I had an extra\ntwo days a week, I could turn around ABC myself.‚Äù Well, guess\nwhat, he didn‚Äôt have an extra two days a week.\nA CEO ‚Äî or a startup founder ‚Äî oaen has a hard time letting\ngo of the function that brought him to the party. The result: you\nhire someone weak into the executive role for that function so\nthat y

## Task 4: Prompts

In the following section, we'll be looking at the role of prompts - and how they help us to guide our application in the right direction.

In this notebook, we're going to rely on the idea of "zero-shot in-context learning".

This is a lot of words to say: "We will ask it to perform our desired task in the prompt, and provide no examples."

### XYZRolePrompt

Before we do that, let's stop and think a bit about how OpenAI's chat models work.

We know they have roles - as is indicated in the following API [documentation](https://platform.openai.com/docs/api-reference/chat/create#chat/create-messages)

There are three roles, and they function as follows (taken directly from [OpenAI](https://platform.openai.com/docs/guides/gpt/chat-completions-api)):

- `{"role" : "system"}` : The system message helps set the behavior of the assistant. For example, you can modify the personality of the assistant or provide specific instructions about how it should behave throughout the conversation. However note that the system message is optional and the model‚Äôs behavior without a system message is likely to be similar to using a generic message such as "You are a helpful assistant."
- `{"role" : "user"}` : The user messages provide requests or comments for the assistant to respond to.
- `{"role" : "assistant"}` : Assistant messages store previous assistant responses, but can also be written by you to give examples of desired behavior.

The main idea is this:

1. You start with a system message that outlines how the LLM should respond, what kind of behaviours you can expect from it, and more
2. Then, you can provide a few examples in the form of "assistant"/"user" pairs
3. Then, you prompt the model with the true "user" message.

In this example, we'll be forgoing the 2nd step for simplicities sake.

#### Utility Functions

You'll notice that we're using some utility functions from the `aimakerspace` module - let's take a peek at these and see what they're doing!

##### XYZRolePrompt

Here we have our `system`, `user`, and `assistant` role prompts.

Let's take a peek at what they look like:

```python
class BasePrompt:
    def __init__(self, prompt):
        """
        Initializes the BasePrompt object with a prompt template.

        :param prompt: A string that can contain placeholders within curly braces
        """
        self.prompt = prompt
        self._pattern = re.compile(r"\{([^}]+)\}")

    def format_prompt(self, **kwargs):
        """
        Formats the prompt string using the keyword arguments provided.

        :param kwargs: The values to substitute into the prompt string
        :return: The formatted prompt string
        """
        matches = self._pattern.findall(self.prompt)
        return self.prompt.format(**{match: kwargs.get(match, "") for match in matches})

    def get_input_variables(self):
        """
        Gets the list of input variable names from the prompt string.

        :return: List of input variable names
        """
        return self._pattern.findall(self.prompt)
```

Then we have our `RolePrompt` which laser focuses us on the role pattern found in most API endpoints for LLMs.

```python
class RolePrompt(BasePrompt):
    def __init__(self, prompt, role: str):
        """
        Initializes the RolePrompt object with a prompt template and a role.

        :param prompt: A string that can contain placeholders within curly braces
        :param role: The role for the message ('system', 'user', or 'assistant')
        """
        super().__init__(prompt)
        self.role = role

    def create_message(self, **kwargs):
        """
        Creates a message dictionary with a role and a formatted message.

        :param kwargs: The values to substitute into the prompt string
        :return: Dictionary containing the role and the formatted message
        """
        return {"role": self.role, "content": self.format_prompt(**kwargs)}
```

We'll look at how the `SystemRolePrompt` is constructed to get a better idea of how that extension works:

```python
class SystemRolePrompt(RolePrompt):
    def __init__(self, prompt: str):
        super().__init__(prompt, "system")
```

That pattern is repeated for our `UserRolePrompt` and our `AssistantRolePrompt` as well.

##### ChatOpenAI

Next we have our model, which is converted to a format analagous to libraries like LangChain and LlamaIndex.

Let's take a peek at how that is constructed:

```python
class ChatOpenAI:
    def __init__(self, model_name: str = "gpt-4.1-mini"):
        self.model_name = model_name
        self.openai_api_key = os.getenv("OPENAI_API_KEY")
        if self.openai_api_key is None:
            raise ValueError("OPENAI_API_KEY is not set")

    def run(self, messages, text_only: bool = True):
        if not isinstance(messages, list):
            raise ValueError("messages must be a list")

        openai.api_key = self.openai_api_key
        response = openai.ChatCompletion.create(
            model=self.model_name, messages=messages
        )

        if text_only:
            return response.choices[0].message.content

        return response
```

#### ‚ùì Question #3:

When calling the OpenAI API - are there any ways we can achieve more reproducible outputs?

> NOTE: Check out [this section](https://platform.openai.com/docs/guides/text-generation/) of the OpenAI documentation for the answer!

##### ‚úÖ Answer: Yes we can achieve reproducible outputs by pinning production applications to specific model ID's. This is similar to a seed ID in simulations. This enables us to monitor prompt behavior as we continue to iterate.


### Creating and Prompting OpenAI's `gpt-4.1-mini`!

Let's tie all these together and use it to prompt `gpt-4.1-mini`!

In [42]:
from aimakerspace.openai_utils.prompts import (
    UserRolePrompt,
    SystemRolePrompt,
    AssistantRolePrompt,
)

from aimakerspace.openai_utils.chatmodel import ChatOpenAI

chat_openai = ChatOpenAI()
user_prompt_template = "{content}"
user_role_prompt = UserRolePrompt(user_prompt_template)
system_prompt_template = (
    "You are an expert in {expertise}, you always answer in a kind way."
)
system_role_prompt = SystemRolePrompt(system_prompt_template)

messages = [
    system_role_prompt.create_message(expertise="Python"),
    user_role_prompt.create_message(
        content="What is the best way to write a loop?"
    ),
]

response = chat_openai.run(messages)

In [43]:
print(response)

Hello! I'd be happy to help with that. 

The "best" way to write a loop in Python depends on what you're trying to accomplish, but generally, Python's `for` loops are very idiomatic and readable for iterating over sequences like lists, strings, or ranges.

Here's a simple example of a `for` loop:

```python
for i in range(5):
    print(i)
```

This will print numbers 0 to 4.

If you want to loop over items in a list:

```python
fruits = ["apple", "banana", "cherry"]
for fruit in fruits:
    print(fruit)
```

Some tips for writing good loops:

- Use `for` loops when iterating over a sequence or collection.
- Use `while` loops if you need to loop until a certain condition is met and the number of iterations isn't predetermined.
- Keep the loop body simple and avoid unnecessary complexity.
- Use break and continue sparingly to keep your code easier to read.

If you'd like, feel free to share a specific example or task, and I can help tailor the best loop for that! üòä


## Task 5: Retrieval Augmented Generation

Now we can create a RAG prompt - which will help our system behave in a way that makes sense!

There is much you could do here, many tweaks and improvements to be made!

In [60]:
RAG_SYSTEM_TEMPLATE = """You are a knowledgeable assistant that answers questions based strictly on provided context.

Instructions:
- Only answer questions using information from the provided context
- If the context doesn't contain relevant information, respond with "I don't know"
- Be accurate and cite specific parts of the context when possible
- Keep responses {response_style} and {response_length}
- Only use the provided context. Do not use external knowledge.
- Only provide answers when you are confident the context supports your response."""

RAG_USER_TEMPLATE = """Context Information:
{context}

Number of relevant sources found: {context_count}
{similarity_scores}

Question: {user_query}

Please provide your answer based solely on the context above."""

rag_system_prompt = SystemRolePrompt(
    RAG_SYSTEM_TEMPLATE,
    strict=True,
    defaults={
        "response_style": "concise",
        "response_length": "brief"
    }
)

rag_user_prompt = UserRolePrompt(
    RAG_USER_TEMPLATE,
    strict=True,
    defaults={
        "context_count": "",
        "similarity_scores": ""
    }
)

Now we can create our pipeline!

In [61]:
class RetrievalAugmentedQAPipeline:
    def __init__(self, llm: ChatOpenAI, vector_db_retriever: VectorDatabase, 
                 response_style: str = "detailed", include_scores: bool = False,
                 distance_metric: str = "cosine") -> None:
        self.llm = llm
        self.vector_db_retriever = vector_db_retriever
        self.response_style = response_style
        self.include_scores = include_scores
        self.distance_metric = distance_metric

    def run_pipeline(self, user_query: str, k: int = 4, distance_metric: str = None, **system_kwargs) -> dict:
        # Use provided distance metric or default to instance metric
        metric_to_use = distance_metric or self.distance_metric
        
        # Retrieve relevant contexts using specified distance metric
        context_list = self.vector_db_retriever.search_by_text(
            user_query, k=k, distance_measure=metric_to_use
        )
        
        context_prompt = ""
        similarity_scores = []
        
        for i, (context, score) in enumerate(context_list, 1):
            context_prompt += f"[Source {i}]: {context}\n\n"
            similarity_scores.append(f"Source {i}: {score:.3f}")
        
        # Create system message with parameters
        system_params = {
            "response_style": self.response_style,
            "response_length": system_kwargs.get("response_length", "detailed")
        }
        
        formatted_system_prompt = rag_system_prompt.create_message(**system_params)
        
        user_params = {
            "user_query": user_query,
            "context": context_prompt.strip(),
            "context_count": len(context_list),
            "similarity_scores": f"Relevance scores: {', '.join(similarity_scores)}" if self.include_scores else ""
        }
        
        formatted_user_prompt = rag_user_prompt.create_message(**user_params)

        return {
            "response": self.llm.run([formatted_system_prompt, formatted_user_prompt]), 
            "context": context_list,
            "context_count": len(context_list),
            "similarity_scores": similarity_scores if self.include_scores else None,
            "distance_metric_used": metric_to_use,
            "prompts_used": {
                "system": formatted_system_prompt,
                "user": formatted_user_prompt
            }
        }

In [62]:
rag_pipeline = RetrievalAugmentedQAPipeline(
    vector_db_retriever=vector_db,
    llm=chat_openai,
    response_style="detailed",
    include_scores=True
)

result = rag_pipeline.run_pipeline(
    # "What is the 'Michael Eisner Memorial Weak Executive Problem'?",
    "What ways of investing are beneficial?",
    k=3,
    response_length="comprehensive", 
    include_warnings=True,
    confidence_required=True
)

print(f"Response: {result['response']}")
print(f"\nContext Count: {result['context_count']}")
print(f"Similarity Scores: {result['similarity_scores']}")

Response: Based on the provided context, beneficial ways of investing include managing investments as part of an overall portfolio rather than focusing on individual investments in isolation. In this portfolio approach, each investment's potential return (benefits) and risk profile (things that can go wrong) are considered together to understand the overall risk-return balance of the portfolio (Sources 1 and 2). 

It is logical and often beneficial to include individual investments within a portfolio that may be far riskier than one would normally be comfortable with if the potential return is sufficiently attractive. Conversely, it can also be beneficial to include investments that are far less risky and offer lower return potential to protect against downside risks. The key point is that the significance lies not in the risk of any single investment but in how all investments' risks and returns combine in the portfolio as a whole (Source 1).

Additionally, gaining exposure to younger

#### ‚ùì Question #4:

What prompting strategies could you use to make the LLM have a more thoughtful, detailed response?

What is that strategy called?

> NOTE: You can look through our [OpenAI Responses API](https://colab.research.google.com/drive/14SCfRnp39N7aoOx8ZxadWb0hAqk4lQdL?usp=sharing) notebook for an answer to this question if you get stuck!

##### ‚úÖ Answer: 
Chain-of-Thought (CoT) Prompting, Ask the model to "think step by step" or "show your reasoning". 
Few-Shot Learning, Provide examples of the desired response format.
Role-Based Prompting
Give the model a specific role or expertise.
Iterative Prompting
Ask follow-up questions to get more detail.


One can also use the Responses API to control the reasoning and effort and to provide developer instructions. The reasoning controls how much computational effort is put into an answer and the instructions can help guide the model output.


### üèóÔ∏è Activity #1:

Enhance your RAG application in some way! 

Suggestions are: 

- Allow it to work with PDF files
- Implement a new distance metric
- Add metadata support to the vector database
- Use a different embedding model
- Add the capability to ingest a YouTube link

While these are suggestions, you should feel free to make whatever augmentations you desire! If you shared an idea during Session 1, think about features you might need to incorporate for your use case! 

When you're finished making the augments to your RAG application - vibe check it against the old one - see if you can "feel the improvement"!

> NOTE: These additions might require you to work within the `aimakerspace` library - that's expected!

> NOTE: If you're not sure where to start - ask Cursor (CMD/CTRL+L) to guide you through the changes!

## üîç Distance Metrics Comparison

Now let's explore the different distance metrics available in our enhanced RAG system! We've implemented several distance metrics that can affect how similar documents are retrieved:

### Available Distance Metrics:

1. **Cosine Similarity** (default) - Measures the angle between vectors, good for text similarity
2. **Euclidean Distance** - Measures straight-line distance between vectors
3. **Manhattan Distance** - Measures sum of absolute differences (L1 norm)
4. **Dot Product** - Measures vector alignment and magnitude
5. **Pearson Correlation** - Measures linear relationship between vectors
6. **Jaccard Similarity** - Measures overlap of binary features

Let's test these different metrics and see how they affect retrieval results!


In [56]:
# Quick setup: Create vector database if it doesn't exist
print("üîß Setting up vector database for distance metrics demo...")

# Check if we need to create a vector database
if 'vector_db' not in locals() and 'youtube_vector_db' not in locals():
    print("üìÑ Creating vector database from PMarca blogs...")
    
    # Load and split documents
    from aimakerspace.text_utils import TextFileLoader, CharacterTextSplitter
    from aimakerspace.vectordatabase import VectorDatabase
    import asyncio
    
    # Load documents
    text_loader = TextFileLoader("data/PMarcaBlogs.txt")
    documents = text_loader.load_documents()
    
    # Split documents
    text_splitter = CharacterTextSplitter()
    split_documents = text_splitter.split_texts(documents)
    
    # Create vector database
    vector_db = VectorDatabase()
    vector_db = asyncio.run(vector_db.abuild_from_list(split_documents))
    
    print(f"‚úÖ Created vector_db with {len(split_documents)} documents")
else:
    if 'vector_db' in locals():
        print("‚úÖ vector_db already exists")
    if 'youtube_vector_db' in locals():
        print("‚úÖ youtube_vector_db already exists")

print("üöÄ Ready to test distance metrics!")


üîß Setting up vector database for distance metrics demo...
‚úÖ vector_db already exists
üöÄ Ready to test distance metrics!


In [57]:
# Reload the vectordatabase module to get the new distance metrics
import importlib
import sys

# Force reload the vectordatabase module
if 'aimakerspace.vectordatabase' in sys.modules:
    importlib.reload(sys.modules['aimakerspace.vectordatabase'])

# Import the new distance metrics
from aimakerspace.vectordatabase import (
    cosine_similarity, euclidean_distance, manhattan_distance,
    dot_product_similarity, pearson_correlation, jaccard_similarity
)

# Check which vector database is available
if 'vector_db' in locals():
    print("‚úÖ Using vector_db (PMarca blogs)")
    current_vector_db = vector_db
elif 'youtube_vector_db' in locals():
    print("‚úÖ Using youtube_vector_db (YouTube transcripts)")
    current_vector_db = youtube_vector_db
else:
    print("‚ùå No vector database found. Please run the earlier cells to create one.")
    print("Available variables:", [var for var in locals() if 'vector' in var.lower()])

# Check available metrics in our vector database
if 'current_vector_db' in locals():
    print("\nüéØ Available Distance Metrics:")
    print("=" * 40)
    available_metrics = current_vector_db.get_available_metrics()
    for i, metric in enumerate(available_metrics, 1):
        print(f"{i}. {metric}")

    print(f"\n‚úÖ Total metrics available: {len(available_metrics)}")
else:
    print("‚ö†Ô∏è Cannot proceed without a vector database. Please run the document loading and vector database creation cells first.")


‚úÖ Using vector_db (PMarca blogs)

üéØ Available Distance Metrics:
1. cosine
2. euclidean
3. manhattan
4. dot_product
5. pearson
6. jaccard

‚úÖ Total metrics available: 6


In [58]:
# Advanced: Create pipelines with different default distance metrics
if 'current_vector_db' in locals() and 'chat_openai' in locals():
    print("üöÄ Creating Specialized RAG Pipelines with Different Default Metrics")
    print("=" * 70)

    # Create specialized pipelines
    cosine_pipeline = RetrievalAugmentedQAPipeline(
        vector_db_retriever=current_vector_db,
        llm=chat_openai,
        response_style="detailed",
        include_scores=True,
        distance_metric="cosine"
    )

    euclidean_pipeline = RetrievalAugmentedQAPipeline(
        vector_db_retriever=current_vector_db,
        llm=chat_openai,
        response_style="detailed", 
        include_scores=True,
        distance_metric="euclidean"
    )

    dot_product_pipeline = RetrievalAugmentedQAPipeline(
        vector_db_retriever=current_vector_db,
        llm=chat_openai,
        response_style="detailed",
        include_scores=True,
        distance_metric="dot_product"
    )

    # Test with the same question
    comparison_question = "What is the 'Michael Eisner Memorial Weak Executive Problem'?"
    pipelines = [
        ("Cosine Similarity", cosine_pipeline),
        ("Euclidean Distance", euclidean_pipeline), 
        ("Dot Product", dot_product_pipeline)
    ]

    print(f"üîç Question: '{comparison_question}'")
    print("=" * 50)

    for name, pipeline in pipelines:
        print(f"\nüìè {name} Pipeline:")
        print("-" * 30)
        
        try:
            result = pipeline.run_pipeline(comparison_question, k=2)
            print(f"üéØ Metric: {result['distance_metric_used']}")
            print(f"üìä Scores: {result['similarity_scores']}")
            print(f"üí¨ Response: {result['response'][:150]}...")
            
        except Exception as e:
            print(f"‚ùå Error: {e}")

    print(f"\nüéâ All specialized pipelines tested successfully!")
else:
    print("‚ö†Ô∏è Cannot create specialized pipelines without vector database and chat model. Please run the earlier cells first.")


üöÄ Creating Specialized RAG Pipelines with Different Default Metrics
üîç Question: 'What is the 'Michael Eisner Memorial Weak Executive Problem'?'

üìè Cosine Similarity Pipeline:
------------------------------
üéØ Metric: cosine
üìä Scores: ['Source 1: 0.658', 'Source 2: 0.509']
üí¨ Response: The "Michael Eisner Memorial Weak Executive Problem" refers to a situation in which a CEO or startup founder has difficulty letting go of the function...

üìè Euclidean Distance Pipeline:
------------------------------
üéØ Metric: euclidean
üìä Scores: ['Source 1: -0.827', 'Source 2: -0.991']
üí¨ Response: The "Michael Eisner Memorial Weak Executive Problem" refers to a situation where a CEO, who previously excelled in a specific functional area (such as...

üìè Dot Product Pipeline:
------------------------------
üéØ Metric: dot_product
üìä Scores: ['Source 1: 0.658', 'Source 2: 0.509']
üí¨ Response: The "Michael Eisner Memorial Weak Executive Problem" refers to the tendency of a 

In [59]:
# Test RAG pipeline with different distance metrics
if 'current_vector_db' in locals() and 'chat_openai' in locals():
    print("ü§ñ Testing RAG Pipeline with Different Distance Metrics")
    print("=" * 60)

    # Create a test pipeline
    test_pipeline = RetrievalAugmentedQAPipeline(
        vector_db_retriever=current_vector_db,
        llm=chat_openai,
        response_style="concise",
        include_scores=True
    )

    # Test with different metrics
    test_metrics = ["cosine", "euclidean", "dot_product"]
    test_question = "What are the key principles of valuation?"

    for metric in test_metrics:
        print(f"\nüéØ Testing with {metric.upper()} metric:")
        print("-" * 40)
        
        try:
            result = test_pipeline.run_pipeline(
                test_question,
                k=2,
                distance_metric=metric,
                response_length="brief"
            )
            
            print(f"üìä Distance metric used: {result['distance_metric_used']}")
            print(f"üìÑ Context count: {result['context_count']}")
            print(f"üéØ Similarity scores: {result['similarity_scores']}")
            print(f"üí¨ Response: {result['response'][:200]}...")
            
        except Exception as e:
            print(f"‚ùå Error with {metric}: {e}")

    print(f"\n‚úÖ RAG pipeline testing completed!")
else:
    print("‚ö†Ô∏è Cannot test RAG pipeline without vector database and chat model. Please run the earlier cells first.")


ü§ñ Testing RAG Pipeline with Different Distance Metrics

üéØ Testing with COSINE metric:
----------------------------------------
üìä Distance metric used: cosine
üìÑ Context count: 2
üéØ Similarity scores: ['Source 1: 0.378', 'Source 2: 0.365']
üí¨ Response: I don't know. The provided context discusses risks in startups and fundraising strategies but does not explicitly address key principles of valuation....

üéØ Testing with EUCLIDEAN metric:
----------------------------------------
üìä Distance metric used: euclidean
üìÑ Context count: 2
üéØ Similarity scores: ['Source 1: -1.115', 'Source 2: -1.127']
üí¨ Response: I don't know. The provided context does not discuss the key principles of valuation....

üéØ Testing with DOT_PRODUCT metric:
----------------------------------------
üìä Distance metric used: dot_product
üìÑ Context count: 2
üéØ Similarity scores: ['Source 1: 0.378', 'Source 2: 0.365']
üí¨ Response: I don't know. The provided context does not contain inf

## üé• YouTube Integration Demo

Here's how to use the new YouTube functionality in your RAG pipeline!

### üì¶ Install Required Package

First, let's make sure the required package is installed:


In [None]:


# Fixed YouTube API Scraper for Professor Damodaran Videos
import requests
import time
from urllib.parse import quote
import os
from dotenv import load_dotenv


# Load environment variables from .env file
load_dotenv()

# Get API key from environment variables
# api_key = os.getenv("OPENAI_API_KEY")
# YouTube API configuration
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")  # Your API key
BASE_URL = "https://www.googleapis.com/youtube/v3"

# Professor Damodaran's known channels and search terms
DAMODARAN_CHANNELS = [
    "UCKn6CPi8h4IG3g4ga5FgKfQ",  # Aswath Damodaran's main channel
]

SEARCH_TERMS = [
    "Aswath Damodaran",
    "Professor Damodaran",
    "Damodaran valuation",
    "Damodaran corporate finance",
    "Damodaran investment",
    "Damodaran DCF",
    "Damodaran risk",
    "Damodaran NYU",
    "Damodaran financial modeling",
    "Damodaran cost of capital"
]

def search_youtube_videos(search_term, max_results=10):
    """Search YouTube for videos using the API"""
    url = f"{BASE_URL}/search"
    params = {
        'part': 'snippet',
        'q': search_term,
        'maxResults': max_results,
        'order': 'relevance',
        'type': 'video',
        'key': YOUTUBE_API_KEY
    }
    
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"‚ùå Error searching for '{search_term}': {e}")
        return None

def get_video_details(video_id):
    """Get detailed information about a video"""
    url = f"{BASE_URL}/videos"
    params = {
        'part': 'snippet,statistics,contentDetails',
        'id': video_id,
        'key': YOUTUBE_API_KEY
    }
    
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"‚ùå Error getting details for video {video_id}: {e}")
        return None

def filter_damodaran_videos(search_results):
    """Filter videos to ensure they're from Professor Damodaran"""
    damodaran_videos = []
    
    for item in search_results.get('items', []):
        # Handle different response formats
        if 'id' in item:
            if isinstance(item['id'], dict) and 'videoId' in item['id']:
                video_id = item['id']['videoId']
            elif isinstance(item['id'], str):
                video_id = item['id']
            else:
                print(f"‚ö†Ô∏è Unexpected video ID format: {item['id']}")
                continue
        else:
            print(f"‚ö†Ô∏è No video ID found in item: {item}")
            continue
            
        title = item['snippet']['title']
        channel_title = item['snippet']['channelTitle']
        description = item['snippet']['description']
        
        # Check if it's Damodaran-related content
        damodaran_keywords = ['damodaran', 'aswath', 'professor damodaran']
        is_damodaran = any(keyword.lower() in title.lower() or 
                          keyword.lower() in channel_title.lower() or 
                          keyword.lower() in description.lower() 
                          for keyword in damodaran_keywords)
        
        if is_damodaran:
            # Get video details for filtering
            details = get_video_details(video_id)
            if details and details.get('items'):
                video_info = details['items'][0]
                duration = video_info['contentDetails']['duration']
                view_count = int(video_info['statistics'].get('viewCount', 0))
                
                # Filter for longer, more educational videos
                if 'PT' in duration and view_count > 1000:  # At least 1000 views
                    damodaran_videos.append({
                        'video_id': video_id,
                        'title': title,
                        'channel': channel_title,
                        'url': f"https://www.youtube.com/watch?v={video_id}",
                        'duration': duration,
                        'views': view_count,
                        'description': description[:200] + "..." if len(description) > 200 else description
                    })
            
            time.sleep(0.1)  # Rate limiting
    
    return damodaran_videos

def scrape_damodaran_videos():
    """Main function to scrape Professor Damodaran's videos"""
    print("üéì Scraping Professor Damodaran's Videos for RAG Training")
    print("=" * 60)
    
    all_videos = []
    
    for term in SEARCH_TERMS:
        print(f"üîç Searching for: '{term}'")
        
        # Search for videos
        search_results = search_youtube_videos(term, max_results=15)
        if not search_results:
            continue
        
        # Filter for Damodaran videos
        damodaran_videos = filter_damodaran_videos(search_results)
        print(f"  üìπ Found {len(damodaran_videos)} relevant videos")
        
        all_videos.extend(damodaran_videos)
        time.sleep(0.5)  # Rate limiting between searches
    
    # Remove duplicates based on video_id
    unique_videos = {}
    for video in all_videos:
        if video['video_id'] not in unique_videos:
            unique_videos[video['video_id']] = video
    
    final_videos = list(unique_videos.values())
    
    print(f"\nüìä Scraping Summary:")
    print(f"  Total unique videos found: {len(final_videos)}")
    
    # Sort by view count (most popular first)
    final_videos.sort(key=lambda x: x['views'], reverse=True)
    
    # Display top videos
    print(f"\nüèÜ Top Professor Damodaran Videos for RAG Training:")
    for i, video in enumerate(final_videos[:10], 1):
        print(f"  {i}. {video['title']}")
        print(f"     üì∫ {video['url']}")
        print(f"     üëÄ {video['views']:,} views | ‚è±Ô∏è {video['duration']}")
        print(f"     üìù {video['description']}")
        print()
    
    return final_videos

# Run the scraper
damodaran_videos = scrape_damodaran_videos()

# Extract URLs for the RAG system
if damodaran_videos:
    video_urls = [video['url'] for video in damodaran_videos]
    print(f"‚úÖ Ready to process {len(video_urls)} Professor Damodaran videos!")
    print(f"üìã Video URLs extracted for RAG training")
    
    # Show first few URLs
    print(f"\nüîó First 5 video URLs:")
    for i, url in enumerate(video_urls[:5], 1):
        print(f"  {i}. {url}")
    
    if len(video_urls) > 5:
        print(f"  ... and {len(video_urls) - 5} more videos")
else:
    print("‚ùå No Professor Damodaran videos found")
    video_urls = []


In [None]:
# Check if youtube_transcript_api is available
try:
    import youtube_transcript_api
    print("‚úÖ youtube_transcript_api is available!")
except ImportError:
    print("‚ùå youtube_transcript_api not found. Please install it:")
    print("   Run in terminal: uv add youtube-transcript-api")
    print("   Or: pip install youtube-transcript-api")
    print("   Then restart your Jupyter kernel")

# Restart kernel and reload modules to fix import issues
import importlib
import sys

# Force reload the text_utils module
if 'aimakerspace.text_utils' in sys.modules:
    importlib.reload(sys.modules['aimakerspace.text_utils'])

try:
    from aimakerspace.text_utils import YouTubeLoader, MultiYouTubeLoader, CharacterTextSplitter
    from aimakerspace.vectordatabase import VectorDatabase
    import asyncio
    print("‚úÖ All imports successful!")
    
    # Multiple YouTube URLs (add as many as you want)
    youtube_urls = [
        "https://www.youtube.com/watch?v=fmlPJBxzpMo",  # Replace with your videos
        "https://www.youtube.com/watch?v=GzKq_p8IKqI",  # Uncomment to add more
        "https://www.youtube.com/watch?v=JMlAi0B-rlE",  # Uncomment to add more
        "https://www.youtube.com/watch?v=c20_S-QgvsA",
        "https://www.youtube.com/watch?v=znmQ7oMiQrM",
        "https://www.youtube.com/watch?v=Z5chrxMuBoo",
        "https://www.youtube.com/watch?v=c20_S-QgvsA",
        "https://www.youtube.com/watch?v=8vYQpWXQ5hE",
    ]
    
    # Load YouTube transcripts from multiple videos
    print(f"Loading transcripts from {len(youtube_urls)} YouTube video(s)...")
    
    if len(youtube_urls) == 1:
        # Single video (original behavior)
        youtube_loader = YouTubeLoader(youtube_urls[0])
        youtube_documents = youtube_loader.load_documents()
        print(f"‚úÖ Loaded {len(youtube_documents)} document(s) from YouTube")
        print(f"Video ID: {youtube_loader.get_video_info()['video_id']}")
        print(f"First 200 characters: {youtube_documents[0][:200]}...")
    else:
        # Multiple videos (new functionality)
        multi_loader = MultiYouTubeLoader(youtube_urls)
        youtube_documents = multi_loader.load_documents()
        
        # Get summary
        summary = multi_loader.get_summary()
        print(f"‚úÖ Successfully loaded: {summary['successful_videos']}/{summary['total_urls']} videos")
        print(f"üìÑ Total documents: {summary['total_documents']}")
        
        if summary['successful_videos'] > 0:
            print(f"First 200 characters: {youtube_documents[0][:200]}...")
        
        # Show which videos were successfully loaded
        successful_videos = multi_loader.get_successful_videos()
        print(f"\nüì∫ Successfully loaded videos:")
        for i, video_info in enumerate(successful_videos, 1):
            print(f"  {i}. {video_info['video_id']} - {video_info['url']}")
        
        # Show failed videos if any
        failed_videos = multi_loader.get_failed_videos()
        if failed_videos:
            print(f"\n‚ùå Failed to load videos:")
            for failed in failed_videos:
                print(f"  - {failed['url']}: {failed['error']}")
    
except ImportError as e:
    print(f"‚ùå Import error: {e}")
    print("Please restart your Jupyter kernel: Kernel -> Restart")
    print("If the error persists, the package might not be installed in the right environment.")

In [None]:
# Split the YouTube transcript into chunks
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
split_youtube_documents = text_splitter.split_texts(youtube_documents)

print(f"üìÑ Split into {len(split_youtube_documents)} chunks")
print(f"First chunk: {split_youtube_documents[0][:300]}...")


In [None]:
# Create vector database from YouTube content
print("Creating vector database from YouTube content...")
youtube_vector_db = VectorDatabase()
youtube_vector_db = asyncio.run(youtube_vector_db.abuild_from_list(split_youtube_documents))

print("‚úÖ YouTube vector database created!")
print(f"Number of vectors: {len(youtube_vector_db.vectors)}")


In [None]:
rag_pipeline_youtube = RetrievalAugmentedQAPipeline(
    vector_db_retriever=youtube_vector_db,
    llm=chat_openai,
    response_style="detailed",
    include_scores=True
)

result = rag_pipeline.run_pipeline(
    "What ways of investing are beneficial?",
    k=3,
    response_length="comprehensive", 
    include_warnings=True,
    confidence_required=True
)

print(f"Response: {result['response']}")
print(f"\nContext Count: {result['context_count']}")
print(f"Similarity Scores: {result['similarity_scores']}")          

In [None]:
# Test the YouTube RAG system
print("üîç Testing YouTube RAG system...")

# Example query about the video content
query = "What is the main topic discussed in this video?"
results = youtube_vector_db.search_by_text(query, k=3)

print(f"\nQuery: {query}")
print(f"Found {len(results)} relevant chunks:")
for i, (text, similarity) in enumerate(results):
    print(f"\n--- Result {i+1} (similarity: {similarity:.4f}) ---")
    print(text[:200] + "..." if len(text) > 200 else text)


## üéì Professor Damodaran Finance Questions Test

Let's test our RAG system with comprehensive finance and investment questions inspired by Professor Damodaran's expertise:


In [None]:
# Quick Test: Core Professor Damodaran Questions
print("üéØ Quick Test: Core Finance Questions")
print("=" * 50)

# Core questions for quick testing
core_questions = [
    "What ways of investing are beneficial?",  # Your original question
    "What is the DCF model and how do you use it?",
    "How do you calculate the cost of capital?",
    "What are the key principles of valuation?",
    "How do you assess risk in investments?"
]

for i, question in enumerate(core_questions, 1):
    print(f"\nüìù Question {i}: {question}")
    print("‚îÄ" * 50)
    
    try:
        result = rag_pipeline.run_pipeline(
            question,
            k=2,  # Fewer results for quicker testing
            response_length="concise",  # Shorter responses
            include_warnings=False,
            confidence_required=False
        )
        
        print(f"‚úÖ {result['response'][:200]}...")  # Show first 200 chars
        print(f"üìä Sources: {result['context_count']}")
        
    except Exception as e:
        print(f"‚ùå Error: {e}")

print(f"\n‚úÖ Quick test completed!")
