# File Name: simple_multimodal_knwl_bases_building.ipynb
### Location: Chapter 19
### Purpose: 
#####             1. Create collection on serverless opensearch
#####             2. Create a network policy for collection
#####             3. Create a security policy for encryption using an AWS-owned key
#####             4. Create a access policy for collection to define permissions for the collection and index
#####             5. Call the create_access_policy method to define permissions for the collection and index
#####             6. Create a vector search collection in OpenSearch Serverless
#####             7. Collection will take some time to be "ACTIVE". So, checking when the collection is "ACTIVE" for the next steps
#####             8. Index Creation on the collection
#####             9. Search capability with a simple text prompts
#####             10. Search capability features a combination of text and image prompts

##### Dependency: simple_multimodal_data_prep.ipynb at Chapter 19 should work properly.

# <ins>-----------------------------------------------------------------------------------</ins>


# <ins>Amazon SageMaker Classic</ins>
#### Those who are new to Amazon SageMaker Classic. Follow the link for the details. https://docs.aws.amazon.com/sagemaker/latest/dg/studio.html

# <ins>Environment setup of Kernel</ins>
##### Fill "Image" as "Data Science"
##### Fill "Kernel" as "Python 3"
##### Fill "Instance type" as "ml-t3-medium"
##### Fill "Start-up script" as "No Scripts"
##### Click "Select"

###### Refer https://docs.aws.amazon.com/sagemaker/latest/dg/notebooks-create-open.html for details.

# <ins>Mandatory installation on the kernel through pip</ins>

##### This lab will work with below software version. But, if you are trying with latest version of boto3, awscli, and botocore. This code may fail. You might need to change the corresponding api. 

##### You will see pip dependency errors. you can safely ignore these errors and continue executing rest of the cell. 

In [None]:
%%time

%pip install --no-build-isolation --force-reinstall -q \
    "boto3>=1.34.84" \
    "opensearch-py>=2.7.1" \
    "retrying>=1.3.4" \
    "ragas" \
    "ipywidgets>=7.6.5" \
    "iprogress>=0.4" \
    "langchain>=0.2.16" \
    "langchain_community>=0.2.17" \
    "awscli>=1.32.84" \
    "botocore>=1.34.84" \
    "langchain-aws>=0.1.7"    

# <ins>Disclaimer</ins>

##### You will see pip dependency errors. you can safely ignore these errors and continue executing rest of the cell.

# <ins>Restart the kernel</ins>

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

# <ins>Python package import</ins>

##### boto3 offers various clients for Amazon Bedrock to execute various actions.
##### botocore is a low-level interface to AWS tools, while boto3 is built on top of botocore and provides additional features

In [None]:
import json
import os
import boto3
import botocore
import pprint
import random
from retrying import retry
import warnings
import time
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import pprint as pp
from botocore.exceptions import BotoCoreError, ClientError
import pandas as pd
import tqdm.notebook as tq
from IPython.display import display, Image, HTML
import base64

### Ignore warning 

In [None]:
warnings.filterwarnings('ignore')

### Bringing all the store variable value from previous notebook. Here simple_multimodal_data_prep.ipynb

In [None]:
%store -r

## Define important environment variable

