# Amazon Bedrock Knowledge Bases with S3 Vectors
This notebook provides sample code for building an end-to-end example for building a RAG application using Amazon Bedrock Knowledge Bases using Amazon S3 Vector store as the vector database. This notebook contains:

1. Overview
2. Pre-requisites
3. Creating an Amazon S3 Vector Store and Index
4. Creating an Amazon Bedrock Knowledge Base
5. Creating the Data Source
6. Sync the Data Source
7. Test the Knowledge Base

## Overview
Amazon Bedrock Knowledge Bases allows you to integrate proprietary information into your generative-AI applications. Using the Retrieval Augmented Generation (RAG) technique, a knowledge base searches your data to find the most relevant information and then uses it to answer natural language questions with context-specific responses.

In this notebook, you will create an Amazon S3 vector store and integrate it with Amazon Bedrock Knowledge Bases. This powerful combination allows us to create a scalable and efficient system for storing, retrieving, and using embeddings in AI-powered applications.

### What are Vector Stores?
Vector stores are specialized databases designed to store and efficiently search through vector embeddings. These embeddings represent text or other data in a high-dimensional space where semantic similarity can be measured as distance between vectors.

By combining Amazon S3 Vector Stores scalability and cost-effectiveness with Amazon Bedrock Knowledge Bases capabilities, we can build robust knowledge retrieval systems that can handle large volumes of data while providing accurate and context-aware responses.

## Prerequisites

To complete this notebook you should have:

1. An AWS account with appropriate permissions
2. A role with access to the following services: Amazon S3, AWS STS, and Amazon Bedrock
3. Access to Amazon Bedrock models (specifically Anthropic Claude 3.5 Sonnet and Amazon Titan Text Embeddings V2)

In the notebook, we will use a synthetic dataset of health reports to populate the Amazon Bedrock Knowledge Bases. You can use your own documents by:

1. Upload your documents (data source) to an Amazon S3 bucket
2. Note the Amazon S3 Bucket name and update the relevant sections in this notebook

### Setup

Let's first install the required dependencies and initialize the boto3 clients we'll need throughout this notebook.

In [None]:
# Install or update boto3
!pip install -qU boto3

In [None]:
import os
import sys
import json
import time
import uuid
import boto3
import pprint
import requests
from botocore.client import Config
from botocore.exceptions import ClientError
from utils import generate_short_code, create_bedrock_execution_role, empty_and_delete_bucket, create_s3_bucket

# Create boto3 session and get account information
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
iam_client = boto3.client('iam')
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']

# Create s3vectors client
s3vectors = boto3.client('s3vectors', region_name=region_name)

# Create bedrock agent clients with extended timeouts for long-running operations
bedrock_config = Config(connect_timeout=120, read_timeout=120, retries={'max_attempts': 0}, region_name=region_name)
bedrock_agent_runtime_client = boto3_session.client("bedrock-agent-runtime", config=bedrock_config)
bedrock_agent_client = bedrock = boto3.client('bedrock-agent', region_name=region_name)

# Generate unique identifier for resource names to avoid conflicts
unique_id = generate_short_code()

# Define resource names with unique identifiers
bucket_name = f"my-data-source-{unique_id}"
vector_store_name = f"my-s3-vector-store-{unique_id}"
vector_index_name = f"my-s3-vector-index-{unique_id}"

print(f"Using unique identifier: {unique_id}")
print(f"AWS Region: {region_name}")
print(f"Account ID: {account_id}")

## Create an Amazon S3 Vector store

S3 Vector Store is a managed vector database solution directly integrated with Amazon S3.

In this section, we'll create a vector bucket that will serve as our vector database. The vector bucket will store the vector embeddings of our documents, enabling semantic search capabilities.

