## Create a Knowledge Base with Custom chunking strategy

#### Custom Chunking Logic with Lambda Functions in Amazon Bedrock

When creating a Knowledge Base (KB) for Amazon Bedrock, you can connect a Lambda function to specify your custom chunking logic. During the ingestion process, if a Lambda function is provided, the Knowledge Base will execute the Lambda function and store the input and output values in the specified intermediate S3 bucket.

#### Use Cases for Lambda Functions in KBs

- **Custom Chunking Logic:** Lambda functions can be used to implement custom logic for chunking documents during ingestion, enabling more control over how documents are divided into meaningful chunks.
- **Chunk-level Metadata Processing:** Lambda functions can also process chunked data, for example, by adding custom metadata at the chunk level, enriching the data for more advanced retrieval or analysis.

This allows for more flexibility and tailored handling of document data within the Knowledge Base, making it possible to apply unique chunking strategies and augment the data with specific metadata for improved search and retrieval.


In [1]:
import json
with open("variables.json", "r") as f:
    variables = json.load(f)

variables

{'accountNumber': '307297743176',
 'regionName': 'us-west-2',
 'collectionArn': 'arn:aws:aoss:us-west-2:307297743176:collection/h7cmj732p9d3v91spkhd',
 'collectionId': 'h7cmj732p9d3v91spkhd',
 'vectorIndexName': 'ws-index-',
 'bedrockExecutionRoleArn': 'arn:aws:iam::307297743176:role/advanced-rag-workshop-bedrock_execution_role-us-west-2',
 's3Bucket': '307297743176-us-west-2-advanced-rag-workshop',
 'kbFixedChunk': '4P6PBDDEGL',
 'kbSemanticChunk': 'IC3ZCBORXT'}

### 0. Create a Lambda function with custom chunking logic

In [2]:
from io import BytesIO
import zipfile
import boto3
import time
import json
import botocore

# Create IAM client to interact with AWS IAM service
iam = boto3.client("iam", region_name=variables["regionName"])
lambda_client = boto3.client("lambda", region_name=variables["regionName"])

# Define the role name
role_name = f"advanced-rag-custom-chunk-{variables['regionName']}-role"
function_name = "advanced-rag-custom-chunk"

# Try to get the IAM role if it exists
try:
    # Check if the role already exists
    get_role_response = iam.get_role(RoleName=role_name)
    lambda_iam_role = get_role_response  # Store the entire response
    print(f"IAM role '{role_name}' already exists. Using the existing role.")
except iam.exceptions.NoSuchEntityException:
    # Define the IAM assume role policy for the Lambda function
    assume_role_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "lambda.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
    # Convert the IAM assume role policy into JSON format
    assume_role_policy_document_json = json.dumps(assume_role_policy_document)
    
    # Create the IAM role for the Lambda function with the assume role policy
    lambda_iam_role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=assume_role_policy_document_json
    )
    print(f"Created new IAM role: {role_name}")

# Always put the policy (it will update if it exists or create if it doesn't)
iam.put_role_policy(
    RoleName=role_name,  # Use role name directly instead of lambda_iam_role["Role"]["RoleName"]
    PolicyName="s3policy",
    PolicyDocument=json.dumps(
        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "s3:GetObject",
                        "s3:ListBucket", 
                        "s3:PutObject"
                    ],
                    "Resource": [
                        f"arn:aws:s3:::{variables['s3Bucket']}-custom-chunk",
                        f"arn:aws:s3:::{variables['s3Bucket']}-custom-chunk/*"
                    ],
                    "Condition": {
                        "StringEquals": {
                            "aws:ResourceAccount": f"{variables['accountNumber']}"
                        }
                    }
                }
            ]
        }
    )
)

# Prepare the Lambda function code by creating a ZIP file
s = BytesIO()
z = zipfile.ZipFile(s, 'w')
z.write("lambda_function.py")
z.close()
zip_content = s.getvalue()

# Sleep for 10 seconds to ensure resources are available
time.sleep(10)

# Get the role ARN
role_arn = lambda_iam_role["Role"]["Arn"]