In [None]:
# Try-except block to handle potential errors
try:
    # Create a new Boto3 session to interact with AWS services
    boto3_session_name = boto3.session.Session()


    # Create a Bedrock Agent client using the current session and region
    bedrock_agent_client = boto3_session_name.client('bedrock-agent', region_name=aws_region_name)
    
    # Initialize Bedrock and Bedrock Runtime clients using Boto3
    # These clients will allow interactions with Bedrock-related AWS services
    boto3_bedrock_client = boto3.client('bedrock', region_name=aws_region_name)
    boto3_bedrock_runtime_client = boto3.client('bedrock-runtime', region_name=aws_region_name)

    # Define the service name for Amazon OpenSearch Serverless (AOSS)
    opensearch_service_name = 'aoss'

    # Create an S3 client to interact with Amazon S3
    s3_client = boto3.client('s3')

    # Create an STS client to interact with AWS Security Token Service (STS)
    sts_client = boto3.client('sts')

    # Generate a random suffix number between 200 and 900
    random_suffix = random.randrange(200, 900)

    # Get the AWS account ID of the caller
    aws_account_id = sts_client.get_caller_identity()["Account"]

    # Generate a suffix using the region and account ID for the S3 bucket name
    s3_suffix = f"{aws_region_name}-{aws_account_id}"

    # Define the name of the S3 bucket (you can replace this with your actual bucket name)
    s3_bucket_name = f'bedrock-kb-{s3_suffix}-{random_suffix}'

    # PrettyPrinter instance for formatted output
    pretty_printer = pprint.PrettyPrinter(indent=2)

    # Create an OpenSearch Serverless (AOSS) client using the current session
    aoss_client = boto3_session_name.client('opensearchserverless')

    # Generate unique names for the vector store and index based on the suffix
    vector_store_name = f'multimodal-sample-rag-{random_suffix}'
    index_name = f"multimodal-sample-rag-index-{random_suffix}"

    # Create an IAM client to interact with Identity and Access Management (IAM) service
    iam_client = boto3_session_name.client('iam')

    # Retrieve the current AWS account number and ARN of the caller
    sts_client = boto3.client('sts')
    identity_arn = sts_client.get_caller_identity().get('Arn')
    
    # Create security policy name for aoss collection
    security_policy_name = f'multimodal-col-sec-policy-{random_suffix}'
    network_policy_name = f'multimodal-col-net-policy-{random_suffix}'
    access_policy_name = f'multimodal-col-acs-policy-{random_suffix}'
    
    # Embedding model ARN for Bedrock
    embeddingModelArn = f"arn:aws:bedrock:{aws_region_name}::foundation-model/amazon.titan-embed-text-v1"
    
    # Amazon Knowledges Bases variable 
    bedrock_knowledge_bases_name = f"multimodal-knowl-bases-{random_suffix}"
    description = "Bedrock multimodal sample knowledge bases."

    # Store all variables in a dictionary
    variables_store = {
        "aws_region_name": aws_region_name,
        "bedrock_agent_client": bedrock_agent_client,
        "opensearch_service_name": opensearch_service_name,
        "s3_client": s3_client,
        "sts_client": sts_client,
        "aws_account_id": aws_account_id,
        "s3_suffix": s3_suffix,
        "s3_bucket_name": s3_bucket_name,
        "random_suffix": random_suffix,
        "aoss_client": aoss_client,
        "vector_store_name": vector_store_name,
        "index_name": index_name,
        "iam_client": iam_client,
        "sts_client": sts_client,
        "identity_arn": identity_arn,
        "security_policy_name": security_policy_name,
        "network_policy_name": network_policy_name,
        "access_policy_name": access_policy_name,
        "embeddingModelArn": embeddingModelArn,
        "bedrock_knowledge_bases_name": bedrock_knowledge_bases_name,
        "description": description
    }

    # Print all variables
    for var_name, value in variables_store.items():
        print(f"{var_name}: {value}")

except Exception as e:
    print(f"An unexpected error occurred: {e}")


### %store magic command to store the variable for use in other notebook cells

In [None]:
%store bucket_name aws_region_name opensearch_service_name embeddingModelArn description
%store aws_account_id s3_suffix s3_bucket_name random_suffix bedrock_knowledge_bases_name
%store vector_store_name index_name identity_arn security_policy_name network_policy_name access_policy_name

# Create collection on serverless opensearch

### Create a network policy for collection

##### This code creates a network security policy for an Amazon OpenSearch Serverless (AOSS) collection using the aoss_client from the AWS Boto3 library. The policy is named network_policy_name and specifies access rules in JSON format, targeting a resource identified as collection/<vector_store_name>. The policy type is set to network, with an option (AllowFromPublic) to allow public access, customizable based on the use case.

In [None]:
%%time
# Create a network policy for collection

try:
    # Creating a network security policy
    network_policy_name_res = aoss_client.create_security_policy(
        name=network_policy_name,  # Name of the security policy
        policy=json.dumps(  # JSON-formatted policy rules
            [
                {
                    'Rules': [
                        {
                            'Resource': ['collection/' + vector_store_name],  # Define the resource
                            'ResourceType': 'collection'  # Specify that it's a collection resource
                        }
                    ],
                    'AllowFromPublic': True  # Allow public access (may need to change based on your use case)
                }
            ]
        ),
        type='network'  # Define the type of security policy as 'network'
    )

    # If the security policy is created successfully, print the success message
    print(f"Security policy '{network_policy_name}' created successfully.")

