# Observability in Production PLACEHOLDER

TBD

In this notebook, we'll demonstrates how to implement observability for the Amazon Bedrock Converse API in production using the custom observability solution. We'll use the `BedrockLogs` class from the `observability` module to track and log API calls, responses, and user feedback.

### Prerequisite
After successfully setting up the backend resources required using the provided `CloudFormation template` to gather necessary data on user requests, your custom metadata like latency, time to first token, tags, model responses, citations, and any other custom identifiers you would like to add (e.g., user_id/customer_id), you can now test if your observability architecture is working as expected and determine the latency introduced by adding this additional component to your application.

In [1]:
%pip install ruamel.yaml

Note: you may need to restart the kernel to use updated packages.


In [2]:
# Deploy prerequisites 
# IAM role, S3 bucket, Firehose

import boto3
import datetime

from ruamel.yaml import YAML
from io import StringIO

yaml = YAML()
with open('../cf/prod-observability.yaml', 'r') as file:
    yaml_data = yaml.load(file)

cloudformation_client = boto3.client('cloudformation')

stream = StringIO()
yaml.dump(yaml_data, stream)

template_body = stream.getvalue()

current_time = datetime.datetime.now()
date_string = current_time.strftime("%Y%m%d%H%M%S")
stack_name = 'observability-stack' + date_string

response = cloudformation_client.create_stack(
    StackName=stack_name,
    TemplateBody=template_body,
    Parameters=[
        {
            'ParameterKey': 'DateString',
            'ParameterValue': date_string
        }
    ],
    Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']
)

cloudformation_client.get_waiter('stack_create_complete').wait(StackName=stack_name)

response = cloudformation_client.describe_stacks(StackName=stack_name)
output_dict = {}

for output in response['Stacks'][0]['Outputs']:
    output_dict[output['OutputKey']] = output['OutputValue']

output_dict_select = {"FIREHOSE_NAME": output_dict["FirehoseDeliveryStreamName"], 
                      "CRAWLER_NAME": output_dict["GlueCrawlerName"]}

### Setup and Imports

First, let's import the necessary libraries and set up our environment.

In [3]:
import boto3
import json
import time
from datetime import datetime
import pytz
import string
import random
from uuid import uuid4

# Custom observability module
from observability import BedrockLogs

from dotenv import load_dotenv, find_dotenv, set_key
import os


# loading environment variables that are stored in local file
local_env_filename = 'llm-system-eval.env'
load_dotenv(find_dotenv(local_env_filename),override=True)
os.environ['REGION'] = os.getenv('REGION')
REGION = os.environ['REGION']
os.environ['GUARDRAIL_ID'] = os.getenv('GUARDRAIL_ID')
GUARDRAIL_ID = os.environ['GUARDRAIL_ID']
os.environ['GUARDRAIL_VERSION'] = os.getenv('GUARDRAIL_VERSION')
GUARDRAIL_VERSION = os.environ['GUARDRAIL_VERSION']
os.environ['CUSTOM_TAG'] = os.getenv('CUSTOM_TAG')
CUSTOM_TAG = os.environ['CUSTOM_TAG']
os.environ['EXPERIMENT_ID'] = os.getenv('EXPERIMENT_ID')
EXPERIMENT_ID = os.environ['EXPERIMENT_ID']

FIREHOSE_NAME = output_dict["FirehoseDeliveryStreamName"]
CRAWLER_NAME = output_dict["GlueCrawlerName"]

# Update environment variables with the resource references we created in the prerequisites
os.environ['FIREHOSE_NAME'] = FIREHOSE_NAME
os.environ['CRAWLER_NAME'] = CRAWLER_NAME

set_key(local_env_filename, 'FIREHOSE_NAME', FIREHOSE_NAME)
set_key(local_env_filename, 'CRAWLER_NAME', CRAWLER_NAME)

(True,
 'CRAWLER_NAME',
 'observability-026459568683-glue-crawler-20240828225635')

