# Prepare-Rewrite-Retrieve knowledge base data ingestion

In this notebook we will explore the data ingestion process of the **prepare-then-rewrite-then-retrieve-then-read** framework proposed by the authors of ["Meta Knowledge for Retrieval Augmented Large Language Models"](https://www.amazon.science/publications/meta-knowledge-for-retrieval-augmented-large-language-models) for creating more accurate and enriched RAG workflows.

## Pre-requisites

To run this notebook your role executing the notebook needs:

* Permissions to invoke Bedrock
* Access to the Amazon Nova Pro model
* Write permissions to the DynamoDB table created with the CDK stack in this PoC
* Write permissions to the OpenSearch Serverless host created with the CDK stack in this PoC

Additionally, we need the following python packages:

In [None]:
!pip install -U boto3 langchain langchain-community langchain-aws opensearch-py PyPDF2 dotenv faiss-cpu

In [2]:
import os
import re
import logging
import json
import secrets
import time
import boto3
import faiss
import langchain_core

from dotenv import load_dotenv

from pydantic import BaseModel, Field
from typing import Literal

from enum import Enum
from PyPDF2 import PdfReader
from botocore.exceptions import ClientError
from langchain_aws import ChatBedrockConverse

from prompts.dataIngestion.generate_metadata_prompt import get_metadata_prompt_selector
from prompts.dataIngestion.generate_qa_prompt import get_qa_prompt_selector, get_structured_qa_prompt_selector
from prompts.dataIngestion.generate_meta_kb_prompt import get_summary_prompt_selector
from structured_output.metadata import DocumentMetadata
from structured_output.question_answers import QA_pairs

from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.documents import Document

from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore

from langchain_aws.embeddings import BedrockEmbeddings

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth


In [None]:
logger = logging.getLogger()
langchain_core.globals.set_debug(False)
load_dotenv()

In [4]:
BEDROCK_MODEL_ID = "us.amazon.nova-pro-v1:0"
EMBEDDINGS_MODEL_ID="amazon.titan-embed-text-v2:0"

### Type definitions

In [5]:
# Customize according to the types of document to be processed by the application
class DocumentTypes(Enum):
    SYSTEM_ARCHITECTURE = "systems architecture"
    SECURITY = "information technology security"
    DATA_GOVERNANCE = "data governance"
    TECH_STRATEGY = "tech strategy"
    MANAGEMENT = "management"

class AnalysisPerspectives(Enum):
    SECURITY = "software security engineer"
    DATA_GOVERNANCE = "data governance"
    RESILIENCY = "systems resiliency"
    SYS_OPS = "systems operations"

# Persona definition for generating and answering QA
AnalysisPersonas = {
    "software security engineer": {
        "description": "It is responsible for ensuring that workloads have the necessary security controls in place",
        "perspectives": [AnalysisPerspectives.SECURITY.value, AnalysisPerspectives.DATA_GOVERNANCE.value]
    },
    "solutions architect": {
        "description": "It is responsible for designing scalable and cost-efficient software solutions",
        "perspectives": [AnalysisPerspectives.RESILIENCY.value, AnalysisPerspectives.DATA_GOVERNANCE.value, AnalysisPerspectives.SECURITY.value]
    },
    "software developer": {
        "description":"Implements the system functionalities",
        "perspectives": [AnalysisPerspectives.SYS_OPS.value, AnalysisPerspectives.RESILIENCY.value, AnalysisPerspectives.SECURITY.value]
    }
}

### AWS Resources

In [6]:
OPENSEARCH_HOST = <OPENSEARCH_HOST> #URL (without the protocol) of the OpenSearch Serverless Host
OPENSEARCH_PORT = 443 #Port of the OpenSearch Serverless Host

OPERNSEARCH_INDEX_NAME = <OPENSEARCH_INDEX_NAME> #Name of the OpenSearch Serverless Index

METAKB_DYNAMODB_TABLE_NAME = <METAKB_DYNAMODB_TABLE_NAME> #Name of the DynamoDB table created with the CDK stack in this PoC

In [None]:
PDF_FILE = "./docs/overallReadme.pdf"
DOCUMENT_KEY = "overallReadme"
DOCUMENT_TITLE = "Overall readme"

ANALYSIS_PERSONNA = "solutions architect"
ANALYSIS_PERSPECTIVE = AnalysisPersonas[ANALYSIS_PERSONNA]["perspectives"][0]

print(f"Using persona: {ANALYSIS_PERSONNA}")
print(f"Using perspective: {ANALYSIS_PERSPECTIVE}")

## Create clients for AWS services

This knowledge base is made up of two components:

* A vector store: As in any other knowledge base the main component is a vector store where the embeddings are persisted. For this implementation we use [Amazon OpenSearch Serverless](https://aws.amazon.com/opensearch-service/features/serverless/) as vector store.
* A meta-knowledge base: Consisting of the summaries of each partition of the knowledge base. For this implementation the meta-knowledge base is stored in an [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) table.

In [8]:
botoSession = boto3.Session()

meta_kb_table = botoSession.resource("dynamodb").Table(METAKB_DYNAMODB_TABLE_NAME)
bedrock_runtime = boto3.client('bedrock-runtime')

## Helper functions

In [9]:
def extract_text_from_pdf(
    pdf_path
):
    "Extract text from a PDF file"

    reader = PdfReader(pdf_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text() + "\n" # Add newline for readability between pages
    return text

def encode_text(
        text: str = None,  # the text to be encoded
        dimension: int = 1024,  # 1,024 (default), 384, 256
):
    "Get text embedding using embeddings model"

    payload_body = {
        "inputText": text,
        "dimensions": dimension,
        "normalize": True
    }

    #print("embedding text")
    #print(payload_body)

    response = bedrock_runtime.invoke_model(
        body=json.dumps(payload_body),
        modelId=EMBEDDINGS_MODEL_ID,
        accept="application/json",
        contentType="application/json"
    )

    feature_vector = json.loads(response.get("body").read())["embedding"]

    #print("text embedding")
    #print(feature_vector)

    return feature_vector

def index_document(
        oss_client,
        oss_index_name,
        embedding,
        persona,
        perspective,
        question,
        answer
):
    "Index a document into Amazon Open Search Serverless"

    document = {
        "persona": persona,
        "perspective": perspective,
        "question": question,
        "answer": answer,
        "embedding": embedding
    }

    #print(document)

    oss_response = oss_client.index(
        index=oss_index_name,
        body=document,
    )

    return oss_response


def get_opensearch_connection(
        host: str,
        port: int,
) -> OpenSearch:
    "Establishes a connection to an OpenSearch cluster using AWSV4SignerAuth for authentication."

    # Create an AWSV4SignerAuth instance for authentication
    auth = AWSV4SignerAuth(
        boto3.Session(
            region_name=os.getenv("AWS_REGION")
        ).get_credentials(),
        os.getenv("AWS_REGION"),
        "aoss"
    )

    # Create an OpenSearch client instance
    client = OpenSearch(
        hosts=[{"host": host, "port": port}],
        http_auth=auth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        timeout=30,
    )

    # Return the OpenSearch client instance
    return client

def get_existing_summary(
        user,
        perspective
):
    "Get an existing summary, if exists, for a combination of user and perspective."

    print("Trying to get summary")
    print(f"summary key: {user}-{perspective}")

    try:
        response = meta_kb_table.get_item(
            Key={
                "summary_key": f"{user}-{perspective}",
            }
        )
        if "Item" in response:
            item = response["Item"]
            return item["summary"]
        else:
            return ""
    except ClientError as ex:
        print(f"Summary for {user}-{perspective} does not exist")
        raise ex

## Load document

In [None]:
document_text = extract_text_from_pdf(PDF_FILE)
print(document_text)

## Metadata generation

In this section we will extract metadata from the document and store it in a Pydantic **DocumentMetadata** object

In [11]:
metadata_llm = ChatBedrockConverse(
    model=BEDROCK_MODEL_ID,
    temperature=0.4,
    max_tokens=2000,
    # other params...
)

LLM_GENERATE_METADATA_PROMPT_SELECTOR = get_metadata_prompt_selector(lang="en")

gen_medatata_prompt = LLM_GENERATE_METADATA_PROMPT_SELECTOR.get_prompt(BEDROCK_MODEL_ID)
structured_metadata = metadata_llm.with_structured_output(DocumentMetadata)

structured_metadata_prompt = gen_medatata_prompt | structured_metadata

In [12]:
document_metadata = structured_metadata_prompt.invoke(
    {
        "document_types": ", ".join([e.value for e in DocumentTypes]),
        "user_type": ANALYSIS_PERSONNA,
        "doc_title": DOCUMENT_TITLE,
        "text": document_text,
        "medata_object_definition": DocumentMetadata.model_json_schema()
    }
)

Lets take a look at the results

In [None]:
print("Generated metadata\n")

for k, v in document_metadata.model_dump().items():
    print(f"{k}: {v}")

## Q&A Generation

We now generate pairs of QA from the document

In [14]:
questions_llm = ChatBedrockConverse(
    model=BEDROCK_MODEL_ID,
    temperature=0.1,
    max_tokens=2000,
    # other params...
)

LLM_GENERATE_QUESTIONS_PROMPT_SELECTOR = get_qa_prompt_selector(lang="en")

gen_questions_prompt = LLM_GENERATE_QUESTIONS_PROMPT_SELECTOR.get_prompt(BEDROCK_MODEL_ID)

questions_prompt = gen_questions_prompt | questions_llm

In [15]:
n_qa_pairs = 10

qa_completion = questions_prompt.invoke(
    {
        "topic_perspective": ANALYSIS_PERSPECTIVE,
        "user_type": ANALYSIS_PERSONNA,
        "n_pairs": n_qa_pairs,
        "doc_title": DOCUMENT_TITLE,
        "text": document_text
    }
)

Next, we extract only the question-answers to a Pydantic **QA_pairs** object 

In [16]:
question_extraction_llm = ChatBedrockConverse(
    model=BEDROCK_MODEL_ID,
    temperature=0.1,
    max_tokens=2000,
    # other params...
)

LLM_GENERATE_STRUCTURED_QUESTIONS_PROMPT_SELECTOR = get_structured_qa_prompt_selector(lang="en")

gen_structured_questions_prompt = LLM_GENERATE_STRUCTURED_QUESTIONS_PROMPT_SELECTOR.get_prompt(BEDROCK_MODEL_ID)
structured_questions = questions_llm.with_structured_output(QA_pairs)

structured_questions_prompt = gen_structured_questions_prompt | structured_questions

In [17]:
document_qa = structured_questions_prompt.invoke(
    {
        "qa_text": qa_completion
    }
)

Lets visualize the results

In [None]:
for qa in document_qa.qa_pairs:
    print(f"Q: {qa.question}")
    print(f"A: {qa.answer}")
    print("\n")

## Meta-Knowledge summary generation

Now we can take the metadata and question-answer pairs to generate the meta-knowledge summary

In [None]:
previous_summary = get_existing_summary(ANALYSIS_PERSONNA, ANALYSIS_PERSPECTIVE)

print("Existing summary:\n\n")
print(previous_summary)

In [20]:
document_qa_str = ""

for i, qa_pair in enumerate(document_qa.qa_pairs):
    document_qa_str += f"{i+1}.- {qa_pair.question}\n{qa_pair.answer}\n\n"

In [21]:
mk_summary_llm = ChatBedrockConverse(
    model=BEDROCK_MODEL_ID,
    temperature=0.4,
    max_tokens=5000,
    top_p=0.9
    # other params...
)

In [22]:
LLM_GENERATE_SUMMARY_PROMPT_SELECTOR = get_summary_prompt_selector(lang="en", with_context=True if previous_summary else False, for_chunks=False)

gen_summary_prompt = LLM_GENERATE_SUMMARY_PROMPT_SELECTOR.get_prompt(BEDROCK_MODEL_ID)

summary_prompt = gen_summary_prompt | mk_summary_llm

In [None]:
if previous_summary:

    print("Generating with previous summary")
    mk_summary = summary_prompt.invoke(
        {
            "document_types": ", ".join([e.value for e in DocumentTypes]),
            "topic_perspective": ANALYSIS_PERSPECTIVE,
            "users_types": ANALYSIS_PERSONNA,
            "qa_pairs": document_qa_str,
            "summary": previous_summary
        }
    )

else:

    print("Generating without previous summary")
    mk_summary = summary_prompt.invoke(
        {
            "document_types": ", ".join([e.value for e in DocumentTypes]),
            "topic_perspective": ANALYSIS_PERSPECTIVE,
            "users_types": ANALYSIS_PERSONNA,
            "qa_pairs": document_qa_str
        }
    )

The generated summary is

In [None]:
mk_summary.content

## Indexing into knowledge base

In [25]:
documents = [
    Document(
        page_content=qa_pair.question, 
        metadata={
            "persona": ANALYSIS_PERSONNA,
            "perspective": ANALYSIS_PERSPECTIVE,
            "question": qa_pair.question,
            "answer": qa_pair.answer,
        }
    ) for qa_pair in document_qa.qa_pairs
]

We are now ready to index the information into the knowledge base. We will be storing the meta-knowledge summary into an Amazon DynamoDB table and the Q&A pairs into an Amazon OpenSearch Serverless vector-store

In [None]:
meta_kb_table.put_item(
    Item={
        "summary_key": f"{ANALYSIS_PERSONNA}-{ANALYSIS_PERSPECTIVE}",
        "summary": mk_summary.content,
    }
)

Embed and store the queston-answer pairs in OpenSearch Serverless

In [27]:
opensearch_client = get_opensearch_connection(OPENSEARCH_HOST, OPENSEARCH_PORT)

In [None]:
for qa_pair in document_qa.qa_pairs:

    question = qa_pair.question
    answer = qa_pair.answer

    print(f"Q&A pair:\n\n{question}\n{answer}")
    question_vector = encode_text(question)

    #Index Q&A pair
    oss_response = index_document(
        oss_client=opensearch_client,
        oss_index_name=OPERNSEARCH_INDEX_NAME,
        embedding=question_vector,
        persona=ANALYSIS_PERSONNA,
        perspective=ANALYSIS_PERSPECTIVE,
        question=question,
        answer=answer
    )

    print(f"Q&A pair indexed: {oss_response}")

If you made it to this point now you have a knowledge base with the **prepare-then-rewrite-then-retrieve-then-read** framework