# Check if the Lambda function already exists
try:
    lambda_client.get_function(FunctionName=function_name)
    print(f"Lambda function '{function_name}' already exists. Updating code...")
    
    # Update existing function code
    lambda_function = lambda_client.update_function_code(
        FunctionName=function_name,
        ZipFile=zip_content
    )
    print("Lambda function code updated successfully")
except lambda_client.exceptions.ResourceNotFoundException:
    print(f"Creating new Lambda function: {function_name}")
    
    # Create the Lambda function
    lambda_function = lambda_client.create_function(
        FunctionName=function_name,
        Runtime='python3.12',
        Timeout=60,
        Role=role_arn,
        Code={'ZipFile': zip_content},
        Handler='lambda_function.lambda_handler'
    )
    print("Lambda function created successfully")

Created new IAM role: advanced-rag-custom-chunk-us-west-2-role
Creating new Lambda function: advanced-rag-custom-chunk
Lambda function created successfully


In [3]:
# Create an S3 client to interact with the AWS S3 service in the specified region
s3 = boto3.client("s3", region_name=variables["regionName"])

try:
    # Check if the bucket already exists by sending a HEAD request to S3
    s3.head_bucket(Bucket=variables["s3Bucket"]+"-custom-chunk")
    # If the bucket exists, print a message
    print(f"Bucket '{variables['s3Bucket']}' already exists.")
except:
    # If the bucket does not exist, create a new one
    s3.create_bucket(Bucket=variables["s3Bucket"]+"-custom-chunk", CreateBucketConfiguration={
        'LocationConstraint': variables["regionName"]})  # Specify the region for the new bucket
    # Print a message indicating the bucket has been created
    print(f"Bucket '{variables['s3Bucket']}-custom-chunk' created.")

Bucket '307297743176-us-west-2-advanced-rag-workshop-custom-chunk' created.


### 1. Create a Knowledge Base

In [4]:
# Helper function definition
from retrying import retry  # Import retrying module to add retry logic
import boto3  # Import boto3 for AWS SDK to interact with AWS services

# Create a Bedrock agent client to interact with Amazon Bedrock service
bedrock_agent = boto3.client("bedrock-agent", region_name=variables["regionName"])

# Retry logic added to the function, which will retry the function 3 times with a random wait time between 1-2 seconds
@retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=3)
def create_knowledge_base_func(name, description, chunking_type):
    # Define the embedding model ARN that will be used by Bedrock for embedding ingested documents
    embedding_model_arn = f"arn:aws:bedrock:{variables['regionName']}::foundation-model/amazon.titan-embed-text-v2:0"
    
    # Define OpenSearch Serverless configuration that includes the collection and vector index names
    opensearch_serverless_configuration = {
            "collectionArn": variables["collectionArn"],  # ARN of the OpenSearch collection
            "vectorIndexName": variables["vectorIndexName"] + chunking_type,  # Name of the vector index
            "fieldMapping": {
                "vectorField": "vector",  # Field name for the vector
                "textField": "text",      # Field name for the text
                "metadataField": "text-metadata"  # Field name for the metadata
            }
        }
    
    # Print the OpenSearch configuration for debugging purposes
    print(opensearch_serverless_configuration)
    
    # Call the Bedrock API to create the knowledge base with the specified configurations
    create_kb_response = bedrock_agent.create_knowledge_base(
        name=name,  # Name of the knowledge base
        description=description,  # Description of the knowledge base
        roleArn=variables["bedrockExecutionRoleArn"],  # ARN of the IAM role that Bedrock will use for execution
        knowledgeBaseConfiguration={
            "type": "VECTOR",  # Type of the knowledge base (VECTOR in this case)
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embedding_model_arn  # ARN of the embedding model used for the knowledge base
            }
        },
        storageConfiguration={
            "type": "OPENSEARCH_SERVERLESS",  # Type of the storage (using OpenSearch Serverless)
            "opensearchServerlessConfiguration": opensearch_serverless_configuration  # OpenSearch configuration
        }
    )
    
    # Return the created knowledge base details
    return create_kb_response["knowledgeBase"]

In [5]:
import boto3
import json

