In [None]:
#Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#SPDX-License-Identifier: MIT-0

# Open Search Serverless Index preparation

In this notebook, we are creating a preparing the index on the OpenSearch collection and loading movie data. Once the data is loaded, we will compare search results across OpenSearch keyword search and semantic search capabilities.

#### Install required libraries
The following cell installs required python libraries specified in the 'requirements.txt' file. Execute this cell if you are starting from this notebook

In [None]:
#This cell installs the required libraries specified in the 'requirements.txt' file
!pip install -r requirements.txt --quiet

#### Load OpenSearch Collection details

We have created an OpenSearch Serverless collection in the previous notebook. This notebook builds upon that by preparing an index and loading movie metadata into it

In [None]:
#loading variables from previous notebook that will be needed for this one
%store -r os_host
%store -r collection_name

The next cell provides an alternative way to manually set the values of the `os_host`, `collection_name` variables, if needed. This can be useful if the stored values are not available or need to be overridden for a specific use case.

In [None]:
#alternatively you can manually set those values if you need
#os_host = ""
#collection_name = ""

#### Load required libraries
In the next cell, we import necessary Python libraries and modules. We also add the `llm_utils` module from the `../lib/src/utils/` directory to the system path. This module contains utility functions that we will use later in the notebook

In [None]:
import sagemaker
import boto3
import json
import pprint as pp
import os
import shutil
import csv
import time
import pprint

from opensearchpy import (
    AWSV4SignerAuth
)

#adding our utils library to sys path
import sys
sys.path.append("../src/utils/")
import llm_utils


Either set the region manually or use the default one.

In [None]:
#Manually set the region to deploy the openSearch serverless collection
#REGION = 'us-east-1'

#get default from boto3 session
session = boto3.session.Session()
# Get the default region from the session
REGION = session.region_name

print(f"Region: {REGION}")

## Pre-requisites

Prior to going through this workshop you need to create the Open search index and the required policies. We however need to add the execution role that you're using to run this notebook to the data access policy.

### Get current role/identity

First we identify the current identity. it might be an execution role if you're using sagemaker or a user if you're using visual studio with the AWS CLI plugin

In [None]:
identity_arn = ""

try:
    # Get the execution role ARN
    identity_arn = sagemaker.get_execution_role()
    
except Exception as e:
    print("Not a sagemaker role, trying to retrieve the user identity")
    # Create an STS client
    sts_client = boto3.client('sts')

    # Get the caller identity
    caller_identity = sts_client.get_caller_identity()
    identity_arn = caller_identity['Arn']

print(f"Identity ARN:{identity_arn}")


### Create the policy to allow the user or role to add a data access policy to the opensearch collection

In the following code cell, an AWS Identity and Access Management (IAM) client is created using the boto3 library in Python. Then, a policy document is defined with a statement that allows all actions ("aoss:*") on all resources ("Resource": "*"). This policy document is used to create an IAM policy named "AOSSAccessPolicy" using the create_policy method of the IAM client. The created policy can be attached to IAM roles or users to grant them the specified permissions.

In [None]:
# Create an IAM client
iam = boto3.client('iam')

# Define the policy document
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "*"
        }
    ]
}

# Create the IAM policy
aossAccessPolicy = iam.create_policy(
    PolicyName='AOSSAccessPolicy',
    PolicyDocument=json.dumps(policy_document)
)


aossAccessPolicyArn = aossAccessPolicy["Policy"]["Arn"]

In the next cell, we wait for the policy to be created and then attach it to the current identity (user or role). This ensures that the current identity has the necessary permissions to add a data access policy to the OpenSearch Serverless collection.

In the following code cell, the script first waits for 10 seconds to ensure that the policy has been created. It then checks whether the provided identity ARN belongs to an IAM user or an IAM role. If it's a user, it attaches the newly created AOSS access policy to the user. If it's a role, it attaches the policy to the role instead. The code handles the NoSuchEntityException that could occur if the ARN doesn't correspond to a user, assuming it's a role in that case.

In [None]:
#wait for the policy to be created
time.sleep(10)

# Check if the identity ARN is for a user or a role
try:
    # Try to get the user information
    user = iam.get_user(UserName=identity_arn.split('/')[-1])
    print(f"The identity ARN '{identity_arn}' is for a user.")

    # Attach the policy to the user
    iam.attach_user_policy(
        UserName=user['User']['UserName'],
        PolicyArn=aossAccessPolicyArn
    )