In [None]:
def create_vector_bucket(vector_bucket_name):
    """Create an S3 Vector bucket and return its ARN"""
    try:
        # Create the vector bucket
        s3vectors.create_vector_bucket(vectorBucketName=vector_bucket_name)
        print(f"✅ Vector bucket '{vector_bucket_name}' created successfully")
        
        # Get the vector bucket details
        response = s3vectors.get_vector_bucket(vectorBucketName=vector_bucket_name)
        bucket_info = response.get("vectorBucket", {})
        vector_store_arn = bucket_info.get("vectorBucketArn")
        
        if not vector_store_arn:
            raise ValueError("Vector bucket ARN not found in response")
            
        print(f"Vector bucket ARN: {vector_store_arn}")
        return vector_store_arn
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        error_message = e.response.get('Error', {}).get('Message', 'Unknown error')
        print(f"❌ Error creating vector bucket: {error_code} - {error_message}")
        raise

# Create the vector bucket
vector_store_arn = create_vector_bucket(vector_store_name)

### Creating a Vector Index

Now that we have created the vector store, we need to create a vector index. The vector index is where:

1. Vector embeddings are stored and organized
2. Similarity searches are performed
3. Metadata about our documents is maintained

We'll specify parameters like dimension (the size of our embedding vectors), distance metric (how similarity is calculated), and data type.

In [None]:
# Define the dimensionality of our embedding vectors
# This should match the output dimension of the embedding model we'll use (Titan Embed Text v2)
vector_dimension = 1024

In [None]:
def create_and_get_index_arn(s3vectors_client, vector_store_name, vector_index_name, vector_dimension):
    """
    Create a vector index in the specified vector store and return its ARN
    
    Args:
        s3vectors_client: Boto3 client for S3 Vectors
        vector_store_name: Name of the vector store
        vector_index_name: Name for the new index
        vector_dimension: Dimension of the vectors (e.g., 1024 for Titan Embed)
        
    Returns:
        str: ARN of the created index
    """
    # Define index configuration
    index_config = {
        "vectorBucketName": vector_store_name,
        "indexName": vector_index_name,
        "dimension": vector_dimension,
        "distanceMetric": "cosine",  # Using cosine similarity as our metric
        "dataType": "float32",       # Standard for most embedding models
        "metadataConfiguration": {
            "nonFilterableMetadataKeys": ["AMAZON_BEDROCK_TEXT"]  # Text content won't be used for filtering
        }
    }
    
    try:
        # Create the index
        s3vectors_client.create_index(**index_config)
        print(f"✅ Vector index '{vector_index_name}' created successfully")

        # Get the index ARN
        response = s3vectors_client.list_indexes(vectorBucketName=vector_store_name)
        index_arn = response.get("indexes", [{}])[0].get("indexArn")
        
        if not index_arn:
            raise ValueError("Index ARN not found in response")
            
        print(f"Vector index ARN: {index_arn}")
        return index_arn

    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        error_message = e.response.get('Error', {}).get('Message', 'Unknown error')
        print(f"❌ Failed to create or retrieve index: {error_code} - {error_message}")
        raise

In [None]:
# Create the vector index
vector_index_arn = create_and_get_index_arn(
    s3vectors,
    vector_store_name,
    vector_index_name,
    vector_dimension)

print(f"\nVector index created with ARN: {vector_index_arn}")

## Create an Amazon Bedrock Knowledge Base 

Before we create a Knowledge Base, we need to establish the appropriate IAM permissions. The Knowledge Base needs permissions to:

1. Access and read documents from our S3 bucket
2. Create and manage embeddings in our S3 Vector Store
3. Interact with the Bedrock embedding models

We'll create an IAM role with the necessary permissions.

In [None]:
# Create IAM role for Bedrock Knowledge Base
create_role = create_bedrock_execution_role(unique_id, region_name, bucket_name, vector_store_name, vector_index_name, account_id)
roleArn = create_role["Role"]["Arn"]
roleName = create_role["Role"]["RoleName"]

print(f"Created IAM role: {roleName}")
print(f"Role ARN: {roleArn}")

In [None]:
# Wait for IAM role propagation
print("Waiting for IAM role propagation (60 seconds)...")
time.sleep(60)  # Wait for all policies and resources to be fully propagated

