# Engineering Retrieval Augmented Generation Applications for the Enterprise with Pinecone

## Setup

### Dependencies

In [None]:
!pip install pandas marko openai pyarrow fastparquet pinecone-client ragas torch "rerankers[flashrank]"

: 

In [2]:
import os
import pandas as pd
from IPython.display import Markdown, display

### API Keys

In [None]:
# Load environment variables from .env file
import dotenv
dotenv.load_dotenv()

# API keys are now loaded from .env file
# os.environ["OPENAI_API_KEY"] is automatically set
# os.environ["PINECONE_API_KEY"] is automatically set

print("✅ API keys loaded from .env file")

******


### Read the data

In [5]:
import warnings
warnings.filterwarnings('ignore')

data = pd.read_parquet("https://storage.googleapis.com/pinecone-datasets-dev/pinecone_docs_ada-002/raw/file1.parquet")
data.head()

Unnamed: 0,id,text,source,metadata
0,728aeea1-1dcf-5d0a-91f2-ecccd4dd4272,# Scale indexes\n\n[Suggest Edits](/edit/scali...,https://docs.pinecone.io/docs/scaling-indexes,"{'created_at': '2023_10_25', 'title': 'scaling..."
1,2f19f269-171f-5556-93f3-a2d7eabbe50f,# Understanding organizations\n\n[Suggest Edit...,https://docs.pinecone.io/docs/organizations,"{'created_at': '2023_10_25', 'title': 'organiz..."
2,b2a71cb3-5148-5090-86d5-7f4156edd7cf,# Manage datasets\n\n[Suggest Edits](/edit/dat...,https://docs.pinecone.io/docs/datasets,"{'created_at': '2023_10_25', 'title': 'datasets'}"
3,1dafe68a-2e78-57f7-a97a-93e043462196,# Architecture\n\n[Suggest Edits](/edit/archit...,https://docs.pinecone.io/docs/architecture,"{'created_at': '2023_10_25', 'title': 'archite..."
4,8b07b24d-4ec2-58a1-ac91-c8e6267b9ffd,# Moving to production\n\n[Suggest Edits](/edi...,https://docs.pinecone.io/docs/moving-to-produc...,"{'created_at': '2023_10_25', 'title': 'moving-..."


In [6]:
display(Markdown(data.iloc[0]['text']))

# Scale indexes

[Suggest Edits](/edit/scaling-indexes)In this topic, we explain how you can scale your indexes horizontally and vertically.


Projects in the `gcp-starter` environment do not support the features referred to here, including pods, replicas, and collections.


## Vertical vs. horizontal scaling


If you need to scale your environment to accommodate more vectors, you can modify your existing index to scale it vertically or create a new index and scale horizontally. This article will describe both methods and how to scale your index effectively. 


## Vertical scaling


Scaling vertically is fast and involves no downtime. This is a good choice when you can't pause upserts and must continue serving traffic. It also allows you to double your capacity instantly. However, there are some factors to consider.


