# Amazon Bedrock RAG Application with CloudFormation

This notebook demonstrates how to build a Q&A application using Amazon Bedrock with Retrieval-Augmented Generation (RAG).
Resources such as S3 buckets, IAM roles, and other infrastructure are provisioned using a CloudFormation template.

## Setup
Let's start by importing the required libraries and configuring Bedrock.

In [None]:
from dotenv import load_dotenv
import botocore.exceptions
import ipywidgets as widgets
import os
import json
import sys
import boto3
import pprint as pp
import time

aws_region = "us-east-1"
load_dotenv(".env")


def interactive_sleep(seconds):
    """Sleep interactively for the given number of seconds with progress."""
    for i in range(seconds):
        print(f"Sleeping... {i + 1}/{seconds} seconds", end="\r")
        time.sleep(1)


bedrock_runtime_client = boto3.client("bedrock-runtime", region_name=aws_region)
bedrock_management_client = boto3.client('bedrock', region_name=aws_region)
bedrock_agent_client = boto3.client('bedrock-agent', region_name=aws_region)
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime', region_name=aws_region)
cloudformation_client = boto3.client('cloudformation', region_name=aws_region)

boto3.__version__

### Load CloudFormation Template

The CloudFormation template defines resources such as S3 buckets for data storage, IAM roles for access control, and Amazon Bedrock configurations.

In [None]:
import boto3
import json
import yaml
from pygments import highlight, lexers, formatters

cloudformation_client = boto3.client('cloudformation')
template_file_path = 'bedrock_rag_template.yaml'


