## Building Q&A application using Knowledge Bases for Amazon Bedrock - RetrieveAndGenerate API
### Context

With knowledge bases, you can securely connect foundation models (FMs) in Amazon Bedrock to your company
data for Retrieval Augmented Generation (RAG). Access to additional data helps the model generate more relevant,
context-speciﬁc, and accurate responses without continuously retraining the FM. All information retrieved from
knowledge bases comes with source attribution to improve transparency and minimize hallucinations. For more information on creating a knowledge base using console, please refer to this [post](!https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base.html).

In this notebook, we will dive deep into building Q&A application using `RetrieveAndGenerate` API provided by Knowledge Bases for Amazon Bedrock. This API will query the knowledge base to get the desired number of document chunks based on similarity search, integrate it with Large Language Model (LLM) for answering questions.


### Pattern

We can implement the solution using Retreival Augmented Generation (RAG) pattern. RAG retrieves data from outside the language model (non-parametric) and augments the prompts by adding the relevant retrieved data in context. Here, we are performing RAG effectively on the knowledge base created in the previous notebook or using console. 

### Pre-requisite

Before being able to answer the questions, the documents must be processed and stored in knowledge base.

1. Load the documents into the knowledge base by connecting your s3 bucket (data source). 
2. Ingestion - Knowledge base will split them into smaller chunks (based on the strategy selected), generate embeddings and store it in the associated vectore store.

![data_ingestion.png](./images/data_ingestion.png)


#### Notebook Walkthrough

For our notebook we will use the `RetreiveAndGenerate API` provided by Knowledge Bases for Amazon Bedrock which converts user queries into
embeddings, searches the knowledge base, get the relevant results, augment the prompt and then invoking a LLM to generate the response. 

We will use the following workflow for this notebook. 

![retrieveAndGenerate.png](./images/retrieveAndGenerate.png)


### USE CASE:

#### Dataset

In this example, you will use several years of Amazon's Letter to Shareholders as a text corpus to perform Q&A on. This data is already ingested into the knowledge base. You will need the `knowledge base id` and `model ARN` to run this example. We are using `Anthropic Claude Instant` model for generating responses to user questions.

### Python 3.10

⚠  For this lab we need to run the notebook based on a Python 3.10 runtime. ⚠

### Setup

Install following packages. 

In [None]:
%pip install --upgrade pip
%pip install boto3==1.33.2 --force-reinstall --quiet
%pip install botocore==1.33.2 --force-reinstall --quiet
%pip install opensearch-py
%pip install requests-aws4auth

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
# helper functions for bedrock agent & knowledge base creation
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
import botocore
import time
import json
import os

opensearch_serverless_client = boto3.client('opensearchserverless')
agent_client = boto3.client("bedrock-agent")
lambda_client = boto3.client('lambda')
iam = boto3.client('iam')
s3 = boto3.client('s3')

def get_agent_id_and_s3_bucket_name_from_payload(props):
    
    print(f'Properties: {props}')
    agent_id = ''
    s3_bucket_name = ''
    
    for prop in props:
        print(prop)
        if prop['name'] == 'agentId':
            agent_id = prop['value']
        elif prop['name'] == 's3KnowledgeBaseBucketName':
            s3_bucket_name = prop['value']
            
    return s3_bucket_name, agent_id


def create_encryption_policy(agent_id):
    """Creates an encryption policy that matches all collections beginning with collection-""" + agent_id + """"""
    try:
        response = opensearch_serverless_client.create_security_policy(
            description=f'Encryption policy created by an agent with id {agent_id}.',
            name=f'agent-{agent_id}-policy',
            policy="""
                {
                    \"Rules\":[
                        {
                            \"ResourceType\":\"collection\",
                            \"Resource\":[
                                \"collection\/collection-""" + agent_id + """*\"
                            ]
                        }
                    ],
                    \"AWSOwnedKey\":true
                }
                """,
            type='encryption'
        )
        print('\nEncryption policy created:')
        print(response)
    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'ConflictException':
            print(
                '[ConflictException] The policy name or rules conflict with an existing policy.')
        else:
            raise error