In [1]:
# Observability and Evaluation Custom Solution for Amazon Bedrock Applications
import pytz
import json
import time
import boto3
from uuid import uuid4
from datetime import datetime, timezone
from typing import Any, Callable, Dict, Optional

class BedrockLogs:
    VALID_FEATURE_NAMES = ["None", "Agent", "KB", "InvokeModel"]

    def __init__(self, delivery_stream_name: str = None, 
                 experiment_id: str = None, 
                 default_call_type: str = 'LLM', 
                 feature_name: str = None, 
                 feedback_variables: bool = False
                ):
        self.delivery_stream_name = delivery_stream_name
        self.experiment_id = experiment_id
        self.default_call_type = default_call_type
        self.feedback_variables = feedback_variables

        if feature_name is not None:
            if feature_name not in BedrockLogs.VALID_FEATURE_NAMES:
                raise ValueError(f"Invalid feature_name '{feature_name}'. Valid values are: {', '.join(BedrockLogs.VALID_FEATURE_NAMES)}")
        self.feature_name = feature_name
        self.step_counter = 0

        if self.delivery_stream_name is None:
            raise ValueError("delivery_stream_name must be provided or set equals to 'local' example: delivery_stream_name='local'.")

        if self.delivery_stream_name == 'local':
            self.firehose_client = None
        else:
            self.firehose_client = boto3.client('firehose')

    @staticmethod
    def find_keys(dictionary, key, path=[]):
        """
        Recursive function to find all keys in a nested dictionary and their paths.

        Args:
            dictionary (dict): The dictionary to search.
            key (str): The key to search for.
            path (list, optional): The path of keys to the current dictionary. Defaults to None.

        Returns:
            list: A list of tuples containing the key's path and value.
        """
        results = []

        if isinstance(dictionary, dict):
            for k, v in dictionary.items():
                new_path = path + [k]
                if k == key:
                    results.append((new_path, v))
                else:
                    results.extend(BedrockLogs.find_keys(v, key, new_path))
        elif isinstance(dictionary, list):
            for i, item in enumerate(dictionary):
                new_path = path + [i]
                results.extend(BedrockLogs.find_keys(item, key, new_path))

        return results

    def extract_session_id(self, log_data: Dict[str, Any]) -> str:
        """
        Extracts the session ID from the log data. If the session ID is not available,
        it generates a new UUID for the run ID.

        Args:
            log_data (Dict[str, Any]): The log data dictionary.

        Returns:
            str: The session ID or a newly generated UUID if the session ID is not available.
        """
        if self.feature_name == "Agent":
            session_id_paths = self.find_keys(log_data, 'x-amz-bedrock-agent-session-id')
        else:
            session_id_paths = self.find_keys(log_data, 'sessionId')

        if session_id_paths:
            path, session_id = session_id_paths[0]
            return session_id
        else:
            return str(uuid4())

    def handle_agent_feature(self, output_data, request_start_time):
        """
        Handles the logic for the 'Agent' feature, including step counting and latency calculation.

        Args:
            output_data (Any): The output data from the function call.
            request_start_time (float): The start time of the request.

        Returns:
            Any: The updated output data with step numbers and latency information.
        """
        self.session_id = None
        prev_trace_time = None
        for data in output_data:
            if isinstance(data, dict) and 'trace' in data:
                trace = data['trace']
                if 'start_trace_time' in trace:
                    # Check if 'start_trace_time' is defined correctly
                    if not isinstance(trace['start_trace_time'], float):
                        raise ValueError("The key 'start_trace_time' should be present and should be a time.time() object.")

                    # Calculate the latency between traces
                    if prev_trace_time is None:
                        trace['latency'] = trace['start_trace_time'] - request_start_time
                    else:
                        trace['latency'] = trace['start_trace_time'] - prev_trace_time

                    prev_trace_time = trace['start_trace_time']
                    trace['step_number'] = self.step_counter
                    self.step_counter += 1
                    data['trace'] = trace  # Update the 'trace' dictionary in the original data

            elif isinstance(data, list):
                for item in data:
                    if isinstance(item, dict) and 'start_trace_time' in item:
                        # Check if 'start_trace_time' is defined correctly
                        if not isinstance(item['start_trace_time'], float):
                            raise ValueError("The key 'start_trace_time' should be present and should be a time.time() object.")

                        # Calculate the latency between traces
                        if prev_trace_time is None:
                            item['latency'] = item['start_trace_time'] - request_start_time
                        else:
                            item['latency'] = item['start_trace_time'] - prev_trace_time

                        prev_trace_time = item['start_trace_time']
                        item['step_number'] = self.step_counter
                        self.step_counter += 1

                    elif isinstance(item, dict) and 'trace' in item:
                        trace = item['trace']
                        if 'start_trace_time' in trace:
                            # Check if 'start_trace_time' is defined correctly
                            if not isinstance(trace['start_trace_time'], float):
                                raise ValueError("The key 'start_trace_time' should be present and should be a time.time() object.")

                            # Calculate the latency between traces
                            if prev_trace_time is None:
                                trace['latency'] = trace['start_trace_time'] - request_start_time
                            else:
                                trace['latency'] = trace['start_trace_time'] - prev_trace_time

                            prev_trace_time = trace['start_trace_time']
                            trace['step_number'] = self.step_counter
                            self.step_counter += 1
                            item['trace'] = trace  # Update the 'trace' dictionary in the original item

        return output_data

    def watch(self, capture_input: bool = True, capture_output: bool = True, call_type: Optional[str] = None):
        def wrapper(func):
            def inner(*args, **kwargs):
                # For Latency Calculation:
                self.request_start_time = time.time()

                # Get the function name
                function_name = func.__name__

                # Capture input if requested
                input_data = args if capture_input else None
                input_log = None
                if input_data:
                    input_log = input_data[0]
                    
                # Generate observation_id
                observation_id = str(uuid4())
                obs_timestamp = datetime.now(timezone.utc).isoformat()

                # Get the start time
                start_time = time.time()

                # Calls the function to be executed
                result = func(*args, **kwargs)
                
                # Capture output if requested
                output_data = result if capture_output else None

                # Get the end time
                end_time = time.time()

                # Calculate the duration
                duration = end_time - start_time
                
                # Begin Logging Time:
                logging_start_time = time.time()

                # Handle the 'Agent' feature case
                if self.feature_name == "Agent":
                    if output_data is not None:
                        output_data = self.handle_agent_feature(output_data, self.request_start_time)
                        run_id = self.extract_session_id(output_data[0])
                    else:
                        run_id = self.extract_session_id(input_log)
                else:
                    # Extract the session ID from the log or generate a new one
                    run_id = self.extract_session_id(input_log)

                # Prepare the metadata
                metadata = {
                    'experiment_id': self.experiment_id,
                    'run_id': run_id,
                    'observation_id': observation_id,
                    'obs_timestamp': obs_timestamp,
                    'start_time': datetime.fromtimestamp(start_time, tz=pytz.utc).isoformat(),
                    'end_time': datetime.fromtimestamp(end_time, tz=pytz.utc).isoformat(),
                    'duration': duration,
                    'input_log': input_log,
                    'output_log': output_data,
                    'call_type': call_type or self.default_call_type,
                    'feature_name': self.feature_name,
                    'feedback_enabled': self.feedback_variables
                }

                # Update the metadata with additional_metadata if provided
                additional_metadata = kwargs.get('additional_metadata', {})
                if additional_metadata:
                    metadata.update(additional_metadata)

                input_data = kwargs.get('user_prompt', {})
                if input_data:
                    metadata.update(input_data)
                    
                # Get the end time
                logging_end_time = time.time()

                # Calculate the duration
                logging_duration = logging_end_time - logging_start_time
                metadata['logging_duration'] = logging_duration

                # Send the metadata to Amazon Kinesis Data Firehose or return it locally for testing:
                if self.delivery_stream_name == 'local':
                    if self.feedback_variables:
                        print("Logs in local mode-with feedback:")
                        return result, metadata, run_id, observation_id
                    else:
                        print("Logs in local mode-without feedback:")
                        return result, metadata
                # log to firehose
                else:
                    firehose_response = self.firehose_client.put_record(
                        DeliveryStreamName=self.delivery_stream_name,
                        Record={
                            'Data': json.dumps(metadata)
                        }
                    )
                    if self.feedback_variables:
                        print("Logs in S3-with feedback:")
                        return result, run_id, observation_id
                    else:
                        print("Logs in S3-without feedback:")
                        return result

            return inner
        return wrapper