except iam.exceptions.NoSuchEntityException:
    # If the identity ARN is not for a user, it must be for a role
    print(f"The identity ARN '{identity_arn}' is for a role.")

    # Attach the policy to the role
    iam.attach_role_policy(
        RoleName=identity_arn.split('/')[-1],
        PolicyArn=aossAccessPolicyArn
    )


### Data Access Policy creation for opensearch

We create a new Data access policy to attach the notebook's execution role or user.

The provided code creates an OpenSearch Serverless client using boto3, defines a data access policy granting permissions to perform various operations on the "semantic-search" collection and its index, assigns the policy to a specified identity, and calls the `create_access_policy` method to create the data access policy with a given name, description, and policy document.

In [None]:
# Create an OpenSearch Serverless client
opss_client = boto3.client('opensearchserverless')

data_access_policy = json.dumps([
      {
        "Rules": [
          {
            "Resource": [
              f"collection/{collection_name}"
            ],
            "Permission": [
              "aoss:CreateCollectionItems",
              "aoss:DeleteCollectionItems",
              "aoss:UpdateCollectionItems",
              "aoss:DescribeCollectionItems"
            ],
            "ResourceType": "collection"
          },
          {
            "Resource": [
              f"index/{collection_name}/*"
            ],
            "Permission": [
              "aoss:CreateIndex",
              "aoss:DeleteIndex",
              "aoss:UpdateIndex",
              "aoss:DescribeIndex",
              "aoss:ReadDocument",
              "aoss:WriteDocument"
            ],
            "ResourceType": "index"
          }
        ],
        "Principal": [
          identity_arn
        ],
        "Description": "data-access-rule"
      }
    ], indent=2)

data_access_policy_name_nb = f"{collection_name}-policy-notebook"

# Create the data access policy
response = opss_client.create_access_policy(
    description='Data access policy for semantic search collection',
    name=data_access_policy_name_nb,
    policy=str(data_access_policy),
    type='data'
)

print(response)

## Create semantic search engine with Amazon OpenSearch Service Serverless

### Opensearch data access policy update

To access our opensearch serverless collection, we need to update its data policy update with the current user

The code cell sets up an AWS OpenSearch Serverless client by assigning the service name, obtaining AWS credentials, creating an authentication object, and then using the LLMUtils module to create the client with the authentication object and the OpenSearch host.

In [None]:
#opensearch serverless service, aka aoss
service = 'aoss'

#get an Auth object to call aoss
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, REGION, service)

#LLMUtils.connect_to_aoss() can be found in lib/src/utils/ folder.
aoss_client = llm_utils.connect_to_aoss(auth, os_host)


### Create OpenSearch index

In the next few cells, we define the index name, data columns to be included, and the index configuration. We then create the index if it doesn't already exist


In the following code cell, an OpenSearch index named "movies-index" is defined. The code also specifies a list of data columns that will be added to this index, including information about movies such as their TMDB ID, original language, title, description, genres, release year, keywords, director, actors, popularity score, popularity score bins, average vote rating, and average vote rating bins.

In [None]:
#opensearch index name
index_name = "movies-index"

#data column to add to the index
data_columns = ['tmdb_id', 'original_language', 'original_title', 'description', 'genres', 'year', 'keywords', 'director', 'actors', 'popularity', 'popularity_bins',
                  'vote_average', 'vote_average_bins']

In [None]:
#index configuration. note that we're adding both text metadata as well as the vector_index property that will be storing our embedding for each title.
# For additional information on the K-NN index configuration, please read the below documentation.
#https://opensearch.org/docs/latest/field-types/supported-field-types/knn-vector/
#https://opensearch.org/docs/latest/search-plugins/knn/knn-index/

