## Custom JSON Processing with Transformation Functions in Amazon Bedrock Knowledge Bases

In modern RAG applications, the ability to effectively process and transform data before it reaches your Foundation Models is crucial for optimal performance. While standard JSON processing works for many use cases, complex enterprise applications often require more nuanced control over how their data is structured and presented. 

Just as query reformulation helps break down complex queries for better retrieval, transformation functions allow you to reshape and refine your JSON data to better serve your specific use case. This capability is particularly valuable when working with varied data sources or when you need to standardize information across different formats. By customizing how your JSON data is processed, you can enhance the quality of responses from your RAG applications while maintaining efficiency and scalability.

This example will explore how to leverage transformation functions in Amazon Bedrock Knowledge Bases to optimize your JSON processing pipeline and achieve more precise and relevant results from your GenAI applications.

### 1. Import the needed libraries

First step is to install the pre-requisites packages.

In [None]:
%pip install --upgrade pip --quiet
%pip install -r ../requirements.txt --no-deps --quiet
%pip install -r ../requirements.txt --upgrade --quiet

In [None]:
import IPython

IPython.Application.instance().kernel.do_shutdown(True)

In [18]:
import sys
import boto3
import logging


sys.path.insert(0, ".")
sys.path.insert(1, "..")


from utils.knowledge_base import BedrockKnowledgeBase

Following are clients and variables that will be used across this example:

In [None]:
#Clients
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session()
region =  session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime') 


region, account_id

In [20]:
import time

# Get the current timestamp
current_time = time.time()

# Format the timestamp as a string
timestamp_str = time.strftime("%Y%m%d%H%M%S", time.localtime(current_time))[-7:]
# Create the suffix using the timestamp
suffix = f"{timestamp_str}"

knowledge_base_name_custom = 'custom-chunking-kb'
knowledge_base_description = "Knowledge Base containing complex Json"
bucket_name = f'{knowledge_base_name_custom}-{suffix}'
intermediate_bucket_name = f'{knowledge_base_name_custom}-intermediate-{suffix}'
lambda_function_name = f'{knowledge_base_name_custom}-lambda-{suffix}'
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

# Define data sources
data_source=[{"type": "S3", "bucket_name": bucket_name}]

### 2 - Create Lambda Function

Following customized Lambda function will work as a transformation function to process JSON elements from input datasets and split it before ingest on Vector Database.


In [None]:
%%writefile lambda_function.py
import json
import logging
import boto3


logger = logging.getLogger()
logger.setLevel(logging.INFO)


def read_s3_file(s3_client, bucket, key):
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return json.loads(response['Body'].read().decode('utf-8'))

def write_to_s3(s3_client, bucket, key, content):
    s3_client.put_object(Bucket=bucket, Key=key, Body=json.dumps(content))


def lambda_handler(event, context):
    logger.info('input={}'.format(json.dumps(event)))
    s3 = boto3.client('s3')

    # Extract relevant information from the input event
    input_files = event.get('inputFiles')
    input_bucket = event.get('bucketName')

    if not all([input_files, input_bucket]):
        raise ValueError("Missing required input parameters")

    output_files = []

    for input_file in input_files:
        logger.info('input file ={}'.format(input_file))
        content_batches = input_file.get('contentBatches', [])
        original_file_location = input_file.get('originalFileLocation', {})

        processed_batches = []

        for batch in content_batches:
            input_key = batch.get('key')

            if not input_key:
                    raise ValueError("Missing key in content batch")

            file_content = read_s3_file(s3, input_bucket, input_key)

            # Process content
            file_key = ""
            if 'cities' in file_content['fileContents'][0]['contentBody']:
                file_key = 'cities'
            elif 'ratings' in file_content['fileContents'][0]['contentBody']:
                file_key = 'ratings'
            else:
                raise Exception("Key Not Found on File")

            for i in json.loads(file_content['fileContents'][0]['contentBody'])[file_key]:
                output_key = "output/{}_{}.json".format(file_key, i['id'])

                processed_content = {'fileContents': []}
                processed_content['fileContents'].append({
                        'contentType': 'json', 
                        'contentBody': json.dumps(i)
                })
                
                # Write processed content back to S3
                write_to_s3(s3, input_bucket, output_key, processed_content)

                # Add processed batch information
                processed_batches.append({
                    'key': output_key
                })
        
        output_file = {
            'originalFileLocation': original_file_location,
            'contentBatches': processed_batches
        }

        output_files.append(output_file)

    result = {'outputFiles': output_files}

    return result


### 3 - Create Knowledge Base with custom chunking strategy

Let's start by creating a Amazon Bedrock Knowledge Base to store two datasets (on `synthetic_dataset` folder):

- `destinations.json`: data from travel destinations, with country and city names and a quick summary of places to visit.
- `ratings.json`: people ratings and experience from previous experiences on previous cities.

**Note: Both datasets are synthetic, they were generated using Bedrock**

Knowledge Bases allow you to integrate with different vector databases including Amazon OpenSearch Serverless, Amazon Aurora, Pinecone, Redis Enterprise and MongoDB Atlas. For this example, we will integrate the knowledge base with Amazon OpenSearch Serverless. To do so, we will use the helper class BedrockKnowledgeBase which will create the knowledge base and all of its pre-requisites:

1. IAM roles and policies
1. S3 bucket
1. Amazon OpenSearch Serverless encryption, network and data access policies
1. Amazon OpenSearch Serverless collection
1. Amazon OpenSearch Serverless vector index
1. Knowledge base
1. Knowledge base data source
1. Create a knowledge base using CUSTOM chunking strategy.