# Define Knowledge Base name with unique identifier
kb_name = f"kb-s3-vectors-{unique_id}"

# Create the Knowledge Base
create_kb_response = bedrock.create_knowledge_base(
    name=kb_name,
    description='Amazon Bedrock Knowledge Bases with S3 Vector Store',
    roleArn=roleArn,
    knowledgeBaseConfiguration={
        'type': 'VECTOR',
        'vectorKnowledgeBaseConfiguration': {
            # Specify the embedding model to use
            'embeddingModelArn': f'arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v2:0',
            'embeddingModelConfiguration': {
                'bedrockEmbeddingModelConfiguration': {
                    'dimensions': 1024,  # Should match the vector_dimension we defined earlier
                    'embeddingDataType': 'FLOAT32'
                }
            },
        },
    },
    storageConfiguration={
        'type': 'S3_VECTORS',
        's3VectorsConfiguration': {
            'indexArn': f'arn:aws:s3vectors:{region_name}:{account_id}:bucket/{vector_store_name}/index/{vector_index_name}',
        },
    }
)

knowledge_base_id = create_kb_response["knowledgeBase"]["knowledgeBaseId"]
print(f"Knowledge base ID: {knowledge_base_id}")

print(f"\nWaiting for knowledge base {knowledge_base_id} to finish creating...")

# Poll for KB creation status
status = "CREATING"
start_time = time.time()

while status == "CREATING":
    # Get current status
    response = bedrock.get_knowledge_base(
        knowledgeBaseId=knowledge_base_id
    )
    
    status = response['knowledgeBase']['status']
    elapsed_time = int(time.time() - start_time)
    
    print(f"Current status: {status} (elapsed time: {elapsed_time}s)")
    
    if status == "CREATING":
        print("Still creating, checking again in 30 seconds...")
        time.sleep(30)
    else:
        break

print(f"\n✅ Knowledge base creation completed with status: {status}")

## Create the Data Source

Now we need to create a data source that the Knowledge Base will use. The data source contains the documents that will be processed, embedded, and indexed into our vector store. 

We'll follow these steps:
1. Create an S3 bucket for our documents
2. Download and upload synthetic medical transcript data
3. Create a Bedrock data source pointing to our S3 bucket

### Creating an S3 Bucket for Data Source

First, let's create the S3 Bucket that will hold our documents.

In [None]:
# Create an S3 bucket for our data source
create_s3_bucket(bucket_name, region=region_name)

### Downloading and Preparing Sample Data

For this example, we'll use a synthetic dataset of medical transcriptions. The dataset consists of PDF format transcriptions of simulated medical conversations and can be found in [this GitHub repository](https://github.com/nazmulkazi/dataset_automated_medical_transcription).

The following code will download these files for us.

In [None]:
# Define folder for downloaded files
dataset_folder = "source_transcripts"

# Create folder if it doesn't exist
if not os.path.exists(dataset_folder):
    os.makedirs(dataset_folder)

abs_path = os.path.abspath(dataset_folder)

# GitHub API URL for the repository content
repo_url = 'https://api.github.com/repos/nazmulkazi/dataset_automated_medical_transcription/contents/transcripts/source'
headers = {'Accept': 'application/vnd.github.v3+json'}