index_body = {
  "settings": {
    "index": {
      'number_of_shards': 4,
      "number_of_replicas": 0,
      "knn": True,
      "knn.algo_param.ef_search": 100
    }
  },
  "mappings": {
    "properties": {
      "tmdb_id": {"type": "integer"},
      "original_language": {"type": "text"},
      "original_title": {"type": "text"},
      "description": {"type": "text"},
      "genres": {"type": "text"},
      "year": {"type": "integer"},
      "keywords": {"type": "text"},
      "director": {"type": "text"},
      "actors": {"type": "text"},
      "popularity": {"type": "float"},
      "popularity_bins": {"type": "text"},
      "vote_average": {"type": "float"},
      "vote_average_bins": {"type": "text"},
      "vector_index": {
        "type": "knn_vector",
        "dimension": 1024, #if you use cohere: dimension of the embedding is 1024, for titan: 1536
        "method": {
          "name": "hnsw",
          "space_type": "l2",
          "engine": "nmslib",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      }
    }
  }
}

This section of the code checks if the specified index already exists in OpenSearch. If the index does not exist, it creates a new index with the provided index body. 

In the following code cell, it first retrieves a list of existing index names in the Elasticsearch cluster using the `get_alias` method of the `indices` client. It then checks if the specified `index_name` is not present in the list of existing indexes. If it's not present, it creates a new index with the given `index_name` and `index_body` configuration using the `create` method of the `indices` client. If the index already exists, it prints a message indicating that the index already exists.

In [None]:
#get a list of the indexes already existing
indexes = aoss_client.indices.get_alias("*")
indexes_list = list(indexes.keys())

#check if index doesn't already exist and create it
if index_name not in indexes_list:
    print('Creating index:\n')
    create_response = aoss_client.indices.create(index_name, body=index_body)
    print(create_response)
else:
    print("index already exists")

Here, we display information about the newly created index

The code cell retrieves and displays details about a previously created OpenSearch index, including its mapping (fields and data types), settings (configuration options like shards and replicas), and any associated aliases

In [None]:
#display information on the index you just created

# Get index mapping
response = aoss_client.indices.get_mapping(index=index_name)
pp.pprint(response) 

# Get index settings
response = aoss_client.indices.get_settings(index=index_name)
pp.pprint(response)

# Get index aliases
response = aoss_client.indices.get_alias(index=index_name) 
pp.pprint(response)

## Create embeddings from CSV file
This section sets up the path to the CSV file containing movie metadata. It provides the option to use either a small dataset with 1,000 movies or a larger dataset with 45,000 movies.

In the following code cell, a path to a CSV file containing movie metadata is being assigned to the variable `movies_data_path`. The commented line shows an alternative path for a larger dataset containing metadata for 45,000 movies. By default, the script is configured to use a smaller dataset with 1,000 movies.

In [None]:
#small dataset with 200 movies
movies_data_path = "../dataset/movies_metadata_small.csv"

#full dataset with 45K movies
#movies_data_path = "../dataset/movies_metadata_45K.csv"

Here, we get the Bedrock instances using the boto3 library, which will be used for generating embeddings

In [None]:
#get bedrock instances with boto3
bedrock = boto3.client('bedrock')
bedrock_client = boto3.client('bedrock-runtime')

This code block creates a local folder where the embeddings will be generated. It checks if the folder already exists and cleans it up if necessary, or creates a new folder if it doesn't exist.

In the following code cell, the script checks if a directory named "../tmp/embeddings" exists. If it does, it deletes all files and subdirectories within that folder. After that, it recreates the "../tmp/embeddings" directory. This process ensures that the specified folder is empty before any new files are added to it, in preparation for generating and storing embeddings within that directory.

In [None]:
#local folder where embeddings will be generated
embeddings_folder_path = "../tmp/embeddings"

if os.path.exists(embeddings_folder_path):
  print("Folder already exists, deleting contents")
  for filename in os.listdir(embeddings_folder_path):
    file_path = os.path.join(embeddings_folder_path, filename)
    if os.path.isfile(file_path) or os.path.islink(file_path):
        os.unlink(file_path)
    elif os.path.isdir(file_path):
        shutil.rmtree(file_path)
  os.rmdir(embeddings_folder_path)

print("Recreating folder")  
os.makedirs(embeddings_folder_path)

We're now ready to generate the embeddings. expect it to take around 20min.

Here, we generate the embeddings for each movie in the CSV file. This process involves reading the CSV file, generating embeddings using the Bedrock service, and writing the embeddings to JSON files in batches. The embeddings and movie metadata are combined into a single JSON document.

In the following code cell, the script reads a CSV file containing movie data, processes each row to create a dictionary with metadata, generates vector embeddings for the metadata using the Bedrock library, combines the vector embeddings with the metadata, and writes the resulting JSON documents to separate files in batches of 100 records. The script handles writing the remaining records to a separate file if the total number of records is not divisible by the batch size. This process is key part of preparing data for semantic search application.

In [None]:
# Limit the number of records to process in each block
block_size = 100

with open(movies_data_path) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    
    #get header
    header = next(csv_reader)
    
    #counter for line in csv file
    line_num = 1

    #document buffer
    documents = []

    for row in csv_reader:

        #create dict with title's metadata using the header and the row values
        title_metadata = dict()
        for col in header:
            title_metadata[col] = row[header.index(col)]

        #generate embedding with Bedrock
        vector_embedding = llm_utils.get_embeddings_from_text(json.dumps(title_metadata), "cohere", input_type="search_document")

        #merge vector and metadata
        request_body_dict = dict()
        request_body_dict['id'] = line_num
        request_body_dict['vector_index'] = vector_embedding
        request_body_dict = request_body_dict | title_metadata
        
        #dict to json string
        request_body = json.dumps(request_body_dict)

        #add to documents
        documents.append(request_body)

        #write down the json file every line_num
        if line_num > 0 and line_num % block_size == 0:
            print('writing file')
            block_num = line_num // block_size
            output_file_path = f"{embeddings_folder_path}/embeddings_block{block_num}.json"
            with open(output_file_path, 'w') as output_file:
                json.dump(documents, output_file, indent=2)
                print(f"Processed {len(documents)} records and saved to {output_file_path}")
                #reset documents buffer
                documents = []

        line_num += 1

    if documents:
        # Write the remaining documents to a file
        output_file_path = f"{embeddings_folder_path}/embeddings_block{block_num + 1}.json"
        with open(output_file_path, 'w') as output_file:
            json.dump(documents, output_file, indent=2)

        print(f"Processed {len(documents)} records and saved to {output_file_path}")

## Load embeddings in index

The code cell defines the `format_data_for_bulk_import` function, which takes a list of JSON-encoded documents and returns a list of actions compatible with the bulk ingestion process of an OpenSearch index; it iterates through the input data, converts each JSON string to a dictionary, and constructs the appropriate action dictionary

In [None]:
#format the data to match format expected by opensearch bulk ingest
def format_data_for_bulk_import(data):
    actions = []
    for doc in data:
        #str to dict
        doc_dict = json.loads(doc)

        #building the json format required for bulk index
        actions.append({
            "_op_type": "index",
            "_index": index_name,
            #"_id": doc_dict['id'],  #not allowed for index operation
            "_source": {
                "vector_index": doc_dict["vector_index"],
                "tmdb_id" : doc_dict['tmdb_id'],
                "original_language" : doc_dict['original_language'],
                "original_title" : doc_dict['original_title'],
                "description" : doc_dict['description'],
                "genres" : doc_dict['genres'],
                "year" : doc_dict['year'],
                "keywords" : doc_dict['keywords'],
                "director" : doc_dict['director'],
                "actors" : doc_dict['actors'],
                "popularity" : doc_dict['popularity'],
                "popularity_bins" : doc_dict['popularity_bins'],
                "vote_average" : doc_dict['vote_average'],
                "vote_average_bins" : doc_dict['vote_average_bins']
            }
        })
    return actions

Here, we iterate through the JSON files containing the embeddings and movie metadata, format the data for bulk import, and use the OpenSearch bulk API to index the documents


The code cell imports the `bulk` function from `opensearchpy.helpers`, reconnects to the OpenSearch service using authentication credentials and a host URL, iterates through JSON files in a directory, loads the JSON data, formats it for bulk import using `format_data_for_bulk_import`, prints which file is being indexed, uses the `bulk` function to insert the formatted documents into the specified OpenSearch index with `raise_on_exception=True`, and prints a summary of successfully indexed and failed documents for each file.

In [None]:
from opensearchpy.helpers import bulk

#reconnect to avoid potential timeout of the session
aoss_client = llm_utils.connect_to_aoss(auth, os_host)

# Iterate through each JSON file
for filename in os.listdir(embeddings_folder_path):
    
    file_path = os.path.join(embeddings_folder_path, filename)

    # Load JSON file
    with open(file_path, "r") as file:
        data = json.load(file)

        #format data into actions
        actions = format_data_for_bulk_import(data)

        print(f"Indexing {filename}")

        # Use the bulk API to insert documents from the file
        success, failed = bulk(
            aoss_client,
            actions,
            index=index_name, 
            raise_on_exception=True
        )

    print(f"Indexed {success} documents successfully, {failed} documents failed for file: {filename}")

After indexing, we check the number of documents in the index to ensure that the indexing process was successful.

In the following code cell, it first introduces a delay of 30 seconds using `time.sleep(30)`. 

Subsequently, it prints the number of documents present in an index named `index_name` using the `aoss_client.count()` method. The purpose of this code is to verify the number of documents indexed, which is expected to be 200 if a small dataset has been successfully indexed.

In [None]:
#checking how many document we have in the index (might need a couple of refresh. you should see 200 if you've indexed the small dataset)
time.sleep(40)
print(f"number of docs in index:{aoss_client.count(index=index_name)}")

## Search the index

### Using vector embeddings and KNN

This section demonstrates how to search the index using vector embeddings and the k-nearest neighbors (KNN) algorithm. It generates an embedding for a given question, constructs a KNN query, and retrieves the top k most relevant documents from the index.

The code cell generates a vector embedding representation of the example question "list horror movies that take place in nature" using the Cohere language model, sets k=5 for the number of documents to retrieve, constructs a query to search for the k nearest neighbors to the question embedding vector in the specified vector index, and performs the search.

In [None]:
#example question
question = "list horror movies that take place in nature"

#we generate the vectorised version of the question
question_embedding = llm_utils.get_embeddings_from_text(question, "cohere", input_type="search_query")

#number of documents to retrieve
k = 5

query = {
    "size": k,
    "query": {
        "knn": {
        "vector_index": {
            "vector": question_embedding,
            "k": k
        }
        }
    },
    "_source": data_columns
}

search_response = aoss_client.search(body=query, index=index_name)

Here, we extract the relevant information from the OpenSearch response and display the search results.

The following code prints the embedding search response.

In [None]:
#extract object from os response
response = llm_utils.extract_response_from_os_response(search_response)
response

### Using the normal opensearch index (and not the embeddings)

This section shows how to perform a regular text-based search on the OpenSearch index without using vector embeddings. It constructs a boolean query to search for movies with a specific title and an actor, sorts the results by popularity, and retrieves the top 10 documents.

In [None]:
query = {
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "original_title": "deadpool"
                    }
                },
                {
                    "match_phrase": {
                        "actors": "Ryan Reynolds"
                    }
                }
            ]
        }
    },
    "size": 10,
    "sort": [
        {
            "popularity": {
                "order": "desc"
            }
        }
    ]
}

