## Notebook: run_tests.ipynb

### Run a set of test cases
__Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.__
__SPDX-License-Identifier: MIT-0__

Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

In [1]:
import json
import logging
import os
import boto3
import uuid
import csv
import re
import sys
import io
import time
import datetime
import multiprocessing as mp
import pandas as pd
from tqdm.auto import tqdm

In [2]:
# Input parameters - update as needed
parameters = {
  'bot_id':           {'value': None, 'message': 'Please specify a BOT_ID parameter for yor Lex bot'},
  'bot_alias_id':     {'value': None, 'message': 'Please specify a BOT_ALIAS_ID parameter for yor Lex bot'},
  'locale_id':        {'value': None, 'message': 'Please specify a LOCALE_ID parameter for yor Lex bot'},
  'input_file':       {'value': None, 'message': 'Please specify a INPUT_FILE parameter, e.g. test_cases.csv'},
  'output_file':      {'value': None, 'message': 'Please specify a OUTPUT_FILE parameter, e.g. test_results.csv'},
  'test_description': {'value': '',   'message': 'Optional OUTPUT_DESCRIPTION parameter'},
  'max_threads':      {'value': None, 'message': 'Please specify a MAX_THREADS parameter'}
}

parameters['bot_id']['value'] = 'AXFKEVM7XZ'
parameters['bot_alias_id']['value'] = 'OQYWQ8UAUV'
parameters['locale_id']['value'] = 'en_US'
parameters['input_file']['value'] = 'test-runs/test-cases-claude-haiku-2024-09-02.xlsx'
parameters['output_file']['value'] = 'test-runs/test-results-claude-haiku-2024-09-02.xlsx'
parameters['test_description']['value'] = '50 test questions using Claude Haiku'
parameters['max_threads']['value'] = '8'

os.environ['KB_ALFA'] = 'EH6MNGJYKT'
os.environ['S3_BUCKET_ALFA'] = 'contact-center-kb-010928211701'

lex_client = boto3.client('lexv2-runtime')
USE_LEX = True

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
### uncomment below for detailed logging output
### logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))

logger.info('<<bedrock_helpers>> boto3 version={}'.format(boto3.__version__))

for parameter in parameters.keys():
    if parameters[parameter]['value'] is None:
        parameter_value = os.environ.get(parameter.upper(), None)
        if parameter_value is None:
            logger.error(parameters[parameter]['message'])
        else:
            parameters[parameter]['value'] = parameter_value

if None in {v['value'] for k, v in parameters.items()}:
    logger.error('Missing some input parameters; exiting.')
    exit(1)
    
max_threads = parameters['max_threads'].get('value', None)
MAX_THREADS = 1 if max_threads is None else int(max_threads)
MULTIPLE_THREADS = True if MAX_THREADS > 1 else False

# set a unique identifier for this test run (stored as Lex session attribute)
test_run = (datetime.datetime.now().strftime('%Y-%m-%d at %H:%M:%S UTC'))
test_description = parameters.get('test_description',{}).get('value','')
if len(test_description) > 0:
    test_run += f' ({test_description})'

# set request attribute for Lex test runs (stored as Lex request attribute)
channel_attribute = 'SageMaker Notebook'

In [3]:
# Optionally set a limit (int) on the number of questions to evaluate (for test development)
question_limit = None      # examples: None; 10

# Optionally set a limit (list[int]) to specific questions to evaluate (for test development)
target_test_cases = None   # examples: None; [14, 17]

In [4]:
def read_evaluation_file(filepath):
    # Check file extension and use the appropriate Pandas function
    if filepath.endswith(".xlsx") or filepath.endswith(".xls"):
        qa_pairs = pd.read_excel(filepath)
    elif filepath.endswith(".csv"):
        qa_pairs = pd.read_csv(filepath)
    else:
        raise ValueError("Unsupported file format")

    return qa_pairs

In [5]:
def write_evaluation_result(df, filepath):
    # Check file extension and use the appropriate Pandas function
    if filepath.endswith(".xlsx") or filepath.endswith(".xls"):
        df.to_excel(filepath)
    elif filepath.endswith(".csv"):
        df.to_csv(filepath)
    else:
        raise ValueError("Unsupported file format")

In [6]:
qa_pairs = read_evaluation_file(parameters['input_file']['value'])

In [7]:
if question_limit is not None:
    qa_pairs = qa_pairs.head(question_limit)
    
if target_test_cases is not None:
    qa_pairs = qa_pairs.loc[qa_pairs['Test Case'].isin(target_test_cases)]

