# Measure and Record Test for Amazon Bedrock Knowledge Bases with Amazon S3Vector as Vector Store
### Measures latency for Retrieve and Retrieve & Generate Operations
### Credit - Test Data Source used - https://github.com/docugami/KG-RAG-datasets/blob/main/sec-10-q/README.md

## Install Packages and Import Libraries

In [None]:
#!pip install --upgrade boto3
#!pip intall --upgrade botocore
#!pip install pandas

In [None]:
import boto3, json, os
import pandas as pd  
import time

## Initiate Clients

In [None]:
# Fetch region name and account number
my_session = boto3.session.Session()
region = my_session.region_name
aws_account_number = boto3.client('sts').get_caller_identity().get('Account')

In [None]:
# bedrock_client - used to create Knowledge Base
bedrock_client = boto3.client('bedrock-agent',region_name=region)
# s3vector_client - used to S3Vector bucket and index
s3vector_client = boto3.client("s3vectors", region_name=region)
# iam_client - used to create IAM Role and Policy
iam_client = boto3.client('iam',region_name=region)
# s3_client - used to create source s3 bucket and upload files for Bedrock Knowledage Base
s3_client = boto3.client('s3', region_name=region)
# bedrock_agent_runtime_client - used to call  Retrieve and Retrieve & Generate Operations on Bedrock Knowledge Base 
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime', region_name=region)

## Define the parameters
### Update as per specific requirement

In [None]:
# Bedrock Knowledage Base name
kb_name = "kb-benchmark"

# S3Vector Bucket and Index name
vector_bucket_name = kb_name + "-vector-bucket"
vector_index_name = kb_name + "-vector-index"

# IAM Role and Policy names for Bedrock Knowledage Base
role_name = kb_name + "-role"
role_policy_name = role_name + "-policy"

# Dimension of the vector for embedding
vector_dimension = 1024

# ARN of Embedding model
embeddingModelArn = "arn:aws:bedrock:" + region + "::foundation-model/cohere.embed-english-v3"

# ARN of the LLM model for Retrieve & Generate
generationModelArn = "arn:aws:bedrock:" + region + ":" + aws_account_number + ":inference-profile/us.anthropic.claude-haiku-4-5-20251001-v1:0"

# Name of the source S3 bucket for Bedrock Knowledage Base
s3_source_bucket = kb_name + "-source-data-bucket"

# Questions / query file. It is a CSV file with "Question" (at minumum) as column which has queries which we ask to KB for latency test
qnafile = "qna_data.csv"

## Create IAM Policy and Role for Amazon Bedrock Knowledge Base
### Update as per specific requirement

In [None]:
role_trust_doc= json.dumps({
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmazonBedrockKnowledgeBaseTrustPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
})

role_policy_doc= json.dumps({
	'Version': '2012-10-17',
	'Statement': [{	'Sid': 'BedrockInvokeModelStatement',
			'Effect': 'Allow',
			'Action': [
				'bedrock:InvokeModel'
			],
			'Resource': '*'
		}, {
			'Sid': 'MarketplaceOperationsFromBedrockFor3pModels',
			'Effect': 'Allow',
			'Action': [
				'aws-marketplace:Subscribe',
				'aws-marketplace:ViewSubscriptions',
				'aws-marketplace:Unsubscribe'
			],
			'Resource': '*',
			'Condition': {
				'StringEquals': {
					'aws:CalledViaLast': 'bedrock.amazonaws.com'
				}
			}
		}, 	{
			'Sid': 'S3ListBucketStatement',
			'Effect': 'Allow',
			'Action': [
				's3:ListBucket'
			],
			'Resource': '*'
		}, 	{
			'Sid': 'S3GetObjectStatement',
			'Effect': 'Allow',
			'Action': [
				's3:GetObject'
			],
			'Resource': '*'
		}, 	{
			'Sid': 'S3VectorsPermissions',
			'Effect': 'Allow',
			'Action': [
				's3vectors:GetIndex',
				's3vectors:QueryVectors',
				's3vectors:PutVectors',
				's3vectors:GetVectors',
				's3vectors:DeleteVectors'
			],
			'Resource': '*'
		}
	]
})