# Handle the case where the security policy already exists
except aoss_client.exceptions.ConflictException:
    print(f"Security policy '{network_policy_name}' already exists.")

# Handle validation errors such as incorrect policy structure
except aoss_client.exceptions.ValidationException as e:
    print(f"Validation error when creating security policy: {str(e)}")

# Catch any other general exceptions
except Exception as e:
    print(f"An error occurred while creating the security policy: {str(e)}")


### Create a security policy for encryption using an AWS-owned key

##### This code snippet creates an encryption security policy for an Amazon OpenSearch Serverless (AOSS) collection using the AWS Boto3 aoss_client. The policy, named security_policy_name, is configured to use an AWS-owned key for encryption (AWSOwnedKey: True). The target resource is specified as collection/<vector_store_name>, ensuring encryption rules apply specifically to the intended collection.

In [None]:
%%time
# Create a security policy for encryption using an AWS-owned key

try:
    security_policy_response = aoss_client.create_security_policy(
        name=security_policy_name,
        policy=json.dumps(
            {
                'Rules': [{'Resource': ['collection/' + vector_store_name],
                           'ResourceType': 'collection'}],
                'AWSOwnedKey': True
            }),
        type='encryption'
    )
    
    print(f"Security policy '{security_policy_name}' created successfully.")
except aoss_client.exceptions.ConflictException:
    print(f"Security policy '{security_policy_name}' already exists.")
except aoss_client.exceptions.ValidationException as e:
    print(f"Validation error when creating security policy: {str(e)}")
except Exception as e:
    print(f"An error occurred while creating security policy: {str(e)}")

### Create a access policy for collection to define permissions for the collection and index

##### This code defines a function, find_iam_role_by_name_substring, to locate an IAM role and retrieve its ARN based on a specified substring within the role name. Using the AWS Boto3 iam_client, it lists all IAM roles and filters them for names containing the substring "GenAIBookBedrockSageMakerExecutionR".

In [None]:
%%time
# Find out IAM role and ARN for this session

def find_iam_role_by_name_substring(substring):
    try:
        # Use list_roles to retrieve IAM roles
        response = iam_client.list_roles()

        # Filter roles by name that contains the substring
        matching_roles = [role for role in response['Roles'] if substring in role['RoleName']]

        if matching_roles:
            for role in matching_roles:
                print(f"Found Role: {role['RoleName']} | ARN: {role['Arn']}")
                genaibookedbedrocksagemakerexecutionrolearn = role['Arn']
        else:
            print(f"No roles found with name containing '{substring}'.")
            
        return genaibookedbedrocksagemakerexecutionrolearn

    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Call the function with the desired substring
genaibookedbedrocksagemakerexecutionrolearn = find_iam_role_by_name_substring("GenAIBookBedrockSageMakerExecutionR")

### %store magic command to store the variable for use in other notebook cells

In [None]:
%store genaibookedbedrocksagemakerexecutionrolearn

### Call the create_access_policy method to define permissions for the collection and index

##### This code creates an access policy for managing permissions on an Amazon OpenSearch Serverless (AOSS) collection and index using the AWS Boto3 aoss_client. The policy, named access_policy_name, defines detailed rules for both the collection and index resources, specifying actions allowed for each.
##### Policy Rules:
#####               a) For collection resources (collection/<vector_store_name>)
#####               b) For index resources (index/<vector_store_name>/*)

In [None]:
%%time
try:
    
    access_policy_res = aoss_client.create_access_policy(
        name=access_policy_name,  # The name of the access policy being created
        policy=json.dumps(  # The access policy body, provided in JSON format
            [
                {
                    'Rules': [  # Define the access rules for the resources
                        {
                            'Resource': ['collection/' + vector_store_name],  # Specify the resource collection
                            'Permission': [  # Define allowed actions for the collection
                                'aoss:CreateCollectionItems',  # Allows creating items in the collection
                                'aoss:DeleteCollectionItems',  # Allows deleting items from the collection
                                'aoss:UpdateCollectionItems',  # Allows updating items in the collection
                                'aoss:DescribeCollectionItems'  # Allows describing items in the collection
                            ],
                            'ResourceType': 'collection'  # Define resource type as collection
                        },
                        {
                            'Resource': ['index/' + vector_store_name + '/*'],  # Specify the index resource path
                            'Permission': [  # Define allowed actions for the index
                                'aoss:CreateIndex',  # Allows creating an index
                                'aoss:DeleteIndex',  # Allows deleting an index
                                'aoss:UpdateIndex',  # Allows updating an index
                                'aoss:DescribeIndex',  # Allows describing an index
                                'aoss:ReadDocument',  # Allows reading documents from the index
                                'aoss:WriteDocument'  # Allows writing documents to the index
                            ],
                            'ResourceType': 'index'  # Define resource type as index
                        }
                    ],
                    'Principal': [  # Define who has access to this policy
                        identity_arn,  # The primary ARN to which the policy applies
                        genaibookedbedrocksagemakerexecutionrolearn  # Example of an additional ARN
                    ],
                    'Description': 'Easy data policy'  # Description of the policy
                }
            ]
        ),
        type='data'  
    )
    
    # If the policy is created successfully, print a success message
    print(f"Access policy '{access_policy_name}' created successfully.")

