# Observability with LangFuse and Strands Evaluation

In the Strands Agents SDK, observability refers to your ability to measure system behavior and performance. Observability combines instrumentation, data collection, and analysis techniques. These techniques provide insights into an agent's behavior and performance, helping you effectively build, debug, and maintain agents that better serve your unique needs and reliably complete tasks.

This notebook demonstrates how to build an agent with observability and evaluation capabilities. 

We use [Langfuse](https://langfuse.com/) to process the Strands Agent traces and LLM as a judge to evaluate agent performance. The primary focus is on agent evaluation and the quality of responses generated by the agent using traces produced by the SDK.

### What is Observability and Evaluation?

**Observability** means being able to see what your AI agent is doing "behind the scenes" - like watching its thought process. It helps you understand why your agent makes certain decisions or gives particular responses.

**Evaluation** is how we measure if our agent is doing a good job. Instead of just guessing if responses are good, we use specific metrics to score the agent's performance.

### Observability Components

All observability APIs are embedded directly within the Strands Agents SDK. The following are key observability data points:

[**Metrics**](https://strandsagents.com/latest/user-guide/observability-evaluation/metrics/) - Essential for understanding agent performance, optimizing behavior, and monitoring resource usage.

[**Traces**](https://strandsagents.com/latest/user-guide/observability-evaluation/traces/) - A fundamental component of the Strands SDK's observability framework, providing detailed insights into your agent's execution.

[**Logs**](https://strandsagents.com/latest/user-guide/observability-evaluation/logs/) - Strands SDK uses Python's standard logging module to provide visibility into operations.

[**Evaluation**](https://strandsagents.com/latest/user-guide/observability-evaluation/evaluation/) - Essential for measuring agent performance, tracking improvements, and ensuring your agents meet quality standards. With Strands SDK, you can perform Manual Evaluation, Structured Testing, LLM Judge Evaluation, and Tool-Specific Evaluation.

### OpenTelemetry Integration

Strands natively integrates with OpenTelemetry, an industry standard for distributed tracing. You can visualize and analyze traces using any OpenTelemetry-compatible tool. This integration provides:

- **Compatibility with existing observability tools:** Send traces to platforms such as Jaeger, Grafana Tempo, AWS X-Ray, Datadog, and more
- **Standardized attribute naming:** Uses OpenTelemetry semantic conventions
- **Flexible export options:** Console output for development, OTLP endpoint for production
- **Auto-instrumentation:** Trace creation is handled automatically when you turn on tracing

### Use Case 1

 Evaluate how well an LLM can query a Knowledge Base containing 10k documents and provide financial analytical insights

In this use case we will be building a knowlegde base with 10k Documents from mulitple organisations. We will then test our LLM against pre-built test cases that have been created to determine how well it is retrieveing and generating responses to the questions answered. We will perofrm the following steps. 

1. Build a knowledge base in Amazon Bedrock 

2. Build a Strands Agent to query the Knowledge Base and provide financial analysis. 

3. Build a Simple Evalutation framework to evaluate the Agents Performance. 



### ✅ Install Required Packages

First, we need to install all the necessary packages for our notebook. Each package has a specific purpose:

- **langfuse**: Provides observability for our agent
- **boto3**: AWS SDK for Python, used to access AWS services and Use Amazon Bedrock Models
- **strands**: Framework for building AI agents


In [1]:

%pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://plugin.us-east-1.prod.workshops.aws
Note: you may need to restart the kernel to use updated packages.


#### Set Up And Configuration 

In [2]:
import sys
import time
import boto3
from botocore.client import Config
import logging
import sys

# Clean Resources - change to true if you want to run clean up at the end
clean_resources = False

# Configure logging to show in Jupyter
logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',
    handlers=[logging.StreamHandler(sys.stdout)],
    force=True
)

from utils import (
    generate_short_code, 
    create_bedrock_execution_role, 
    create_knowledge_base, 
    create_s3_bucket, 
    create_vector_bucket, 
    create_and_get_index_arn,
    process_companies,
    create_s3_data_source,
    upload_companies
)

# Create boto3 session and get account information
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
iam_client = boto3.client('iam')
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']

# Create s3vectors client
s3vectors = boto3.client('s3vectors', region_name=region_name)

# Create bedrock agent clients with extended timeouts for long-running operations
bedrock_config = Config(connect_timeout=120, read_timeout=120, retries={'max_attempts': 0}, region_name=region_name)
bedrock_agent_runtime_client = boto3_session.client("bedrock-agent-runtime", config=bedrock_config)
bedrock_agent_client = bedrock = boto3.client('bedrock-agent', region_name=region_name)

# Generate unique identifier for resource names to avoid conflicts
unique_id = generate_short_code()

# Define resource names with unique identifiers
bucket_name = f"my-data-source-{unique_id}"
vector_store_name = f"my-s3-vector-store-{unique_id}"
vector_index_name = f"my-s3-vector-index-{unique_id}"
knowledge_base_name = f"my-10k-knowledge-base-{unique_id}"

print(f"Using unique identifier: {unique_id}")
print(f"AWS Region: {region_name}")
print(f"Account ID: {account_id}")

Found credentials in shared credentials file: ~/.aws/credentials
Found credentials in shared credentials file: ~/.aws/credentials
Using unique identifier: fd95
AWS Region: us-east-1
Account ID: 522814709709


## Create an Amazon S3 Vector store

S3 Vector Store is a managed vector database solution directly integrated with Amazon S3.

In this section, we'll create a vector bucket that will serve as our vector database. The vector bucket will store the vector embeddings of our documents, enabling semantic search capabilities.

In [3]:
# Create the vector bucket is it does not already exist
vector_store_arn = create_vector_bucket(vector_store_name, s3vectors)

# Define the dimensionality of our embedding vectors
# This should match the output dimension of the embedding model we'll use (Titan Embed Text v2)
vector_dimension = 1024

# Create the vector index
vector_index_arn = create_and_get_index_arn(
    s3vectors,
    vector_store_name,
    vector_index_name,
    vector_dimension)

print(f"\nVector index created with ARN: {vector_index_arn}")

# Create IAM role for Bedrock Knowledge Base
create_role = create_bedrock_execution_role(unique_id, region_name, bucket_name, vector_store_name, vector_index_name, account_id)
roleArn = create_role["Role"]["Arn"]
roleName = create_role["Role"]["RoleName"]

print(f"Created IAM role: {roleName}")
print(f"Role ARN: {roleArn}")

knowledge_base_id = create_knowledge_base(knowledge_base_name, bedrock_agent_client, roleArn, vector_store_name, vector_index_name )



✅ Vector bucket 'my-s3-vector-store-fd95' created successfully
Vector bucket ARN: arn:aws:s3vectors:us-east-1:522814709709:bucket/my-s3-vector-store-fd95
✅ Vector index 'my-s3-vector-index-fd95' created successfully
Vector index ARN: arn:aws:s3vectors:us-east-1:522814709709:bucket/my-s3-vector-store-fd95/index/my-s3-vector-index-fd95



Vector index created with ARN: arn:aws:s3vectors:us-east-1:522814709709:bucket/my-s3-vector-store-fd95/index/my-s3-vector-index-fd95


Waiting for IAM role propagation (60 seconds)...


Created IAM role: kb_execution_role_s3_vector_fd95
Role ARN: arn:aws:iam::522814709709:role/kb_execution_role_s3_vector_fd95


Knowledge base ID: NYMRCJD30V

Waiting for knowledge base NYMRCJD30V to finish creating...
Current status: CREATING (elapsed time: 0s)
Still creating, checking again in 30 seconds...
Current status: ACTIVE (elapsed time: 30s)

✅ Knowledge base creation completed with status: ACTIVE


## Create the Data Source

Now we need to create a data source that the Knowledge Base will use. The data source contains the documents that will be processed, embedded, and indexed into our vector store. 

We'll follow these steps:
1. Create an S3 bucket for our documents
2. Download 10-k reports from SEC ( optional requires SEC API Key) and upload docs into S3
3. Create a Bedrock data source pointing to our S3 bucket

### Creating an S3 Bucket for Data Source

First, let's create the S3 Bucket that will hold our documents.

In [4]:
# Create an S3 bucket for our data source
create_s3_bucket(bucket_name, region=region_name)

✅ S3 bucket 'my-data-source-fd95' created successfully


True

### Populate S3 with 10-k Documents

We are now going to download the data for our knowledge base, these will be 10-k reports from companies over the past 5 years. Do do this you will need an API key from the SEC website, you can get one by signing up free at https://sec-api.io/. 

Once you have your API Key replace the <API_KEY> in the sting below with your key and run the cell. 

In [5]:
COMPANY_SYMBOLS = [
    # Tech companies
    #'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'META', 'NVDA',
    'AMZN'
]

#To download reports from the SEC update to true and enter API KEY from https://sec-api.io/
download_company_10k = False
API_KEY = "YOUR_API_KEY"

if download_company_10k:
    process_companies(bucket_name, COMPANY_SYMBOLS, API_KEY)
else:
    upload_companies(bucket_name)



🚀 Starting upload of preloaded 10K documents from ./preloaded_10k
Uploading to S3: s3://my-data-source-fd95/10k-reports/2022/AMZN/AMZN_2022_2022-12-31_10K.html
Successfully uploaded: 10k-reports/2022/AMZN/AMZN_2022_2022-12-31_10K.html
Uploading to S3: s3://my-data-source-fd95/10k-reports/2024/AMZN/AMZN_2024_2024-12-31_10K.html
Successfully uploaded: 10k-reports/2024/AMZN/AMZN_2024_2024-12-31_10K.html
Uploading to S3: s3://my-data-source-fd95/10k-reports/2023/AMZN/AMZN_2023_2023-12-31_10K.html
Successfully uploaded: 10k-reports/2023/AMZN/AMZN_2023_2023-12-31_10K.html
Uploading to S3: s3://my-data-source-fd95/10k-reports/2021/AMZN/AMZN_2021_2021-12-31_10K.html
Successfully uploaded: 10k-reports/2021/AMZN/AMZN_2021_2021-12-31_10K.html
Uploading to S3: s3://my-data-source-fd95/10k-reports/2020/AMZN/AMZN_2020_2020-12-31_10K.html
Successfully uploaded: 10k-reports/2020/AMZN/AMZN_2020_2020-12-31_10K.html
✅ Upload complete! 5 companies, 5 files uploaded


### Creating the Bedrock Data Source

With our documents now stored in S3, we can create a data source in our Knowledge Base that points to this bucket. The data source configuration includes:

1. The S3 bucket location
2. Chunking strategy (how documents are split into manageable pieces)
3. Data deletion policy

The chunking strategy is particularly important as it affects how your documents are processed for retrieval:

In [6]:
datasource_id = create_s3_data_source(bedrock_agent_client, knowledge_base_id, bucket_name)

print(f"✅ Data source created with ID: {datasource_id}")

✅ Data source created with ID: MBUWCQ4DGE


✅ Data source created with ID: MBUWCQ4DGE


## Sync the data source

Now that we have created the data source, we need to start the ingestion job that will:

1. Read documents from our S3 bucket
2. Chunk them according to our configuration
3. Generate embeddings for each chunk using the Titan Embed model
4. Store both the embeddings and text in our S3 Vector Store

This process may take several minutes depending on the size of your data.

In [7]:
# Start the ingestion job
response_ingestion = bedrock.start_ingestion_job(
    dataSourceId=datasource_id,
    description='First sync',
    knowledgeBaseId=knowledge_base_id
)

print(f"Started ingestion job: {response_ingestion['ingestionJob']['ingestionJobId']}")

# Monitor the ingestion job progress
status = "STARTING"
ingestion_job_id = response_ingestion['ingestionJob']['ingestionJobId']
start_time = time.time()

print("Monitoring ingestion job progress:")
print("-" * 50)

while status in ["STARTING", "IN_PROGRESS"]:
    # Get current status
    response = bedrock.get_ingestion_job(
        dataSourceId=datasource_id,
        knowledgeBaseId=knowledge_base_id,
        ingestionJobId=ingestion_job_id
    )
    
    status = response['ingestionJob']['status']
    elapsed_time = int(time.time() - start_time)
    
    # Get current statistics
    stats = response['ingestionJob']['statistics']
    
    # Clear previous output and print updated status
    print(f"Status: {status} (elapsed time: {elapsed_time}s)")
    print(f"Documents scanned: {stats['numberOfDocumentsScanned']}")
    print(f"Documents indexed: {stats['numberOfNewDocumentsIndexed']}")
    print(f"Documents failed: {stats['numberOfDocumentsFailed']}")
    
    if status in ["STARTING", "IN_PROGRESS"]:
        print("Checking again in 30 seconds...\n")
        time.sleep(30)
    else:
        break

print("-" * 50)
if status == "COMPLETE":
    print(f"✅ Ingestion job completed successfully")
else:
    print(f"⚠️ Ingestion job ended with status: {status}")
    
# Print final statistics
print(f"\nFinal statistics:")
print(f"  • Documents scanned: {stats['numberOfDocumentsScanned']}")
print(f"  • Documents indexed: {stats['numberOfNewDocumentsIndexed']}")
print(f"  • Documents failed: {stats['numberOfDocumentsFailed']}")
print(f"  • Total elapsed time: {elapsed_time} seconds")

Started ingestion job: PVZRQFGV0C
Monitoring ingestion job progress:
--------------------------------------------------
Status: STARTING (elapsed time: 0s)
Documents scanned: 0
Documents indexed: 0
Documents failed: 0
Checking again in 30 seconds...

Status: IN_PROGRESS (elapsed time: 30s)
Documents scanned: 5
Documents indexed: 0
Documents failed: 0
Checking again in 30 seconds...

Status: IN_PROGRESS (elapsed time: 60s)
Documents scanned: 5
Documents indexed: 0
Documents failed: 0
Checking again in 30 seconds...

Status: IN_PROGRESS (elapsed time: 90s)
Documents scanned: 5
Documents indexed: 0
Documents failed: 0
Checking again in 30 seconds...

Status: IN_PROGRESS (elapsed time: 120s)
Documents scanned: 5
Documents indexed: 0
Documents failed: 0
Checking again in 30 seconds...

Status: IN_PROGRESS (elapsed time: 151s)
Documents scanned: 5
Documents indexed: 0
Documents failed: 0
Checking again in 30 seconds...

Status: IN_PROGRESS (elapsed time: 181s)
Documents scanned: 5
Documents 

Now lets Test the Knoweldge Base before moving on the the Next Work Sheet

### Cleean up Resources. 

In [9]:
print("Starting cleanup process...\n")

#Uncomment line below and re-run to clean resources. 
#clean_resources = True

if clean_resources:
    # Delete Knowledge Base
    try:
        print(f"Deleting Knowledge Base: {knowledge_base_id}")
        bedrock.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
        print("✅ Knowledge Base deleted successfully")
    except Exception as e:
        print(f"❌ Error deleting Knowledge Base: {str(e)}")

    # Delete S3 Vector Store policy
    print(f"\nDeleting S3 Vector Store: {vector_store_name}")
    try:
        s3vectors.delete_vector_bucket_policy(vectorBucketName=vector_store_name)
        print("✅ S3 Vector Store policy deleted successfully")
    except Exception as e:
        print(f"❌ Error deleting Vector Store policy: {str(e)}")

    # Delete S3 Vector Store index
    print(f"\nDeleting S3 Vector Index: {vector_index_name}")
    try:
        s3vectors.delete_index(indexArn=vector_index_arn)
        print("✅ S3 Vector Store Index deleted successfully")
    except Exception as e:
        print(f"❌ Error deleting Vector Store Index: {str(e)}")

    # Delete S3 Vector Store 
    print(f"\nDeleting S3 Vector Store: {vector_store_name}")
    try:
        print(f"\nDeleting S3 Vector Store: {vector_store_name}")
        print("✅ S3 Vector Store deleted successfully")
    except Exception as e:
        print(f"❌ Error deleting Vector policy: {str(e)}")

    # Empty and delete S3 Bucket
    print(f"\nEmptying and deleting S3 Bucket: {vector_store_name}")
    try:
        s3vectors.delete_vector_bucket(vectorBucketName=vector_store_name)
        print("✅ S3 Vector Bucket emptied and deleted successfully")
    except Exception as e:
        print(f"❌ Error emptying and deleting S3 Bucket: {str(e)}")


    # Delete IAM Role and detach policies
    print(f"\nDeleting IAM Role: {roleName}")
    try:
        # List and detach all attached policies
        attached_policies = iam_client.list_attached_role_policies(RoleName=roleName).get('AttachedPolicies', [])
        for policy in attached_policies:
            print(f"Detaching policy: {policy['PolicyArn']}")
            iam_client.detach_role_policy(RoleName=roleName, PolicyArn=policy['PolicyArn'])
            iam_client.delete_policy(PolicyArn=policy['PolicyArn'])
        
        # Delete the role
        iam_client.delete_role(RoleName=roleName)
        print("✅ IAM Role deleted successfully")
    except Exception as e:
        print(f"❌ Error deleting IAM Role: {str(e)}")


    print("\n✅ All resources have been cleaned up successfully!")
else:
    print("Skipping cleanup process.")

Starting cleanup process...

Skipping cleanup process.