search_response = aoss_client.search(body=query, index=index_name)


Here, we extract the relevant information from the OpenSearch response, remove the vector_index field (since it's not used in this scenario), and display the search results.

In [None]:
response_extracted = llm_utils.extract_response_from_os_response(search_response)
#removing the vector_index as we are not using it in that scenario
if len(response_extracted) > 0:
    for i in range(len(response_extracted)):
        response_extracted[i].pop("vector_index")

response_extracted

## Hybrid Search - the best of both world?

Hybrid search is a technique that combines traditional lexical search and semantic search to improve search relevance. 

The motivation behind this approach is that different queries perform better with either lexical or semantic search. By combining these two methods, search performance can be enhanced. The challenge in implementing hybrid search lies in normalizing the similarity scores produced by the two types of search, as they use different scales. 

<br/>
Various benchmarks show a significant performance improvement using Hybrid search versus lexical or semantic only. 
Example: https://opensearch.org/blog/hybrid-search/

<i>"hybrid search improves the result quality by 8–12% compared to keyword search and by 15% compared to natural language search"</i>

<br/>
Depending on the underlying Vector DB/store you are using, the solution will differ. 

<b>With non serverless deployment of Amazon OpenSearch service</b> using features released in September 2023 as part of OpenSearch 2.10, you can do that relatively easily using a "search pipeline" that includes a normalization processor. The pipeline runs at search time and normalizes the scores to a common scale, enabling meaningful combination. The process involves using min-max normalization and harmonic mean as the normalization and combination techniques, respectively. Additionally, the weights assigned to the query clauses can be adjusted to balance the influence of lexical and semantic search.

<b>Amazon OpenSearch Serverless</b> does not support this feature yet so you need to develop the feature outside of Opensearch for the moment.

Same for other vectorDBs including Amazon RDS for <b>PostgreSQL using pgvector</b> for example.

### RFF - Reciprocal Rank Fusion

There are various strategies and algorithm to merge 2 lists of results. 

The Reciprocal Rank Fusion (RRF) is a sophisticated algorithm that combines different sets of results, each with its own relevance scores, into a single unified set of results. One of the main benefits of RRF is that it can produce high-quality results without requiring any adjustments or tuning. Additionally, RRF does not require the relevance scores from different sources to be related or similar in nature.

How does it work?

The Reciprocal Rank Fusion (RRF) technique works by gathering search results from multiple different approaches, assigning each document in the results a score based on its reciprocal rank, and then combining these scores to produce a new ranking. The core idea behind this method is that documents that consistently appear at higher ranks across various search strategies are more likely to be relevant, and therefore, should receive a higher ranking in the consolidated search result.

Algorithm explained here: 
https://www.elastic.co/guide/en/elasticsearch/reference/current/rrf.html

In [None]:
#merge 2 lists of movies and remove duplicates
def merge_remove_duplicates(list1, list2):
    # Create an empty dictionary to store unique entries
    unique_entries = {}

    # Iterate over the first list
    for entry in list1:
        tmdb_id = entry['tmdb_id']
        unique_entries[tmdb_id] = entry

    # Iterate over the second list
    for entry in list2:
        tmdb_id = entry['tmdb_id']
        if tmdb_id not in unique_entries:
            unique_entries[tmdb_id] = entry

    # Convert the dictionary back to a list
    merged_list = list(unique_entries.values())

    return merged_list


def reciprocal_rank_fusion(list1, list2):
    # Create a dictionary to store ranks
    ranks = {}

    max_size = max(len(list1), len(list2))
    
    # Assign ranks to items in list1
    for rank, item in enumerate(list1, start=1):
        tmdb_id = item['tmdb_id']
        ranks.setdefault(tmdb_id, []).append(rank)
    
    # Assign ranks to items in list2
    for rank, item in enumerate(list2, start=1):
        tmdb_id = item['tmdb_id']
        ranks.setdefault(tmdb_id, []).append(rank)
    
    #ranks will look like: {'168259': [1, 5], '460846': [2, 12], '294254': [3]}
    
    # Calculate RRF scores
    # example: for '168259': [1, 5], the RFF score is 1/1 + 1/5 = 1.2
    rrf_scores = {}
    for tmdb_id, ranks_list in ranks.items():
        rrf_score = sum(1 / rank for rank in ranks_list)
        rrf_scores[tmdb_id] = rrf_score
    
    merged_list = merge_remove_duplicates(list1, list2)
    for item in merged_list:
        tmdb_id = item['tmdb_id']
        item['RFF_score'] = rrf_scores.get(tmdb_id, 0)

    sorted_merged_list = sorted(merged_list, key=lambda x: x['RFF_score'], reverse=True)

    return sorted_merged_list[:max_size]

### Generate a dataset of semantic and lexical queries about genre as an example.

Genres to be used as query for the API calls

In [None]:
genres = ["Mystery" , "Science Fiction", "Thriller", "Adventure", "Horror", "Comedy", "Drama", "Crime", "Western", "Romance", "Family", "Animation"]

Calling APIS for lexical, semantic and generating Hybrid search results and storing that aside.

In [None]:
aoss_client = llm_utils.connect_to_aoss(auth, os_host)

#number of items to be returned
k = 10

lexical_queries_results = dict()
semantic_queries_results = dict()
hybrid_queries_results = dict()

for genre in genres:

    #---------- lexical search -----------
    query_lexical = {
        "query": {
        "match": {
        "genres": genre
        }
    },
        "size": k,
    }
    search_response = aoss_client.search(body=query_lexical, index=index_name)
    lexical_response = llm_utils.extract_response_from_os_response(search_response)
    #removing the vector_index as we are not using it in that scenario
    if len(lexical_response) > 0:
        for i in range(len(lexical_response)):
            lexical_response[i].pop("vector_index")
    
    lexical_queries_results[genre] = lexical_response

    #-------- semantic search -----------
    #we generate the vectorised version of the question
    question_embedding = llm_utils.get_embeddings_from_text(genre, "cohere", input_type="search_query")
    query_semantic = {
        "size": k,
        "query": {
            "knn": {
            "vector_index": {
                "vector": question_embedding,
                "k": k
            }
            }
        },
        "_source": data_columns
    }
    response = aoss_client.search(body=query_semantic, index=index_name)
    #extract object from os response
    semantic_response = llm_utils.extract_response_from_os_response(response)
    semantic_queries_results[genre] = semantic_response

    #---------- Hybrid search fusion with RRF -----------
    RFF_response  = reciprocal_rank_fusion(lexical_response, semantic_response)
    hybrid_queries_results[genre] = RFF_response

### Evaluation of our search result
Instead of manually building our evaluation dataset, let's try to get Claude3 to do it for us.

In [None]:
import pandas as pd
small_dataset_df = pd.read_csv("../dataset/movies_metadata_small.csv")
print(f"dataset size:{small_dataset_df.shape}")

In [None]:
small_dataset_df.head(2)

We build a prompt for Claude3 to select the "best" movies for us based on common criteria like awards, critics or cultural impact.

In [None]:
preselection_size = k*2

#model_id = "anthropic.claude-3-haiku-20240307-v1:0"
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"

prompt = """
Your task is to extract the "best" <number>{k}</number> movies from the list provided in the <documents> tag in relation to this specific <genre>{genre}</genre>.

SUPER IMPORTANT: DO NOT COME UP WITH MOVIES FROM YOUR OWN KNOWLEDGE!! you need to extract the list from the provided documents in <documents> tag only.

Read carefully through the list of movies before responding. 

When evaluating which are the best movies consider those criteria: cultural impact and influence, critics, awards.

Extract from the list in <documents> tag a pre-selection of <number>{preselection_size}</number> movies and write them in <thinking> tag before responding in <answer> tag.

Output your final selection of <number>{k}</number> in a well JSON format as show in the <example> tag. Note that the tmdb_id is an integer, not a string.

<example>
    <thinking>[{"tmbd_id":2343,"original_title":"Rocky"}, {"tmbd_id":23434,"original_title":"Batman"}, {"tmbd_id":43232,"original_title":"Rambo"}]</thinking>
    <answer>[{"tmbd_id":2343,"original_title":"Rocky"}, {"tmbd_id":43232,"original_title":"Rambo"}]</answer>
<example>

<documents>
{movies}
</documents>
"""

We define few utils functions to help us format the dataframe into json that we can then include into the system prompt as part of the LLM API call.

In [None]:
import re

#Convert a DataFrame to a list of JSON-formatted objects.
def df_to_json(df):
    json_list = []
    
    for _, row in df.iterrows():
        json_dict = row.to_dict()
        json_list.append(json_dict)
    
    return json_list

#Format a list of JSON objects into a string.
def format_json_list(json_list):
    
    formatted_string = ""
    for json_obj in json_list:
        formatted_string += "{\n"
        for key, value in json_obj.items():
            formatted_string += f"  '{key}': '{value}',\n"
        formatted_string = formatted_string.rstrip(",\n") + "\n},\n"
    formatted_string = formatted_string.rstrip(",\n")
    return formatted_string

#Extract the string content from within <answer> tags in the given text.
def extract_answer_text(text):
    pattern = r'<answer>(.*?)</answer>'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1)
    else:
        return ""


