# Knowledge Bases for Amazon Bedrock with Web URL Data Connector 

## Setup 
Before running the rest of this notebook, you'll need to run the cells below to (ensure necessary libraries are installed and) connect to Bedrock.

In [None]:
# %pip install -U opensearch-py==2.3.1
# %pip install -U boto3==1.34.143
# %pip install -U retrying==1.3.4

In [1]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [2]:
import warnings
warnings.filterwarnings('ignore')

## Create a vector store - OpenSearch Serverless index

### Step 1 - Create OSS policies and collection
Firt of all we have to create a vector store. In this section we will use *Amazon OpenSerach serverless.*

Amazon OpenSearch Serverless is a serverless option in Amazon OpenSearch Service. As a developer, you can use OpenSearch Serverless to run petabyte-scale workloads without configuring, managing, and scaling OpenSearch clusters. You get the same interactive millisecond response times as OpenSearch Service with the simplicity of a serverless environment. Pay only for what you use by automatically scaling resources to provide the right amount of capacity for your application—without impacting data ingestion.

In [3]:
import json
import os
import boto3
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss
import random
from retrying import retry
suffix = random.randrange(200, 900)

boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'

bucket_name = <PUT YOUR BUCKET NAME> # Provide your bucket name which is already created

In [4]:
import time

vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

In [5]:
# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss( vector_store_name=vector_store_name,
                                                                           aoss_client=aoss_client,
                                                                           bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

In [6]:
print(collection)
time.sleep(10)

{'createCollectionDetail': {'arn': 'arn:aws:aoss:us-east-1:507922848584:collection/n7bvyeb0mdj42h0spssj', 'createdDate': 1721171086378, 'id': 'n7bvyeb0mdj42h0spssj', 'kmsKeyArn': 'auto', 'lastModifiedDate': 1721171086378, 'name': 'bedrock-sample-rag-683', 'standbyReplicas': 'ENABLED', 'status': 'CREATING', 'type': 'VECTORSEARCH'}, 'ResponseMetadata': {'RequestId': '79d69662-6df5-4b6c-9232-87d0db99b0c4', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '79d69662-6df5-4b6c-9232-87d0db99b0c4', 'date': 'Tue, 16 Jul 2024 23:04:46 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '314', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}


In [7]:
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

n7bvyeb0mdj42h0spssj.us-east-1.aoss.amazonaws.com


In [8]:
# create oss policy and attach it to Bedrock execution role
create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                bedrock_kb_execution_role=bedrock_kb_execution_role)

Opensearch serverless arn:  arn:aws:iam::507922848584:policy/AmazonBedrockOSSPolicyForKnowledgeBase_797


## Step 2 - Create vector index

In [9]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true"
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
            "method": {
                "name": "hnsw",
                "engine": "faiss",  
                "space_type": "l2",
                "parameters": {
                    "ef_construction": 200,
                    "m": 16
                }
            }
        },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
time.sleep(60)

In [10]:
# Create index
response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
print('\nCreating index:')
print(response)


Creating index:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'bedrock-sample-index-683'}


## Create Knowledge Base
Steps:
- initialize Open search serverless configuration which will include collection ARN, index name, vector field, text field and metadata field.
- initialize chunking strategy, based on which KB will split the documents into pieces of size equal to the chunk size mentioned in the `chunkingStrategyConfiguration`.
- initialize the web URL configuration, which will be used to create the data source object later.
- initialize the Titan embeddings model ARN, as this will be used to create the embeddings for each of the text chunks.

In [11]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

webConfiguration = {"sourceConfiguration": {
                          "urlConfiguration": {
                           "seedUrls": [{
                                    "url": "https://www.datascienceportfol.io/suman"                  #### <<<<<------ <Change this to your Web URL> 
                                }]
                            }
                        },
                     "crawlerConfiguration": {
                            "crawlerLimits": {
                                "rateLimit": 50
                            },
                            "scope": "HOST_ONLY"
                        }
                   }         


embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Bedrock Knowledge Bases for Web URL Connector"
roleArn = bedrock_kb_execution_role_arn