In [None]:
knowledge_base_custom = BedrockKnowledgeBase(
    kb_name=f'{knowledge_base_name_custom}-{suffix}',
    kb_description=knowledge_base_description,
    data_sources=data_source,
    lambda_function_name=lambda_function_name,
    intermediate_bucket_name=intermediate_bucket_name, 
    chunking_strategy = "CUSTOM", 
    suffix = f'{suffix}-c'
)

### 4 - Upload datasets to S3 and start ingestion Job

After Knowledge Base creation, let's upload both datasets into a S3 Bucket.

In [23]:
file_name = 'destinations.json'
s3_client.upload_file(f'synthetic_dataset/{file_name}', bucket_name, file_name)

In [24]:
file_name = 'ratings.json'
s3_client.upload_file(f'synthetic_dataset/{file_name}', bucket_name, file_name)

Now, let's start the ingestion job to process those files.

If you want to check processing logs, you can find lambda function attached to your Knowledge Base and go to monitoring tab, to find Cloud Watch Logs link and see the logs.

In [None]:
# ensure that the kb is available
time.sleep(30)
# sync knowledge base
knowledge_base_custom.start_ingestion_job()

### 5 - Test Knowledge Base

Now the Knowlegde Base is available we can test it out using the [retrieve](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/retrieve.html) and [retrieve_and_generate](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/retrieve_and_generate.html) functions.

First, let's retrieve Knowledge Base ID and store it

In [None]:
kb_id_custom = knowledge_base_custom.get_knowledge_base_id()

#### 5.1 Testing Knowledge Base with Retrieve and Generate API

Now, let's start with a simple question, asking about a place called Elephanta Caves and languages they speak over there.

The answer is in the `"id":1037` on the `destinations.json` file, which means Mumbai is the expected answer with Marathi, Hindi, and English being the languages spoken there."

In [27]:
query = "Is there a place called Elephanta Caves? If so, what languages do they speak over there?" 
# Expected: Mumbai - India, Marathi, Hindi, English

In [None]:
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={        
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id_custom,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

Now let's ask another question, about places where they speak Japanese and also visualize both APIs, to see data returned from knowledge base and model thinking with those answers.

In [29]:

query = "Can you suggest me a good place to be they speak Japanese?" 

In [None]:
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id_custom,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

As you can see, with the retrieve and generate API, we get the final response directly. Now let's observe the citations for the RetrieveAndGenerate API.

Since, our primary focus on this notebook is to observe the retrieved chunks and citations returned by the model while generating the response. When we provide the relevant context to the foundation model alongwith the query, it will most likely generate the high quality response.

In [31]:
def citations_rag_print(response_ret):
#structure 'retrievalResults': list of contents. Each list has content, location, score, metadata
    for num,chunk in enumerate(response_ret,1):
        print(f'Chunk {num}: ',chunk['content']['text'],end='\n'*2)
        print(f'Chunk {num} Location: ',chunk['location'],end='\n'*2)
        print(f'Chunk {num} Metadata: ',chunk['metadata'],end='\n'*2)

In [None]:
response_custom = response['citations'][0]['retrievedReferences']
print("# of citations or chunks used to generate the response: ", len(response_custom))
citations_rag_print(response_custom)

#### 5.2 Testing Knowledge Base with Retrieve API

If you need an extra layer of control, you can retrieve the chunks that best match your query using the retrieve API. In this setup, we can configure the desired number of results and control the final answer with your own application logic. The API then provides you with the matching content, its S3 location, the similarity score and the chunk metadata.

In [33]:
def response_print(response_ret):
#structure 'retrievalResults': list of contents. Each list has content, location, score, metadata
    for num,chunk in enumerate(response_ret['retrievalResults'],1):
        print(f'Chunk {num}: ',chunk['content']['text'],end='\n'*2)
        print(f'Chunk {num} Location: ',chunk['location'],end='\n'*2)
        print(f'Chunk {num} Score: ',chunk['score'],end='\n'*2)
        print(f'Chunk {num} Metadata: ',chunk['metadata'],end='\n'*2)

In [None]:
response_custom_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id_custom, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        'text': query
    }
)
print("# of citations or chunks used to generate the response: ", len(response_custom_ret['retrievalResults']))
response_print(response_custom_ret)

As you can notice, with CUSTOM chunking, we get 5 retrieved results as requested in the API using semantic similarity, which is the default for the Retrieve API.

Those references are stored separately in the Vector Database, following the JSON structure, but all of them are part of the same file. This makes our model return better responses.

#### 5.3 Testing Knowledge Base with Both Files

Now, let's ask something to force Knowledge base to look into content that are in both files, like ratings from an specific place.

In the following example, we will expect an answer considering Fez from Morocco, and an 8.6 rating with this `id:261` in the `ratings.json` file.

This is complete Json structure:
```
ratings.json: {"id":261,"rating":8.6,"review":"Desert capital with stunning mosque. Traditional markets offer authentic shopping experience.","visit_date":1676937600000,"traveler_type":"Couple","length_of_stay":7,"photos_shared":26,"helpful_votes":22,"destination_id":1064}

destinations.json: {"id":1064,"city":"Fez","country":"Morocco","latitude":34.0181,"longitude":-5.0078,"main_attractions":"Fez El Bali, Bou Inania Madrasa, Chouara Tannery, Al-Qarawiyyin Mosque","best_season":"Spring","local_transport":"Petit taxi, Bus, Walking, Donkey","languages":"Arabic, Berber, French","currency":"MAD","timezone":"Africa/Casablanca"},
```

In [37]:
query = "What People say about Morocco? Give me highest rating"

In [None]:
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id_custom,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

### 6 - Clean Up

To clean up resources, execute following method from helper class:

In [None]:
knowledge_base_custom.delete_kb(delete_s3_bucket=True, delete_lambda_function=True)