def create_network_policy(agent_id):
    """Creates a network policy that matches all collections beginning with collection-""" + agent_id + """"""
    try:
        response = opensearch_serverless_client.create_security_policy(
            description=f'Network policy created by an agent with id {agent_id}.',
            name=f'agent-{agent_id}-policy',
            policy="""
                [{
                    \"Description\":\"Public access for the collection created by an agent with id """ + agent_id +""".\",
                    \"Rules\":[
                        {
                            \"ResourceType\":\"dashboard\",
                            \"Resource\":[\"collection\/collection-""" + agent_id + """*\"]
                        },
                        {
                            \"ResourceType\":\"collection\",
                            \"Resource\":[\"collection\/collection-""" + agent_id + """*\"]
                        }
                    ],
                    \"AllowFromPublic\":true
                }]
                """,
            type='network'
        )
        print('\nNetwork policy created:')
        print(response)
    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'ConflictException':
            print(
                '[ConflictException] A network policy with this name already exists.')
        else:
            raise error


def create_access_policy(account_id,
                         agent_id,
                         lambda_role_arn, 
                         knowledge_base_role_arn,
                         account_iam_role):
    """Creates a data access policy that matches all collections beginning with agent-""" + agent_id + """"""
    try:
        response = opensearch_serverless_client.create_access_policy(
            description=f'Data access policy for collections created by an agent with id {agent_id}.',
            name=f'agent-{agent_id}-policy',
            policy="""
                [{
                    \"Rules\":[
                        {
                            \"Resource\":[
                                \"index\/collection-""" + agent_id + """*\/*\"
                            ],
                            \"Permission\":[
                                \"aoss:CreateIndex\",
                                \"aoss:DeleteIndex\",
                                \"aoss:UpdateIndex\",
                                \"aoss:DescribeIndex\",
                                \"aoss:ReadDocument\",
                                \"aoss:WriteDocument\"
                            ],
                            \"ResourceType\": \"index\"
                        },
                        {
                            \"Resource\":[
                                \"collection\/collection-""" + agent_id + """*\"
                            ],
                            \"Permission\":[
                                \"aoss:CreateCollectionItems\",
                                \"aoss:DescribeCollectionItems\",
                                \"aoss:DeleteCollectionItems\",
                                \"aoss:UpdateCollectionItems\"
                            ],
                            \"ResourceType\": \"collection\"
                        }
                    ],
                    \"Principal\": [
                        \"""" + knowledge_base_role_arn + """\", 
                        \"""" + account_iam_role + """\"
                    ]
                }]
                """,
            type='data'
        )
        print('\nAccess policy created:')
        print(response)
    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'ConflictException':
            print(
                '[ConflictException] An access policy with this name already exists.')
        else:
            raise error


def create_collection(agent_id):
    """Creates a collection"""
    try:
        response = opensearch_serverless_client.create_collection(
            name=f'collection-{agent_id}',
            description=f'Collection created by an agent with id {agent_id}.',
            type='VECTORSEARCH'
        )
        return(response['createCollectionDetail']['arn'])
    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'ConflictException':
            print(
                '[ConflictException] A collection with this name already exists. Try another name.')
        else:
            raise error


def wait_for_collection_creation(awsauth,
                                 agent_id,
                                 vector_index_name, 
                                 text_field, 
                                 bedrock_metadata_field,
                                 vector_field_name):
    """Waits for the collection to become active"""
    response = opensearch_serverless_client.batch_get_collection(
        names=[f'collection-{agent_id}'])
    # Periodically check collection status
    while (response['collectionDetails'][0]['status']) == 'CREATING':
        print('Creating collection...')
        time.sleep(30)
        response = opensearch_serverless_client.batch_get_collection(
            names=[f'collection-{agent_id}'])
    print('\nCollection successfully created:')
    print(response["collectionDetails"])
    # Extract the collection endpoint from the response
    host = (response['collectionDetails'][0]['collectionEndpoint'])
    final_host = host.replace("https://", "")
    index_data(host=final_host, 
               awsauth=awsauth, 
               vector_index_name=vector_index_name,
               bedrock_metadata_field=bedrock_metadata_field,
               text_field=text_field,
               vector_field_name=vector_field_name)


