# Putting the R(etrieval) in RAG

In [None]:
!pip install pinecone openai

In [None]:
from openai import OpenAI
from pinecone import Pinecone, ServerlessSpec
import hashlib
from datetime import datetime

from tqdm import tqdm
from google.colab import userdata

In [None]:
# Retrieve the Pinecone API key from user data
pinecone_key = userdata.get('PINECONE_API_KEY')

# Initialize the OpenAI client with the API key from user data
client = OpenAI(
    api_key=userdata.get('OPENAI_API_KEY')
)

# Define constants for the Pinecone index, namespace, and engine
INDEX_NAME = 'semantic-search-rag'  # The name of the Pinecone index
NAMESPACE = 'default'  # The namespace to use within the index
ENGINE = 'text-embedding-3-small'  # The embedding model to use (vector size 1,536)

# Initialize the Pinecone client with the retrieved API key
pc = Pinecone(
    api_key=pinecone_key
)


In [None]:
# Function to get embeddings for a list of texts using the OpenAI API
def get_embeddings(texts, engine=ENGINE):
    # Create embeddings for the input texts using the specified engine
    response = client.embeddings.create(
        input=texts,
        model=engine
    )

    # Extract and return the list of embeddings from the response
    return [d.embedding for d in list(response.data)]

# Function to get embedding for a single text using the OpenAI API
def get_embedding(text, engine=ENGINE):
    # Use the get_embeddings function to get the embedding for a single text
    return get_embeddings([text], engine)[0]

# Test the functions by getting the length of a single embedding and a list of embeddings
len(get_embedding('hi')), len(get_embeddings(['hi', 'hello']))

In [None]:
if INDEX_NAME not in pc.list_indexes().names():  # need to create the index
    print(f'Creating index {INDEX_NAME}')
    pc.create_index(
        name=INDEX_NAME,  # The name of the index
        dimension=1536,  # The dimensionality of the vectors for our OpenAI embedder
        metric='cosine',  # The similarity metric to use when searching the index
        spec=ServerlessSpec(
            cloud='aws',
            region='us-east-1'
        )
    )

# Store the index as a variable
index = pc.Index(name=INDEX_NAME)
index

In [None]:
index.describe_index_stats()

In [None]:
def my_hash(s):
    # Return the MD5 hash of the input string as a hexadecimal string
    return hashlib.md5(s.encode()).hexdigest()

my_hash('I love to hash it')

In [None]:
def prepare_for_pinecone(texts, engine=ENGINE, urls=None):
    # Get the current UTC date and time
    now = datetime.utcnow()

    # Generate vector embeddings for each string in the input list, using the specified engine
    embeddings = get_embeddings(texts, engine=engine)

    # Create tuples of (hash, embedding, metadata) for each input string and its corresponding vector embedding
    # The my_hash() function is used to generate a unique hash for each string, and the datetime.utcnow() function is used to generate the current UTC date and time
    responses = [
        (
            my_hash(text),  # A unique ID for each string, generated using the my_hash() function
            embedding,  # The vector embedding of the string
            dict(text=text, date_uploaded=now)  # A dictionary of metadata, including the original text and the current UTC date and time
        )
        for text, embedding in zip(texts, embeddings)  # Iterate over each input string and its corresponding vector embedding
    ]
    if urls and len(urls) == len(texts):
        for response, url in zip(responses, urls):
            response[-1]['url'] = url

    return responses


In [None]:
texts = ['hi']

In [None]:
_id, embedding, metadata = prepare_for_pinecone(texts)[0]

print('ID:  ',_id, '\nLEN: ', len(embedding), '\nMETA:', metadata)

In [None]:
urls = ['fake.url']
_id, embedding, metadata = prepare_for_pinecone(texts, urls=urls)[0]

print('ID:  ',_id, '\nLEN: ', len(embedding), '\nMETA:', metadata)

In [None]:
def upload_texts_to_pinecone(texts, namespace=NAMESPACE, batch_size=None, show_progress_bar=False, urls=None):
    # Call the prepare_for_pinecone function to prepare the input texts for indexing
    total_upserted = 0
    if not batch_size:
        batch_size = len(texts)

    _range = range(0, len(texts), batch_size)
    for i in tqdm(_range) if show_progress_bar else _range:
        text_batch = texts[i: i + batch_size]
        if urls:
            url_batch = urls[i: i + batch_size]
            prepared_texts = prepare_for_pinecone(text_batch, urls=url_batch)
        else:
            prepared_texts = prepare_for_pinecone(text_batch)


        # Use the upsert() method of the index object to upload the prepared texts to Pinecone
        total_upserted += index.upsert(
            vectors=prepared_texts,
            namespace=namespace
        )['upserted_count']


    return total_upserted