try:
    # Get list of files from GitHub
    response = requests.get(repo_url, headers=headers, timeout=20)
    response.raise_for_status()  # Raise exception for non-200 responses
    json_data = response.json()
    
    # Filter for files only
    list_of_pdfs = [item for item in json_data if item['type'] == 'file']
    query_parameters = {"downloadformat": "pdf"}
    
    # Display how many files will be downloaded
    print(f"Downloading {len(list_of_pdfs)} files from GitHub repository...")
    
    # List of filenames for reference
    transcripts = [pdf_dict['name'] for pdf_dict in list_of_pdfs]
    
    # Download counter for progress tracking
    downloaded = 0
    
    # Download each file with progress indicator
    for pdf_dict in list_of_pdfs:
        pdf_name = pdf_dict['name']
        file_url = pdf_dict['download_url']
        
        # Update progress
        downloaded += 1
        if downloaded % 5 == 0 or downloaded == len(list_of_pdfs):
            print(f"Progress: {downloaded}/{len(list_of_pdfs)} files")
        
        # Download and save the file
        r = requests.get(file_url, params=query_parameters, timeout=20)
        r.raise_for_status()
        
        with open(os.path.join(dataset_folder, pdf_name), 'wb') as pdf_file:
            pdf_file.write(r.content)
    
    print(f"\n✅ All {len(list_of_pdfs)} files have been downloaded to {abs_path}")

except requests.RequestException as e:
    print(f"❌ Error downloading files: {e}")
    raise


In [None]:
def upload_folder_to_s3(folder_path, bucket_name, prefix=''):
    """
    Upload all files from a folder to an S3 bucket
    
    Args:
        folder_path: Path to the folder containing files to upload
        bucket_name: Name of the S3 bucket
        prefix: Prefix to add to the object names in S3 (optional)
    """
    s3_client = boto3.client('s3')
    upload_count = 0
    total_files = 0
    
    # Count total files first
    for _, _, files in os.walk(folder_path):
        total_files += len(files)
    
    print(f"Uploading {total_files} files to S3 bucket '{bucket_name}'...")
    
    # Upload files
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            local_path = os.path.join(root, file)
            relative_path = os.path.relpath(local_path, folder_path)
            s3_path = os.path.join(prefix, relative_path).replace("\\", "/")
            
            try:
                s3_client.upload_file(local_path, bucket_name, s3_path)
                upload_count += 1
                
                # Show progress periodically
                if upload_count % 5 == 0 or upload_count == total_files:
                    print(f"Progress: {upload_count}/{total_files} files uploaded")
                    
            except ClientError as e:
                print(f"❌ Error uploading {local_path}: {e}")
    
    print(f"\n✅ Uploaded {upload_count} files to S3 bucket '{bucket_name}'")

# Upload our downloaded transcripts to S3
upload_folder_to_s3(abs_path, bucket_name)

### Creating the Bedrock Data Source

With our documents now stored in S3, we can create a data source in our Knowledge Base that points to this bucket. The data source configuration includes:

1. The S3 bucket location
2. Chunking strategy (how documents are split into manageable pieces)
3. Data deletion policy

The chunking strategy is particularly important as it affects how your documents are processed for retrieval:

In [None]:
# Create the data source
data_source_response = bedrock.create_data_source(
    knowledgeBaseId=knowledge_base_id,
    name='AmazonS3DataSource',
    description='Amazon S3 Data Source',
    dataDeletionPolicy='DELETE',  # When data source is deleted, also delete the data
    dataSourceConfiguration={
        'type': 'S3',
        's3Configuration': {
            'bucketArn': f'arn:aws:s3:::{bucket_name}',
        },
    },
    vectorIngestionConfiguration={
        'chunkingConfiguration': {
            'chunkingStrategy': 'FIXED_SIZE',  # Split documents into chunks of fixed size
            'fixedSizeChunkingConfiguration': {
                "maxTokens": 300,           # Maximum tokens per chunk
                "overlapPercentage": 20     # Overlap between chunks to maintain context
            }
        }
    }
)

# Extract the data source ID
datasource_id = data_source_response["dataSource"]["dataSourceId"]
print(f"✅ Data source created with ID: {datasource_id}")

## Sync the data source

Now that we have created the data source, we need to start the ingestion job that will:

1. Read documents from our S3 bucket
2. Chunk them according to our configuration
3. Generate embeddings for each chunk using the Titan Embed model
4. Store both the embeddings and text in our S3 Vector Store

This process may take several minutes depending on the size of your data.

In [None]:
# Start the ingestion job
response_ingestion = bedrock.start_ingestion_job(
    dataSourceId=datasource_id,
    description='First sync',
    knowledgeBaseId=knowledge_base_id
)