def index_data(host, awsauth, vector_index_name, text_field, 
               bedrock_metadata_field, vector_field_name):
    """Create an index"""
    # Build the OpenSearch client
    client = OpenSearch(
        hosts=[{'host': host, 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        timeout=300
    )
    # It can take up to a minute for data access rules to be enforced
    time.sleep(45)
    
    # Create index
    body = {
      "mappings": {
        "properties": {
          f"{bedrock_metadata_field}": {
            "type": "text",
            "index": False
          },
          "id": {
            "type": "text",
            "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
              }
            }
          },
          f"{text_field}": {
            "type": "text",
            "index": False
          },
          f"{vector_field_name}": {
            "type": "knn_vector",
            "dimension": 1536,
            "method": {
              "engine": "nmslib",
              "space_type": "cosinesimil",
              "name": "hnsw"
            }
          }
        }
      },
      "settings": {
        "index": {
          "number_of_shards": 2,
          "knn.algo_param": {
            "ef_search": 512
          },
          "knn": True,
        }
      }
    }

    response = client.indices.create(index=vector_index_name, body=body)
    print('\nCreating index:')
    print(response)
    

def create_allow_bedrock_iam_policy(policy_name, agent_id):
    
    bedrock_allow_models_policy = """{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "bedrock:InvokeModel",
          "bedrock:ListCustomModels",
          "bedrock:ListFoundationModels"
        ],
        "Resource": "*"
      }
    ]
    }"""
    
    policy = iam.create_policy(
            PolicyName=policy_name,
            Description=f"Policy for Bedrock Invoke Model, List Models and ListFoundationModels create by an agent with id {agent_id}.",
            PolicyDocument=bedrock_allow_models_policy,
        )
        
    return policy['Policy']['Arn']
    
    
def create_allow_collection_access(policy_name, collection_arn, agent_id):
    
    collection_allow_access_policy = """{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "aoss:APIAccessAll"
         ],
         "Resource": [
           \"""" + collection_arn + """\"
         ]
      }
    ]
    }"""
    
    policy = iam.create_policy(
            PolicyName=policy_name,
            Description=f"Policy for access to the Opensearch by the Knowledge Base created by an agent with id {agent_id}.",
            PolicyDocument=collection_allow_access_policy,
        )
        
    return policy['Policy']['Arn']

    
