# 3 - Search OpenSearch indices

This notebook shows how to make searches against the knowledge base in a notebook format.

#### Install dependencies

In [None]:
!pip install --upgrade --force-reinstall boto3

In [None]:
!pip install opensearch-py

#### Import dependencies

In [None]:
import os
import boto3
import json
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, helpers
from urllib.parse import quote
from datetime import datetime

## Set search parameters - maximum length of RAG context text, hit score thresholds, and flags to use document summary and date in determining ranking of search results

The max_length_rag_text parameter sets the maximum length of context provided to the LLM

In [None]:
max_length_rag_text = 15000

Each OpenSearch full text hit must meet or exceed the full_text_hit_score_threshold to be considered in the result

In [None]:
full_text_hit_score_threshold = 0.5

The use_summary and summary_weight_over_full_text parameters determine whether the document summary is used to determine the ranking of search results and the extent to which the summary is weighted over full text results.  Each OpenSearch summary hit must meet or exceed the summary_hit_score_threshold to be considered in the result

In [None]:
use_summary = True
summary_weight_over_full_text = 1.5
summary_hit_score_threshold = 0.9

The use_date and years_until_zero_value parameters determine whether the document date is used to determine the ranking of search results, and the extent to which the score is impacted by age of document.  The parameter points_deducted_per_day_old is calculated based on the number of years until the document has no value (maximum score is 1).

In [None]:
use_date = True
years_until_no_value = 20
points_deduct_per_day_old = 1/(365 * years_until_no_value)

#### Get the Bedrock Guardrail ID, version, and the block message given in the stack parameters

In [None]:
stack_name = "chatbot-demo"

cf_client = boto3.client('cloudformation')
response = cf_client.describe_stacks(StackName=stack_name)
outputs = response["Stacks"][0]["Outputs"]
bedrock_guardrail_id = list(filter(lambda outputs: outputs['OutputKey'] == 'BedrockGuardrailId', outputs))[0]["OutputValue"]
print("The Bedrock Guardrail ID is:", bedrock_guardrail_id)
bedrock_guardrail_version = list(filter(lambda outputs: outputs['OutputKey'] == 'BedrockGuardrailVersion', outputs))[0]["OutputValue"]
print("The Bedrock Guardrail version is:", bedrock_guardrail_version)

stack_parameters = response["Stacks"][0]["Parameters"]
bedrock_guardrails_block_message = list(filter(lambda stack_parameters: stack_parameters['ParameterKey'] == 'BedrockGuardrailsBlockMessage', stack_parameters))[0]["ParameterValue"]
print("The Bedrock Guardrail block message is:", bedrock_guardrails_block_message)

#### Set the values for S3 key to weblink conversion

In [None]:
use_s3_key_to_weblink_conversion = True
s3_key_prefix_to_remove = "md/website"
weblink_prefix = "https://internal-site.us"
s3_key_suffix_to_remove = ".md"
weblink_suffix = ".html"

#### Retrieve stored values

In [None]:
%store -r region_name
%store -r host
%store -r summary_index_name
%store -r full_text_index_name
%store -r date_index_name
%store -r pipeline_id
%store -r model_id
print("Region is:", region_name)
print("OpenSearch endpoint", host)
print("Summary index name", summary_index_name)
print("Full Text index name", full_text_index_name)
print("Date index name", date_index_name)
print("Semantic search pipeline ID", pipeline_id)
print("Model ID", model_id)

#### Get OpenSearch client

In [None]:
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region_name)

opensearch_client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    requestTimeout = 20,
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

#### Create a Bedrock runtime object

In [None]:
bedrock_runtime = boto3.client(
    service_name='bedrock-runtime',
    region_name=region_name, 
)


#### Set the text gen config and content type for Bedrock

In [None]:
text_gen_config = {
    "maxTokenCount": 350,
    "stopSequences": [], 
    "temperature": 0,
    "topP": 1
}
bedrock_model_id = 'amazon.titan-text-express-v1'
accept = 'application/json' 
content_type = 'application/json'


## Set the question to be asked

In [None]:
#### Your question below
query_text = "What is the impact of wind on solar panel performance?"
#query_text = "What are the reasons why consumers do not choose electric vehicles?"
#query_text = "What are the benefits of electric vehicles?"

#### Ask an unsafe question to test Bedrock Guardrail
#query_text = "Why do idiots drive hybrid cars?"

#### Do a semantic search for the search term on the summary index