qa_pairs.head()

Unnamed: 0,Test Case,Step,Utterance,Session Attributes,Ground Truth Answer,Expected Intent,Expected State
0,1,1,What is the mission or philosophy of Example C...,"knowledgeBase=Alfa,ragLLM=Claude V3 Haiku,eval...","At Example Corp Hospitality Group, our mission...",.*,Fulfilled
1,2,1,How many hotel brands does Example Corp Hospit...,"knowledgeBase=Alfa,ragLLM=Claude V3 Haiku,eval...",Example Corp Hospitality Group has five distin...,.*,Fulfilled
2,3,1,What type of accommodations does Example Corp ...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts offers premium ac...,.*,Fulfilled
3,4,1,Where are some of the exclusive locations for ...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts are located in Ma...,.*,Fulfilled
4,5,1,What types of room options are available at Ex...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts offers a range of...,.*,Fulfilled


In [8]:
# add output columns to the df
qa_pairs['RAG LLM'] = [''] * len(qa_pairs)
qa_pairs['Response'] = [''] * len(qa_pairs)
qa_pairs['Actual Intent'] = [''] * len(qa_pairs)
qa_pairs['Actual State'] = [''] * len(qa_pairs)

qa_pairs['Retrieval Latency'] = [''] * len(qa_pairs)
qa_pairs['LLM Latency'] = [''] * len(qa_pairs)

qa_pairs['Test Result'] = [''] * len(qa_pairs)
qa_pairs['Test Explanation'] = [''] * len(qa_pairs)
qa_pairs['Test Latency'] = [''] * len(qa_pairs)
qa_pairs['Test LLM'] = [''] * len(qa_pairs)

qa_pairs['Hallucination'] = [''] * len(qa_pairs)
qa_pairs['Hallucination Explanation'] = [''] * len(qa_pairs)
qa_pairs['Detection Latency'] = [''] * len(qa_pairs)
qa_pairs['Detection LLM'] = [''] * len(qa_pairs)

qa_pairs.head()

Unnamed: 0,Test Case,Step,Utterance,Session Attributes,Ground Truth Answer,Expected Intent,Expected State,RAG LLM,Response,Actual Intent,...,Retrieval Latency,LLM Latency,Test Result,Test Explanation,Test Latency,Test LLM,Hallucination,Hallucination Explanation,Detection Latency,Detection LLM
0,1,1,What is the mission or philosophy of Example C...,"knowledgeBase=Alfa,ragLLM=Claude V3 Haiku,eval...","At Example Corp Hospitality Group, our mission...",.*,Fulfilled,,,,...,,,,,,,,,,
1,2,1,How many hotel brands does Example Corp Hospit...,"knowledgeBase=Alfa,ragLLM=Claude V3 Haiku,eval...",Example Corp Hospitality Group has five distin...,.*,Fulfilled,,,,...,,,,,,,,,,
2,3,1,What type of accommodations does Example Corp ...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts offers premium ac...,.*,Fulfilled,,,,...,,,,,,,,,,
3,4,1,Where are some of the exclusive locations for ...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts are located in Ma...,.*,Fulfilled,,,,...,,,,,,,,,,
4,5,1,What types of room options are available at Ex...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts offers a range of...,.*,Fulfilled,,,,...,,,,,,,,,,


In [9]:
#
# parse the list of test cases into separate cases each with one or more steps
#
def parse_test_cases(df) -> (int, list[pd.DataFrame]):
    test_cases = []
    first_index = None
    counter = -1
    
    for index, row in df.iterrows():
        counter += 1
        logger.debug(f'Evaluating counter {counter}, index {index}, Step={row["Step"]}')
        if row['Step'] == 1:
            if first_index is not None:
                # capture prior row(s) as a test case
                logger.debug(f'appending df[{first_index}:{index}]')
                test_cases.append(df[first_index:counter])
            first_index = counter
        
    # don't forget the last batch
    logger.debug(f'appending df[{first_index}:{counter+1}]')
    test_cases.append(df[first_index:counter+1])
    
    num_steps = counter+1
    
    return num_steps, test_cases

In [10]:
count, test_cases = parse_test_cases(qa_pairs)

In [11]:
logger.info(f'Found {len(test_cases)} test cases, with a total of {count} test steps.')

In [12]:
test_cases[0].head()