def create_knowledge_base_iam_role(role_name, account_id, region):
  
    basic_role = """{
    "Version": "2012-10-17",
    "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "bedrock.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": \"""" + account_id + """\"
        },
        "ArnLike": {
          "AWS:SourceArn": \"arn:aws:bedrock:""" + region + """:""" + account_id + """:knowledge-base/*\"
        }
      }
    }
    ]
    }"""
  
    iam.create_role(RoleName=role_name, 
      AssumeRolePolicyDocument=basic_role)

    # This role has the AmazonOpenSearchServiceReadOnlyAccess managed policy.
    iam.attach_role_policy(RoleName=role_name, 
      PolicyArn='arn:aws:iam::aws:policy/AmazonOpenSearchServiceReadOnlyAccess')
    # This role has the AmazonS3ReadOnlyAccess managed policy.
    iam.attach_role_policy(RoleName=role_name,
      PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess')
      
    return iam.get_role(RoleName=role_name)['Role']['Arn']
    
    
def attach_bedrock_and_collection_policies(role_name,
                                           bedrock_policy_arn,
                                           collection_policy_arn):
    
    iam.attach_role_policy(RoleName=role_name,
      PolicyArn=bedrock_policy_arn)
    iam.attach_role_policy(RoleName=role_name,
      PolicyArn=collection_policy_arn)
      
    return
    
    
def create_knowledge_base(collection_arn, 
                          vector_field_name,
                          vector_index_name,
                          knowledge_base_role_arn,
                          text_field,
                          bedrock_metadata_field,
                          agent_id):
    
    knowledge_base_config = {
      "type": "VECTOR",
      "vectorKnowledgeBaseConfiguration": {
        "embeddingModelArn": "arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1"
      }
    }

    storage_config = {
      "opensearchServerlessConfiguration": {
        "collectionArn": collection_arn, 
        "fieldMapping": {
          "metadataField": bedrock_metadata_field,
          "textField": text_field,  
          "vectorField": vector_field_name
        },
        "vectorIndexName": vector_index_name
      },
      "type": "OPENSEARCH_SERVERLESS" 
    }

    response = agent_client.create_knowledge_base(
        name=f'Agent-{agent_id}-KnowledgeBase-Opensearch',
        description=f'Knowledge base created by an agent with id {agent_id}.',
        roleArn=knowledge_base_role_arn,
        knowledgeBaseConfiguration=knowledge_base_config,
        storageConfiguration=storage_config)
        
    return(response['knowledgeBase']['knowledgeBaseId'])


def create_data_source(knowledge_base_id, s3_bucket_name, agent_id):
    
    # Set up bucket arn from user's 's3_bucket_name'
    s3_bucket_arn = f'arn:aws:s3:::{s3_bucket_name}'
    
    data_source_configuration = {
      "s3Configuration": {
        "bucketArn": s3_bucket_arn
      },
        "type": "S3"
    }
    
    response = agent_client.create_data_source(
        knowledgeBaseId=knowledge_base_id,
        name=f'Agent-{agent_id}-DataSource',
        dataSourceConfiguration=data_source_configuration)
        
    return response['dataSource']['dataSourceId']
    
    
def associate_knowledge_base(agent_id, knowledge_base_id):
    
    agent_kb_description = agent_client.associate_agent_knowledge_base(
    agentId=agent_id,
    agentVersion='DRAFT',
    description='Modify this instruction as needed.',
    knowledgeBaseId=knowledge_base_id
)
    
    return agent_kb_description

def start_ingestion_job(knowledge_base_id, dataSourceId):
    response = agent_client.start_ingestion_job(
        knowledgeBaseId=knowledge_base_id,
        dataSourceId=dataSourceId
    )
    return response

In [None]:
import random
random_id = random.randint(0,1000)

# Get role to attach to Opensearch allow list
from sagemaker import get_execution_role

role = get_execution_role()
print(role)
# Get account id and current region
account_id = role.split(":")[4]
print(account_id)
region = os.environ['AWS_REGION']
print(region)

# specify bucket with data to index for RAG
s3_bucket_name = 'sagemaker-studio-' + str(account_id) # "sagemaker-studio-XXX" 

# Set up auth for Opensearch client
service = 'aoss'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
               region, service, session_token=credentials.token)

agent_id_lowercase = str(random_id) #.lower() # To satisfy collection policy name constraints
bedrock_policy_name = f"bedrock-policy-name-created-by-agent-{random_id}"
collection_policy_name = f"collection-policy-name-created-by-agent-{random_id}"
knowledge_base_role_name = f"AmazonBedrockExecutionRoleForKnowledgeBase_{random_id}"
vector_field_name = f"embeddings-{agent_id_lowercase}"
vector_index_name = f"vector-{agent_id_lowercase}"
text_field = "text-field"
bedrock_metadata_field = "bedrock-managed-metadata-field"

knowledge_base_role_arn = create_knowledge_base_iam_role(knowledge_base_role_name, account_id, region)
print(knowledge_base_role_arn)
create_encryption_policy(agent_id=agent_id_lowercase)
create_network_policy(agent_id=agent_id_lowercase)
create_access_policy(account_id=account_id,
                     agent_id=agent_id_lowercase,
                     lambda_role_arn=role, 
                     knowledge_base_role_arn=knowledge_base_role_arn,
                     account_iam_role=role)

collection_arn = create_collection(agent_id=agent_id_lowercase)
wait_for_collection_creation(awsauth=awsauth, 
                             agent_id=agent_id_lowercase,
                             vector_index_name=vector_index_name,
                             text_field=text_field,
                             bedrock_metadata_field=bedrock_metadata_field,
                             vector_field_name=vector_field_name)

bedrock_policy_arn = create_allow_bedrock_iam_policy(policy_name=bedrock_policy_name,
                                                     agent_id=random_id)
collection_policy_arn = create_allow_collection_access(policy_name=collection_policy_name, 
                                                       collection_arn=collection_arn,
                                                       agent_id=random_id)

# Pause to make sure iam policies are created                           
time.sleep(10)                                                       

attach_bedrock_and_collection_policies(role_name=knowledge_base_role_name,
                                       collection_policy_arn=collection_policy_arn,
                                       bedrock_policy_arn=bedrock_policy_arn)

# Pause to make sure iam policies are attached                           
time.sleep(10)