Provide the above configurations as input to the `create_knowledge_base` method, which will create the Knowledge base.

In [12]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [13]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [14]:
print(kb)

{'createdAt': datetime.datetime(2024, 7, 16, 23, 7, 54, 679021, tzinfo=tzutc()), 'description': 'Bedrock Knowledge Bases for Web URL Connector', 'knowledgeBaseArn': 'arn:aws:bedrock:us-east-1:507922848584:knowledge-base/1YIB0D2DOM', 'knowledgeBaseConfiguration': {'type': 'VECTOR', 'vectorKnowledgeBaseConfiguration': {'embeddingModelArn': 'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'}}, 'knowledgeBaseId': '1YIB0D2DOM', 'name': 'bedrock-sample-knowledge-base-683', 'roleArn': 'arn:aws:iam::507922848584:role/AmazonBedrockExecutionRoleForKnowledgeBase_797', 'status': 'CREATING', 'storageConfiguration': {'opensearchServerlessConfiguration': {'collectionArn': 'arn:aws:aoss:us-east-1:507922848584:collection/n7bvyeb0mdj42h0spssj', 'fieldMapping': {'metadataField': 'text-metadata', 'textField': 'text', 'vectorField': 'vector'}, 'vectorIndexName': 'bedrock-sample-index-683'}, 'type': 'OPENSEARCH_SERVERLESS'}, 'updatedAt': datetime.datetime(2024, 7, 16, 23, 7, 54, 679021

In [15]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

Next we need to create a data source, which will be associated with the knowledge base created above. Once the data source is ready, we can then start to ingest the documents.

In [None]:
# Just for documentation :) 
# help(bedrock_agent_client.create_data_source)

In [16]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataDeletionPolicy = 'DELETE',
    dataSourceConfiguration = {
        "type": "WEB",
        "webConfiguration":webConfiguration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
print(ds)

{'createdAt': datetime.datetime(2024, 7, 16, 23, 8, 5, 454853, tzinfo=tzutc()), 'dataDeletionPolicy': 'DELETE', 'dataSourceConfiguration': {'type': 'WEB', 'webConfiguration': {'crawlerConfiguration': {'crawlerLimits': {'rateLimit': 50}, 'scope': 'HOST_ONLY'}, 'sourceConfiguration': {'urlConfiguration': {'seedUrls': [{'url': 'https://www.datascienceportfol.io/suman'}]}}}}, 'dataSourceId': 'ZQHAKAWVXV', 'description': 'Bedrock Knowledge Bases for Web URL Connector', 'knowledgeBaseId': '1YIB0D2DOM', 'name': 'bedrock-sample-knowledge-base-683', 'status': 'AVAILABLE', 'updatedAt': datetime.datetime(2024, 7, 16, 23, 8, 5, 454853, tzinfo=tzutc()), 'vectorIngestionConfiguration': {'chunkingConfiguration': {'chunkingStrategy': 'FIXED_SIZE', 'fixedSizeChunkingConfiguration': {'maxTokens': 512, 'overlapPercentage': 20}}}}


In [17]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

{'ResponseMetadata': {'RequestId': 'c572864d-d8d9-443a-8c5d-33456095e7a5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Tue, 16 Jul 2024 23:08:29 GMT',
   'content-type': 'application/json',
   'content-length': '734',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'c572864d-d8d9-443a-8c5d-33456095e7a5',
   'x-amz-apigw-id': 'bByJFEBroAMEkjA=',
   'x-amzn-trace-id': 'Root=1-6696fd6d-651c5ab273c33e26510d2e8e'},
  'RetryAttempts': 0},
 'dataSource': {'createdAt': datetime.datetime(2024, 7, 16, 23, 8, 5, 454853, tzinfo=tzutc()),
  'dataDeletionPolicy': 'DELETE',
  'dataSourceConfiguration': {'type': 'WEB',
   'webConfiguration': {'crawlerConfiguration': {'crawlerLimits': {'rateLimit': 50},
     'scope': 'HOST_ONLY'},
    'sourceConfiguration': {'urlConfiguration': {'seedUrls': [{'url': 'https://www.datascienceportfol.io/suman'}]}}}},
  'dataSourceId': 'ZQHAKAWVXV',
  'description': 'Bedrock Knowledge Bases for Web URL Connector',
  'knowledgeBaseId': '1YIB0D2DOM',
  'name': 

### Start ingestion job
Once the KB and data source is created, we can start the ingestion job.
During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS.

In [18]:
# Start an ingestion job
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [19]:
job = start_job_response["ingestionJob"]
print(job)

{'dataSourceId': 'ZQHAKAWVXV', 'ingestionJobId': 'LWC9QQUIVA', 'knowledgeBaseId': '1YIB0D2DOM', 'startedAt': datetime.datetime(2024, 7, 16, 23, 8, 33, 484474, tzinfo=tzutc()), 'statistics': {'numberOfDocumentsDeleted': 0, 'numberOfDocumentsFailed': 0, 'numberOfDocumentsScanned': 0, 'numberOfMetadataDocumentsModified': 0, 'numberOfMetadataDocumentsScanned': 0, 'numberOfModifiedDocumentsIndexed': 0, 'numberOfNewDocumentsIndexed': 0}, 'status': 'STARTING', 'updatedAt': datetime.datetime(2024, 7, 16, 23, 8, 33, 484474, tzinfo=tzutc())}


In [20]:
# Get job 
while(job['status']!='COMPLETE' ):
  get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
  job = get_job_response["ingestionJob"]
print(job)
time.sleep(40)

{'dataSourceId': 'ZQHAKAWVXV', 'failureReasons': ['["Skipped document: https://github.com/debnsuma/masters-ml/blob/main/Case_study/01_SQL_Target/Business%20Case_%20Target%20SQL.pdf. The resource you are requesting doesn\'t exist."]'], 'ingestionJobId': 'LWC9QQUIVA', 'knowledgeBaseId': '1YIB0D2DOM', 'startedAt': datetime.datetime(2024, 7, 16, 23, 8, 33, 484474, tzinfo=tzutc()), 'statistics': {'numberOfDocumentsDeleted': 0, 'numberOfDocumentsFailed': 0, 'numberOfDocumentsScanned': 12, 'numberOfMetadataDocumentsModified': 0, 'numberOfMetadataDocumentsScanned': 0, 'numberOfModifiedDocumentsIndexed': 0, 'numberOfNewDocumentsIndexed': 11}, 'status': 'COMPLETE', 'updatedAt': datetime.datetime(2024, 7, 16, 23, 14, 12, 871316, tzinfo=tzutc())}


In [21]:
kb_id = kb["knowledgeBaseId"]
print(kb_id)

1YIB0D2DOM


In [22]:
%store kb_id

Stored 'kb_id' (str)


## Test the knowledge base
### Using RetrieveAndGenerate API
Behind the scenes, RetrieveAndGenerate API converts queries into embeddings, searches the knowledge base, and then augments the foundation model prompt with the search results as context information and returns the FM-generated response to the question. For multi-turn conversations, Knowledge Bases manage short-term memory of the conversation to provide more contextual results.

The output of the RetrieveAndGenerate API includes the generated response, source attribution as well as the retrieved text chunks.

In [23]:
# try out KB using RetrieveAndGenerate API
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime", region_name=region_name)
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"                                           # <Change it to any model of your choice which is supported by KB>
model_arn = f'arn:aws:bedrock:us-east-1::foundation-model/{model_id}'

In [24]:
query = "What are the projects Suman has worked on?"
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        'text': query
    },
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': kb_id,
            'modelArn': model_arn
        }
    },
)