Finally, we call the API to generate our list of best movies per genre. It should take around 3min.

In [None]:
#we prepare a dictionary to check the llm outputs.
dataset_dict = dict()
for idx, row in enumerate(small_dataset_df[['tmdb_id', 'original_title']].iterrows(), start=1):
    dataset_dict[row[1]["tmdb_id"]] = row[1]["original_title"]

In [None]:
#llm generated relevant selection
llm_generated_relevant_data = dict()

for genre in genres:
    #get filtered df based on genre.
    genre_filtered_df = small_dataset_df[small_dataset_df["genres"].str.contains(genre)]
    
    #convert df to json format.
    json_docs = df_to_json(genre_filtered_df)
    str_json_docs = format_json_list(json_docs)
    
    #format prompt
    formated_system_prompt = prompt.replace("{movies}", str_json_docs).replace("{k}", str(k)).replace("{preselection_size}", str(preselection_size)).replace("{genre}", genre)

    response = llm_utils.invoke_anthropic_claude(formated_system_prompt, 
                            system_prompt = "",
                            max_tokens=4096, 
                            temperature=0, 
                            top_k=250, 
                            top_p=0.999,
                            modelId=model_id,
                            anthropic_version="bedrock-2023-05-31",
                            debug=False)
    try:
        json_response = json.loads(extract_answer_text(response))
    except Exception as e:
        print(f"Error while parsing json response: {e}")
        print(f"response:{response}")
        continue

    #check that the llm output is correct
    for item in json_response:
        try:
            is_valid = dataset_dict[item["tmdb_id"]] == item["original_title"]
            if not is_valid:
                print(f"Error for item:{item}")
        except Exception as e:
            print(f"Exception for item:{item}, {e}")

    llm_generated_relevant_data[genre] = json_response

    print(f"Data generated for {genre} genre")