knowledge_base_id = create_knowledge_base(collection_arn=collection_arn, 
                                          vector_field_name=vector_field_name,
                                          vector_index_name=vector_index_name,
                                          knowledge_base_role_arn=knowledge_base_role_arn,
                                          text_field=text_field,
                                          bedrock_metadata_field=bedrock_metadata_field,
                                          agent_id=random_id)
print(knowledge_base_id)
datasource_id = create_data_source(knowledge_base_id=knowledge_base_id,
                   s3_bucket_name=s3_bucket_name,
                   agent_id=random_id)

print(datasource_id)
def start_ingestion_job(knowledge_base_id, dataSourceId):
    response = agent_client.start_ingestion_job(
        knowledgeBaseId=knowledge_base_id,
        dataSourceId=dataSourceId
    )
    return response

response = start_ingestion_job(knowledge_base_id, datasource_id)
print(response)

In [None]:
import boto3
import pprint
from botocore.client import Config

pp = pprint.PrettyPrinter(indent=2)

bedrock_config = Config(connect_timeout=120, read_timeout=120, retries={'max_attempts': 0})
bedrock_client = boto3.client('bedrock-runtime')
bedrock_agent_client = boto3.client("bedrock-agent-runtime",
                              config=bedrock_config)

model_id = "anthropic.claude-instant-v1" # try with both claude instant as well as claude-v2. for claude v2 - "anthropic.claude-v2"
kb_id = knowledge_base_id
%store kb_id

## RetreiveAndGenerate API
Behind the scenes, `RetrieveAndGenerate` API converts queries into embeddings, searches the knowledge base, and then augments the foundation model prompt with the search results as context information and returns the FM-generated response to the question. For multi-turn conversations, Knowledge Bases manage short-term memory of the conversation to provide more contextual results. 

The output of the `RetrieveAndGenerate` API includes the   `generated response`, `source attribution` as well as the `retrieved text chunks`. 

In [None]:
def retrieveAndGenerate(input, kbId, sessionId=None, model_id = "anthropic.claude-instant-v1"):
    model_arn = f'arn:aws:bedrock:us-east-1::foundation-model/{model_id}'
    if sessionId:
        return bedrock_agent_client.retrieve_and_generate(
            input={
                'text': input
            },
            retrieveAndGenerateConfiguration={
                'type': 'KNOWLEDGE_BASE',
                'knowledgeBaseConfiguration': {
                    'knowledgeBaseId': kbId,
                    'modelArn': model_arn
                }
            },
            sessionId=sessionId
        )
    else:
        return bedrock_agent_client.retrieve_and_generate(
            input={
                'text': input
            },
            retrieveAndGenerateConfiguration={
                'type': 'KNOWLEDGE_BASE',
                'knowledgeBaseConfiguration': {
                    'knowledgeBaseId': kbId,
                    'modelArn': model_arn
                }
            }
        )

In [None]:
query = "Are Amazon's latest financial results good?"
response = retrieveAndGenerate(query, kb_id,model_id=model_id)
generated_text = response['output']['text']
pp.pprint(generated_text)

In [None]:
citations = response["citations"]
contexts = []
for citation in citations:
    retrievedReferences = citation["retrievedReferences"]
    for reference in retrievedReferences:
         contexts.append(reference["content"]["text"])

pp.pprint(contexts)

## Next Steps

If you want more customized experience, you can use `Retrieve API`. This API converts user queries into embeddings, searches the knowledge base, and returns the relevant results, giving you more control to build custom workflows on top of the semantic search results. 
For sample code, try following notebooks: 
- `customized-rag-retrieve-api-claude-v2.ipynb` - it calls the `retrieve` API to get relevant contexts and then augment the context to the prompt, which you can provide as input to any text-text model provided by Amazon Bedrock. 
  
- You can use the RetrieveQA chain from LangChain and add Knowledge Base as retriever. For sample code, try notebook: 
`customized-rag-retrieve-api-langchain-claude.ipynb`

- If you are interested in evaluating your RAG application, for sample code, try notebook: 
`customized-rag-retrieve-api-titan-lite-evaluation` where we are using `Amazon Titan Lite` model for generating responses and `Anthropic Claude V2` for evaluating response. 