print(f"Started ingestion job: {response_ingestion['ingestionJob']['ingestionJobId']}")

In [None]:
# Monitor the ingestion job progress
status = "STARTING"
ingestion_job_id = response_ingestion['ingestionJob']['ingestionJobId']
start_time = time.time()

print("Monitoring ingestion job progress:")
print("-" * 50)

while status in ["STARTING", "IN_PROGRESS"]:
    # Get current status
    response = bedrock.get_ingestion_job(
        dataSourceId=datasource_id,
        knowledgeBaseId=knowledge_base_id,
        ingestionJobId=ingestion_job_id
    )
    
    status = response['ingestionJob']['status']
    elapsed_time = int(time.time() - start_time)
    
    # Get current statistics
    stats = response['ingestionJob']['statistics']
    
    # Clear previous output and print updated status
    print(f"Status: {status} (elapsed time: {elapsed_time}s)")
    print(f"Documents scanned: {stats['numberOfDocumentsScanned']}")
    print(f"Documents indexed: {stats['numberOfNewDocumentsIndexed']}")
    print(f"Documents failed: {stats['numberOfDocumentsFailed']}")
    
    if status in ["STARTING", "IN_PROGRESS"]:
        print("Checking again in 30 seconds...\n")
        time.sleep(30)
    else:
        break

print("-" * 50)
if status == "COMPLETE":
    print(f"✅ Ingestion job completed successfully")
else:
    print(f"⚠️ Ingestion job ended with status: {status}")
    
# Print final statistics
print(f"\nFinal statistics:")
print(f"  • Documents scanned: {stats['numberOfDocumentsScanned']}")
print(f"  • Documents indexed: {stats['numberOfNewDocumentsIndexed']}")
print(f"  • Documents failed: {stats['numberOfDocumentsFailed']}")
print(f"  • Total elapsed time: {elapsed_time} seconds")

## Testing the Knowledge Base

Now that our Knowledge Base is fully set up and populated with data, we can test it using two different approaches:

1. **Retrieve and Generate API**: This is a single-step approach where Bedrock retrieves relevant information and generates a complete answer.
2. **Retrieve API**: This gives you more control by returning only the retrieved chunks, allowing you to process them further in your application.

### Testing the Knowledge Base with Retrieve and Generate API

With this API, Bedrock takes care of retrieving the necessary references from the knowledge base and generating the final answer using a foundation model from Bedrock.

<div class="alert alert-block alert-warning">
<b>⚠️ Warning:</b> Make sure you have enabled Anthropic Claude 3 Sonnet access in the Amazon Bedrock Console (model access). If you don't have access to this model, you can substitute it with another model you have access to.
</div>

In [None]:
# Define our test query
test_query = "Who is Kitty?"

print(f"Query: {test_query}\n")
print("Retrieving information and generating answer...")

try:
    # Call the retrieve and generate API
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        input={
            "text": test_query
        },
        retrieveAndGenerateConfiguration={
            "type": "KNOWLEDGE_BASE",
            "knowledgeBaseConfiguration": {
                'knowledgeBaseId': knowledge_base_id,
                "modelArn": f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-5-sonnet-20240620-v1:0",
                "retrievalConfiguration": {
                    "vectorSearchConfiguration": {
                        "numberOfResults": 5,  # Number of chunks to retrieve
                    } 
                }
            }
        }
    )
    
    print("\n----- Answer -----")
    print(response['output']['text'])
    
    # You can also access citation metadata
    if 'citations' in response['output']:
        print("\n----- Citations -----")
        for citation in response['output']['citations']:
            print(f"- {citation['generatedResponsePart']['text']}")
            print(f"  Source: {citation['retrievedReferences'][0]['location']}")
    