Unnamed: 0,Test Case,Step,Utterance,Session Attributes,Ground Truth Answer,Expected Intent,Expected State,RAG LLM,Response,Actual Intent,...,Retrieval Latency,LLM Latency,Test Result,Test Explanation,Test Latency,Test LLM,Hallucination,Hallucination Explanation,Detection Latency,Detection LLM
0,1,1,What is the mission or philosophy of Example C...,"knowledgeBase=Alfa,ragLLM=Claude V3 Haiku,eval...","At Example Corp Hospitality Group, our mission...",.*,Fulfilled,,,,...,,,,,,,,,,


In [13]:
for test_index, test_case in enumerate(test_cases):
    for step_index, step in test_case.iterrows():
        logger.info(f'Test #{step["Test Case"]} step #{step["Step"]}: {step["Utterance"]}')

In [14]:
import bedrock_helpers

def run_rag_solution(test_step: pd.Series, session_attributes: list):
    ### TODO utterance = 'QUESTION: ' + test_step.get('Utterance', 'ERROR')
    utterance = test_step.get('Utterance', 'ERROR')

    # get Knowledge Base instance
    knowledge_base = session_attributes.get('knowledgeBase', 'Default')
    bedrock_kb = bedrock_helpers.select_knowledge_base(knowledge_base)
    
    logger.info(f'S3 BUCKET FOR KB = {bedrock_kb.s3_bucket}')

    # set a query filter - in this case based on the S3 folder structure
    brand = session_attributes.get('brand', '')
    logger.debug(f'BRAND FILTER = {brand}')
    query_filter = {
        'startsWith': {
            'key': 'x-amz-bedrock-kb-source-uri',
            'value': 's3://' + bedrock_kb.s3_bucket + brand
        }
    }            
    bedrock_kb.metadata_filter = query_filter

    logger.info(f'KB QUERY FILTER = {json.dumps(query_filter)}')

    # retrieve the context from knowledge base
    response = bedrock_kb.retrieve_context(query=utterance)

    num_matches = response.get('num_matches', -1)
    retrieval_time = response.get("invocation_time", -1)
    context = response.get('context', 'None')        
    logger.debug(f'context = {context}')
    
    # select agent based on session attributes
    llm_name = session_attributes.get('ragLLM')
    agent = bedrock_helpers.select_conversational_agent(llm_name)

    agent.context = True
    agent.guardrails = True
            
    agent_response = agent.generate_response(context, utterance)
    model = agent.model_instance.model_instance_name
    time_in_ms = agent_response.get('invocation_time')
    tokens_in = agent_response.get('input_tokens')
    tokens_out = agent_response.get('output_tokens')
    prompt = agent_response.get('prompt')
    rag_response = agent_response.get('response')
    
    logger.info('LLM PROMPT = {}'.format(prompt))
    logger.info('AGENT RESPONSE = {}'.format(rag_response))

    test_step['Response'] = rag_response
    test_step['RAG LLM'] = agent.model_instance.model_id
    test_step['Retrieval Latency'] = retrieval_time
    test_step['LLM Latency'] = time_in_ms

    logger.info(f'Request: {utterance}')
    logger.info(f'Response: {rag_response}')
    logger.info('')
    
    # if a ground truth answer is available, evaluate the answer against it
    if (ground_truth := test_step.get('Ground Truth Answer', '')) > '':
        # select LLM based on session attributes
        evaluation_llm_name = session_attributes.get('evaluationLLM')
        evaluation_agent = bedrock_helpers.select_conversational_agent(evaluation_llm_name)
        
        evaluation_result = evaluation_agent.evaluate_response(utterance, rag_response, ground_truth)
        time_in_ms = evaluation_result.get('invocation_time')
        result = evaluation_result.get('result')
        rationale = evaluation_result.get('rationale')
        
        test_step['Test Result'] = result
        test_step['Test Explanation'] = rationale
        test_step['Test Latency'] = time_in_ms
        test_step['Test LLM'] = evaluation_agent.model_instance.model_id

        logger.info(f'Test Result: {result}')
        logger.info(f'Rationale: {rationale}')
        logger.info(f'Duration = {time_in_ms}')    
         
    # check for hallucinations
    detection_llm_name = session_attributes.get('detectionLLM')
    detection_agent = bedrock_helpers.select_conversational_agent(detection_llm_name)

    hallucination_result = detection_agent.detect_hallucinations(utterance, rag_response, context)
    time_in_ms = hallucination_result.get('invocation_time')
    result = hallucination_result.get('result')
    rationale = hallucination_result.get('rationale')
                                  
    test_step['Hallucination'] = result
    test_step['Hallucination Explanation'] = rationale
    test_step['Detection Latency'] = time_in_ms
    test_step['Detection LLM'] = detection_agent.model_instance.model_id

    logger.info(f'Hallucination: {result}')
    logger.info(f'Rationale: {rationale}')
    logger.info(f'Duration = {time_in_ms}')    
    logger.info('')
    