generated_text = response['output']['text']

print(generated_text)

Based on the information provided, some of the projects Suman Debnath has worked on include:

- Comprehensive e-commerce analytics for a leading retailer in Brazil, involving data analysis, visualization, and identifying trends and seasonal variances in order data.
- Strategic media content analysis for a leading video streaming platform, using data analysis and visualization to drive content development and market expansion decisions.
- Customer segmentation analysis for a major fitness equipment brand, using statistical analysis to define target customer profiles for treadmill products.
- Statistical customer behavior analysis for a major retail corporation, analyzing Black Friday transaction data to compare spending behaviors across demographics. Additionally, Suman has worked on:

- Efficient feature engineering for a leading logistics organization, cleaning and engineering features from raw data to enhance forecasting models and improve operational efficiency.
- Creating a Tableau

In [25]:
## print out the source attribution/citations from the original documents to see if the response generated belongs to the context.
citations = response["citations"]
contexts = []
for citation in citations:
    retrievedReferences = citation["retrievedReferences"]
    for reference in retrievedReferences:
        contexts.append(reference["content"]["text"])

print(contexts)

['Suman Debnath | Principal Developer Advocate (AI/ML) | Data portfolio Suman Debnath Principal Developer Advocate (AI/ML) Based in Boston, US LinkedIn](https://www.linkedin.com/in/suman-d/)[ email me About Suman Debnath is a Principal Machine Learning Advocate at Amazon Web Services. He transitioned to machine learning in 2020 after a career in systems, storage and performance engineering. Currently, his focus is on Supervised Learning, Natural Language Processing (NLP), Large Language Models (LLMs), and Retrieval Augmented Generation (RAG). Suman is committed to leveraging open-source tools like TensorFlow, PyTorch, Numpy, Spark, and Pandas for advancing machine learning. He has developed performance benchmarking and monitoring tools for distributed storage systems. Suman has spoken at over 250 global events, including PyCon, PyData, ODSC, and meetups across multiple countries. At present, he leads the Global Developer Advocate team for Machine Learning and GenAI at AWS. Skills Pytho

### Retrieve API
Retrieve API converts user queries into embeddings, searches the knowledge base, and returns the relevant results, giving you more control to build custom workﬂows on top of the semantic search results. The output of the Retrieve API includes the the retrieved text chunks, the location type and URI of the source data, as well as the relevance scores of the retrievals.

In [26]:
# retreive api for fetching only the relevant context.
relevant_documents = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id,
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)