In [None]:
if use_summary:
    query={
        "_source": {
            "excludes": [ "text_embedding" ]
        },
        "size": 30,
        "query": {
            "neural": {
                "text_embedding": {
                "query_text": query_text,
                "model_id": model_id,
                "k": 30
                }
            }
        }
    }

    summary_response = opensearch_client.search(index=summary_index_name, 
                           body=query,
                           stored_fields=["text"])

    print("Got",len(summary_response["hits"]["hits"]),"hits.")
else:
    summary_response = []

In [None]:
#summary_response

#### Build a list of keys of documents with the highest summary score for each document

In [None]:
if use_summary:

    doc_list = []
    min_summary_hit_score = summary_hit_score_threshold * summary_response['hits']['max_score']

    for i in summary_response['hits']['hits']:
        # Add this document if it's within the hit score threshold
        if i['_score'] >= min_summary_hit_score:
            doc_list.append({i['_source']['document']: i['_score']})

    all_keys = set().union(*doc_list)
    document_summary_high_scores = {key: max(dic.get(key, float('-inf')) for dic in doc_list) for key in all_keys}

    document_summary_high_scores

#### Do a semantic search for the search term on the full text index

In [None]:
query={
    "_source": {
        "excludes": [ "text_embedding" ]
    },
    "size": 20,
    "query": {
        "neural": {
            "text_embedding": {
            "query_text": query_text,
            "model_id": model_id,
            "k": 30
            }
        }
    }
}

full_text_response = opensearch_client.search(index=full_text_index_name, 
                       body=query,
                       stored_fields=["text"])

print("Got",len(full_text_response["hits"]["hits"]),"hits.")

In [None]:
#full_text_response["hits"]["hits"]

#### If use_date parameter is true, get the date of each document with a search hit and calculate its age in days

In [None]:
document_age_list = []

if use_date:

    for full_text_hit in full_text_response["hits"]["hits"]:
        if not any(d['document'] == full_text_hit["_source"]["document"] for d in document_age_list):
            query={
                "query": {
                    "match_phrase": {
                        "document": full_text_hit["_source"]["document"]
                    }
                }
            }
            date_response = opensearch_client.search(index=date_index_name, body=query)
            document_date = date_response["hits"]["hits"][0]["_source"]["document_date"][:10]
            days_old = (datetime.now() - datetime.strptime(document_date, "%Y-%m-%d")).days
            document_age_list.append(
                {
                    "document": full_text_hit["_source"]["document"],
                    "date": document_date,
                    "days_old": days_old
                }
            )

document_age_list

#### If use_date parameter is true, deduct points on scores of each full text hit based on the age of the document and parameter points to deduct per day old

In [None]:
if use_date:

    for hit in full_text_response["hits"]["hits"]:
        days_old = list(filter(lambda doc: doc['document'] == hit['_source']['document'], document_age_list))[0]["days_old"]
        points_to_deduct = points_deduct_per_day_old * days_old
        if hit["_score"] > points_to_deduct:
            hit["_score"] = hit["_score"] - (points_deduct_per_day_old * days_old)
        else:
            hit["_score"] = 0

In [None]:
#full_text_response

#### Make a list of full text hits with associated summary document scores
To ensure continuity of context, the sections of the document immediately before and after each hit will be added to the hit list

In [None]:
hit_sections = []
min_hit_score = full_text_hit_score_threshold * min(full_text_response["hits"]["hits"], key=lambda x:x['_score'])["_score"]

if use_summary:
    for hit in full_text_response["hits"]["hits"]:
        for i in range(hit["_source"]["section"] - 1, hit["_source"]["section"] + 2):
            if "page" in hit["_source"]:
                page = hit["_source"]["page"]
            else:
                page = None
            if "section_heading" in hit["_source"]:
                section_heading = hit["_source"]["section_heading"]
            else:
                section_heading = None
            if i > 0 and hit["_score"] >= min_hit_score:
                # If summary doc score exists for this full text hit then use that, else do not add this item
                if hit['_source']['document'] in document_summary_high_scores:
                    document_score = summary_weight_over_full_text * document_summary_high_scores[hit["_source"]["document"]]
                    hit_sections.append(
                            {
                                "document": hit["_source"]["document"],
                                "page": page,
                                "section_heading": section_heading,
                                "section": i,
                                "document_score": hit["_score"] + document_score
                            }
                        )
else:
    for hit in full_text_response["hits"]["hits"]:
        for i in range(hit["_source"]["section"] - 3, hit["_source"]["section"] + 4):
            if "page" in hit["_source"]:
                page = hit["_source"]["page"]
            else:
                page = None
            if "section_heading" in hit["_source"]:
                section_heading = hit["_source"]["section_heading"]
            else:
                section_heading = None
            if i > 0 and hit["_score"] >= min_hit_score:
                hit_sections.append(
                        {
                            "document": hit["_source"]["document"],
                            "page": page,
                            "section_heading": section_heading,
                            "section": i,
                            "document_score": hit["_score"],
                        }
                    )
    