In [15]:
#
# Worker function to process a test case (which may have multiple steps)
#
def execute_test_case(test_case: pd.DataFrame) -> pd.DataFrame:
    attributes = ''
    test_case_result = []
    session_id = None
    request_attributes={'channel': channel_attribute}

    for index, row in test_case.iterrows():        
        logger.debug(f'Evaluating index={index}, Test={row["Test Case"]}, Step={row["Step"]}')
    
        if int(row['Step']) == 1:
            # start a new session
            session_id = str(uuid.uuid4())
            logger.debug('            new session: {}'.format(session_id))

            # reset the session attributes from the test case
            attributes = row['Session Attributes']
            if len(attributes) > 0:
                attributes = attributes.rstrip(',')
                session_attributes = dict(item.split('=') for item in attributes.split(','))
            else:
                session_attributes = {}
                
        # increase Lex's timeout limit for the Lambda codehook
        session_attributes['x-amz-lex:codehook-timeout-ms'] = '90000'
        
        # add or update a session attribute to track this test run (for analytics)
        session_attributes['test-run'] = '{}'.format(test_run)
        session_attributes['test-case'] = '{:0>3}'.format(row['Test Case'])
        session_attributes['test-step'] = '{:0>3}'.format(row['Step'])
        session_attributes['ground-truth'] = '{}'.format(row['Ground Truth Answer'])

        logger.debug('session attributes: {}'.format(json.dumps(session_attributes, indent=4)))
                   
        session_state={'sessionAttributes': session_attributes}        
        user_input = row['Utterance']

        if USE_LEX:
            logger.info(f'Session: {session_id}: calling Lex for test step [{row["Test Case"]}.{row["Step"]}]')

            bot_response = None
            try:
                bot_response = lex_client.recognize_text(
                    botId=parameters['bot_id']['value'],
                    botAliasId=parameters['bot_alias_id']['value'],
                    localeId=parameters['locale_id']['value'],
                    sessionId=session_id,
                    text=user_input,
                    sessionState=session_state,
                    requestAttributes=request_attributes
                )
            except Exception as e:
                logger.error('Exception calling Lex for test step [{}.{}]'.format(row['Test Case'], row['Step']))
                logger.error('Exception = {}'.format(str(e)))
                break

            if bot_response == None:
                logger.error('No response from Lex for test step [{}.{}]'.format(row['Test Case'], row['Step']))
                break

            logger.info('-- called Lex for test step [{}.{}]'.format(row['Test Case'], row['Step']))

            logger.info(json.dumps(bot_response, indent=4))
            
            row['Response'] = bot_response.get('messages',[{}])[0].get('content','[no Response>')
            row['Actual Intent'] = bot_response['sessionState']['intent']['name']
            row['Actual State'] = bot_response['sessionState']['intent']['state']

            result_attributes = bot_response['sessionState']['sessionAttributes']            
            row['Retrieval Latency'] = result_attributes.get('retrieval_latency')

            row['RAG LLM'] = result_attributes.get('rag_llm')
            row['LLM Latency'] = result_attributes.get('rag_latency')

            row['Test Result'] = result_attributes.get('evaluation_result')
            row['Test Explanation'] = result_attributes.get('evaluation_details')
            row['Test Latency'] = result_attributes.get('evaluation_latency')
            row['Test LLM'] = result_attributes.get('evaluation_llm')

            row['Hallucination'] = result_attributes.get('detection_result')
            row['Hallucination Explanation'] = result_attributes.get('detection_details')
            row['Detection Latency'] = result_attributes.get('detection_latency')
            row['Detection LLM'] = result_attributes.get('detection_llm')

            test_case.loc[index] = row
            
        else:
            logger.info(f'Session: {session_id}: calling RAG solution for test step [{row["Test Case"]}.{row["Step"]}]')
            run_rag_solution(row, session_attributes)
            test_case.loc[index] = row

        logger.debug(f'Session: {session_id}: Answer test step [{row["Test Case"]}.{row["Step"]}] is {row["Response"]}')
        
    return test_case


