# Amazon Bedrock Recipe: Langfuse Integration with Amazon Bedrock Knowledge Bases

## Overview
This recipe implements an OpenTelemetry-based tracing and monitoring system for Amazon Bedrock Knowledge Bases through Langfuse integration. It creates trace structures to track Knowledge Base operation performance metrics including retrieval scores, result counts, and execution durations. It processes Knowledge Base retrieve operations, generating spans with operation attributes such as timing data, error states, and response content. The error handling and logging functions enable systematic debugging, performance monitoring, and audit trail maintenance.

### Context
Langfuse integration enables tracing, monitoring, and analyzing the performance and behavior of your Amazon Bedrock Knowledge Base operations. This helps in understanding KB interactions, debugging issues, and optimizing performance. When using Langfuse, you can utilize the cloud platform or a self-hosted option on a container.

#### Use Case
To demonstrate the integration between Langfuse and Amazon Bedrock Knowledge Bases, providing observability outside of AWS tooling.

#### Implementation
In this notebook, we will show how to integrate Amazon Bedrock Knowledge Bases and Langfuse the Langfuse cloud platform. We will configure KB operation observability, send traces to Langfuse, and validate the results using Knowledge Base retrieve and retrieveandgenerate operations.


## Prerequisites
AWS account with appropriate IAM permissions for Amazon Bedrock Knowledge Bases and Model Access as well as appropriate permission to deploy containers if using the Langfuse self-hosted option.

### Python Dependencies

To run this notebook, you'll need to install some libraries in your environment:


In [None]:
%pip install -qr requirements.txt

### AWS Credentials
Before using Amazon Bedrock, ensure that your AWS credentials are configured correctly. You can set them up using the AWS CLI or by setting environment variables. For this notebook assumes that the credentials are already configured.


In [None]:
import boto3

# Create the client to invoke Knowledge Bases in Amazon Bedrock:
br_kb_runtime = boto3.client("bedrock-agent-runtime")

### Amazon Bedrock Knowledge Base

We assume you've already created an [Amazon Bedrock Knowledge Base](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base.html). If you don't have one already you can follow the **[instructions here](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-build.html)** to set up an example knowledge base.

Configure your knowledge base **ID** in the cell below. You can find these by looking up your knowledge base in the ["Knowledge Base" page on the AWS Console for Amazon Bedrock](https://console.aws.amazon.com/bedrock/home?#/knowledge-bases) or CLI.

The Knowledge Base ID should be ten characters, uppercase, and alphanumeric.

In [None]:
kb_id = ""  # <- Configure your Bedrock Knowledge Base ID

Before moving on lets validate retrieve API is working correctly. The response is not important we are simply testing the API call. 

In [None]:
print(f"Trying to retrieve from knowledge base {kb_id}...")
kb_resp = br_kb_runtime.retrieve(
    knowledgeBaseId=kb_id,
    retrievalQuery={
        "text": "Test query"
    },
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults": 3
        }
    }
)
if "retrievalResults" in kb_resp:
    print("✅ Got response")
else:
    raise ValueError(f"No 'retrievalResults' in knowledge base response:\n{kb_resp}")

### Langfuse API keys

There are multiple ways you can use Langfuse - and we'll first need to configure where your Langfuse is hosted:

### Langfuse Cloud

If you're directly using [Langfuse Cloud](https://langfuse.com/pricing), your langfuse_api_url will be either
- https://cloud.langfuse.com/
- https://us.cloud.langfuse.com/
- ...or similar.