# Handle case where a policy with the same name already exists
except aoss_client.exceptions.ConflictException:
    print(f"Access policy '{access_policy_name}' already exists.")

# Handle validation errors during policy creation
except aoss_client.exceptions.ValidationException as e:
    print(f"Validation error when creating access policy: {str(e)}")

# Handle any other exceptions that occur during the process
except Exception as e:
    print(f"An error occurred while creating access policy: {str(e)}")

### Create a vector search collection in OpenSearch Serverless

##### This code attempts to create a vector search collection in Amazon OpenSearch Serverless using the AWS Boto3 aoss_client. The collection, named vector_store_name, is configured for VECTORSEARCH, a specialized type of collection used for vector-based information retrieval.

##### The create_collection method is called with the specified name and type (VECTORSEARCH) and followed by extracting collection details. 

In [None]:
%%time
# Try to create a vector search collection in OpenSearch Serverless

try:
    response = aoss_client.create_collection(
        name=vector_store_name,
        type='VECTORSEARCH'
    )
    print(f"Collection '{vector_store_name}' creation is in progress.")
    print("Response:", response)
    
    aoss_collection_host = response['createCollectionDetail']['id'] + '.' + aws_region_name + '.aoss.amazonaws.com'
    aoss_collectionarn = response['createCollectionDetail']['arn']
    
    print(f"aoss_collection_host '{aoss_collection_host}' creation is in progress.")
    print(f"aoss_collectionarn '{aoss_collectionarn}' creation is in progress.")
    
except aoss_client.exceptions.ConflictException:
    print(f"Collection '{vector_store_name}' already exists.")
except aoss_client.exceptions.ValidationException as e:
    print(f"Validation error: {str(e)}")
except aoss_client.exceptions.ServiceQuotaExceededException as e:
    print(f"Service quota exceeded: {str(e)}")
except aoss_client.exceptions.OcuLimitExceededException as e:
    print(f"OCU limit exceeded: {str(e)}")
except aoss_client.exceptions.InternalServerException as e:
    print(f"Internal server error: {str(e)}")
except aoss_client.exceptions.ResourceNotFoundException as e:
    print(f"Resource not found: {str(e)}")
except Exception as e:
    print(f"An error occurred: {str(e)}")

### %store magic command to store the variable for use in other notebook cells

In [None]:
%store aoss_collection_host aoss_collectionarn

### Collection will take some time to be "ACTIVE". So, checking when the collection is "ACTIVE" for the next steps.

##### This code provides a mechanism to wait for an Amazon OpenSearch Serverless vector search collection to transition to the "ACTIVE" state, which is necessary before proceeding with subsequent operations.

In [None]:
%%time
# Collection will take some time to be "ACTIVE". So, checking when the collection is "ACTIVE" for the next steps.

def interactive_sleep(seconds):
    """A simple sleep function that could be replaced with more complex logic."""
    time.sleep(seconds)
def wait_for_collection_creation(aoss_client, vector_store_name):
    try:
        # Initial call to batch_get_collection
        response = aoss_client.batch_get_collection(names=[vector_store_name])
        
        # Periodically check collection status
        while response['collectionDetails'][0]['status'] == 'CREATING':
            print('Creating collection...')
            interactive_sleep(30)
            response = aoss_client.batch_get_collection(names=[vector_store_name])
        
        print(f'\nCollection successfully created: {vector_store_name}')
    
    except ClientError as e:
        print(f"An error occurred: {e.response['Error']['Message']}")
    except IndexError:
        print("No collection details found. Please check the collection name.")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")