Let's explore manually the suggestions made by the model

In [None]:
[title["original_title"] for title in llm_generated_relevant_data['Mystery']]

In [None]:
[title["original_title"] for title in llm_generated_relevant_data['Drama']]

### Precision
To calculate the precision of a search result, we need to determine the ratio of relevant items in the list to be evaluated compared to the total number of items in that list.

In [None]:
def calculate_precision(relevant_dict, evaluated_dict):
    precision_array = []
    for index, key in enumerate(evaluated_dict.keys()):
        relevant_count = 0
        relevant_ids = [item["tmdb_id"] for item in relevant_dict[key]]

        for item in evaluated_dict[key]:
            if int(item["tmdb_id"]) in relevant_ids:
                relevant_count += 1

        precision = relevant_count / len(evaluated_dict[key]) if evaluated_dict[key] else 0
        #print(f"{key}: {precision}")
        precision_array.append(precision)
    
    return sum(precision_array)/len(precision_array)

In [None]:
print(f"Precision for semantic search only:{calculate_precision(llm_generated_relevant_data, semantic_queries_results)}")
print(f"Precision for lexical search only:{calculate_precision(llm_generated_relevant_data, lexical_queries_results)}")
print(f"Precision for hybrid search:{calculate_precision(llm_generated_relevant_data, hybrid_queries_results)}")