In [None]:
# Call the upload_texts_to_pinecone() function with the input texts
upload_texts_to_pinecone(texts)

index.describe_index_stats()

In [None]:
texts

In [None]:
def query_from_pinecone(query, top_k=3, include_metadata=True):
    # get embedding from THE SAME embedder as the documents
    query_embedding = get_embedding(query, engine=ENGINE)

    return index.query(
      vector=query_embedding,
      top_k=top_k,
      namespace=NAMESPACE,
      include_metadata=include_metadata   # gets the metadata (dates, text, etc)
    ).get('matches')

In [None]:
# test that the index is empty
query_from_pinecone('hello')

In [None]:
import hashlib

def delete_texts_from_pinecone(texts, namespace=NAMESPACE):
    # Compute the hash (id) for each text
    hashes = [hashlib.md5(text.encode()).hexdigest() for text in texts]

    # The ids parameter is used to specify the list of IDs (hashes) to delete
    return index.delete(ids=hashes, namespace=namespace)

# delete our text
delete_texts_from_pinecone(texts)


In [None]:
index.describe_index_stats()

In [None]:
base_url = 'https://developer.mozilla.org'
mdn_web_docs_url = base_url + '/en-US/docs/Web'
print(mdn_web_docs_url)

from bs4 import BeautifulSoup
import requests

# get all links from mdn_web_docs_url
urls = []

# Adding headers to mimic a browser, as seen in previous attempts
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'en-US,en;q=0.5',
    'Accept-Encoding': 'gzip, deflate, br',
    'Connection': 'keep-alive',
    'Upgrade-Insecure-Requests': '1',
    'Referer': 'https://www.google.com/'
}

r = requests.get(mdn_web_docs_url, headers=headers, timeout=10)
soup = BeautifulSoup(r.content, 'html.parser')

# Collect and filter relevant URLs
filtered_urls = []
# List of file extensions to exclude
excluded_extensions = ['.txt', '.xml', '.json', '.rss', '.atom', '.pdf', '.zip', '.tar.gz', '.csv']

for link in soup.find_all('a'):
    if 'href' in link.attrs:
        href = link['href']
        full_url_candidate = None

        # Handle relative URLs
        if href.startswith('/'):
            full_url_candidate = base_url + href
        # Handle absolute URLs that are still within the base domain
        elif href.startswith(base_url):
            full_url_candidate = href

        if full_url_candidate:
            # Filter for specific documentation pages under /en-US/docs/Web/
            # and ensure it's not the main mdn_web_docs_url itself or an anchor link.
            # Also, exclude URLs ending with common non-HTML file extensions.
            if full_url_candidate.startswith(base_url + '/en-US/docs/Web/') and \
               full_url_candidate != mdn_web_docs_url and \
               '#' not in full_url_candidate and \
               not any(full_url_candidate.endswith(ext) for ext in excluded_extensions): # Exclude non-HTML files
                # Ensure it's a sub-page, not just the category page (by checking path segments)
                if full_url_candidate.count('/') > mdn_web_docs_url.count('/') or \
                   (full_url_candidate.count('/') == mdn_web_docs_url.count('/') and full_url_candidate.endswith('/')):
                    if full_url_candidate not in filtered_urls: # Avoid duplicates
                        filtered_urls.append(full_url_candidate)
urls = filtered_urls # Update the urls variable with the filtered list

urls

In [None]:
texts = []
for url in tqdm(urls):
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    body = soup.find('body').get_text()
    # CLEAN YOUR DATA HERE :)
    texts.append(body)

texts[0]

In [None]:
BATCH_SIZE = 4
upload_texts_to_pinecone(texts, batch_size=BATCH_SIZE, urls=urls, show_progress_bar=True)

In [None]:
index.describe_index_stats()

In [None]:
 results = query_from_pinecone('I want to write HTML', top_k=3)
 for result in results:
    print(result['metadata']['url'], result['score'], result['metadata']['text'][:50])