# Example usage
wait_for_collection_creation(aoss_client, vector_store_name)

# Index Creation on the collection

##### This script defines a modular approach to create a KNN (k-Nearest Neighbor) vector index in an Amazon OpenSearch Serverless collection using Python and Boto3. The code provides functions for authentication, OpenSearch client creation, and index management while handling errors gracefully.

##### 1. AWS Authentication (get_aws_auth): Retrieves AWS credentials using boto3.Session() and constructs an AWSV4SignerAuth object.
##### 2. OpenSearch Client Creation (create_opensearch_client): Establishes a connection to OpenSearch using the provided host and AWS authentication.
##### 3. Vector Index Creation (create_vector_index): Checks if the specified index already exists using client.indices.exists().

In [None]:
%%time 
def get_aws_auth(region_name, service):
    """Retrieve AWS authentication credentials."""
    try:
        credentials = boto3.Session().get_credentials()
        awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)
        return awsauth
    except (NoCredentialsError, PartialCredentialsError) as e:
        print(f"Error retrieving AWS credentials: {e}")
        raise

def create_opensearch_client(host, awsauth):
    """Build the OpenSearch client."""
    try:
        client = OpenSearch(
            hosts=[{'host': host, 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            timeout=300
        )
        return client
    except Exception as e:
        print(f"Error creating OpenSearch client: {e}")
        raise

def create_vector_index(client, index_name, index_body):
    """Create the vector index in OpenSearch."""
    try:
        if not client.indices.exists(index=index_name):
            client.indices.create(index=index_name, body=json.dumps(index_body))
            print(f"Index '{index_name}' created successfully.")
        else:
            print(f"Index '{index_name}' already exists.")
    except RequestError as e:
        print(f"Error creating index '{index_name}': {e}")
        raise

def create_index():

    index_body = {
           "settings": {
              "index.knn": "true"
           },
           "mappings": {
              "properties": {
                 "image_vector": {
                    "type": "knn_vector",
                    "dimension": 1024 # Embedding size for Amanon Titan Multimodal Embedding G1 model, it is 1,024 (default), 384, 256
                 },
                 "description": {"type": "text"},
                  "item_id" : {"type": "text"},
                 "image_url": {"type": "text"}
              }
           }
    }

    try:
        # Get AWS authentication
        awsauth = get_aws_auth(aws_region_name, opensearch_service_name)

        # Create OpenSearch client
        oss_client = create_opensearch_client(aoss_collection_host, awsauth)

        # Create index
        try:
            response = oss_client.indices.create(index=index_name, body=json.dumps(index_body))
            print('\nCreating index:')

            # index creation can take up to a minute
            interactive_sleep(60)
            
            print('\nIndex creation completed.')
            
            return oss_client 
        
        except RequestError as e:
            # you can delete the index if its already exists
            # oss_client.indices.delete(index=index_name)
            print(f'Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to delete, and recreate the index')


    except Exception as e:
        print(f"An error occurred during the process: {e}")

oss_client = create_index()

# Start an ingestion job

##### The code efficiently indexes metadata from a DataFrame into an OpenSearch index. It iterates through the DataFrame rows with a progress bar using tqdm, constructs a document containing the embedding vector, description, item ID, and image path for each record, and indexes it into the specified OpenSearch index. 

## You should store the image files into S3 in your live project. 

In [None]:
%%time
 try:
    # Iterate over the DataFrame rows with a progress bar
    for idx, record in tq.tqdm(image_metadata_list_df.iterrows(), total=len(image_metadata_list_df)):
        try:
            # Construct the document to be indexed
            document = {
                'image_vector': record['embedding_img'],  # Embedding vector for the image
                'description': record['Description'],    # Text description of the image
                'item_id': record['ID'],                 # Unique identifier for the item
                'image_url': record['Image_path'],       # URL or path to the image
            }

            # Index the document into the specified OpenSearch index
            response = oss_client.index(
                index=index_name,  # Target OpenSearch index
                body=document      # Document to be indexed
            )
            # Optionally log success or response details for debugging
            print(f"Document indexed successfully for item_id: {document['item_id']}")

        except Exception as e:
            # Handle errors related to indexing a specific document
            print(f"Error indexing document for item_id: {record['ID']}. Error: {e}")

except Exception as e:
    # Handle broader errors (e.g., issues with the DataFrame or OpenSearch client)
    print(f"Unexpected error during the indexing process: {e}")


# Use Case 1

### Search capability with a simple text prompt

    The function get_titan_multimodal_embedding generates multimodal embeddings using AWS Bedrock models by processing either an image or a text description. It accepts parameters like the image path, text description, desired embedding dimensions (default 1024), and the model ID. The function checks if the image file exists and encodes it in base64, while text descriptions are directly added to the request payload. It ensures at least one input is provided and invokes the model via the Bedrock runtime client, combining the input with an embedding configuration. If successful, it returns the model's response as a parsed JSON object.

Architecture 

<img src="./usecase1_arc_diagram.png" style="width: 600px; height: 400px;">

In [None]:
%%time
def get_titan_multimodal_embedding(
    image_path: str = None,  # Maximum image dimensions: 2048 x 2048 pixels
    description: str = None,  # Text description in English (max 128 tokens)
    dimension: int = 1024,  # Desired embedding dimension (default 1024, other options: 384, 256)
    model_id: str = multimodal_embed_model_id  # Predefined model ID for the multimodal embedding
):
    """
    Function to obtain multimodal embeddings by providing either an image or a text description.
    
    Args:
        image_path (str): Path to the image file (optional).
        description (str): Text description for embedding (optional).
        dimension (int): The dimensionality of the embedding output (default is 1024).
        model_id (str): Model identifier for the multimodal embedding model.
    
    Returns:
        dict: The response from the Bedrock model containing the multimodal embeddings.
    
    Raises:
        FileNotFoundError: If the image file does not exist at the given path.
        AssertionError: If neither image nor description is provided.
    """
    
    # Initialize the payload to send to the model
    payload_body = {}

    # Embedding configuration with the specified output dimension
    embedding_config = {
        "embeddingConfig": { 
            "outputEmbeddingLength": dimension
        }
    }
    
    # Process image input if provided
    if image_path:
        # Check if the provided image path exists locally
        if os.path.exists(image_path):
            # Open the image file in binary mode and encode it in base64
            with open(image_path, "rb") as image_file:
                encoded_image = base64.b64encode(image_file.read()).decode('utf8')
            # Add the base64 encoded image to the payload
            payload_body["inputImage"] = encoded_image
        else:
            # Raise an error if the image file does not exist
            raise FileNotFoundError(f"The image file at {image_path} does not exist.")
    
    # Process text description input if provided
    if description:
        payload_body["inputText"] = description

    # Ensure that either image or text is provided for the request
    assert payload_body, "Please provide either an image and/or a text description."

    try:
        # Invoke the model using the Bedrock runtime client to get multimodal embeddings
        response = boto3_bedrock_runtime_client.invoke_model(
            body=json.dumps({**payload_body, **embedding_config}), 
            modelId=model_id,
            accept="application/json", 
            contentType="application/json"
        )
        # Return the parsed JSON response from the model
        return json.loads(response.get("body").read())

    except Exception as e:
        # Handle any exceptions that might occur during the model invocation
        print(f"An error occurred while invoking the model: {e}")
        return None

In [None]:
prompt = "Men, Apparel, Topwear, Tshirts, White"

    The function find_similar_items_from_query performs a semantic search using OpenSearch and Titan's multimodal embedding model to find items similar to a given text query. It first generates a query embedding using get_titan_multimodal_embedding, then constructs a KNN query for OpenSearch using the embedding and specified parameters like top_k and num_results. The function executes the search via the OpenSearch client, excluding image vectors from the response, and processes the results to return a list of similar items, including their scores, IDs, image URLs, and descriptions. It handles errors related to both embedding generation and OpenSearch execution, ensuring robustness in the workflow. 

In [None]:
%%time
def find_similar_items_from_query(
    query_prompt: str, 
    top_k: int, 
    num_results: int, 
    index_name: str, 
    dataset, 
    open_search_client
) -> list:
    """
    Perform a semantic search using KNN on an input query prompt to find similar items.
    
    Args:
        query_prompt (str): Input text query for generating embeddings.
        top_k (int): Number of top-k similar vectors to retrieve.
        num_results (int): Number of search results to return.
        index_name (str): Name of the OpenSearch index.
        dataset: Metadata dataset for additional lookups.
        open_search_client: OpenSearch client for executing queries.
    
    Returns:
        list: A list of dictionaries containing similar item details (score, item_id, image_url, description).
    """
    try:
        # Obtain the query embedding for the given text prompt
        query_embedding = get_titan_multimodal_embedding(description=query_prompt, dimension=1024)["embedding"]

        # Define the OpenSearch KNN query body
        query_body = {
            "size": num_results,
            "_source": {
                "exclude": ["image_vector"],  # Exclude the image_vector field from the search results
            },
            "query": {
                "knn": {
                    "image_vector": {
                        "vector": query_embedding,  # Query embedding for similarity search
                        "k": top_k,                 # Number of top-k similar vectors to retrieve
                    }
                }
            },
        }

        try:
            # Execute the search in OpenSearch
            search_response = open_search_client.search(index=index_name, body=query_body)

            # Process the search results
            similar_items = []
            for hit in search_response["hits"]["hits"]:
                similar_item = {
                    "score": hit["_score"],                           # Similarity score
                    "item_id": hit["_source"]["item_id"],             # Unique identifier for the item
                    "image_url": hit["_source"]["image_url"],         # URL of the image
                    "description": hit["_source"]["description"],     # Description of the item
                }
                similar_items.append(similar_item)

            return similar_items

        except Exception as search_error:
            # Handle errors related to OpenSearch queries
            print(f"Error executing OpenSearch query: {search_error}")
            return []

    except Exception as embedding_error:
        # Handle errors related to embedding generation
        print(f"Error generating query embedding: {embedding_error}")
        return []

# Example usage
try:
    similar_items_results = find_similar_items_from_query(
        query_prompt=prompt,
        top_k=5,
        num_results=3,
        index_name=index_name,
        dataset=image_metadata_list,
        open_search_client=oss_client,
    )

    # Print the results or take further actions
    print(similar_items_results)

except Exception as general_error:
    # Handle unexpected errors in the overall process
    print(f"Unexpected error during semantic search: {general_error}")

    This function, display_similar_items, is designed to display search results one by one with their details. It iterates through the results list and prints each item's score and description. If the image file associated with the result exists locally (verified using os.path.exists), it displays the image using Python's IPython.display.Image module with a width of 300 pixels

In [None]:
%%time
# Function to display items one by one

def display_similar_items(results):
    for idx, item in enumerate(results):
        print()
        print()
        print()
        print(f"Item {idx + 1}:")
        print(f"Score: {item['score']:.4f}")
        print(f"Description: {item['description']}")
        if os.path.exists(item['image_url']):  # Check if the image file exists
            display(Image(filename=item['image_url'], width=300))  # Display image
        else:
            print("Image file not found.")
        print("-" * 80)

# Call the function
display_similar_items(similar_items_results)

# Use Case 2

### Search capability features a combination of text and image prompts

Architecture 

<img src="./usecase2_arc_diagram.png" style="width: 600px; height: 400px;">

In [None]:
%%time
# Assuming image_metadata_list is a list of dictionaries with an 'ID' column
def get_random_id(metadata_list):
    """
    Selects a random ID from the metadata list.

    Args:
        metadata_list (list): List of dictionaries containing the 'ID' field.

    Returns:
        str: Randomly selected ID.
    """
    if not metadata_list:
        raise ValueError("The metadata list is empty.")
    
    # Randomly select an item from the list and return its 'ID'
    selected_item = random.choice(metadata_list)
    return selected_item["ID"]

# Get a random ID
item_id = get_random_id(image_metadata_list)
print(f"Randomly selected ID: {item_id}")

    The code performs a semantic search to find visually similar images using a query image and an OpenSearch index. It first filters a list of metadata dictionaries to locate the image path of the query image. Then, it uses a function, find_similar_items_from_image, which leverages a Titan multimodal embedding function to generate a query embedding for the image. This embedding is used to search an OpenSearch index for the top-k most similar vectors, specified by k and num_results. The function constructs and executes a KNN (k-nearest neighbors) query on the index, processes the search hits, and compiles a list of results containing metadata like score, item ID, image URL, and description for each similar item. Finally, the list of similar items is returned and optionally displayed.

In [None]:
%%time
# Filter the list of dictionaries where the 'ID' matches the item_id
matching_items = [item for item in image_metadata_list if item["ID"] == item_id]

# Print the matching item(s)
search_image_path = matching_items[0]["Image_path"]

# Assuming image_data_list is a list of dictionaries and search_image_path is extracted correctly

# Function for semantic search capability using knn on input image prompt
def find_similar_items_from_image(image_path: str, k: int, num_results: int, index_name: str, dataset, open_search_client) -> []:
    """
    Main semantic search capability using knn on input image prompt.
    Args:
        k: number of top-k similar vectors to retrieve from OpenSearch index
        num_results: number of the top-k similar vectors to retrieve
        index_name: index name in OpenSearch
    """
    # Assuming the get_titan_multimodal_embedding function will work for image paths
    query_emb = get_titan_multimodal_embedding(image_path=image_path, dimension=1024)["embedding"]
    
    body = {
        "size": num_results,
        "_source": {
            "exclude": ["image_vector"],
        },
        "query": {
            "knn": {
                "image_vector": {
                    "vector": query_emb,
                    "k": k,
                }
            }
        },
    }
    

    # Execute search query
    res = open_search_client.search(index=index_name, body=body)
    
    results_list = []

    # Iterate through the search hits and collect image data
    for hit in res["hits"]["hits"]:

        
        # Initialize an empty list to store the results
        results_list = []

        # Loop over the hits to process each similar item
        for hit in res["hits"]["hits"]:
        # Initialize a dictionary to store information for each similar item
            similar_items_results = {}

            # Extract relevant information from the hit
            similar_items_results["score"] = hit["_score"]
            similar_items_results["item_id"] = hit["_source"]["item_id"]
            similar_items_results["image_url"] = hit["_source"]["image_url"]
            similar_items_results["description"] = hit["_source"]["description"]

            # Optionally, you can retrieve the image and item name as well
            # image, item_name = get_image_from_item_id_s3(item_id=similar_items_results["item_id"], dataset=dataset)

            # Attach the score and item name as a label to the image if needed
            # image.name_and_score = f'{similar_items_results["score"]}:{item_name}'

            # Append the results to the list
            results_list.append(similar_items_results)

    return results_list
    

# Set the image_path from the matching item (e.g., from your previous search)
search_image_path = matching_items[0]["Image_path"]

# Example usage for similar item search
similar_items = find_similar_items_from_image(
    image_path=search_image_path,
    k=5,
    num_results=3,
    index_name=index_name,
    dataset=image_metadata_list,
    open_search_client=oss_client
)

# Display the retrieved similar images
#results_list = display_images(similar_items)

print(similar_items)

# Call the function
display_similar_items(similar_items)

# End of NoteBook 

#### <ins>Step 1</ins> 

##### Please ensure that you close the kernel after using this notebook to avoid any potential charges to your account.

##### Process: Go to "Kernel" at top option. Choose "Shut Down Kernel". 
##### Refer https://docs.aws.amazon.com/sagemaker/latest/dg/studio-ui.html


#### <ins>Step 2</ins> 

#### If you are not executing any further lab of this Chapter 19
##### Uncomment and execute the below steps.

    This script defines two functions to manage OpenSearch Serverless collections: 
    
    extract_collection_id_from_arn(arn) extracts the collection ID from an ARN by splitting the string to retrieve the last segment.
    delete_opensearch_collection(aoss_client, collection_arn) uses the extracted ID to delete the collection via the OpenSearch client. 

In [None]:
'''%%time
def extract_collection_id_from_arn(arn):
    """Extracts the collection ID from the OpenSearch Serverless ARN."""
    try:
        # The collection ID is the last segment after the "/" in the ARN
        collection_id = arn.split("/")[-1]
        return collection_id
    except Exception as e:
        print(f"Error extracting collection ID from ARN: {e}")
        return None

def delete_opensearch_collection(aoss_client, collection_arn):
    """Deletes an OpenSearch Serverless collection using its ARN."""
    # Step 1: Extract the collection ID from the ARN
    collection_id = extract_collection_id_from_arn(collection_arn)
    
    if collection_id is None:
        print("Invalid collection ARN. Cannot proceed with deletion.")
        return
    
    # Step 2: Use the OpenSearch client to delete the collection by ID
    try:
        response = aoss_client.delete_collection(id=collection_id)
        print(f"Collection {collection_id} deleted successfully.")
    except Exception as e:
        print(f"Error deleting collection: {e}")

# Call the function to delete the collection
delete_opensearch_collection(aoss_client, aoss_collectionarn) '''