In [27]:
print(relevant_documents["retrievalResults"])

[{'content': {'text': 'Suman Debnath | StackOverflow AWS Engagement Insights Dashboard Suman Debnath Principal Developer Advocate (AI/ML) Based in Boston, US LinkedIn](https://www.linkedin.com/in/suman-d/)[ email me About Suman Debnath is a Principal Machine Learning Advocate at Amazon Web Services. He transitioned to machine learning in 2020 after a career in systems, storage and performance engineering. Currently, his focus is on Supervised Learning, Natural Language Processing (NLP), Large Language Models (LLMs), and Retrieval Augmented Generation (RAG). Suman is committed to leveraging open-source tools like TensorFlow, PyTorch, Numpy, Spark, and Pandas for advancing machine learning. He has developed performance benchmarking and monitoring tools for distributed storage systems. Suman has spoken at over 250 global events, including PyCon, PyData, ODSC, and meetups across multiple countries. At present, he leads the Global Developer Advocate team for Machine Learning and GenAI at AW

## Clean up
Please make sure to comment the below section if you are planning to use the Knowledge Base that you created above for building your RAG application.
If you only wanted to try out creating the KB using SDK, then please make sure to delete all the resources that were created as you will be incurred cost for storing documents in OSS index.

In [None]:
# # Delete KnowledgeBase
# bedrock_agent_client.delete_data_source(dataSourceId = ds["dataSourceId"], knowledgeBaseId=kb['knowledgeBaseId'])
# bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=kb['knowledgeBaseId'])
# oss_client.indices.delete(index=index_name)
# aoss_client.delete_collection(id=collection_id)
# aoss_client.delete_access_policy(type="data", name=access_policy['accessPolicyDetail']['name'])
# aoss_client.delete_security_policy(type="network", name=network_policy['securityPolicyDetail']['name'])
# aoss_client.delete_security_policy(type="encryption", name=encryption_policy['securityPolicyDetail']['name'])

In [None]:
# # delete role and policies
# from utility import delete_iam_role_and_policies
# delete_iam_role_and_policies()