# Create policy
response = iam_client.create_policy(
    PolicyName=role_policy_name,
    PolicyDocument=role_policy_doc,
    Description=role_policy_name
)
policy_arn = response["Policy"]["Arn"]

# Create role
response = iam_client.create_role(
    RoleName=role_name,
    AssumeRolePolicyDocument=role_trust_doc,
    Description=role_name
)
role_arn = response["Role"]["Arn"]

# Attach policy to role
iam_client.attach_role_policy(
    RoleName=role_name,
    PolicyArn=policy_arn
)

In [None]:
role_arn

## Create vector store for Knowledge Base
### Update as per specific requirement

In [None]:
# Create S3vector bucket
response = s3vector_client.create_vector_bucket(vectorBucketName=vector_bucket_name)
s3_vector_bucket_arn = response["vectorBucketArn"]

# Create Vector Index
response =s3vector_client.create_index(
        vectorBucketName=vector_bucket_name,
        indexName=vector_index_name,
        dimension=vector_dimension,
        distanceMetric="euclidean",
        dataType = "float32",
        metadataConfiguration={
                'nonFilterableMetadataKeys': ['AMAZON_BEDROCK_TEXT','AMAZON_BEDROCK_METADATA']}
        )
s3_vector_bucket_index_arn = response["indexArn"]

print(s3_vector_bucket_arn)
print(s3_vector_bucket_index_arn)

## Create Amazon Bedrock Knowledge Based with S3Vector Store
### Update as per specific requirement

In [None]:
kb_config = {
    'name': kb_name,
    'description': kb_name,
    'roleArn': role_arn,
    'knowledgeBaseConfiguration': {
        'type': 'VECTOR',
        'vectorKnowledgeBaseConfiguration': {
            'embeddingModelArn': embeddingModelArn
        }
    },
    'storageConfiguration': {
        'type': 'S3_VECTORS',
         's3VectorsConfiguration': {
            'vectorBucketArn': s3_vector_bucket_arn,
            'indexArn': s3_vector_bucket_index_arn
        }
    }
}

response = bedrock_client.create_knowledge_base(**kb_config)
kb_id_s3vector = response['knowledgeBase']['knowledgeBaseId']    
kb_id_s3vector

## Configure Source S3 Bucket for Knowledge Base
### Update as per specific requirement (Change folder location for KB files, upload your KB files)

In [None]:
# Create source S3 bucket
response = s3_client.create_bucket(Bucket=s3_source_bucket)
s3_source_bucket_arn = response["BucketArn"]
print(s3_source_bucket_arn)

In [None]:
# Upload files in "docs" folder to S3 bucket
directory = os.fsencode("./docs")
    
for file in os.listdir(directory):
    filename = os.fsdecode(file)
    if ".pdf" in filename:
        s3_client.upload_file("./docs/" + filename, s3_source_bucket, filename)
print("Files upload to S3 bucket completed")

## Create Data Source for the Knowledge Base and Synch Data Source (ingest data from data source to vector store)
### Update as per specific requirement 

In [None]:
# Create KB data source
response = bedrock_client.create_data_source(
    knowledgeBaseId=kb_id_s3vector,
    name=kb_id_s3vector + "-s3-datasource",
    description=kb_id_s3vector + "-s3-datasource",
    dataSourceConfiguration={
        'type': 'S3',
        's3Configuration': {
            'bucketArn': s3_source_bucket_arn
        }
    },
    dataDeletionPolicy='RETAIN',
    vectorIngestionConfiguration={
        'chunkingConfiguration': {
            'chunkingStrategy': 'FIXED_SIZE',
            'fixedSizeChunkingConfiguration': {
                'maxTokens': 300,
                'overlapPercentage': 20
            }
        }
    }
)

data_source_id = response["dataSource"]["dataSourceId"]

In [None]:
# Check status of data source to complete
response = bedrock_client.get_data_source(
    knowledgeBaseId=kb_id_s3vector,
    dataSourceId=data_source_id
)
data_source_status = response["dataSource"]["status"]
print(data_source_status)

if data_source_status != 'AVAILABLE':
    print("Data source could not be created. Kinldy fix it before you continue")

In [None]:
# Synchronize data source to S3 bucket
response = bedrock_client.start_ingestion_job(
    knowledgeBaseId=kb_id_s3vector,
    dataSourceId=data_source_id
)
ingestion_job_id = response["ingestionJob"]["ingestionJobId"]

