<div style="background-color: #FFDDDD; border-left: 5px solid red; padding: 10px; color: black;">
    <strong>Kernel: PySpark
</div>

## Lab 03. Build Retrival Augmented Generation System using Amazon EMR Spark Distributed Processing and OpeSearch Vector Database
---

## Contents

- [Overview](#overview)
- [Connect to an Existing EMR Cluster](#connect-to-an-existing-emr-cluster)
- [Upload Files from Local to S3](#upload-files-from-local-to-s3)
- [Convert PDF to Text](#convert-pdf-to-text)
- [Run Parallelized Embeddings using Amazon EMR (EC2 Spark based Processing)](#run-parallelized-embeddings-using-amazon-emr-ec2-spark-based-processing)
- [Putting it All Together](#putting-it-all-together)


In this notebook we demonstrate how you can build a Retrival Augmented Generation System using the following components,
1. Embedding Model: `BAAI/bge-base-en-v1.5`
2. Text Generation Model: `meta-/llama2-7b-chat`
3. Vector Database: OpenSearch as Vector Database to store embeddings
4. StreamLit UI: A Chat Interface to talk to your documents

## Connect to an Existing EMR Cluster

### Why an empty cell you ask?

Let's connect to an EMR Cluster while at this cell. Click `Cluster` button on the top right section of this JupyterLab window > Select a `Cluster` > Click Connect > Select `No Credentials` and `Voila`!

<div style="background-color: #FFDDDD; border-left: 5px solid red; padding: 10px; color: black;"> Stop! Please read this!
</div>


Running `%%help` provides you will all the auto-magic commands supported by SageMaker PySpark cell.

In [None]:
%%help

## Upload Files from Local to S3

Before we build a Retrieval Augmented Generation System, we need data! For this lab we're going to leverage the following PDF samples,
1. Amazon SageMaker Dev Guide (1047 pages)
2. EC2 Developer Guide (688 pages)
3. S3 Developer Guide (546 pages)

We're going to build a helper RAG bot that's able to look at developer guides and answer a user's question. You will be able to extend this notebook to build a RAG bot with your own developer guides, internal wiki pages, legacy system guides and much more. We're going to be processing a total of ~2300 pages for this example. 

---
Running `%%local` will ensure your code is executed on your Space's compute Instance (ex: ml.t3.medium, ml.t3.large, etc..) rather than EMR instances

In [None]:
%%local
import os
import json
import glob
import boto3
import sagemaker
from tqdm import tqdm

In [None]:
%%local
REGION = "us-west-2"
sess = sagemaker.Session()
default_bucket = sess.default_bucket()
s3_client = boto3.client("s3")

print(f"Using default bucket ---> {default_bucket}")

The sample pdf files are places under `./AWSGuides` folder of this lab. We're going to upload this to an S3 bucket where these documents are going to be accessible to the EMR for reading and processing! 

In [None]:
%%local
def upload_raw_pdf_files_to_bucket(destination_bucket, destination_prefix, raw_pdf_files):
    
    print(f"Uploading ---> {len(raw_pdf_files)} files!")
    
    uploaded_file_s3uris = []
    for pdf_file in tqdm(raw_pdf_files, total=len(raw_pdf_files)):
        pdf_fname = os.path.basename(pdf_file).replace(",", "").replace(" ", "-")
        
        pdf_dest_prefix = os.path.join(destination_prefix, pdf_fname)
        
        s3_client.upload_file(
            pdf_file, 
            destination_bucket, 
            pdf_dest_prefix
        )
        uploaded_file_s3uris.append(f"s3://{destination_bucket}/{pdf_dest_prefix}")
    
    return uploaded_file_s3uris

pdf_files_to_upload = glob.glob("./AWSGuides/*.pdf")

destination_prefix = "Lab03/raw-pdfs"

files_paths_in_s3 = upload_raw_pdf_files_to_bucket(
    destination_bucket=default_bucket, 
    destination_prefix=destination_prefix,
    raw_pdf_files=pdf_files_to_upload
)

print(f"Uploaded files to ---> {files_paths_in_s3}")

We need to ensure that we have a synchronization of certain global variables between our local machine learning instance and a remote EMR Compute instance. For example variables like S3 paths, region information, service names, etc.. To enable this we're going to use the `%%send_to_spark` magic command. `%%send_to_spark` makes it easy for users to achieve a sync between local and EMR instances. The cells below are sending,
1. Workflow Region Information
2. S3 bucket name
3. S3 documents prefix paths

In [None]:
%%send_to_spark -i REGION -t str -n REGION

In [None]:
%%send_to_spark -i destination_prefix -t str -n SRC_FILE_PREFIX

In [None]:
%%send_to_spark -i default_bucket -t str -n SRC_BUCKET_NAME

## Convert PDF to Text

---
A PDF (Portable Document Format) file is a versatile file format that preserves document formatting across various platforms. Converting PDF files into text is necessary for processing because it allows for the extraction, manipulation, and analysis of the content programmatically, which is not directly feasible with the fixed-format nature of PDFs. This conversion enables Python programmers to apply text processing techniques such as data mining, searching, or natural language processing.


**Note:** Observe the cells below are not using any magic commands, implying that these blocks of code are executed on PySpark remotely from our local SageMaker compute instance.

In [None]:
import os
import boto3
import json
from PyPDF2 import PdfReader
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader, PyPDFDirectoryLoader
import io

In [None]:
print(f"Source bucket and prefix to read pdf files ---> {SRC_BUCKET_NAME} {SRC_FILE_PREFIX}")

We need to ensure that our sample PDF files in S3 are actually discoverable from inside our EMR Cluster. The basic way to validate this is by listing files inside an S3 Bucket

In [None]:
def list_files_in_s3_bucket_prefix(bucket_name, prefix):
    
    s3 = boto3.client('s3')

    # Paginate through the objects in the specified bucket and prefix, and collect all keys (file paths)
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    file_paths = []
    for page in page_iterator:
        if "Contents" in page:
            for obj in page["Contents"]:
                if os.path.basename(obj["Key"]):
                    file_paths.append(obj["Key"])

    return file_paths

all_pdf_files = list_files_in_s3_bucket_prefix(
    bucket_name=SRC_BUCKET_NAME, 
    prefix=SRC_FILE_PREFIX
)
print(f"Found {len(all_pdf_files)} files ---> {all_pdf_files}")

Success!

Our files are discoverable from inside an EMR Cluster. Next, we're going to build a list of file references. Why? So we can parallelize the read operation across EMR nodes. Each EMR node will be responsible for reading a file from S3 and loading it into memory.

In [None]:
all_pdf_files = [(SRC_BUCKET_NAME, fpath) for fpath in all_pdf_files]
type(all_pdf_files)

If you observe, the data type for our list above is a python `List`. Spark doesnt natively parallelize a python `List` but we need to convert it into a Spark RDD which PySpark will then parallelize any task we run with an RDD downstream

In [None]:
pdfs_rdd = spark.sparkContext.parallelize(all_pdf_files)
type(pdfs_rdd)

The cell below enables Each code node reaches out a pdf file from our list of references, downloads the pdf file into memory and returns a PyPDF2 class reference for downstream workloads.

---

![EMR Read PDFs into Memory](./media/EMR-Doc-Read.jpg)

In [None]:
def load_pdf_from_s3_into_memory(row):
    """
    Load a PDF file from an S3 bucket directly into memory.
    """
    try:
        src_bucket_name, src_file_key = row 
        s3 = boto3.client('s3')
        pdf_file = io.BytesIO()
        s3.download_fileobj(src_bucket_name, src_file_key, pdf_file)
        pdf_file.seek(0)
        pdf_reader = PdfReader(pdf_file)
        return (src_file_key, pdf_reader, len(pdf_reader.pages))
    
    except Exception as e:    
        return (os.path.basename(src_file_key), str(e))

Our vanilla list of file references has been coverted into a Spark RDD, now we can `map` each entry in the RDD/List and then `reduce` it back to the Primary node.

In [None]:
pdfs_in_memory = pdfs_rdd.map(load_pdf_from_s3_into_memory).collect()

Ok great! All pdf files are loaded into memory as a PDF class. Lets ensure that all pages of all pdf files are loaded into memory and werent any issues during read/load operations. 

PySpark cells allows users to generate static visualization as well! The example below shows how users can generate charts using `matplotlib` but you can make the charts look fanicer by styling them with more mordern static visualization libraries like `seaborn`.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

x_labels = [pdfx.split('/')[-1] for pdfx, _, _ in pdfs_in_memory]
y_values = [pages_count for _, _, pages_count in pdfs_in_memory]
x = range(len(y_values))

# Create a figure and a set of subplots
fig, axs = plt.subplots(2, 1, figsize=(10, 10))

# First Subplot: Bar Chart
axs[0].bar(x, y_values, color=['red', 'green', 'blue'])
axs[0].set_title('Bar Chart')
axs[0].set_xticks(x)
axs[0].set_xticklabels(x_labels, rotation=45, ha="right")
axs[0].set_ylabel('Pdf Pages Count --->')

_bottom = 0
for (pdf_name, page_count, color) in zip(x_labels, y_values, ['red', 'green', 'blue']):
    axs[1].bar([0], [page_count], bottom=_bottom, color=color, label=pdf_name)
    _bottom += page_count
axs[1].set_title('Stacked Bar Chart')
axs[1].set_xticks([0])
axs[1].set_xticklabels(['Documents'], rotation=45, ha="right")
axs[1].set_ylabel('Stacked Pages Count --->')

# Add a legend to the second subplot
axs[1].legend()

# Adjust the layout
plt.tight_layout()

# Show the plot
plt.show()

%matplot plt

Every PDF document has 'n' pages to process, this task can be executed in a parallel fashion using Spark Processing. 

Each Document is split page by page, each page from a global reference of in memory pdfs.

![PageLevelProcessingEMRPDFtoTxt](./media/PageLevelProcessingEMRPDFtoTxt.jpg)

We're going to create a global pdf dict as a reference that can be accessed by all EMR core nodes when this information is required. This will come in handy when you're trying to maintain a global frame of reference during parallelization of tasks.

---

The way we achieve this parallelism at a page level is by creating a list (then converted into a PySpark RDD) with the format below,
``` 
[
    (/my/file1, page#1),
    (/my/file1, page#2),
    (/my/file1, page#3),
    
    (/my/file2, page#1),
    (/my/file2, page#1),
]
```

In [None]:
global_pdfs_in_mem_dict = {_key: pdf_reader for _key, pdf_reader, _ in pdfs_in_memory}

docs_instances = []
for (file_src, _, page_count) in pdfs_in_memory:
    for pg_num in range(page_count):
        docs_instances.append((file_src, pg_num))
print(f"Created {len(docs_instances)} parallel instances to process!")
# converts our list into an RDD
docs_instances_rdd = spark.sparkContext.parallelize(docs_instances)

We parallize our document conversion from pdf format into text format (page by page) using our EMR core nodes. This step,

`documents = docs_instances_rdd.map(extract_text_from_pdf_reader).collect()`

Run a `map` and `reduce` operation on a list of values against our custom function

In [None]:
def extract_text_from_pdf_reader(row):
    """ 
    Extract text from a page of the document 
    """
    try:
        doc_path, page_num = row
        page_text = global_pdfs_in_mem_dict[doc_path].pages[page_num].extract_text()
        return page_text, doc_path, page_num
    except Exception as e:
        return str(e), doc_path, page_num
    
documents = docs_instances_rdd.map(extract_text_from_pdf_reader).collect()

We need to convert our text blob from the step above into a LangChain friendly built-in `Document` class. 

`CustomDocument` class below is custom implementation of the same class allowing us to convert custom text blobs into a type that's identified by LangChain.

In [None]:
class CustomDocument:
    def __init__(self, text, path, number):
        self.page_content = text
        self.metadata = {
            'source': path, 
            'page': number  
        }

    def __repr__(self):
        # This method is for representing the object in a way thats clear to a user (also can be used for debugging)
        return f"Document(page_content='{self.page_content}', metadata={self.metadata})"

    # Optionally, if you need a string representation of the instance that is more user-friendly, 
    # you can implement the __str__ method
    def __str__(self):
        return f"Page Content: {self.page_content}\nSource: {self.metadata['source']}\nPage Number: {self.metadata['page']}"
    
documents_custom = [
    CustomDocument(text=text, path=doc_source, number=page_num) 
    for text, doc_source, page_num in documents
]

In [None]:
documents_custom[121]


Chunking of text in a Retrieval-Augmented Generation (RAG) system is essential because it breaks down large text inputs into smaller, more manageable pieces. This process enables the efficient retrieval of relevant information from large databases, as RAG systems combine document retrieval with text generation to enhance response quality. Without chunking, the system might struggle with processing constraints and could miss key information due to the sheer volume of data. Thus, chunking ensures both the effectiveness and efficiency of the RAG system in handling and generating responses from extensive text sources.


The RecursiveCharacterTextSplitter is a specialized text processing tool commonly used in RAG (Retrieval-Augmented Generation) system implementations. It is designed to efficiently handle long text inputs by recursively splitting them into smaller, manageable segments without losing context or coherence. This is crucial in RAG systems, which combine information retrieval with text generation, as it ensures that the retrieval component can effectively process and retrieve relevant information from extensive datasets. 

To learn more --> https://python.langchain.com/docs/modules/data_connection/document_transformers/text_splitters/recursive_text_splitter

In [None]:
global_text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50
)
docs = global_text_splitter.split_documents(documents_custom)
print(f"Total number of docs pre-split {len(documents_custom)} | after split {len(docs)}")

Chunking increases the number of text blobs we have to process. The number of blobs we end up with a factor of `chunk_size` and `chunk_overlap`. These are crucial parameters that govern the quality of your RAG system. Let's quickly visualize how many extra blobs are we now required to process as a result of text chunking usin `matplotlib`

In [None]:
# create data
plt.clf()

x_labels = ["Pre-Split", "Post Split"]
y = [len(documents_custom), len(docs)]
x = range(len(x_labels))

fig, axs = plt.subplots(1, 1, figsize=(7, 5))

# First Subplot: Bar Chart
axs.bar(x, y, color=["red", "blue"])
axs.set_title('Pre/Post RecursiveCharacterTextSplitter Split')
axs.set_xticks(x)
axs.set_xticklabels(x_labels, rotation=45, ha="right")
axs.set_ylabel('Text # to Process -->')

# Add a legend to the second subplot
axs.legend()

# Adjust the layout
plt.tight_layout()

# Show the plot
plt.show()

%matplot plt

In [None]:
print(docs[1001])

## Run Parallelized Embeddings using Amazon EMR (EC2 Spark based Processing)

---
The last component in our workflow we're going to parallelize is
1. Tokenizing our text chunks 
2. Generation of vector embeddings using tokenized text chunks
3. Ingestion of vector embeddings into a Vector Database (ex: OpenSearch)


Steps 1..3 described above are represented in code below using 2 functions `generate_embeddings` and `EmbeddingsGenerator`

Function `generate_embeddings` is responsible for taking in a chunk of text and generating embeddings using an Embeddings model via a Lambda `invokeEmbeddingEndpoint`. Lambda `invokeEmbeddingEndpoint` is responsible for handling the tokenziation of text chunk, parsing results and returing the embedding vector back to the code node.

Class `EmbeddingsGenerator` complies with LangChain's structure for vectorestore processor. At the end of this parallelize operation. All embeddings are collected by the EMR primary node `embeddings_generated = input_text_rdd.map(generate_embeddings).collect()` and pushed to an OpenSearch Vector Database index.

In [None]:
def generate_embeddings(input_text_sample):
    
    assert isinstance(input_text_sample, str), f"Input must be a single string but found " 
    
    lambda_client = boto3.client('lambda', region_name='us-west-2') 

    # Prepare the data to send to the Lambda function
    data = {
        "input": input_text_sample
    }

    # Invoke the Lambda function
    response = lambda_client.invoke(
        FunctionName="invokeEmbeddingEndpoint",
        InvocationType="RequestResponse",
        Payload=json.dumps(data)
    )

    # Decode and load the response payload
    response_payload = json.loads(response['Payload'].read().decode("utf-8"))

    # Extract status and embeddings from the response
    status_code, embeddings = int(response_payload['statusCode']), json.loads(response_payload['body'])

    return status_code, embeddings
    
class EmbeddingsGenerator:
    
    @staticmethod
    def embed_documents(input_text, normalize=True):
        """
        Generate embeddings for the provided text, invoking a Lambda function.
        """
        assert isinstance(input_text, list), "Input type must me list to embed_documents function"
        
        input_text_rdd = spark.sparkContext.parallelize(input_text)
        
        embeddings_generated = input_text_rdd.map(generate_embeddings).collect()
        
        embedding_response = []
        for s_code, embeddings in embeddings_generated:
            if s_code == 200:
                embedding_response.append(embeddings)
            else:
                pass
        
        return embedding_response
    
    @staticmethod
    def embed_query(input_text):
        status_code, embedding = generate_embeddings(input_text)
        if status_code == 200:
            return embedding
        else: 
            None

In [None]:
response_code, sample_sentence_embedding = generate_embeddings(docs[1000].page_content)
print(f"Status {response_code}, Embedding size of the document --->", len(sample_sentence_embedding))

Decide a name for your index, this is what's going to be used to query the embeddings we generate using for RAG bot

In [None]:
%%local
INDEX_NAME_OSE = "amz-guides-index"
f = open("../studio-local-ui/indexname.txt", "w")
f.write(INDEX_NAME_OSE)
f.close()

In [None]:
%%send_to_spark -i INDEX_NAME_OSE -t str -n INDEX_NAME_OSE

We need to securely access our OpenSearch cluster, to do so we use SecretsManager to securely access our OpenSearch username and password 

In [None]:
%%local
def get_secret(secret_name, region_name="us-west-2"):
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    get_secret_value_response = client.get_secret_value(
        SecretId=secret_name
    )
    secrets = json.loads(get_secret_value_response['SecretString'])
    user = secrets['username']
    pwd = secrets['password']
    return user, pwd

# Use the function
my_secret_name = "OpenSearchDBSecret"  
user, pwd = get_secret(my_secret_name, REGION)
print(f"Session user and pwd ---> ", user, pwd)

# write data to a local file 
f = open("../studio-local-ui/opensearchlogin.txt", "w")
f.write(f"{user}|||{pwd}")
f.close()

In [None]:
%%send_to_spark -i user -t str -n user

In [None]:
%%send_to_spark -i pwd -t str -n pwd

In the next cell, we're going to use AWS CLI and `%local` command to execute opensearch command to determine opensearch endpoints and use that for downstream `langchain` tasks.

In [None]:
%%local
!echo -n "https://$(aws es describe-elasticsearch-domain \
--domain-name "$(aws es list-domain-names --query 'DomainNames[*].DomainName' --output text)" \
--query 'DomainStatus.Endpoint' --output text)" > ../studio-local-ui/opesearchurl.txt

In [None]:
%%local
OPENSEARCH_DOMAIN_URL = open("../studio-local-ui/opesearchurl.txt", "r").read()
print(f"Your OpenSearch Domain URL --->", OPENSEARCH_DOMAIN_URL)

In [None]:
%%send_to_spark -i OPENSEARCH_DOMAIN_URL -t str -n OPENSEARCH_DOMAIN_URL

##### Now, we put it all together and run the embedding generation and ingestion processing in PySpark using Amazon EMR.
<div style="background-color: #FFFF00; border-left: 5px solid yellow; padding: 10px; color: black;">
    Please be patient, this can take between 4-10 minutes to complete!
</div>

In [None]:
import time
from langchain.vectorstores import OpenSearchVectorSearch

start = time.time()
docsearch = OpenSearchVectorSearch.from_documents(
    docs, 
    EmbeddingsGenerator, 
    opensearch_url=OPENSEARCH_DOMAIN_URL,
    bulk_size=len(docs),
    http_auth=(user, pwd),
    index_name=INDEX_NAME_OSE,
    engine="faiss"
)

end = time.time()
print(f"Total Time for ingestion: {round(end - start, 2)} secs")

In [None]:
query = "What is a Amazon SageMaker?"
sample_responses = docsearch.similarity_search(
    query, 
    k=5, 
    space_type="cosineSimilarity", 
    search_type="painless_scripting"
)

In [None]:
sample_responses[4].page_content

## Putting it All Together

To recap,

1. We create a Spark Cluster to leverage PySpark for Distributed Data Processing at scale!
2. We pushed some raw data into S3 (in reality, this data can be housed anywhere RedShift, S3, RDS, Dynamo, Snowflake, etc..)
3. We Parallelized our document extraction from S3 using PySpark - our PySpark `Core` nodes were able to reach out to doc store (S3) read a file into memory for downstream processing
4. We then split our processing at Document - at a page level and further parallelize our pdf reading process using PySpark
5. We chunk our document corpus using `LangChain`'s `RecursiveCharacterTextSplitter`. We then convert our text into Embeddings using `BAAI/bge-base-en-v1.5` Embedding LLM Model and ingest these embeddings into OpenSearch index. - all using PySpark Parallel Processing technique
6. Now we use `Streamlit` to interact with text generation model and document embeddings with a UI

Run the following cells marked with `%%bash`, these cells will install a few packages in your conda environment and spin up a new Streamlit UI that's accessible from the URL described below.

In [None]:
%%bash
cd ../studio-local-ui
streamlit run rag_app.py --server.runOnSave true --server.port 8502 > /dev/null

<div style="background-color: #6bb07e; border-left: 5px solid #6bb07e; padding: 10px; color: black;">
    - Navigate to: https://example.studio.us-west-2.sagemaker.aws/jupyterlab/default/proxy/8502/
</div>

<div style="background-color: #6bb07e; border-left: 5px solid #6bb07e; padding: 10px; color: black;">
    <i>- Replace "example" with your your current url host `https://use_this_host.studio.us-west-2...`</i>
</div>