By [changing the pod size](manage-indexes#changing-pod-sizes), you can scale to x2, x4, and x8 pod sizes, which means you are doubling your capacity at each step. Moving up to a new capacity will effectively double the number of pods used at each step. If you need to scale by smaller increments, then consider horizontal scaling. 


The number of base pods you specify when you initially create the index is static and cannot be changed. For example, if you start with 10 pods of `p1.x1` and vertically scale to `p1.x2`, this equates to 20 pods worth of usage. Neither can you change pod types with vertical scaling. If you want to change your pod type while scaling, then horizontal scaling is the better option. 


You can only scale index sizes up and cannot scale them back down.


See our learning center for more information on [vertical scaling](https://www.pinecone.io/learn/testing-p2-collections-scaling/#vertical-scaling-on-p1-and-s1).


## Horizontal scaling


There are two approaches to horizontal scaling in Pinecone: adding pods and adding replicas. Adding pods increases all resources but requires a pause in upserts; adding replicas only increases throughput and requires no pause in upserts.


### Adding pods


Adding pods to an index increases all resources, including available capacity. Adding pods to an existing index is possible using our <collections> feature. A collection is an immutable snapshot of your index in time: a collection stores the data but not the original index definition.


When you [create an index from a collection](manage-indexes#create-an-index-from-a-collection), you define the new index configuration. This allows you to scale the base pod count horizontally without scaling vertically. The main advantage of this approach is that you can scale incrementally instead of doubling capacity as with vertical scaling. Also, you can redefine pod types if you are experimenting or if you need to use a different pod type, such asperformance-optimized pods or storage-optimized pods. Another advantage of this method is that you can change your [metadata configuration](manage-indexes#selective-metadata-indexing) to redefine metadata fields as indexed or stored-only. This is important when [tuning your index](performance-tuning) for the best throughput. 


Here are the general steps to make a copy of your index and create a new index while changing the pod type, pod count, metadata configuration, replicas, and all typical parameters when creating a new collection: 


1. Pause upserts.
2. Create a collection from the current index.
3. Create an index from the collection with new parameters.
4. Continue upserts to the newly created index. Note: the URL has likely changed.
5. Delete the old index if desired.


### Adding replicas


Each replica duplicates the resources and data in an index. This means that adding additional replicas increases the throughput of the index but not its capacity. However, adding replicas does not require downtime.


Throughput in terms of queries per second (QPS) scales linearly with the number of replicas per index.


To add replicas, use the `configure_index` operation to [increase the number of replicas for your index](manage-indexes#replicas).


## Next steps


* See our learning center for more information on [vertical scaling](https://www.pinecone.io/learn/testing-p2-collections-scaling/#vertical-scaling-on-p1-and-s1).
* Learn more about <collections>.
Updated 29 days ago 



---

* [Table of Contents](#)
* + [Vertical vs. horizontal scaling](#vertical-vs-horizontal-scaling)
	+ [Vertical scaling](#vertical-scaling)
	+ [Horizontal scaling](#horizontal-scaling)
		- [Adding pods](#adding-pods)
		- [Adding replicas](#adding-replicas)
	+ [Next steps](#next-steps)


## Chunking

### Markdown Splitting

In [7]:
from typing import List, Tuple
import marko
from marko.md_renderer import MarkdownRenderer

# Custom renderer class to add a newline after each paragraph
class CustomMarkdownRenderer(MarkdownRenderer):
    def render_paragraph(self, element):
        return super().render_paragraph(element) + "\n"

# Function to create chunks from markdown text
def markdown_chunker(markdown_text: str, max_chunk_size: int = 1000) -> List[str]:
    # Parse the markdown text
    parsed = marko.parse(markdown_text)
    # Initialize the custom markdown renderer
    md_renderer = CustomMarkdownRenderer()

    # Function to process each element in the parsed markdown
    def process_element(element) -> List[Tuple[str, bool]]:
        if isinstance(element, str):
            return [(element, False)]
        
        # Render the element to markdown text
        rendered_text = md_renderer.render(element) + "\n"
        # Check if the element is a heading
        is_heading = isinstance(element, marko.block.Heading)
        
        result = [(rendered_text, is_heading)]
        
        # Recursively process child elements if they exist
        if hasattr(element, 'children'):
            for child in element.children:
                result.extend(process_element(child))
        
        return result

    # Function to combine processed elements into chunks
    def combine_chunks(elements: List[Tuple[str, bool]]) -> List[str]:
        chunks = []
        current_chunk = ""
        
        for text, is_heading in elements:
            # If the current element is a heading and there's content in the current chunk, finalize the current chunk
            if is_heading and current_chunk:
                chunks.append(current_chunk.strip())
                current_chunk = ""
            
            # If adding the current text exceeds the max chunk size, finalize the current chunk
            if len(current_chunk) + len(text) > max_chunk_size and current_chunk:
                chunks.append(current_chunk.strip())
                current_chunk = ""
            
            # Add the current text to the current chunk
            current_chunk += text

        # Add any remaining content as the last chunk
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        # Remove duplicate chunks
        unique_chunks = []
        seen_chunks = set()
        for chunk in chunks:
            if chunk not in seen_chunks:
                unique_chunks.append(chunk)
                seen_chunks.add(chunk)
        
        return unique_chunks

    # Process the parsed markdown elements
    processed_elements = process_element(parsed)
    # Combine the processed elements into chunks
    return combine_chunks(processed_elements[1:])

### Recursive Chunking

In [1]:
from typing import List
import re

def recursive_chunker(text: str, min_chunk_size: int = 100, max_chunk_size: int = 500) -> List[str]:
    # Function to split text into sentences using regular expressions
    def split_into_sentences(text: str) -> List[str]:
        # Regular expression to identify sentence endings
        sentence_endings = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s')
        return sentence_endings.split(text)

    # Function to split text into paragraphs based on double newline characters
    def split_into_paragraphs(text: str) -> List[str]:
        return text.split('\n\n')

    # Function to chunk text into smaller pieces based on min and max size constraints
    def chunk_text(text: str, min_size: int, max_size: int) -> List[str]:
        paragraphs = split_into_paragraphs(text)  # Split text into paragraphs
        chunks = []  # List to store the resulting chunks
        current_chunk = ""  # Variable to accumulate the current chunk

        for paragraph in paragraphs:
            # If adding the paragraph to the current chunk doesn't exceed max size, add it
            if len(current_chunk) + len(paragraph) <= max_size:
                current_chunk += paragraph + "\n\n"
            else:
                # If the current chunk is not empty, add it to the chunks list
                if current_chunk:
                    chunks.append(current_chunk.strip())
                
                # If the paragraph itself is larger than max size, split it into sentences
                if len(paragraph) > max_size:
                    sentences = split_into_sentences(paragraph)
                    for sentence in sentences:
                        # If the sentence fits within max size, add it to the current chunk
                        if len(sentence) <= max_size:
                            if len(current_chunk) + len(sentence) > max_size:
                                if current_chunk:
                                    chunks.append(current_chunk.strip())
                                current_chunk = sentence + " "
                            else:
                                current_chunk += sentence + " "
                        else:
                            # If the sentence is too large, split it into smaller pieces
                            if current_chunk:
                                chunks.append(current_chunk.strip())
                            chunks.extend([sentence[i:i+max_size] for i in range(0, len(sentence), max_size)])
                            current_chunk = ""
                else:
                    current_chunk = paragraph + "\n\n"

        # Add any remaining text in the current chunk to the chunks list
        if current_chunk:
            chunks.append(current_chunk.strip())

        # Filter out chunks that are smaller than the minimum size
        return [chunk for chunk in chunks if len(chunk) >= min_size]

    # Initial chunking of the text
    chunks = chunk_text(text, min_chunk_size, max_chunk_size)
    
    # Additional pass to ensure all chunks are within size limits
    final_chunks = []
    seen = set()  # Set to keep track of seen chunks to avoid duplicates
    for chunk in chunks:
        if len(chunk) <= max_chunk_size:
            if chunk not in seen:
                final_chunks.append(chunk)
                seen.add(chunk)
        else:
            # If a chunk is too large, re-chunk it
            for sub_chunk in chunk_text(chunk, min_chunk_size, max_chunk_size):
                if sub_chunk not in seen:
                    final_chunks.append(sub_chunk)
                    seen.add(sub_chunk)

    return final_chunks

## Embedding and upserting

### Setup

Initalize clients

In [9]:
from openai import OpenAI
from pinecone import Pinecone, ServerlessSpec

# Initialize OpenAI client
openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

# Initialize Pinecone client
pinecone_client = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))

Create index

In [11]:
index_name = "markdown-chunks"

In [12]:
# Create or get existing index
if index_name not in [index['name'] for index in pinecone_client.list_indexes()]:
    pinecone_client.create_index(
        name=index_name,
        dimension=1536,  # OpenAI embeddings are 1536 dimensions
        metric='cosine',
        spec=ServerlessSpec(cloud='aws', region='us-west-2')
    )

index = pinecone_client.Index(index_name)

### Embedding

In [1]:
def embed_and_upsert_chunk_with_metadata(chunk, metadata):
    # Generate embedding
    response = openai_client.embeddings.create(
        model="text-embedding-ada-002",
        input=chunk,
    )
    embedding = response.data[0].embedding

    # Upsert to Pinecone
    index.upsert(
        vectors=[(f"chunk_{metadata['id']}", embedding, metadata)],
    )

### Upserts

In [21]:
chunks_with_metadata = []

for idx, row in data.iterrows():
    text_chunks = markdown_chunker(row['text'])
    for cidx, chunk in enumerate(text_chunks):
        metadata = row['metadata'].copy()
        metadata['id'] = f"{row['id']}_{cidx}"
        metadata['chunk_text'] = chunk
        chunks_with_metadata.append((chunk, metadata))
        

for chunk, metadata in chunks_with_metadata:
    embed_and_upsert_chunk_with_metadata(chunk, metadata)

## Retrieval

### Naive Retrieval

In [14]:
def retrieve_chunks(query_text, num_chunks=3):
    query_embedding = openai_client.embeddings.create(
        model="text-embedding-ada-002",
        input=query_text
    ).data[0].embedding
    
    results = index.query(vector=query_embedding, top_k=num_chunks, include_metadata=True)

    return [chunk['metadata']['chunk_text'] for chunk in results.matches]
    

### Retrieval with Reranking

In [40]:
from rerankers import Reranker, Document

def retrieve_chunks(query_text, num_chunks=3):
    query_embedding = openai_client.embeddings.create(
        model="text-embedding-ada-002",
        input=query_text
    ).data[0].embedding
    
    # Increase the number of chunks to 2 times the number of chunks requested
    results = index.query(vector=query_embedding, top_k=num_chunks * 2, include_metadata=True)
    
    try:
        # Initialize reranker with a specific model
        ranker = Reranker('flashrank')
        # Prepare documents for reranking
        docs = [Document(text=chunk['metadata']['chunk_text'], doc_id=i) for i, chunk in enumerate(results.matches)]
        # Rerank the documents
        reranked_results = ranker.rank(query=query_text, docs=docs)
        # Return the top num_chunks reranked results
        return [result.document.text for result in reranked_results.top_k(num_chunks)]
    except Exception as e:
        print(f"Error in retrieve_chunks: {e}")
        # Fallback to original results if reranking fails
        return [chunk['metadata']['chunk_text'] for chunk in results.matches[:num_chunks]]

## Augmentation

In [28]:
def augmented_query(query_text, context, num_chunks=5):
  prompt = f"""
  Given the following context, answer the following question.
  Context: {context}
  Query: {query_text}
  """
  response = openai_client.chat.completions.create(
    model="gpt-4o",
    messages=[
      {"role": "system", "content": prompt}
    ]
  )

  return response.choices[0].message.content

### E2E RAG Flow

In [18]:
def rag_flow():
    user_input = input("Please enter your query: ")
    context = retrieve_chunks(user_input)    
    augmented_query_result = augmented_query(user_input, "\n".join(context))
    display(Markdown(f"**Answer:** {augmented_query_result}"))
  

In [19]:
rag_flow()

Please enter your query: How do you create an index in Pinecone using Python?


**Answer:** To create an index in Pinecone using Python, you can use the `pinecone.create_index()` method. In the provided context, the index is created with the name `langchain-retrieval-augmentation`, a cosine similarity metric, and a dimension of 1536 (taken from the length of `res[0]`).

The specific code snippet for creating the index would look like this:

```python
index_name = 'langchain-retrieval-augmentation'
pinecone.create_index(
    name=index_name,
    metric='cosine',
    dimension=len(res[0])  # 1536 dim of text-embedding-ada-002
)
```

After creating the index, you can access it using `index = pinecone.Index(index_name)`.

# Evaluation

In [20]:
from langchain.docstore.document import Document


### Load the docs

In [21]:
documents = []
for _, row in data.iterrows():
    text = row['text']
    metadata = {
        'id': row['id'],
        'metadata': row['metadata'],
        'filename': row['source']
    }
    doc = Document(page_content=text, metadata=metadata)
    documents.append(doc)

### Test set generation

In [22]:
from ragas.testset.generator import TestsetGenerator
from ragas.testset.evolutions import simple, reasoning, multi_context
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# generator with openai models
generator_llm = ChatOpenAI(model="gpt-4o-mini")
critic_llm = ChatOpenAI(model="gpt-4")
embeddings = OpenAIEmbeddings()

generator = TestsetGenerator.from_langchain(
    generator_llm,
    critic_llm,
    embeddings
)

# generate testset
testset = generator.generate_with_langchain_docs(documents, test_size=10, distributions={simple: 0.5, reasoning: 0.25, multi_context: 0.25})

embedding nodes:   0%|          | 0/284 [00:00<?, ?it/s]

Generating:   0%|          | 0/10 [00:00<?, ?it/s]

In [23]:
testset.to_pandas().head()


Unnamed: 0,question,contexts,ground_truth,evolution_type,metadata,episode_done
0,What steps must be taken to shut down the infe...,"[s"": batch}\n )\n emb = res.json()['embe...","To shut down the inference endpoint, navigate ...",simple,[{'id': '73f924b2-7b1d-5fb8-826a-9a4cd71f8a49'...,True
1,What is the purpose of the upsert operation in...,[# Insert data\n\n[Suggest Edits](/edit/insert...,The purpose of the upsert operation in the con...,simple,[{'id': '8199e897-1a6e-58be-8d2c-eed69c360275'...,True
2,What is the purpose of genre filtering in the ...,[ 'namespace': ''}\n\n```\n\ncurl\n```python\n...,The purpose of genre filtering in the context ...,simple,[{'id': 'c4fc0451-bfca-5ab8-a2b4-19c5bafb59b3'...,True
3,What is the recommended method for upserting l...,"[4]\n },\n {\n ""id"": ""E"",\n ""values"": ...",The recommended method for upserting larger am...,simple,[{'id': '808d96b1-2045-508d-8c2b-6ad4610649bf'...,True
4,What are the two methods for updating records ...,"[# 'id-2': {'id': 'id-2',\n# ...",The two methods for updating records in Pineco...,simple,[{'id': '2335f977-beb6-5ea2-b524-ba0fb7fdbe3f'...,True


In [29]:
def generate_test_results(testset):
    results = []
    for row in testset.itertuples():
        query = row.question
        contexts = retrieve_chunks(query)
        context_str = "\n".join(contexts)
        answer = augmented_query(query, context_str)
        results.append({
            "question": query,
            "answer": answer,
            "contexts": contexts,
            "ground_truth": row.ground_truth
        })
    return results

# Generate results for the test set
test_results = generate_test_results(testset.to_pandas())

In [25]:
pd.DataFrame(test_results, columns=['question', 'answer', 'contexts', 'ground_truth'])

Unnamed: 0,question,answer,contexts,ground_truth
0,What steps must be taken to shut down the infe...,To shut down the inference endpoint in the con...,[All of these results look excellent. If you a...,"To shut down the inference endpoint, navigate ..."
1,What is the purpose of the upsert operation in...,The purpose of the upsert operation in the con...,[import { Pinecone } from '@pinecone-database/...,The purpose of the upsert operation in the con...
2,What is the purpose of genre filtering in the ...,The purpose of genre filtering in the context ...,[### Advantages\n\nAdvantages\nAdvantages\n\n*...,The purpose of genre filtering in the context ...
3,What is the recommended method for upserting l...,The recommended method for upserting larger am...,"[Pinecone lets us efficiently ingest, update a...",The recommended method for upserting larger am...
4,What are the two methods for updating records ...,The two methods for updating records in Pineco...,[Access the [Pinecone Console](https://app.pin...,The two methods for updating records in Pineco...
5,What should an index have for effective queryi...,"To effectively query sparse-dense vectors, an ...",[## Querying sparse-dense vectors\n\nQuerying ...,For effective querying of sparse-dense vectors...
6,Which URL part shows the project ID in Pinecone?,The portion of the index URL after the index n...,[## Project ID\n\nProject ID\nProject ID\n\nEa...,The project ID in Pinecone is shown in the ind...
7,What filters are needed for retrieving docs fr...,To retrieve documents from a specific year in ...,"[This example deletes all vectors with genre ""...",To retrieve documents from a specific year in ...
8,What steps to set up a secret in AWS Secrets M...,To set up a secret in AWS Secrets Manager for ...,[### Setting up secrets\n\nSetting up secrets\...,To set up a secret in AWS Secrets Manager for ...
9,"What ops can modify records in Pinecone API, a...",The operations that can modify records in the ...,[import { Pinecone } from '@pinecone-database/...,The operations that can modify records in the ...


In [30]:

# Convert results to a Hugging Face Dataset
from datasets import Dataset
evaluation_dataset = Dataset.from_list(test_results)

# Now you can use this dataset for evaluation
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
    context_entity_recall,
    answer_similarity,
    answer_correctness    
)

result = evaluate(
    evaluation_dataset,
    metrics=[
        context_precision,
        faithfulness,
        answer_relevancy,
        context_recall,        
        context_entity_recall,
        answer_similarity,
        answer_correctness
    ],
)



Evaluating:   0%|          | 0/70 [00:00<?, ?it/s]

In [31]:
result_df = pd.DataFrame(result.items(), columns=['Metric', 'Value'])
result_df

Unnamed: 0,Metric,Value
0,context_precision,0.9
1,faithfulness,0.703333
2,answer_relevancy,0.95217
3,context_recall,0.75
4,context_entity_recall,0.365
5,answer_similarity,0.941639
6,answer_correctness,0.590436


In [27]:
result_df = pd.DataFrame(result.items(), columns=['Metric', 'Value'])
result_df

Unnamed: 0,Metric,Value
0,context_precision,0.941667
1,faithfulness,0.707143
2,answer_relevancy,0.973118
3,context_recall,0.65
4,context_entity_recall,0.3025
5,answer_similarity,0.952995
6,answer_correctness,0.563795


# Pinecone Assistant

## Enterprise RAG - Easy mode

* Simplicity: You can easily add their files and start building AI applications using the API. 
* High-quality results: It provides relevant answers grounded in the user's data, along with references. 
* Comprehensive infrastructure: It handles all system operations, including chunking, embedding, file storage, query planning, vector search, model orchestration, and reranking. 
* Accuracy and reliability: Pinecone Assistant focuses on delivering high-quality and dependable answers 
* Enterprise-grade security: Even in beta, it's powered by components that meet strict compliance requirements. 
* Data protection and control: User data is encrypted, isolated, and used only as context for answers in real-time, without permanently fine-tuning or training the underlying language model.

In [22]:
!pip install --upgrade "pinecone-client[grpc]" pinecone-plugin-assistant

In [25]:
metadata = {"author": "Roie Schwaber-Cohen", "version": "1.0"}

assistant = pinecone_client.assistant.create_assistant(
    assistant_name="datacamp-demo", 
    metadata=metadata, 
    timeout=30 # Wait 30 seconds for assistant operation to complete.
)

List assistants

In [28]:
assistants = pinecone_client.assistant.list_assistants()
assistants

In [38]:
import os

# Create a directory to save the text files if it doesn't exist
output_dir = "text_files"
os.makedirs(output_dir, exist_ok=True)

# Save the first five rows of the "text" field into txt files
for idx, row in data.iterrows():
    file_path = os.path.join(output_dir, f"text_{idx}.txt")
    with open(file_path, "w") as file:
        file.write(row["text"])

# Return the path where the files are saved
output_dir

In [39]:
import os

# List the contents of the text_files directory
output_dir = "text_files"
files = os.listdir(output_dir)
files

In [40]:
# Iterate over each file in the text_files directory and upload it using the assistant
for file_name in files:
    file_path = os.path.join(output_dir, file_name)
    response = assistant.upload_file(
        file_path=file_path,
        timeout=None
    )

## Chat with the Assistnat

In [35]:
from pinecone_plugins.assistant.models.chat import Message

In [41]:
chat_context = [Message(content='How do I create an index in Python?')]
response = assistant.chat_completions(messages=chat_context)

In [43]:
display(Markdown(response.choices[0].message.content))