try:
    # Create a knowledge base using the predefined function
    kb = create_knowledge_base_func(
        name="advanced-rag-workshop-custom-chunking",
        description="Knowledge base using Amazon OpenSearch Service as a vector store",
        chunking_type="custom"
    )

    # Retrieve details of the newly created knowledge base
    get_kb_response = bedrock_agent.get_knowledge_base(knowledgeBaseId=kb['knowledgeBaseId'])

    # Update the variables dictionary with the new knowledge base ID
    variables["kbCustomChunk"] = kb['knowledgeBaseId']

    # Save updated variables to a JSON file, handling datetime serialization
    with open("variables.json", "w") as f:
        json.dump(variables, f, indent=4, default=str)  # Convert datetime to string

    # Print the retrieved knowledge base response in a readable format
    print(f'OpenSearch Knowledge Response: {json.dumps(get_kb_response, indent=4, default=str)}')
    
except Exception as e:
    # Check if error message indicates the knowledge base already exists
    error_message = str(e).lower()
    if any(phrase in error_message for phrase in ["already exist", "duplicate", "already been created"]):
        print("Knowledge Base already exists. Retrieving its ID...")
        
        # List all knowledge bases to find the one that already exists
        list_kb_response = bedrock_agent.list_knowledge_bases()
        
        # Look for a knowledge base with the desired name
        for existing_kb in list_kb_response.get('knowledgeBaseSummaries', []):
            if existing_kb['name'] == "advanced-rag-workshop-custom-chunking":
                kb_id = existing_kb['knowledgeBaseId']
                print(f"Found existing knowledge base with ID: {kb_id}")
                
                # Get the details of the existing knowledge base
                get_kb_response = bedrock_agent.get_knowledge_base(knowledgeBaseId=kb_id)
                
                # Read existing variables to preserve other fields
                try:
                    # Read existing variables
                    with open("variables.json", "r") as f:
                        existing_variables = json.load(f)
                except (FileNotFoundError, json.JSONDecodeError):
                    # If file doesn't exist or is invalid JSON
                    existing_variables = {}
                
                # Update only the custom chunking value
                existing_variables["kbCustomChunk"] = kb_id
                                
                # Write back all variables
                with open("variables.json", "w") as f:
                    json.dump(existing_variables, f, indent=4, default=str)
                
                # Print the retrieved knowledge base response
                print(f'OpenSearch Knowledge Response: {json.dumps(get_kb_response, indent=4, default=str)}')
                break        
        else:
            print("Could not find a knowledge base with the specified name.")
    else:
        # If it's a different error, re-raise it
        raise e