Once your Langfuse environment is set up and you've signed in to the UI, you'll need to set up an **API key pair** for your particular Organization and Project (create a new project if you don't have one already).

For more information, see the [FAQ: Where are my Langfuse API keys](https://langfuse.com/faq/all/where-are-langfuse-api-keys) and Langfuse's [getting started documentation](https://langfuse.com/docs/get-started).

### Langfuse Configuration
Make sure you have updated the **config.json** file with your Langfuse API Keys and Knowledge Base data. 

In [None]:
import os
import base64
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load configuration
with open('config.json', 'r') as config_file:
    config = json.load(config_file)

# Set environment variables
os.environ["OTEL_SERVICE_NAME"] = 'Langfuse'
os.environ["DEPLOYMENT_ENVIRONMENT"] = config["langfuse"]["environment"]
project_name = config["langfuse"]["project_name"]
environment = config["langfuse"]["environment"]
langfuse_public_key = config["langfuse"]["langfuse_public_key"]
langfuse_secret_key = config["langfuse"]["langfuse_secret_key"]
langfuse_api_url = config["langfuse"]["langfuse_api_url"]

# Create auth header
auth_token = base64.b64encode(
    f"{langfuse_public_key}:{langfuse_secret_key}".encode()
).decode()

# Set OpenTelemetry environment variables for Langfuse
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = f"{langfuse_api_url}api/public/otel/v1/traces"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {auth_token}"

# Print all the environment variables
print("Environment Variables:")
print("-" * 30)
print(f"OTEL_SERVICE_NAME: {os.environ.get('OTEL_SERVICE_NAME')}")
print(f"DEPLOYMENT_ENVIRONMENT: {os.environ.get('DEPLOYMENT_ENVIRONMENT')}")
print(f"OTEL_EXPORTER_OTLP_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT')}")

### Setting up Knowledge Base tracing

With all the pre-requisites in place, we're ready to recording traces from your Bedrock Knowledge Base into Langfuse.

First, let's load the libraries:

In [None]:
import time
import boto3
import uuid
import json
from core.timer_lib import timer
from core import instrument_kb_operation, flush_telemetry

#### Now lets define a wrapper function
Here we create a wrapper function that is used to query the Amazon Bedrock Knowledge Base with instrumentation for Langfuse on the Amazon Bedrock Knowledge Base runtime API.

1. Instrumentation for monitoring
3. Trace enabling for debugging
4. Flexible parameter handling through kwargs
5. Proper logging of configuration states


In [None]:
@instrument_kb_operation
def retrieve_from_kb(kbId: str, query: str, **kwargs):
    bedrock_kb_client = boto3.client("bedrock-agent-runtime")

    retrieve_params = {
        "knowledgeBaseId": kbId,
        "retrievalQuery": {
            "text": query
        },
        "retrievalConfiguration": {
            "vectorSearchConfiguration": {
                "numberOfResults": 3
            }
        }
    }

    response = bedrock_kb_client.retrieve(**retrieve_params)
    return response

In [None]:
# Generate a custom trace ID
trace_id = str(uuid.uuid4())
tags = ["bedrock-kb", "example", "development"]

### Query

In [None]:
# Your query to the knowledge base
query = "" # your query to the knowledge base

### Invoke Knowledge Base Retrieve Function
Here we pass all the parameters for retrieving from the knowledge base along with the observability integration with Langfuse.

In [None]:
# Knowledge Base retrieval
response = retrieve_from_kb(
    kbId=config["kb"]["kbId"],
    query=query,
    operation_type="retrieve",
    userId=config["user"]["userId"],
    tags=tags,
    trace_id=trace_id,
    project_name=config["langfuse"]["project_name"],
    environment=config["langfuse"]["environment"],
    langfuse_public_key=config["langfuse"]["langfuse_public_key"],
    langfuse_secret_key=config["langfuse"]["langfuse_secret_key"],
    langfuse_api_url=config["langfuse"]["langfuse_api_url"]
)

### Response Handling
Here we process and print the response from the Knowledge Base.

In [None]:
# Process and print the response
if isinstance(response, dict) and "error" in response:
    print(f"\nError: {response['error']}")
else:
    print("\n🔍 KB Retrieval Results:")
    for idx, result in enumerate(response.get('retrievalResults', []), 1):
        print(f"\nResult {idx}:")
        print(f"Score: {result.get('score', 'N/A')}")
        print(f"Content: {result.get('content', {}).get('text', 'N/A')[:100]}...")
        print(f"Source: {result.get('location', {}).get('s3Location', {}).get('uri', 'N/A')}")

#### Clean up
Flush telemetry before exiting

In [None]:
flush_telemetry()
#timer.reset_all()