In [4]:
# Initialize BedrockLogs in Local mode with feedback variables
bedrock_logs = BedrockLogs(delivery_stream_name='local', feedback_variables=True)

In [5]:
# Create AWS clients
boto3_session = boto3.session.Session()
bedrock_runtime_client = boto3.client('bedrock-runtime')

In [6]:
# Helper function to generate a random session ID
def generate_web_session_id(length=16):
    characters = string.ascii_letters + string.digits
    return ''.join(random.choices(characters, k=length))

### Main Function with Observability

Let's create our main function that uses the Converse API and is decorated with our observability logger. 

In [7]:
@bedrock_logs.watch(call_type='Converse-API')
def converse_with_model(application_metadata):
    model_id = application_metadata['model_arn']
    
    system_prompts = [{"text": "You are a helpful AI Assistant for Amazon OpenSearch's documentation."}]
    
    messages = [
        {
            "role": "user",
            "content": [{"text": application_metadata['question']}]
        }
    ]
    
    inference_config = {
        "temperature": 0,
        "maxTokens": 500,
        "topP": 0.7
    }
    
    guardrail_config = {
        "guardrailIdentifier": GUARDRAIL_ID,
        "guardrailVersion": GUARDRAIL_VERSION,
        "trace": "enabled"
    }
    
    response = bedrock_runtime_client.converse(
        modelId=model_id,
        messages=messages,
        system=system_prompts,
        inferenceConfig=inference_config,
        guardrailConfig=guardrail_config
    )
    
    application_metadata['model_response'] = response
    return response['output']['message']['content'][0]['text']

