# Retrieval and Generation with Bedrock Foundational Models

### Overview  
This notebook demonstrates how to perform retrieval-augmented generation (RAG) using Amazon Bedrock's foundational models. It covers retrieving relevant documents from a knowledge base and generating responses based on the retrieved context.

# 🔍 Retrieval in Flotorch

[Flotorch](https://www.flotorch.ai/) is a real-time Retrieval-Augmented Generation (RAG) orchestration engine designed to streamline operational complexity and enhance observability in deploying AI workflows.

In Flotorch, **retrieval** refers to the process of fetching relevant information from external knowledge bases to augment the responses generated by language models. This ensures that the AI system provides accurate, timely, and context-aware answers by combining its pre-trained knowledge with up-to-date external data.

---

## 🔧 Key Components of Retrieval in Flotorch

1. **Retriever**  
   Searches external databases or knowledge sources to find relevant information based on the user's query.

2. **Augmentation**  
   Incorporates the retrieved data into the model's input to enhance the quality and relevance of the generated response.

3. **Generator**  
   Synthesizes a response by integrating the retrieved information with the model's existing knowledge.

---

This retrieval mechanism is integral to Flotorch's ability to deliver precise and context-aware AI solutions across various industries.


### Build your own Retrieval Augmented Generation (RAG) system
When constructing your own retrieval augmented generation (RAG) system, you can leverage a retriever system and a generator system. The retriever can be an embedding model that identifies the relevant chunks from the vector database based on similarity scores. The generator can be a Large Language Model (LLM) that utilizes the model's capability to answer questions based on the retrieved results (also known as chunks). In the following sections, we will provide additional tips on how to optimize the prompts for your RAG system.

## 🔧 Step 1: load aws variables created

In [1]:
import json
with open("variables.json", "r") as f:
    variables = json.load(f)

variables

{'accountNumber': '746074413210',
 'regionName': 'us-west-2',
 'collectionArn': 'arn:aws:aoss:us-west-2:746074413210:collection/3f35uv3lze9bdothrm0c',
 'collectionId': '3f35uv3lze9bdothrm0c',
 'vectorIndexName': 'ws-index-',
 'bedrockExecutionRoleArn': 'arn:aws:iam::746074413210:role/advanced-rag-workshop-bedrock_execution_role-us-west-2',
 's3Bucket': '746074413210-us-west-2-advanced-rag-workshop',
 'kbFixedChunk': 'WO4U6AWAU1',
 'kbSemanticChunk': 'OUFEWBGEES',
 'kbHierarchicalChunk': 'IHWIS6EP0H',
 'ground_truth_path': 's3://746074413210-us-west-2-advanced-rag-workshop/ground_truth_data_files/kbqa_questions_answers.json'}

## Load Prompt json

In [2]:
prompt_file_path = './dataset/prompt.json'
with open(prompt_file_path, 'r') as f:
    prompt = json.load(f)

In [3]:
# Chunking strategy
chunking_strategies = ['fixed', 'hierarchical', 'semantic']

In [4]:
# Function to get kb id for a chunking strategy
def get_kb_id(chunking_strategy):
    if chunking_strategy == 'fixed':
        return variables['kbFixedChunk']
    elif chunking_strategy == 'hierarchical':
        return variables['kbHierarchicalChunk']
    elif chunking_strategy == 'semantic':
        return variables['kbSemanticChunk']
    else:
        return None

## Sample experiment JSON

In [5]:
exp_config_data = {
            "temp_retrieval_llm": "0.1",
            "gt_data": variables["ground_truth_path"],
            "rerank_model_id": "none",
            "embedding_model": "amazon.titan-embed-text-v2:0",
            "bedrock_knowledge_base": True,
            # "kb_data": variables['kbFixedChunk'],
            "retrieval_service": "bedrock",
            "knn_num": "3",
            "knowledge_base": True,
            "retrieval_model": "us.amazon.nova-pro-v1:0",
            "gateway_api_key": "",
            "vector_dimension": "1024",
            "gateway_enabled": False,
            "gateway_url": "",
            # "chunking_strategy": "Fixed",
            "aws_region": "us-west-2",
            "n_shot_prompt_guide_obj": prompt,
            "n_shot_prompts": 1
        }

### Load Retriver function and other dependencies

In [6]:
from flotorch_core.storage.storage_provider_factory import StorageProviderFactory
from flotorch_core.reader.json_reader import JSONReader
from flotorch_core.storage.db.vector.vector_storage_factory import VectorStorageFactory
from flotorch_core.inferencer.inferencer_provider_factory import InferencerProviderFactory
from flotorch_core.embedding.embedding_registry import embedding_registry

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:numexpr.utils:NumExpr defaulting to 2 threads.
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


### Initialize storage provider

In [7]:
gt_data = exp_config_data['gt_data']
storage = StorageProviderFactory.create_storage_provider(gt_data)
gt_data_path = storage.get_path(gt_data)
json_reader = JSONReader(storage)

### Setting embedding to None if bedrock KB is used

In [8]:
embedding = None

## 🗃️ Vector Storage Initialization

This section initializes the `VectorStorage` component using a factory method that dynamically selects the appropriate vector storage backend (e.g., OpenSearch, Bedrock Knowledge Base) based on the experimental configuration.

---

### 🛠️ `VectorStorageFactory.create_vector_storage(...)`

Creates an instance of vector storage using configuration flags and credentials.

- **Parameters:**
  - `knowledge_base`: *(bool)* – Whether a knowledge base is used as a backend.
  - `use_bedrock_kb`: *(bool)* – If set, uses AWS Bedrock Knowledge Base.
  - `embedding`: *(BaseEmbedding)* – Embedding generator to use for vector creation.
  - `knowledge_base_id`: *(str | None)* – ID of the Bedrock knowledge base.
  - `aws_region`: *(str | None)* – AWS region for Bedrock and related services.

---

### ⚙️ Dynamic Backend Selection

The factory method chooses the backend as follows:

- If `bedrock_knowledge_base` is enabled → connects to **Bedrock KB**.
- Else if `knowledge_base` is enabled → connects to **custom knowledge base**.

---

### 📝 Result

Returns a configured `VectorStorage` instance ready for:
- KNN-based vector search
- Bedrock KB search
- Integration into QA or retrieval pipelines



### Initialize vector storage with configuration for embedding and optional OpenSearch/Bedrock KB


In [9]:
vector_storage = {}
for each_chunking_strategy in chunking_strategies:
    vector_storage[each_chunking_strategy] = VectorStorageFactory.create_vector_storage(
                    knowledge_base=exp_config_data.get("knowledge_base", False),
                    use_bedrock_kb=exp_config_data.get("bedrock_knowledge_base", False),
                    embedding=embedding,
                    knowledge_base_id=get_kb_id(each_chunking_strategy),
                    aws_region=exp_config_data.get("aws_region")
                )

## 🤖 Inferencer Initialization

This block initializes the **Inferencer** using a factory method that configures the inference engine for text generation or question answering based on the experimental setup.

---

### 🏗️ `InferencerProviderFactory.create_inferencer_provider(...)`

Creates and returns an appropriate `Inferencer` instance depending on configuration such as API gateway usage, model settings, region, and credentials.

---

### 🔧 Parameters

- `gateway_enabled`: *(bool)* – Enables API gateway-based invocation if set to `True`.
- `base_url`: *(str)* – URL endpoint for the API Gateway (e.g., `/api/openai/v1`).
- `api_key`: *(str)* – API key for authenticating requests to the gateway.
- `service`: *(str)* – Name of the retrieval service (e.g., Bedrock, sagemaker).
- `model_id`: *(str)* – The model to use for inference (e.g., `anthropic.claude-v2`).
- `region`: *(str)* – AWS region for service provisioning (e.g., `us-east-1`).
- `arn_role`: *(str)* – IAM role ARN for Bedrock invocation permissions.
- `n_shot_prompts`: *(int)* – Number of few-shot examples to include in prompt.
- `temp_retrieval_llm`: *(float)* – Temperature setting for the language model.
- `n_shot_prompt_guide_obj`: *(Any)* – Few-shot guide object for prompt engineering.

---

### ⚙️ Behavior

- If `gateway_enabled` is `True`, connects to the specified API Gateway using credentials.
- If disabled, falls back to direct model invocation through supported services like AWS Bedrock.
- Supports dynamic few-shot prompting and custom temperature configuration.

---

### 🎯 Outcome

Returns a fully configured `Inferencer` object capable of generating answers or completions for queries using the selected language model.



### Initialize inferencer provider with configuration for gateway, retrieval service, and AWS integration


In [10]:
inferencer = InferencerProviderFactory.create_inferencer_provider(
                gateway_enabled = False,
                base_url = "",
                api_key = "",
                service = exp_config_data.get("retrieval_service"),
                model_id = exp_config_data.get("retrieval_model"), 
                region = exp_config_data.get("aws_region"), 
                arn_role = variables.get('bedrockExecutionRoleArn', 'arn:aws:iam::677276078734:role/flotorch-bedrock-role-qamain'),
                n_shot_prompts = int(exp_config_data.get("n_shot_prompts", 0)), 
                temperature = float(exp_config_data.get("temp_retrieval_llm", 0)), 
                n_shot_prompt_guide_obj = exp_config_data.get("n_shot_prompt_guide_obj")
            )

## 🔁 Reranker Initialization

This code conditionally initializes the **`BedrockReranker`**, which reorders retrieved documents based on relevance using a reranking model.

---

### 🏗️ `BedrockReranker(...)` Initialization

The reranker is only instantiated if a valid rerank model ID is provided in the experiment configuration.

---

### 🔧 Parameters

- `aws_region`: *(str)* – AWS region where the Bedrock reranking model is hosted.
- `rerank_model_id`: *(str)* – ID of the Bedrock reranking model to be used.

---

### ⚙️ Behavior

- If `rerank_model_id` is **not** `"none"` (case-insensitive), a `BedrockReranker` is created.
- If the value is `"none"`, no reranker is used and the value is set to `None`.

---

### 🎯 Outcome

- A `BedrockReranker` object if reranking is enabled.
- Otherwise, `reranker = None`.



### Initialize reranker if a valid rerank model ID is provided in the configuration


In [11]:
reranker = BedrockReranker(exp_config_data.get("aws_region"), exp_config_data.get("rerank_model_id")) \
                if exp_config_data.get("rerank_model_id").lower() != "none" \
                else None

### Load ground truth data in JSON reader

In [12]:
## Read ground truth json
from pydantic import BaseModel
from flotorch_core.chunking.chunking import Chunk
class Question(BaseModel):
    question: str
    answer: str

    def get_chunk(self) -> Chunk:
        return Chunk(data=self.question)

questions_list = json_reader.read_as_model(gt_data_path, Question)

INFO:flotorch_core.storage.s3_storage:Reading data from S3 storage


In [13]:
vector_storage

{'fixed': <flotorch_core.storage.db.vector.bedrock_knowledgebase_storage.BedrockKnowledgeBaseStorage at 0x7faffd31ab90>,
 'hierarchical': <flotorch_core.storage.db.vector.bedrock_knowledgebase_storage.BedrockKnowledgeBaseStorage at 0x7fafc12ccfa0>,
 'semantic': <flotorch_core.storage.db.vector.bedrock_knowledgebase_storage.BedrockKnowledgeBaseStorage at 0x7fafc12cc3d0>}

### 🤖 Perform vector search for each question chunk

In [17]:
chunking_strategy_vector_esponses_dict = {}
for each_chunking_strategy in chunking_strategies:
    responses_list = []
    for question in questions_list:
        question_chunk = question.get_chunk()
        vector_response = vector_storage[each_chunking_strategy].search(question_chunk, int(exp_config_data.get("knn_num")), False)
        vector_response_result = vector_response.to_json()['result']
        responses_list.append({'question':question, 'question_chunk':question_chunk, 'vector_response':vector_response, 'vector_response_result':vector_response_result, 'response_status':vector_response.status})
    print(f"Vector search with {each_chunking_strategy} completed")
    chunking_strategy_vector_esponses_dict[each_chunking_strategy] = responses_list

Vector search with fixed completed
Vector search with hierarchical completed
Vector search with semantic completed


### 🔁 Rerank vector responses using the reranker if enabled and response is valid

In [21]:
for chunking_strategy, vector_response_list in chunking_strategy_vector_esponses_dict.items():
    for each_response in vector_response_list:
        response_status = each_response['response_status']
        vector_response_result = each_response['vector_response_result']
        if reranker and response_status:
            vector_response = reranker.rerank_documents(each_response['question_chunk'].data, vector_response_result)
            each_response['vector_response'] = vector_response
    print(f"Reranking completed with {each_chunking_strategy} completed")

### 🧠 Generate answers and extract metadata for each response, applying guardrail checks if needed


In [22]:
for chunking_strategy, vector_response_list in chunking_strategy_vector_esponses_dict.items():
    for each_response in vector_response_list:
        response_status = each_response['response_status']
        if response_status:
            question = each_response['question']
            vector_response = each_response['vector_response']
            vector_response_result = each_response['vector_response_result']
            metadata, answer = inferencer.generate_text(question.question, vector_response_result)
            guardrail_blocked = metadata['guardrail_blocked'] if 'guardrail_blocked' in metadata else False
            if guardrail_blocked:
                answer_metadata = {}
            else:
                answer_metadata = metadata
        else:
            answer = metadata['guardrail_output']
            metadata = {}
            answer_metadata = {}
            guardrail_blocked = vector_response.metadata['guardrail_blocked'] if 'guardrail_blocked' in vector_response.metadata else False
        each_response['metadata'] = metadata
        each_response['answer'] = answer
        each_response['answer_metadata'] = answer_metadata
        each_response['guardrail_blocked'] = guardrail_blocked
    print(f"Inferencing completed with {each_chunking_strategy} completed")

2025-04-20 12:37:02,544 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:03,879 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:05,911 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:08,732 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:11,856 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:12,620 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:14,097 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:15,027 - INFO - Using 1 shot prompt with 1 examples
INFO:default:Using 1 shot prompt with 1 examples
2025-04-20 12:37:16,458 - INFO - Using 1 shot prompt wit

### 📦 Aggregate final results with question, answer, guardrail assessments, and reference context


In [23]:
inference_dict = {}
for chunking_strategy, vector_response_list in chunking_strategy_vector_esponses_dict.items():
    result = []
    for each_response in vector_response_list:
        metadata = each_response['metadata']
        vector_response = each_response['vector_response']
        vector_response_result = each_response['vector_response_result']
        result.append(
                    {'question':each_response['question'].question,
                    'answer':each_response['answer'],
                    'guardrails_output_assessment':metadata['guardrail_output_assessment'] if 'guardrail_output_assessment' in metadata else None,
                    'guardrails_context_assessment':vector_response.metadata['guardrail_context_assessment'] if 'guardrail_context_assessment' in vector_response.metadata else None,
                    'guardrails_input_assessment':vector_response.metadata['guardrail_input_assessment'] if 'guardrail_input_assessment' in vector_response.metadata else None,
                    'guardrails_blocked':each_response['guardrail_blocked'],
                    'guardrails_block_level':vector_response.metadata['block_level'] if 'block_level' in vector_response.metadata else "",
                    'answer_metadata':each_response['answer_metadata'],
                    'reference_contexts':[res['text'] for res in vector_response_result] if vector_response_result else [],
                    'gt_answer':each_response['question'].answer,
                    'query_metadata':vector_response.metadata['embedding_metadata'].to_json() if 'embedding_metadata' in vector_response.metadata else None
                    })
    inference_dict[chunking_strategy] = result

### 💾 Save the aggregated results to a JSON file for inference metrics


In [25]:
with open(f"results/{exp_config_data['retrieval_service']}_inference_metrics.json", "w") as json_file:
    json.dump(inference_dict, json_file, indent=4)