In [16]:
#
# ### Process the test cases
#
def process_test_cases(test_cases: list):
    available_cpus = mp.cpu_count()
    cpu_count = available_cpus if available_cpus <= MAX_THREADS else MAX_THREADS
    logger.info('CPU count is {}; using max processes = {}'.format(available_cpus, cpu_count))

    test_results = []
    start_time = time.perf_counter()
    
    if not MULTIPLE_THREADS:
        logger.info(f'SINGLE-THREADED MODE: {cpu_count} PROCESS')
        for idx, test_case in tqdm(enumerate(test_cases), total=len(test_cases), desc="Running test cases"):    
            results = execute_test_case(test_case)
            test_results.append(results)
    else:
        logger.info(f'MULTI-THREADED MODE: {cpu_count} PROCESSES')
        pool = mp.Pool(cpu_count)        
        # test_results = pool.map(execute_test_case, test_cases)
        test_results = list(tqdm(pool.imap(execute_test_case, test_cases), total=len(test_cases)))
        pool.close()
        pool.join()

    duration = time.perf_counter() - start_time
    duration_string = 'method time = %.0f' % (duration * 1000) + ' ms'

    return duration, test_results

In [17]:
num_tests = len(test_cases)

# process the test cases - with one or more threads
duration, test_results = process_test_cases(test_cases)

print(f'duration = {duration:.0f} seconds')


  0%|          | 0/50 [00:00<?, ?it/s]

duration = 66 seconds


In [18]:
# update the list of qa_pairs with the results
for test_case in test_results:
    for index, row in test_case.iterrows():
        qa_pairs.loc[index] = row        

qa_pairs.head()

Unnamed: 0,Test Case,Step,Utterance,Session Attributes,Ground Truth Answer,Expected Intent,Expected State,RAG LLM,Response,Actual Intent,...,Retrieval Latency,LLM Latency,Test Result,Test Explanation,Test Latency,Test LLM,Hallucination,Hallucination Explanation,Detection Latency,Detection LLM
0,1,1,What is the mission or philosophy of Example C...,"knowledgeBase=Alfa,ragLLM=Claude V3 Haiku,eval...","At Example Corp Hospitality Group, our mission...",.*,Fulfilled,anthropic.claude-3-haiku-20240307-v1:0,Example Corp Hospitality Group's mission is to...,CorporateOverview,...,476,1251,PASSED,The actual answer sentence conveys the same me...,2593,anthropic.claude-3-sonnet-20240229-v1:0,CORRECT,The actual answer sentence accurately reflects...,5401,anthropic.claude-3-sonnet-20240229-v1:0
1,2,1,How many hotel brands does Example Corp Hospit...,"knowledgeBase=Alfa,ragLLM=Claude V3 Haiku,eval...",Example Corp Hospitality Group has five distin...,.*,Fulfilled,anthropic.claude-3-haiku-20240307-v1:0,Example Corp Hospitality Group has five distin...,BrandPortfolio,...,499,1536,PASSED,The actual answer sentence provides the same c...,2453,anthropic.claude-3-sonnet-20240229-v1:0,CORRECT,The document states that Example Corp Hospital...,5483,anthropic.claude-3-sonnet-20240229-v1:0
2,3,1,What type of accommodations does Example Corp ...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts offers premium ac...,.*,Fulfilled,anthropic.claude-3-haiku-20240307-v1:0,Example Corp Seaside Resorts offers a variety ...,Accommodations,...,570,1586,FAILED,The actual answer sentence does not mention th...,2370,anthropic.claude-3-sonnet-20240229-v1:0,CORRECT,The actual answer sentence accurately summariz...,3116,anthropic.claude-3-sonnet-20240229-v1:0
3,4,1,Where are some of the exclusive locations for ...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts are located in Ma...,.*,Fulfilled,anthropic.claude-3-haiku-20240307-v1:0,Example Corp Seaside Resorts has exclusive loc...,Locations,...,571,1142,PASSED,The actual answer sentence accurately lists al...,4168,anthropic.claude-3-sonnet-20240229-v1:0,CORRECT,The actual answer sentence accurately lists th...,2913,anthropic.claude-3-sonnet-20240229-v1:0
4,5,1,What types of room options are available at Ex...,"brand=/seaside-resorts,knowledgeBase=Alfa,ragL...",Example Corp Seaside Resorts offers a range of...,.*,Fulfilled,anthropic.claude-3-haiku-20240307-v1:0,Example Corp Seaside Resorts offers a range of...,Accommodations,...,483,1271,PASSED,The actual answer sentence covers all the acco...,2061,anthropic.claude-3-sonnet-20240229-v1:0,CORRECT,The actual answer sentence accurately summariz...,3728,anthropic.claude-3-sonnet-20240229-v1:0


In [19]:
# store the output
write_evaluation_result(qa_pairs, parameters['output_file']['value'])