### Mean Reciprocal Rank (MRR)

The Mean Reciprocal Rank (MRR) is a metric used to evaluate the performance of a ranking system, such as a search engine or a recommendation system. It measures the average reciprocal rank of the first relevant item in the ranked list.

Here's how the MRR algorithm works:

1. For each query (or search term), we have a list of relevant items and a list of items to be evaluated (the ranked list returned by the system).
2. We iterate through the ranked list and find the position (rank) of the first relevant item.
3. If no relevant item is found, the reciprocal rank is considered 0.
4. Otherwise, the reciprocal rank is calculated as 1 / rank, where rank is the position of the first relevant item.
5. The MRR is the average of the reciprocal ranks across all queries.

A higher MRR value indicates better ranking performance, with a maximum value of 1 when the first item in the ranked list is always relevant.

In [None]:
def calculate_mrr(relevant_dict, ranked_dict):
    total_queries = len(relevant_dict)
    reciprocal_ranks = []

    for _, key in enumerate(ranked_dict.keys()):
        #get relevant ids for the current genre
        relevant_ids = set(item["tmdb_id"] for item in relevant_dict[key])

        for rank, item in enumerate(ranked_dict[key], start=1):
            if int(item["tmdb_id"]) in relevant_ids:
                reciprocal_ranks.append(1 / rank)
                break

    if not reciprocal_ranks:
        return 0.0

    mrr = sum(reciprocal_ranks) / total_queries

    return mrr

In [None]:
print(f"MRR for semantic search only:{calculate_mrr(llm_generated_relevant_data, semantic_queries_results)}")
print(f"MRR for lexical search only:{calculate_mrr(llm_generated_relevant_data, lexical_queries_results)}")
print(f"MRR for hybrid search:{calculate_mrr(llm_generated_relevant_data, hybrid_queries_results)}")

### Conclusion

You would have noticed that we get better result for lexical search compared to semantic search results.
Is is expected considering the type of query we've been using. Assuming that you have accurate metadata which is the case for this dataset, a lexical search will be more efficient than semantic search.
If we use a more complex query like "scifi either in space or another planet movies", Semantic search will have the upper hand.

With that in mind, we will apply according strategy in the next part of the workshop and route queries to either semantic or lexical search to maximise performance.

Finally, we store some variables index_name for use in subsequent notebooks in the workshop

In [None]:
%store index_name
%store identity_arn
%store aossAccessPolicyArn
%store data_access_policy_name_nb