{'collectionArn': 'arn:aws:aoss:us-west-2:307297743176:collection/h7cmj732p9d3v91spkhd', 'vectorIndexName': 'ws-index-custom', 'fieldMapping': {'vectorField': 'vector', 'textField': 'text', 'metadataField': 'text-metadata'}}
OpenSearch Knowledge Response: {
    "ResponseMetadata": {
        "RequestId": "c67f0032-7acb-4af8-9f49-37682d1d4728",
        "HTTPStatusCode": 200,
        "HTTPHeaders": {
            "date": "Mon, 07 Apr 2025 15:44:25 GMT",
            "content-type": "application/json",
            "content-length": "958",
            "connection": "keep-alive",
            "x-amzn-requestid": "c67f0032-7acb-4af8-9f49-37682d1d4728",
            "x-amz-apigw-id": "IqLh-ESrvHcEc_w=",
            "x-amzn-trace-id": "Root=1-67f3f2d9-491f6f3b5b9eb1440d720c82"
        },
        "RetryAttempts": 0
    },
    "knowledgeBase": {
        "createdAt": "2025-04-07 15:44:24.721731+00:00",
        "description": "Knowledge base using Amazon OpenSearch Service as a vector store",
        "

### 2. Create Datasources for Knowledge Base

In [6]:
import time
import json
import boto3
from botocore.exceptions import ClientError

# Create clients
bedrock_agent = boto3.client("bedrock-agent", region_name=variables["regionName"])
bedrock_agent_runtime = boto3.client("bedrock-agent-runtime", region_name=variables["regionName"])

# Load variables to get the correct knowledge base ID
with open("variables.json", "r") as f:
    variables = json.load(f)

# Use the correct knowledge base ID - kbCustomChunk
kb_id = variables.get("kbCustomChunk")
if not kb_id:
    print("Error: No knowledge base ID found for custom chunking!")
    raise ValueError("Knowledge base ID missing")
print(f"Using knowledge base ID: {kb_id}")

# Define configurations
custom_transformation_configuration = {
    "intermediateStorage": {
        "s3Location": {
            "uri": f"s3://{variables['s3Bucket']}-custom-chunk/"
        }
    },
    "transformations": [
        {
            "transformationFunction": {
                "transformationLambdaConfiguration": {
                    "lambdaArn": f"arn:aws:lambda:{variables['regionName']}:{variables['accountNumber']}:function:advanced-rag-custom-chunk"
                }
            },
            "stepToApply": "POST_CHUNKING"
        }
    ]
}
s3_configuration = {
    "bucketArn": f"arn:aws:s3:::{variables['s3Bucket']}",
    "inclusionPrefixes": ["data"]
}
data_source_name = "advanced-rag-example"

# Check if data source already exists and delete if needed
try:
    print(f"Checking for existing data sources in knowledge base {kb_id}...")
    list_ds_response = bedrock_agent.list_data_sources(knowledgeBaseId=kb_id)
    
    existing_ds = None
    for ds in list_ds_response.get('dataSourceSummaries', []):
        if ds['name'] == data_source_name:
            existing_ds = ds
            break
    
    if existing_ds:
        print(f"Found existing data source '{data_source_name}'. Deleting it...")
        bedrock_agent.delete_data_source(
            knowledgeBaseId=kb_id,
            dataSourceId=existing_ds["dataSourceId"]
        )
        print("Waiting for data source deletion to complete...")
        time.sleep(20)
        print("Data source deleted.")
        
except Exception as e:
    print(f"Error while checking or deleting data source: {e}")

# Create the new data source
try:
    print(f"Creating new data source '{data_source_name}' with custom chunking...")
    create_ds_response = bedrock_agent.create_data_source(
        name=data_source_name,
        description="A data source for Advanced RAG workshop",
        knowledgeBaseId=kb_id,
        dataSourceConfiguration={
            "type": "S3",
            "s3Configuration": s3_configuration
        },
        vectorIngestionConfiguration={
            "chunkingConfiguration": {"chunkingStrategy": "NONE"},
            "customTransformationConfiguration": custom_transformation_configuration
        }
    )
    
    ds_custom_chunk = create_ds_response["dataSource"]
    ds_id = ds_custom_chunk["dataSourceId"]
    print(f"Custom chunking data source created successfully with ID: {ds_id}")
    
except ClientError as e:
    error_code = e.response.get('Error', {}).get('Code', '')
    if error_code == 'ConflictException':
        print(f"Data source '{data_source_name}' already exists. Retrieving it...")
        list_ds_response = bedrock_agent.list_data_sources(knowledgeBaseId=kb_id)
        for ds in list_ds_response.get('dataSourceSummaries', []):
            if ds['name'] == data_source_name:
                ds_custom_chunk = ds
                ds_id = ds['dataSourceId']
                print(f"Retrieved existing data source with ID: {ds_id}")
                break
    else:
        print(f"Error creating data source: {e}")
        raise

Using knowledge base ID: Q2T9CZ5VFA
Checking for existing data sources in knowledge base Q2T9CZ5VFA...
Creating new data source 'advanced-rag-example' with custom chunking...
Custom chunking data source created successfully with ID: OLMFDC65VK


### 3. Start Ingestion Job for Amazon Bedrock Knowledge base pointing to Amazon OpenSearch

> **Note**: The ingestion process will take approximately 2-3 minutes to complete. During this time, the system is processing your documents by:
> 1. Extracting text from the source files
> 2. Chunking the content according to the defined strategy (Fixed / Semantic / Hierachical / Custom)
> 3. Generating embeddings for each chunk
> 4. Storing the embeddings and associated metadata in the OpenSearch vector database
>
> You'll see status updates as the process progresses. Please wait for the "Ingestion job completed successfully" message before proceeding to the next step.

In [7]:
try:
    print(f"Starting ingestion job for data source {ds_id}...")
    
    start_job_response = bedrock_agent.start_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=ds_id
    )
    
    job = start_job_response["ingestionJob"]
    print(f"Ingestion job started with ID: {job['ingestionJobId']}")

    print("Monitoring ingestion job status...")
    while job['status'] not in ['COMPLETE', 'FAILED', 'STOPPED']:
        print(f"Current status: {job['status']} - waiting...")
        time.sleep(10)
        
        get_job_response = bedrock_agent.get_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=ds_id,
            ingestionJobId=job["ingestionJobId"]
        )
        
        job = get_job_response["ingestionJob"]

    if job['status'] == 'COMPLETE':
        print(f"Ingestion job completed successfully!")
        print(f"Statistics: {job.get('statistics', 'No statistics available')}")
    else:
        print(f"Ingestion job ended with status: {job['status']}")
        print(f"Failure reason: {job.get('failureReasons', 'No failure reason provided')}")