except ClientError as e:
    error_code = e.response.get('Error', {}).get('Code', 'Unknown')
    error_message = e.response.get('Error', {}).get('Message', 'Unknown error')
    print(f"❌ Error: {error_code} - {error_message}")
    
    if "ForbiddenException" in str(e):
        print("\nYou might not have access to the Claude 3.5 Sonnet model.")
        print("Please verify your model access in the Amazon Bedrock Console or try a different model.")

### Testing Knowledge Base with Retrieve API

The retrieve API gives you more control by returning only the retrieved chunks. This is useful when you want to:

- See exactly what content was retrieved
- Process the retrieved information yourself
- Implement your own custom RAG pipeline
- Use a different model for generating the final answer

In [None]:
# Define our test query
test_query = "Who is Kitty?"

print(f"Query: {test_query}\n")
print("Retrieving information...")

try:
    # Call the retrieve API
    response_ret = bedrock_agent_runtime_client.retrieve(
        knowledgeBaseId=knowledge_base_id,
        retrievalQuery={
            "text": test_query
        },
        retrievalConfiguration={
            "vectorSearchConfiguration": {
                "numberOfResults": 5,  # Number of chunks to retrieve
            } 
        }
    )

    # Helper function to print the retrieved results in a readable format
    def response_print(retrieve_resp):
        print(f"\nFound {len(retrieve_resp['retrievalResults'])} relevant chunks:")
        print("-" * 80)
        
        for num, chunk in enumerate(retrieve_resp['retrievalResults'], 1):
            print(f"\n----- Chunk {num} -----")
            print(f"Text: {chunk['content']['text']}")
            print(f"\nLocation: {chunk['location']}")
            print(f"Score: {chunk['score']:.4f}")  # Similarity score (higher is better)
            print(f"Metadata: {chunk['metadata']}")
            print("-" * 80)

    # Print the retrieved results
    response_print(response_ret)
    
except ClientError as e:
    error_code = e.response.get('Error', {}).get('Code', 'Unknown')
    error_message = e.response.get('Error', {}).get('Message', 'Unknown error')
    print(f"❌ Error: {error_code} - {error_message}")

## Clean up

To avoid ongoing charges, it's important to clean up the resources we've created. This includes:

1. Deleting the Knowledge Base
2. Deleting the S3 Vector Store
3. Emptying and deleting the S3 Bucket
4. Removing IAM roles and policies

Run the cell below to clean up all resources.

In [None]:
print("Starting cleanup process...\n")

# Delete Knowledge Base
print(f"Deleting Knowledge Base: {knowledge_base_id}")
try:
    bedrock.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
    print("✅ Knowledge Base deleted successfully")
except Exception as e:
    print(f"❌ Error deleting Knowledge Base: {str(e)}")

# Delete S3 Vector Store policy
print(f"\nDeleting S3 Vector Store: {vector_store_name}")
try:
    s3vectors.delete_vector_bucket_policy(vectorBucketName=vector_store_name)
    print("✅ S3 Vector Store policy deleted successfully")
except Exception as e:
    print(f"❌ Error deleting Vector Store policy: {str(e)}")

# Empty and delete S3 Bucket
print(f"\nEmptying and deleting S3 Bucket: {bucket_name}")
try:
    empty_and_delete_bucket(bucket_name)
    print("✅ S3 Bucket emptied and deleted successfully")
except Exception as e:
    print(f"❌ Error emptying and deleting S3 Bucket: {str(e)}")

# Delete IAM Role and detach policies
print(f"\nDeleting IAM Role: {roleName}")
try:
    # List and detach all attached policies
    attached_policies = iam_client.list_attached_role_policies(RoleName=roleName).get('AttachedPolicies', [])
    for policy in attached_policies:
        print(f"Detaching policy: {policy['PolicyArn']}")
        iam_client.detach_role_policy(RoleName=roleName, PolicyArn=policy['PolicyArn'])
    
    # Delete the role
    iam_client.delete_role(RoleName=roleName)
    print("✅ IAM Role deleted successfully")
except Exception as e:
    print(f"❌ Error deleting IAM Role: {str(e)}")

print("\n✅ All resources have been cleaned up successfully!")