In [None]:
# Check status of ingestion job
response = bedrock_client.get_ingestion_job(knowledgeBaseId=kb_id_s3vector,dataSourceId=data_source_id, ingestionJobId=ingestion_job_id)
ingestion_job_status = response["ingestionJob"]["status"]

while True:
    time.sleep(10)
    response = bedrock_client.get_ingestion_job(knowledgeBaseId=kb_id_s3vector,dataSourceId=data_source_id, ingestionJobId=ingestion_job_id)
    ingestion_job_status = response["ingestionJob"]["status"]
    print(ingestion_job_status)
    if ingestion_job_status == 'COMPLETE' or ingestion_job_status == 'FAILED':
        break

if ingestion_job_status != 'COMPLETE':
    print("Ingestion Job has not complete. Kinldy fix it before you continue")

## Measure and Record Retrieval and Generation Latency
### Update as per specific requirement 

In [None]:
# function is used to measure retrieval time for one KB query
def retrieve_time(query):
    start_time = time.time()

    response = bedrock_agent_runtime_client.retrieve(
        knowledgeBaseId=kb_id_s3vector,
        retrievalConfiguration={
            'vectorSearchConfiguration': {
                'numberOfResults': 5,
                'overrideSearchType': 'SEMANTIC',
            }
        },
        retrievalQuery={
            'text': query,
            'type': 'TEXT'
        }
    )
    
    end_time = time.time()
    
    responsecode = response["ResponseMetadata"]["HTTPStatusCode"]
    
    if responsecode == 200:
        elapsed_time = end_time - start_time
    else:
        elapsed_time = "ERR"
    return round(elapsed_time, 2)

# function is used to measure retrieve and generation time for one KB query
def retrieve_generate_time(query):
    start_time = time.time()
    
    response = bedrock_agent_runtime_client.retrieve_and_generate(
           input={
                'text': query
            },
            retrieveAndGenerateConfiguration={
                'knowledgeBaseConfiguration': {
                    'knowledgeBaseId': kb_id_s3vector,
                    'modelArn': generationModelArn,
                    'retrievalConfiguration': {
                        'vectorSearchConfiguration': {
                            'numberOfResults': 5,
                            'overrideSearchType': 'SEMANTIC',
                        }
                    }
                },
                'type': 'KNOWLEDGE_BASE'
            }
        )
    
    end_time = time.time()
    
    responsecode = response["ResponseMetadata"]["HTTPStatusCode"]
    
    if responsecode == 200:
        elapsed_time = end_time - start_time
    else:
        elapsed_time = "ERR"
    
    return round(elapsed_time, 2)

In [None]:
latency_output_file = "latency_measurement.csv"

# load questions / query file
data = pd.read_csv(qnafile)  

# Latency Record Template
record_template = "{0}#{1}#{2}\n"

# for reach query, measure latency and write in CSV file
with open(latency_output_file, "w") as f:
    f.write(record_template.format("question","retrive_time","retrive_generate_time"))
    # iterate over rows
    for i, row in data.iterrows():
        q = row['Question']
        rt = retrieve_time(q)
        rgt = retrieve_generate_time(q)
        f.write(record_template.format(q,rt,rgt))
        i

print("Latency measurement completed")

## Clean-up all the resources created

In [None]:
# Delete KB
bedrock_client.delete_knowledge_base(knowledgeBaseId=kb_id_s3vector)

In [None]:
# Delete Vector Store and Index
s3vector_client.delete_index(vectorBucketName=vector_bucket_name, indexName=vector_index_name)
s3vector_client.delete_vector_bucket(vectorBucketName=vector_bucket_name)

In [None]:
# Delete source S3 Bucket
objects = s3_client.list_objects_v2(Bucket = s3_source_bucket)["Contents"]
objects = list(map(lambda x: {"Key":x["Key"]},objects))
s3_client.delete_objects(Bucket = s3_source_bucket, Delete = {"Objects":objects})
s3_client.delete_bucket(Bucket=s3_source_bucket)

In [None]:
# Delete IAM Role and Policy
iam_client.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
iam_client.delete_policy(PolicyArn=policy_arn)
iam_client.delete_role(RoleName=role_name)