except Exception as e:
    print(f"Error with ingestion job: {str(e)}")
    raise

Starting ingestion job for data source OLMFDC65VK...
Ingestion job started with ID: UYRDRJCLD1
Monitoring ingestion job status...
Current status: STARTING - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Current status: IN_PROGRESS - waiting...
Ingestion job completed successfully!
Statistics: {'numberOfDocumentsDeleted': 0, 'numberOfDocumentsFailed': 0, 'numberOfDocumentsScanned': 7, 'numberOfMetadataDocumentsModified': 0, 'numberOfMetadataDocumentsScanned': 7, 'numberOfModifiedDocumentsIndexed': 0, 'numberOfNewDocumentsIndexed': 7}


### 4. Retrieve

In [8]:
import boto3

# Initialize the Bedrock agent runtime client
bedrock_agent_runtime = boto3.client("bedrock-agent-runtime", region_name=variables["regionName"])

kb_id = variables.get("kbCustomChunk")

# Query for relevant documents
query = "What were net incomes of Amazon in 2022, 2023 and 2024?" 

# Retrieve relevant documents based on the query from the knowledge base
relevant_documents_os = bedrock_agent_runtime.retrieve(
    retrievalQuery={
        'text': query  # Specify the query text to search for relevant documents
    },
    knowledgeBaseId=kb_id,  # Provide the knowledge base ID to search in
    retrievalConfiguration={
        'vectorSearchConfiguration': {
            'numberOfResults': 3  # Limit the results to top 3 documents closely matching the query
        }
    }
)

# Return the relevant documents fetched
print(json.dumps([i["content"]["text"] for i in relevant_documents_os["retrievalResults"]], indent=2))

[
  "Company\u2019s calculations and data used to determine the amount of tax benefits to recognize. We evaluated the Company\u2019s income tax disclosures in relation to these matters. /s/ Ernst & Young LLP We have served as the Company\u2019s auditor since 1996. Seattle, Washington February 6, 2025 35Table of Contents AMAZON.COM, INC. CONSOLIDATED STATEMENTS OF CASH FLOWS (in millions) Year Ended December 31, 2022 2023 2024 CASH, CASH EQUIVALENTS, AND RESTRICTED CASH, BEGINNING OF PERIOD $ 36,477 $ 54,253 $ 73,890 OPERATING ACTIVITIES: Net income (loss) (2,722) 30,425 59,248 Adjustments to reconcile net income (loss) to net cash from operating activities: Depreciation",
  "computation of earnings per share: Basic 10,005 10,117 10,189 Diluted 10,198 10,296 10,189 See accompanying notes to consolidated financial statements. 37Table of Contents AMAZON.COM, INC. CONSOLIDATED STATEMENTS OF COMPREHENSIVE INCOME (LOSS) (in millions) Year Ended December 31, 2020 2021 2022 Net income (loss) $

> **Note**: After creating the knowledge base, you can explore its details and settings in the Amazon Bedrock console. This gives you a more visual interface to understand how the knowledge base is structured.
> 
> **[➡️ View your Knowledge Bases in the AWS Console](https://us-west-2.console.aws.amazon.com/bedrock/home?region=us-west-2#/knowledge-bases)**
>
> In the console, you can:
> - See all your knowledge bases in one place
> - View ingestion status and statistics
> - Test queries through the built-in chat interface
> - Modify settings and configurations