#hit_sections

In [None]:
hit_sections

#### Sort the hit list by document score high to low

In [None]:
sorted_list = sorted(hit_sections, key=lambda x: (x['document_score'] * -1, x['document'], x['page'], x['section_heading'], x['section']))

In [None]:
sorted_list

In [None]:
len(sorted_list)

#### Remove the scores and eliminate duplicates in the sorted hit list

In [None]:
sorted_list_without_scores = []

for i in sorted_list:
    sorted_list_without_scores.append(
        {
            "document": i['document'],
            "section": i['section']
        }
    )
    
deduplicated_list = {frozenset(item.items()) : item for item in sorted_list_without_scores}.values()
deduplicated_list

In [None]:
len(deduplicated_list)

#### Retrieve the text of each section in the hit list and concatenate into a single string as RAG context for the LLM

In [None]:
rag_text = ""
reference_list = []

for i in sorted_list:
#    print(i)
    query = {
        'size': 1,
        "query": {
            "bool": {
                "must": [
                    {
                        "match": {
                            "document": i["document"]
                        }
                    },
                    {
                        "match": {
                            "section": i["section"]
                        }
                    }
                ]
            }
        }
    }
    response2 = opensearch_client.search(
        body = query,
        index = full_text_index_name
    )
#    print(response2["hits"]["hits"][0]["_source"])
    # Check to make sure there is a value and that adding this hit will not make the RAG text exceed the maximum length
    if len(response2["hits"]["hits"]) > 0 and (len(rag_text) + len(response2["hits"]["hits"][0]["_source"]["text"]) < max_length_rag_text):
        # Add the text from this hit to the RAG text
        rag_text += response2["hits"]["hits"][0]["_source"]["text"]
        # Add the reference
        reference = {
            "document": i['document'],
            "page": i['page'],
            "section_heading": i['section_heading']
        }
        reference_list.append(reference)

#### Check the length of the RAG text

In [None]:
len(rag_text)

In [None]:
rag_text

#### Set the prompt for the LLM

In [None]:
prompt_template = '''Context - {context}\n\n\n\n
Based only on the above context, answer this question - {query_text}'''
prompt_data = prompt_template.replace("{context}", rag_text).replace("{query_text}", query_text)
body = json.dumps({
    "inputText": prompt_data,
    "textGenerationConfig": text_gen_config
})

#### Make the request to Bedrock and show the response

In [None]:
bedrock_response = bedrock_runtime.invoke_model(
    body=body, 
    modelId=bedrock_model_id, 
    accept=accept, 
    contentType=content_type,
    guardrailIdentifier=bedrock_guardrail_id,
    guardrailVersion=bedrock_guardrail_version
)
response_body = json.loads(bedrock_response.get('body').read())
output_text = response_body.get('results')[0].get('outputText')
output_text

#### Show the references used to create the RAG text

In [None]:
reference_text = ""

# If Guardrails did not block, then get references
if output_text != bedrock_guardrails_block_message:

    reference_list_dedupe = {frozenset(item.items()) : item for item in reference_list}.values()
    for item in reference_list_dedupe:

        # Default document reference is the S3 key
        document = item['document']
        # If use_s3_key_to_weblink_conversion is set then convert document to a weblink
        if use_s3_key_to_weblink_conversion:
            if document.startswith(s3_key_prefix_to_remove):
                document = document.replace(s3_key_prefix_to_remove, weblink_prefix, 1)
            if document.endswith(s3_key_suffix_to_remove):
                #document = document.replace(s3_key_suffix_to_remove, weblink_suffix, 1)
                document = document[:-len(s3_key_suffix_to_remove)] + weblink_suffix

        # If there is a page reference, then include it
        if item["page"] is not None:
            reference_text += "\n- " + document + " page: " + str(item['page'])
        # If there is a section heading reference and weblink conversion is selected then add it to URL
        elif item["section_heading"] is not None and use_s3_key_to_weblink_conversion:
            reference_text += "\n- " + document + "#" + quote(str(item['section_heading'])) + " "
        # If there is a section heading reference and weblink conversion is not selected then add it as text
        elif item["section_heading"] is not None and use_s3_key_to_weblink_conversion is False:
            reference_text += "\n- " + document + " heading: " + str(item['section_heading'])
        else:
            reference_text += "\n- " + document

reference_text

In [None]:
if section_heading:
    quote(item['section_heading'])

#### Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#### SPDX-License-Identifier: MIT-0