### Running the Conversation

Now, let's run a conversation with our model:

In [10]:
question = "I'm using version 2.1 of open search and trying to use zstd compression. Why isn't it working?"

In [13]:
application_metadata = {
    'webSessionId': generate_web_session_id(),
    'userID': 'User-1',
    'customTags': CUSTOM_TAG,
    'request_time': datetime.now(pytz.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
    'model_arn': 'anthropic.claude-3-sonnet-20240229-v1:0', #'anthropic.claude-3-haiku-20240307-v1:0',
    'question': question
}

response, log, run_id, observation_id = converse_with_model(application_metadata)

print(f"Model response: {response}")
print(f"run_id: {run_id}")
print(f"observation_id: {observation_id}")

Logs in local mode-with feedback:
Model response: OpenSearch 2.1 does not support zstd compression out of the box. The zstd compression codec was introduced in OpenSearch 2.2.

In OpenSearch versions prior to 2.2, the available compression codecs are:

- deflate
- lz4 
- none (no compression)

To use zstd compression, you need to upgrade to OpenSearch 2.2 or later. The zstd codec provides better compression ratios compared to deflate while maintaining high decompression speeds.

After upgrading to 2.2+, you can configure zstd compression for specific indices by setting the "codec" parameter when creating an index:

```
PUT my_index
{
  "settings": {
    "index.codec": "zstd" 
  }
}
```

Or update an existing index's codec:

```
PUT my_index/_settings
{
  "index.codec": "zstd"
}
```

The zstd compression level can also be configured from 1 (fastest) to 22 (maximum compression) using the "index.codec.zstd.level" setting.
run_id: 267f8eea-d221-4257-9135-c6fb8e309ddd
observation_id: 5f2630

### Collecting Feedback

We'll define two functions for collecting feedback at the observation and session levels:

In [14]:
# observation level feedback

@bedrock_logs.watch(call_type='observation-feedback')
def observation_level_feedback(feedback):
    pass



user_feedback = 'Thumbs-up'
observation_feedback_from_front_end = {
    'user_id': 'User-1',
    'f_run_id': run_id,
    'f_observation_id': observation_id,
    'actual_feedback': user_feedback
}
observation_level_feedback(observation_feedback_from_front_end)

Logs in local mode-with feedback:


(None,
 {'experiment_id': None,
  'run_id': '05f1dd74-96be-4b99-874c-21fc18aaf146',
  'observation_id': '6c34ae23-5cf1-46bd-a59e-56524bcf069b',
  'obs_timestamp': '2024-08-29T03:09:50.018834+00:00',
  'start_time': '2024-08-29T03:09:50.018843+00:00',
  'end_time': '2024-08-29T03:09:50.018844+00:00',
  'duration': 9.5367431640625e-07,
  'input_log': {'user_id': 'User-1',
   'f_run_id': '267f8eea-d221-4257-9135-c6fb8e309ddd',
   'f_observation_id': '5f263047-7b51-4428-9e72-13a22553b2cc',
   'actual_feedback': 'Thumbs-up'},
  'output_log': None,
  'call_type': 'observation-feedback',
  'feature_name': None,
  'feedback_enabled': True,
  'logging_duration': 2.4080276489257812e-05},
 '05f1dd74-96be-4b99-874c-21fc18aaf146',
 '6c34ae23-5cf1-46bd-a59e-56524bcf069b')

In [15]:
# session level feedback

@bedrock_logs.watch(call_type='session-feedback')
def session_level_feedback(feedback):
    pass

user_feedback = 'Amazing - this is fast and an awesome way to help the customers!'
session_feedback_from_front_end = {
    'user_id': 'User-1',
    'f_run_id': run_id,
    'actual_feedback': user_feedback
}
session_level_feedback(session_feedback_from_front_end)

Logs in local mode-with feedback:


(None,
 {'experiment_id': None,
  'run_id': '06196278-9196-4cea-bf0b-9ffeb0af92b5',
  'observation_id': '1819d959-0f34-4006-a57e-1476d61e5ad0',
  'obs_timestamp': '2024-08-29T03:09:53.727146+00:00',
  'start_time': '2024-08-29T03:09:53.727151+00:00',
  'end_time': '2024-08-29T03:09:53.727152+00:00',
  'duration': 9.5367431640625e-07,
  'input_log': {'user_id': 'User-1',
   'f_run_id': '267f8eea-d221-4257-9135-c6fb8e309ddd',
   'actual_feedback': 'Amazing - this is fast and an awesome way to help the customers!'},
  'output_log': None,
  'call_type': 'session-feedback',
  'feature_name': None,
  'feedback_enabled': True,
  'logging_duration': 2.384185791015625e-05},
 '06196278-9196-4cea-bf0b-9ffeb0af92b5',
 '1819d959-0f34-4006-a57e-1476d61e5ad0')

# Takeaways

This notebook demonstrated how to use the customized `observability` module with the AWS Bedrock Converse API. We've shown how to:

1. Set up the observability environment 
2. Initialize the BedrockLogs class
2. Create a conversation function with observability
3. Run a conversation and collect the response
4. Implement feedback collection at both the observation and session level

The collected data can be used for analysis, troubleshooting, and improving your application's performance and user experience.

# Sources
* https://github.com/aws-samples/amazon-bedrock-samples/tree/main/evaluation-and-observability