def get_template():
    try:
        with open(template_file_path, 'r') as template_file:
            cloudformation_template = template_file.read()
        print("CloudFormation template loaded successfully.")
        return cloudformation_template
    except FileNotFoundError:
        print(f"Error: The file '{template_file_path}' was not found.")
    except yaml.YAMLError as e:
        print(f"Error parsing YAML file: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


cloudformation_template = get_template()
colorful_yaml = highlight(
    cloudformation_template,
    lexers.YamlLexer(),
    formatters.TerminalFormatter()
)

print(colorful_yaml)

### Deploy CloudFormation Stack

Deploy the stack using the loaded template. This creates all required resources for the RAG application.

In [None]:
import uuid

unique_id = str(uuid.uuid4())

stack_name = f'BedrockRAGStack-{unique_id}'
caller_identity = !aws sts get-caller-identity
caller_identity = json.loads(''.join(caller_identity))

try:
    cloudformation_template = get_template()
    response = cloudformation_client.create_stack(
        StackName=stack_name,
        TemplateBody=cloudformation_template,
        Capabilities=['CAPABILITY_NAMED_IAM'],
        Parameters=[
            {
                'ParameterKey': 'CreateKnowledgeBase',
                'ParameterValue': 'false'
            },
            {
                'ParameterKey': 'UUID',
                'ParameterValue': unique_id[:8]
            },
            {
                'ParameterKey': 'CallerIdentity',
                'ParameterValue': caller_identity["Arn"]
            }
        ]
    )

    print("Stack creation initiated. Stack ID:", response['StackId'])

except FileNotFoundError:
    print(f"Error: The file '{template_file_path}' was not found.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

### Wait for Stack Completion

Wait until the stack creation is complete before proceeding to use the resources.

In [None]:
def delete_stack(stack_name):
    """
    Delete the specified CloudFormation stack.
    """
    try:
        print(f"Deleting stack {stack_name}...")
        cloudformation_client.delete_stack(StackName=stack_name)
        print(f"Delete request sent for stack {stack_name}.")
        wait_for_stack(stack_name, 'DELETE_COMPLETE')
    except Exception as e:
        print(f"An error occurred while deleting the stack: {e}")


def wait_for_stack(stack_name, expected_status):
    """
    Wait for the stack to reach an expected status.
    """
    while True:
        try:
            response = cloudformation_client.describe_stacks(StackName=stack_name)
            stack_status = response['Stacks'][0]['StackStatus']
            print(f"Stack status: {stack_status}. Waiting...")
            if stack_status == expected_status:
                print(f"Stack {stack_name} reached expected status: {expected_status}.")
                break
            elif stack_status in ['CREATE_FAILED', 'ROLLBACK_COMPLETE', 'DELETE_FAILED']:
                raise Exception(f"Stack operation failed with status: {stack_status}")
        except cloudformation_client.exceptions.ClientError as e:
            if 'does not exist' in str(e) and expected_status == 'DELETE_COMPLETE':
                print(f"Stack {stack_name} deleted successfully.")
                break
            elif 'does not exist' in str(e):
                raise Exception(f"Stack {stack_name} does not exist.")
            else:
                raise
        time.sleep(30)


def wait_for_stack_or_delete(status='CREATE_COMPLETE'):
    try:
        wait_for_stack(stack_name, status)
    except Exception as e:
        print(e)
        delete_stack(stack_name)


wait_for_stack_or_delete(status='CREATE_COMPLETE')

### Retrieve Stack Outputs

Get the outputs from the stack, such as the S3 bucket name and IAM role ARN, to use them in subsequent steps.

In [None]:
response = cloudformation_client.describe_stacks(StackName=stack_name)
outputs = response['Stacks'][0]['Outputs']

results = {}
for output in outputs:
    results[output['OutputKey']] = output['OutputValue']
    print(f"{output['OutputKey']}: {output['OutputValue']}")

### Create Vector Index

Create an empty vector index to store the embeddings of text data and wait for its stabilization before proceeding to the next steps.


In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError


INDEX_DIMENSION = int(os.getenv("INDEX_DIMENSION", 256))
AWS_REGION = os.getenv("AWS_REGION", aws_region)
HOST = results["OpenSearchServerlessCollectionEndpoint"].replace('https://', '')
SERVICE = "aoss"
INDEX_NAME = os.getenv("VECTOR_INDEX_NAME", f"kb-index-{unique_id[:8]}")
VECTOR_FIELD_NAME = os.getenv("VECTOR_FIELD_NAME", "vector")
TEXT_FIELD_NAME = os.getenv("TEXT_FIELD_NAME", "AMAZON_BEDROCK_TEXT_CHUNK")
METADATA_FIELD_NAME = os.getenv("METADATA_FIELD_NAME", "AMAZON_BEDROCK_METADATA")
TIMEOUT_SECONDS = 60

INDEX_BODY = {
    "settings": {
        "index.knn": "true",
        "number_of_shards": 1,
        "knn.algo_param.ef_search": 512,
        "number_of_replicas": 0,
    },
    "mappings": {
        "properties": {
            VECTOR_FIELD_NAME: {
                "type": "knn_vector",
                "dimension": INDEX_DIMENSION,
                "method": {
                    "name": "hnsw",
                    "engine": "faiss",
                    "space_type": "l2",
                },
            },
            TEXT_FIELD_NAME: {"type": "text"},
            METADATA_FIELD_NAME: {"type": "text"},
        }
    },
}


def get_opensearch_client(host: str, region: str, service: str) -> OpenSearch:
    """Build and return an OpenSearch client."""
    try:
        print(f"Initializing OpenSearch client for host: {host}, region: {region}")
        credentials = boto3.Session().get_credentials()
        awsauth = AWSV4SignerAuth(credentials, region, service)
        client = OpenSearch(
            hosts=[{'host': host, 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            timeout=300,
        )
        print("OpenSearch client initialized successfully.")
        return client
    except Exception as e:
        print(f"Error initializing OpenSearch client: {str(e)}")
        raise


def create_index(client: OpenSearch, index_name: str, body: dict) -> None:
    """Create a vector index in OpenSearch Serverless."""
    try:
        print(f"Creating index: {index_name}")
        response = client.indices.create(index=index_name, body=body)
        print("Index created successfully:")
        print(json.dumps(response, indent=2))
        print(f"Waiting {TIMEOUT_SECONDS} seconds for index stabilization...")
        time.sleep(TIMEOUT_SECONDS)
    except RequestError as e:
        if e.info.get("status") == 400 and "already exists" in str(e.error):
            print(f"Index {index_name} already exists. Skipping creation.")
        else:
            print(f"RequestError while creating index: {str(e)}")
            raise
    except Exception as e:
        print(f"An unexpected error occurred while creating the index: {str(e)}")
        raise


try:
    print("Starting OpenSearch index creation process...")
    client = get_opensearch_client(HOST, AWS_REGION, SERVICE)
    create_index(client, INDEX_NAME, INDEX_BODY)
    print("OpenSearch index creation completed successfully.")
except Exception as e:
    print(f"An unexpected error occurred during the process: {str(e)}")

### Update Stack to create Bedrock's Knowledge Base

Update the CloudFormation stack, after ensuring the vector index is stable, to create a knowledge base in Amazon Bedrock.

In [None]:
try:
    cloudformation_template = get_template()
    response = cloudformation_client.update_stack(
        StackName=stack_name,
        TemplateBody=cloudformation_template,
        Capabilities=['CAPABILITY_NAMED_IAM'],
        Parameters=[
            {
                'ParameterKey': 'CreateKnowledgeBase',
                'ParameterValue': 'true'
            },
            {
                'ParameterKey': 'UUID',
                'ParameterValue': unique_id[:8]
            },
            {
                'ParameterKey': 'CallerIdentity',
                'ParameterValue': caller_identity["Arn"]
            }
        ]
    )

    print("Stack update initiated. Stack ID:", response['StackId'])

except cloudformation_client.exceptions.ClientError as e:
    if 'No updates are to be performed' in str(e):
        print("No changes detected. The stack is already up-to-date.")
    else:
        print(f"An error occurred during stack update: {e}")
except FileNotFoundError:
    print(f"Error: The file '{template_file_path}' was not found.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

wait_for_stack_or_delete(status='UPDATE_COMPLETE')

### Update Stack Outputs

Get the updated outputs from the stack after creating the knowledge base.

In [None]:
%store -r stack_name

In [None]:
response = cloudformation_client.describe_stacks(StackName=stack_name)
outputs = response['Stacks'][0]['Outputs']

results = {}
for output in outputs:
    results[output['OutputKey']] = output['OutputValue']
    print(f"{output['OutputKey']}: {output['OutputValue']}")

### Ingest Data into Knowledge Base

Load documents into the Knowledge Base using the S3 bucket provisioned by the CloudFormation stack.

In [None]:
import os
import boto3
from urllib.request import urlretrieve

s3_client = boto3.client('s3')


def download_and_upload_to_s3(data_root, files_to_download, bucket_name):
    os.makedirs(data_root, exist_ok=True)
    print("Downloading shareholder letters...")
    for url, filename in files_to_download.items():
        file_path = os.path.join(data_root, filename)
        urlretrieve(url, file_path)
        print(f"Downloaded: {file_path}")
    print("\nUploading files to S3...")
    for filename in files_to_download.values():
        local_file_path = os.path.join(data_root, filename)
        s3_key = f"{data_root}/{filename}"
        s3_client.upload_file(local_file_path, bucket_name, s3_key)
        print(f"Uploaded: s3://{bucket_name}/{s3_key}")

    print("\nAll files uploaded successfully!")

Use the `download_and_upload_to_s3` Function to Manage Files in Amazon S3.

In [None]:
DATA_ROOT = "./kb_financials/"
FILES_TO_DOWNLOAD = {
    'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf': 'AMZN-2022-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf': 'AMZN-2021-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf': 'AMZN-2020-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf': 'AMZN-2019-Shareholder-Letter.pdf',
}

bucket_name = results["S3BucketName"]

download_and_upload_to_s3(DATA_ROOT, FILES_TO_DOWNLOAD, bucket_name)

### Start ingestion job
Once the KB and data source is created, we can start the ingestion job.
During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS.

```python
knowledge_base_id = results['BedrockKnowledgeBaseId']
data_source_id = results["BedrockDataSourceId"].split("|")[1]
```

Use the below example code to start the ingestion_job and show the result

```python
start_job_response = bedrock_agent_client.start_ingestion_job(
    knowledgeBaseId = [...], 
    dataSourceId = [...]
)
job = start_job_response["ingestionJob"]
pp.pprint(job)
```

Wait Until the Sync Job is Complete. Use the example code below to verify when the ingestion job is complete:

In [None]:
ingestion_job_id = job["ingestionJobId"]

while job['status'] != 'COMPLETE':
    get_job_response = bedrock_agent_client.get_ingestion_job(
        knowledgeBaseId=[...],
        dataSourceId=[...],
        ingestionJobId=[...]
    )
    job = get_job_response["ingestionJob"]
    interactive_sleep(30)

pp.pprint(job)

Print the knowledge base Id in bedrock, that corresponds to the Opensearch index in the collection we created before, we will use it for the invocation later

```python
knowledge_base_id = results['BedrockKnowledgeBaseId']
pp.pprint(knowledge_base_id)
```

In [None]:
%store knowledge_base_id

### Query Knowledge Base Using Amazon Bedrock

Use Amazon Bedrock's Retrieve API to query the knowledge base and get relevant results.

<details>
<summary>Click here for solutions</summary>
    
```python
response = bedrock_agent_runtime_client.retrieve(
    retrievalQuery={"text": query},
    knowledgeBaseId=knowledge_base_id,
    retrievalConfiguration={"vectorSearchConfiguration": {"numberOfResults": 5}}
)

retrieved_chunks = response['retrievalResults']
for chunk in retrieved_chunks:
    print(chunk['content']['text'])
```
    
</details>

In [None]:
query = "What is Amazon's strategy for Generative AI?"


### Extract the text chunks from the retrieveAPI response

In the cell below, we will fetch the context from the retrieval results.

In [None]:
def get_contexts(retrieved_chunks):
    contexts = []
    for retrieved_chunk in retrieved_chunks: 
        contexts.append(retrieved_chunk['content']['text'])
    return contexts

Use the `get_contexts` function to visualize the context.

<details>
<summary>Click here for the solution</summary>
    
```python
contexts = get_contexts(retrieved_chunks)
pp.pprint(contexts)
```
    
</details>

### Using RetrieveAndGenerate API
Behind the scenes, RetrieveAndGenerate API converts queries into embeddings, searches the knowledge base, and then augments the foundation model prompt with the search results as context information and returns the FM-generated response to the question. For multi-turn conversations, Knowledge Bases manage short-term memory of the conversation to provide more contextual results.

The output of the RetrieveAndGenerate API includes the generated response, source attribution as well as the retrieved text chunks.

In [None]:
def ask_bedrock_llm_with_knowledge_base(query: str, model_arn: str, knowledge_base_id: str) -> str:
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        input={
            'text': query
        },
        retrieveAndGenerateConfiguration={
            'type': 'KNOWLEDGE_BASE',
            'knowledgeBaseConfiguration': {
                'knowledgeBaseId': knowledge_base_id,
                'modelArn': model_arn
            }
        },
    )

    return response


def get_model_arn(model_id):
    return f'arn:aws:bedrock:{aws_region}::foundation-model/{model_id[-1]}'


def visualize_citations(generated_text, citations):
    contexts = []
    for citation in citations:
        retrievedReferences = citation["retrievedReferences"]
        for reference in retrievedReferences:
            contexts.append(reference["content"]["text"])
    print(f"---------- Generated using {model_id[0]}:")
    pp.pprint(generated_text )
    print(f'---------- The citations for the response generated by {model_id[0]}:')
    pp.pprint(contexts)
    print()

In [None]:
claude_model_ids = [ 
    ["Claude 3 Sonnet", "anthropic.claude-3-sonnet-20240229-v1:0"], 
    ["Claude 3 Haiku", "anthropic.claude-3-haiku-20240307-v1:0"]
]

query = "What is Amazon's doing in the field of generative AI?"

Iterate over the `claude_model_ids`, retrieve the `model_arn` using the `get_model_arn` function, and apply the query for each model using the `ask_bedrock_llm_with_knowledge_base` function. Then, extract both the `generated_text` and the `citations`, and use the `visualize_citations` function to display the result.

<details>
<summary>Click here for the solution</summary>
    
```python
for model_id in claude_model_ids:
    model_arn = get_model_arn(model_id)
    response = ask_bedrock_llm_with_knowledge_base(query, model_arn, knowledge_base_id)
    generated_text = response['output']['text']
    citations = response["citations"]
    visualize_citations(generated_text, citations)
```
    
</details>

### Generate Response Using Amazon Bedrock

Use the retrieved context to generate a response with an Amazon Bedrock foundation model.

In [None]:
agent_foundation_model_selector = widgets.Dropdown(
    options=[
        (model['modelName'], model['modelId']) 
        for model in bedrock_management_client.list_foundation_models(
            byProvider="Amazon",
            byOutputModality="TEXT",
            byInferenceType="ON_DEMAND"
        ).get('modelSummaries', []) if "Nova" not in model['modelName'] 
    ],
    value='amazon.titan-text-express-v1',
    description='FM:',
    disabled=False,
)
agent_foundation_model_selector

In [None]:
query = None  # Insert your query text here

if query:
    retrieved_chunks = bedrock_agent_runtime_client.retrieve(
        retrievalQuery={"text": query},
        knowledgeBaseId=knowledge_base_id,
        retrievalConfiguration={"vectorSearchConfiguration": {"numberOfResults": 5}}
    )['retrievalResults']

    prompt = f"""
    Human: Use the following context to answer the question:

    {retrieved_chunks}

    Question: {query}
    Assistant:
    Answer:
    """

    response = bedrock_runtime_client.invoke_model(
        body=json.dumps({"inputText": prompt}),
        modelId=agent_foundation_model_selector.value,
        accept='application/json',
        contentType='application/json'
    )

    response_body = json.loads(response['body'].read())
    print(query)
    print("Generated Response:", response_body['results'][0]['outputText'])
else:
    raise ValueError("No query provided. Please define the 'query' variable with a valid input.")

### Prompt specific to the model to personalize responses 

Here, we will use the specific prompt below for the model to act as a financial advisor AI system that will provide answers to questions by using fact based and statistical information when possible. We will provide the `Retrieve API` responses from above as a part of the `{contexts}` in the prompt for the model to refer to, along with the user `query`.  

In [None]:
query = None  # Insert your query text here

if query:
    retrieved_chunks = bedrock_agent_runtime_client.retrieve(
        retrievalQuery={"text": query},
        knowledgeBaseId=knowledge_base_id,
        retrievalConfiguration={"vectorSearchConfiguration": {"numberOfResults": 5}}
    )['retrievalResults']

    prompt = f"""
    Human: You are a financial advisor AI system, and provides answers to questions by using fact based and statistical information when possible. 
    Use the following pieces of information to provide a concise answer to the question enclosed in <question> tags. 
    If you don't know the answer, just say that you don't know, don't try to make up an answer.
    <context>
    {retrieved_chunks}
    </context>

    <question>
    {query}
    </question>

    The response should be specific and use statistics or numbers when possible.

    Assistant:"""
else:
    raise ValueError("No query provided. Please define the 'query' variable with a valid input.")

### Invoke Foundation Model from Amazon Bedrock

Prepare the JSON payload to be used with the `mistral.mistral-7b` foundation model from Amazon Bedrock.

This model is a 7B dense Transformer, fast-deployed, and easily customizable. It is small yet powerful for various use cases, including:

- **Maximum tokens**: 8K
- **Languages**: English
- **Supported use cases**: Text summarization, structuration, question answering, and code completion

<details>
<summary>Click here for the solution</summary>
    
```python
mistral_payload = json.dumps({
    "prompt": prompt,
    "max_tokens": 512,
    "temperature": 0.5,
    "top_k": 50,
    "top_p": 0.9
})
```
    
</details>

In [None]:
agent_foundation_model_selector = widgets.Dropdown(
    options=[
        (model['modelName'], model['modelId']) 
        for model in bedrock_management_client.list_foundation_models(
            byProvider="Mistral AI",
            byOutputModality="TEXT",
            byInferenceType="ON_DEMAND"
        ).get('modelSummaries', [])
    ],
    description='FM:',
    disabled=False,
)
agent_foundation_model_selector

Change `modelId` to use a different version from the model provider.

In [None]:
accept = 'application/json'
contentType = 'application/json'

response = bedrock_runtime_client.invoke_model(
    body=mistral_payload,
    modelId=agent_foundation_model_selector.value,
    accept=accept,
    contentType=contentType
)
response_body = json.loads(response.get('body').read())
response_text = response_body.get('outputs')[0]['text']

print(query)
print(response_text.strip())

### LangChain Integration for Q&A Applications

Building on the previous sections, we now integrate LangChain to enhance the Q&A application using the Retrieve API provided by Knowledge Bases for Amazon Bedrock. LangChain allows seamless orchestration of retrieval-augmented generation workflows, enabling advanced applications to leverage the knowledge base efficiently.

In this phase, we focus on querying the knowledge base for document chunks based on similarity search, utilizing LangChain's retriever integration. These retrieved results are then passed to the **Anthropic Claude V3.5** model, which processes the query and context to provide precise and context-aware answers.

This integration demonstrates how LangChain simplifies the pipeline by managing retrieval, chunking, and contextual query enhancements, ensuring a streamlined workflow for leveraging Bedrock's powerful capabilities in Q&A scenarios.

In [None]:
agent_foundation_model_selector = widgets.Dropdown(
    options=[
        (model['modelName'], model['modelId']) 
        for model in bedrock_management_client.list_foundation_models(
            byProvider="Anthropic",
            byOutputModality="TEXT",
            byInferenceType="ON_DEMAND"
        ).get('modelSummaries', [])
    ],
    value="anthropic.claude-3-haiku-20240307-v1:0",
    description='FM:',
    disabled=False,
)
agent_foundation_model_selector

In [None]:
from langchain_aws import ChatBedrock
from langchain_aws import AmazonKnowledgeBasesRetriever

llm = ChatBedrock(
    model_id=agent_foundation_model_selector.value,
    client = bedrock_runtime_client
)

Create a `AmazonKnowledgeBasesRetriever` object from LangChain which will call the `Retreive API` provided by Knowledge Bases for Amazon Bedrock which converts user queries into embeddings, searches the knowledge base, and returns the relevant results, giving you more control to build custom workﬂows on top of the semantic search results. The output of the `Retrieve API` includes the the `retrieved text chunks`, the `location type` and `URI` of the source data, as well as the relevance `scores` of the retrievals.

In [None]:
query = "By what percentage did AWS revenue grow year-over-year in 2022?"

<details>
<summary>Click here for the solution</summary>
    
```python
retriever = AmazonKnowledgeBasesRetriever(
    knowledge_base_id=knowledge_base_id,
    retrieval_config={
        "vectorSearchConfiguration": {
            "numberOfResults": 4,
            "overrideSearchType": "SEMANTIC",
        }
    },
)
```
    
</details>

In [None]:
docs = retriever.invoke(input=query)
pp.pprint(docs)

## Prompt specific to the model to personalize responses
Here, we will use the specific prompt below for the model to act as a financial advisor AI system that will provide answers to questions by using fact based and statistical information when possible. We will provide the Retrieve API responses from above as a part of the `{context}` in the prompt for the model to refer to, along with the user `query`.

In [None]:
from langchain.prompts import PromptTemplate

PROMPT_TEMPLATE = """
Human: You are a financial advisor AI system, and provides answers to questions by using fact based and statistical information when possible. 
Use the following pieces of information to provide a concise answer to the question enclosed in <question> tags. 
If you don't know the answer, just say that you don't know, don't try to make up an answer.

<context>
{context}
</context>

<question>
{question}
</question>

The response should be specific and use statistics or numbers when possible.

Assistant:"""

claude_prompt = PromptTemplate(
    template=PROMPT_TEMPLATE,
    input_variables=["context", "question"]
)

### Building a Q&A Application Using RetrievalQA Chain

Integrate the retriever and the LLM (defined earlier) with the `RetrievalQA` chain to create a Question & Answer (Q&A) application.

```python
from langchain.chains import RetrievalQA
```

For more information on the `RetrievalQA` chain, refer to the official documentation:

- [RetrievalQA API Reference](https://python.langchain.com/api_reference/langchain/chains/langchain.chains.retrieval_qa.base.RetrievalQA.html)  
- [Chain Types Documentation](https://python.langchain.com/v0.1/docs/modules/chains/)

<details>
<summary>Click here for the solution</summary>
    
```python
from langchain.chains import RetrievalQA

qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=retriever,
    return_source_documents=True,
    chain_type_kwargs={"prompt": claude_prompt}
)
```
    
</details>

In [None]:
answer = qa.invoke(query)
pp.pprint(answer)

### Extracting and Formatting the Response with References

The following code demonstrates how to extract the `response` from the `answer` variable and format it to include references in the specified syntax:

#### Desired Output Syntax:

The output should present the response followed by the references in the format:

```
Response:
<extracted-response-text>

References:
1. <reference-1>
2. <reference-2>
...
```

## Challenges

### **Challenge 1: Build a Knowledge Base for AWS Certification**
**Objective:** Create a knowledge base specifically for AWS Certification materials.

**Scenario:**
You are tasked with building a knowledge base to assist candidates preparing for the AWS Certification. The KB should store relevant materials, including certification guides, example questions, and printed web pages containing related content.

Collect at least three PDFs for one of the certifications, such as:
- AWS Cloud Practitioner
- AWS Machine Learning Associate
- AWS Machine Learning Specialty

**Tasks:**
1. **Data Preparation:**
   - Download at least three PDFs relevant to the AWS Certification, such as:
     - Certification guides.
     - Practice exam questions.
     - Study tips or FAQs from reputable sources.
   - Save these files locally in a folder named `certifications_materials`.

2. **Create OpenSearch Vector Index:**
   - Write a Python script to:
     - Set up an OpenSearch client using the **boto3** library.
     - Create a vector index with at least one vector field for embeddings and one field for storing metadata (e.g., title or section).
   - Use a unique name for the index, such as `certifications-index`.

3. **Upload PDFs to S3 Bucket:**
   - Write a Python script to:
     - Create an S3 bucket programmatically.
     - Upload the PDFs into the bucket under the `certifications-documents/` prefix.

4. **Create the Knowledge Base in Amazon Bedrock:**
   - Write Python code to:
     - Set up the KB using Amazon Bedrock and link it to the OpenSearch collection.
     - Ensure that embeddings are stored in the vector index created earlier.

5. **Perform a Query Using the Retrieve API:**
   - Write a Python script to query the KB for the following questions:
     - *"What are the key domains covered in the AWS Cloud Practitioner certification?"*
     - *"What are the prerequisites for taking the AWS Machine Learning Specialty certification?"*
     - *"Can you provide an example of a practice question for the AWS Machine Learning Associate certification?"*

**Deliverable:**
- Python code that performs all tasks: creating the KB, indexing the data, and performing a retrieval query.
- JSON file with the results of the two queries, including retrieved chunks and relevance scores.

### **Challenge 2: Build a Q&A System for AWS Solution Architect Certification**
**Objective:** Create a Q&A system that generates answers based on the knowledge base created in Challenge 1.

**Scenario:**
You are building an intelligent assistant for AWS Certification candidates. The assistant should generate precise, contextually accurate answers using the RetrieveAndGenerate API.

**Tasks:**
1. **Use the RetrieveAndGenerate API:**
   - Write Python code to:
     - Query the KB for context related to the user's question.
     - Generate a response using the **Claude 3** model (or any other Bedrock model).
     - Include the retrieved context in the model's prompt.

2. **Create a Custom Prompt:**
   - Write a custom prompt for the model to:
     - Provide concise answers that incorporate numerical or statistical data from the retrieved context.
     - Clearly cite the retrieved chunks as evidence in the response.
   - Example prompt format:
     ```
     You are an AI assistant specializing in AWS Solution Architect Certification. Use the following context to answer the question:
     Context:
     {retrieved_context}
     
     Question:
     {user_query}
     
     If the context does not provide enough information, respond with "I don't have enough information to answer this question."
     ```

3. **Answer Sample Questions:**
   - Generate answers for the following questions:
     - *"What are the key domains covered in the AWS Cloud Practitioner certification?"*
     - *"What are the prerequisites for taking the AWS Machine Learning Specialty certification?"*
     - *"Can you provide an example of a practice question for the AWS Machine Learning Associate certification?"*

4. **Handle Errors:**
   - Implement error handling in Python for:
     - Missing or insufficient retrieved context.
     - API invocation failures.

**Deliverable:**
- Python script to query the KB and generate responses.
- JSON file with the generated answers and retrieved evidence for the two sample questions.

**Bonus:**
- Use the `LangChain` to generate answers.

### Conclusion and Next Steps

The **Retrieve API** offers a powerful mechanism for customizing your **Retrieval-Augmented Generation (RAG)** applications. You can utilize the `InvokeModel` API from Amazon Bedrock or integrate it with LangChain using the `AmazonKnowledgeBaseRetriever`. This API enables flexibility in choosing the right foundation model from Amazon Bedrock and selecting the most suitable search type, whether **HYBRID** or **SEMANTIC**, tailored to your specific use case.

For more information on the Hybrid Search feature, refer to the [official blog post](https://aws.amazon.com/blogs/machine-learning/knowledge-bases-for-amazon-bedrock-now-supports-hybrid-search/).

#### Note on Resources

It is essential to keep the CloudFormation stack and its associated resources intact for the next lab. The stack includes critical components such as the S3 bucket, OpenSearch collection, and other related resources required for subsequent exercises. 

If you are concerned about ongoing costs, ensure the resources are used exclusively for the labs and are not unnecessarily accessed outside of this environment.

#### Reminder for Future Clean-Up

Once all labs are completed, clean up provisioned resources to avoid incurring additional costs. At that time, you can delete the CloudFormation stack using the following code:

```python
cloudformation_client.delete_stack(StackName=stack_name)
print("Stack deletion initiated.")
```

Ensure you only perform this step after confirming that the resources are no longer required.

In [None]:
%store stack_name

In [None]:
%store unique_id