# Multimodal Search with Amazon OpenSearch Service

**Welcome to Multi-Modal search notebook. Use this notebook to build a Multi-Modal Search application powered by Amazon OpenSearch Service**

In this notebook, you will perform the following steps in sequence,

The lab includes the following steps:
1. [Step 1: Get the Cloudformation outputs](#Step-1:-Get-the-Cloudformation-outputs)
2. [Step 2: Create the OpenSearch-Bedrock ML connector](#Step-2:-Create-the-OpenSearch-Sagemaker-ML-connector)
3. [Step 3: Register and deploy the embedding model in OpenSearch](#Step-3:-Register-and-deploy-the-embedding-model-in-OpenSearch)
4. [Step 4: Create the OpenSearch ingest pipeline with text-image-embedding processor](#TODO-Step-4:-Create-the-OpenSearch-ingest-pipeline-with-text-embedding-processor)
5. [Step 5: Create the k-NN index](#Step-5:-Create-the-k-NN-index)
6. [Step 6: Prepare the image dataset](#Step-6:-Prepare-the-image-dataset)
7. [Step 7: Ingest the prepared data into OpenSearch](#Step-7:-Ingest-the-prepared-data-into-OpenSearch)
8. [Step 8: Update the environment variables of lambda](#Step-8:-Update-the-environment-variables-of-lambda)
9. [Step 9: Create the Lambda URL](#Step-9:-Create-the-Lambda-URL)
10. [Step 10: Host the Multi-Modal Search application in EC2](#Step-7:-Host-the-Multi-Modal-Search-application-in-EC2)

In [None]:
#Install dependencies
#Implement header-based authentication and request authentication for AWS services that support AWS auth v4
%pip install requests_aws4auth
#OpenSearch Python SDK
%pip install opensearch_py
#Progress bar for for loop
%pip install alive-progress


In [98]:
#Import dependencies
import sagemaker, boto3, json, time
import os
from sagemaker.session import Session
import subprocess
from IPython.utils import io
from ruamel.yaml import YAML
import io
from PIL import Image
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from time import sleep
from tqdm import tqdm
from alive_progress import alive_bar
import base64
import urllib
import boto3
import requests 
from requests_aws4auth import AWS4Auth
import json

## Step 1: Get the Cloudformation outputs

Here, we retrieve the services that are already deployed as a part of the cloudformation template to be used in building the application. The services include,
1. **Sagemaker Endpoint**
2. **OpenSearch Domain** Endpoint
3. **S3** Bucket name
4. **Lambda** Function name 

In [None]:
cfn = boto3.client('cloudformation')

response = cfn.list_stacks(StackStatusFilter=['CREATE_COMPLETE','UPDATE_COMPLETE'])

for cfns in response['StackSummaries']:
    if('TemplateDescription' in cfns.keys()):
        if('hybrid search' in cfns['TemplateDescription']):
            stackname = cfns['StackName']
stackname

response = cfn.describe_stack_resources(
    StackName=stackname
)
# for resource in response['StackResources']:
#     if(resource['ResourceType'] == "AWS::SageMaker::Endpoint"):
#         SagemakerEmbeddingEndpoint = resource['PhysicalResourceId']

cfn_outputs = cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']

for output in cfn_outputs:
    if('OpenSearchDomainEndpoint' in output['OutputKey']):
        OpenSearchDomainEndpoint = output['OutputValue']
        
    if('EmbeddingEndpointName' in output['OutputKey']):
        SagemakerEmbeddingEndpoint = output['OutputValue']
        
    if('s3' in output['OutputKey'].lower()):
        s3_bucket = output['OutputValue']
        
    if('lambdafunction' in output['OutputKey'].lower()):
        lambdaFunction = output['OutputValue']

region = boto3.Session().region_name  
        

account_id = boto3.client('sts').get_caller_identity().get('Account')



print("stackname: "+stackname)
print("account_id: "+account_id)  
print("region: "+region)
print("SagemakerEmbeddingEndpoint: "+SagemakerEmbeddingEndpoint)
print("OpenSearchDomainEndpoint: "+OpenSearchDomainEndpoint)
print("S3 Bucket: "+s3_bucket)
print("lambda Function : "+lambdaFunction)

## Step 2: Create the OpenSearch-Bedrock ML connector 

Amazon OpenSearch Service AI connectors allows you to create a connector from OpenSearch Service to Bedrock Runtime.
To create a connector, we use the Amazon OpenSearch Domain endpoint, BedrockEndpoint for [amazon.titan-embed-image-v1](https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-titan-multimodal-embeddings-model-bedrock/) model and an IAM role that grants OpenSearch Service access to invoke the bedrock model (this role is already created as a part of the cloudformation template)

In [101]:
#Setup OpenSearch connection
host = 'https://'+OpenSearchDomainEndpoint+'/'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

In [None]:
# Register repository
path = '_plugins/_ml/connectors/_create'
url = host + path

payload = {
   "name": "sagemaker: embedding",
   "description": "Test connector for Sagemaker embedding model",
   "version": 1,
   "protocol": "aws_sigv4",
   "credential": {
      "roleArn": "arn:aws:iam::"+account_id+":role/opensearch-sagemaker-role"
   },
   "parameters": {
      "region": region,
      "service_name": "bedrock"
   },
   "actions": [
      {
         "action_type": "predict",
         "method": "POST",
       "headers": {
        "content-type": "application/json",
        "x-amz-content-sha256": "required"
      },
         
    "url": "https://bedrock-runtime."+region+".amazonaws.com/model/amazon.titan-embed-image-v1/invoke",
     "request_body": "{ \"inputText\": \"${parameters.inputText:-null}\", \"inputImage\": \"${parameters.inputImage:-null}\" }",
      "pre_process_function": "\n    StringBuilder parametersBuilder = new StringBuilder(\"{\");\n    if (params.text_docs.length > 0 && params.text_docs[0] != null) {\n      parametersBuilder.append(\"\\\"inputText\\\":\");\n      parametersBuilder.append(\"\\\"\");\n      parametersBuilder.append(params.text_docs[0]);\n      parametersBuilder.append(\"\\\"\");\n      \n      if (params.text_docs.length > 1 && params.text_docs[1] != null) {\n        parametersBuilder.append(\",\");\n      }\n    }\n    \n    \n    if (params.text_docs.length > 1 && params.text_docs[1] != null) {\n      parametersBuilder.append(\"\\\"inputImage\\\":\");\n      parametersBuilder.append(\"\\\"\");\n      parametersBuilder.append(params.text_docs[1]);\n      parametersBuilder.append(\"\\\"\");\n    }\n    parametersBuilder.append(\"}\");\n    \n    return  \"{\" +\"\\\"parameters\\\":\" + parametersBuilder + \"}\";",
      "post_process_function": "\n      def name = \"sentence_embedding\";\n      def dataType = \"FLOAT32\";\n      if (params.embedding == null || params.embedding.length == 0) {\n          return null;\n      }\n      def shape = [params.embedding.length];\n      def json = \"{\" +\n                 \"\\\"name\\\":\\\"\" + name + \"\\\",\" +\n                 \"\\\"data_type\\\":\\\"\" + dataType + \"\\\",\" +\n                 \"\\\"shape\\\":\" + shape + \",\" +\n                 \"\\\"data\\\":\" + params.embedding +\n                 \"}\";\n      return json;\n    "
      }
   ]
}
headers = {"Content-Type": "application/json"}

r = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)
connector_id = json.loads(r.text)["connector_id"]
connector_id

## Step 3: Register and deploy the embedding model in OpenSearch

Here, Using the connector_id obtained from the previous step, we register and deploy the model in OpenSearch and get a model identifier (model_id)

In [None]:
# Register the model
path = '_plugins/_ml/models/_register'
url = host + path

payload = { "name": "Bedrock Multimodal embeddings model",
    "function_name": "remote",
    "description": "Bedrock Multimodal embeddings model",
    "connector_id": connector_id}

r = requests.post(url, auth=awsauth, json=payload, headers=headers)
model_id = json.loads(r.text)["model_id"]
print("Model registered under model_id: "+model_id)

# Deploy the model

path = '_plugins/_ml/models/'+model_id+'/_deploy'
url = host + path

r = requests.post(url, auth=awsauth, headers=headers)
deploy_status = json.loads(r.text)["status"]

print("Deployment status of the model, "+model_id+" : "+deploy_status)




## (Optional) Test the embedding model 

Optional: Run this snippet to test that the OpenSearch-Bedrock connection is successful and you can generate an embedding for text and images. These embeddings produced by the titan image embeddings model are 1024 dimensional, here, we print the first 10 embedding dimensional values of the below sample text/image,

**Image_text**: Sleek, stylish black sneakers made for urban exploration. With fashionable looks and comfortable design, these sneakers keep your feet looking great while you walk the city streets in style.

**Image**:

![Sleek, stylish black sneakers made for urban exploration. With fashionable looks and comfortable design, these sneakers keep your feet looking great while you walk the city streets in style](https://retail-demo-store-us-east-1.s3.amazonaws.com/images/footwear/2d2d8ec8-4806-42a7-b8ba-ceb15c1c7e84.jpg)

In [102]:
path = '_plugins/_ml/models/'+model_id+'/_predict'
url = host + path

img_url = "https://retail-demo-store-us-east-1.s3.amazonaws.com/images/footwear/2d2d8ec8-4806-42a7-b8ba-ceb15c1c7e84.jpg"
img = img_url.split("/")[-1]
urllib.request.urlretrieve(img_url, "images_retail_amazon/"+img) #use this for fetching image from url
with open("images_retail_amazon/"+img, "rb") as image_file:
    input_image_binary = base64.b64encode(image_file.read()).decode("utf8")

payload = {
                  "parameters": {
                    "inputText": "Sleek, stylish black sneakers made for urban exploration. With fashionable looks and comfortable design, these sneakers keep your feet looking great while you walk the city streets in style",
                      "inputImage":input_image_binary
                              }
                        }
r = requests.post(url, auth=awsauth, json=payload, headers=headers)
embed = json.loads(r.text)['inference_results'][0]['output'][0]['data'][0:10]
shape = json.loads(r.text)['inference_results'][0]['output'][0]['shape'][0]

print(str(embed))
print("\n")
print(str(shape)+" dimensions")

[0.0025672666, 0.009171144, 0.012386386, -0.020567901, 0.008987624, 0.039484773, 0.051271744, 0.04154323, 0.0230735, -0.031851035]


1024 dimensions


## Step 4: Create the OpenSearch ingest pipeline with text_image_embedding processor

In the ingestion pipeline, you choose "text_image_embedding" processor to generate vector embeddings for "image_description" field and/or "image_binary" field, store vector data in "vector_embedding" field of type knn_vector.

In [None]:
path = "_ingest/pipeline/bedrock-multimodal-ingest-pipeline"
url = host + path
payload = {
  "description": "A text/image embedding pipeline",
  "processors": [
    {
      "text_image_embedding": {
        "model_id":'jEvHQYwBuQkLO8mDLE2L',
        "embedding": "vector_embedding",
        "field_map": {
          "text": "image_description",
          "image": "image_binary"
        }
      }
    }
  ]
}

r = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)



## Step 5: Create the k-NN index

Create the K-NN index and set the pipeline created in the previous step "bedrock-multimodal-ingest-pipeline" as the default pipeline. The vector_embedding field must be mapped as a k-NN vector with 1024 dimensions matching the bedrock model dimension. 

For the kNN index we use **nmslib** engine with **hnsw** algorithm and **l2** spacetype

In [None]:
path = "bedrock-multimodal-demostore-search-index"
url = host + path

#this will delete the index if already exists
requests.delete(url, auth=awsauth, json=payload, headers=headers)

payload = {
  "settings": {
    "index.knn": True,
    "default_pipeline": "bedrock-multimodal-ingest-pipeline",
    "number_of_shards": 4,
    "number_of_replicas": "0"
  },
  "mappings": {
    "properties": {
      "vector_embedding": {
        "type": "knn_vector",
        "dimension": shape,
        "method": {
          "name": "hnsw",
          "engine": "lucene",
          "parameters": {}
        }
      },
         "caption": {
        "type": "text"
      },
            "category": {
        "type": "text"
      },
              "style": {
        "type": "text"
      },
                "price": {
        "type": "double"
      },
        "gender_affinity": {
        "type": "text"
      },
        "stock": {
        "type": "integer"
      },
      "image_description": {
        "type": "text"
      },
        "image_s3_url": {
        "type": "text"
      },
      "image_binary": {
        "type": "binary"
      }
    }
  }
}
r = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)

## Step 6: Prepare the dataset

Download the Demo retail store metadata file from S3 

In [None]:
!aws s3 cp s3://retail-demo-store-us-east-1/data/products.yaml .

yaml = YAML()
input_file = 'products.yaml'

items_ = yaml.load(open(input_file))

print(json.dumps(items_[0], indent=4))

## Step 7: Ingest the prepared data into OpenSearch

We ingest only the metadata of the product images into the opensearch index using bulk request

This step takes approcimately 10 minutes to load the data into opensearch

We also resize all the images to have uniform height and width using the below function

In [None]:
def resize_image(photo, bucket, width, height):
    
    Image.MAX_IMAGE_PIXELS = 100000000
    
    with Image.open(photo) as image:
        image.verify()
    with Image.open(photo) as image:    
        
        if image.format in ["JPEG", "PNG"]:
            file_type = image.format.lower()
            path = image.filename.rsplit(".", 1)[0]

            image.thumbnail((width, height))
            image.save(f"{path}-resized.{file_type}")

            #fileshort = os.path.basename(path)
            
            #print(path)

            s3.upload_file(
                f"{path}-resized.{file_type}",
                bucket,
                f"resized/{fileshort}-resized.{file_type}",
                ExtraArgs={"ContentType": f"image/{file_type}"},
            )
            
        else:
            raise Exception("Unsupported image format")
        
    return file_type, path


In [None]:
s3 = boto3.client('s3')

host = 'https://'+OpenSearchDomainEndpoint+'/'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
headers = { "Content-Type": "application/json"}
client = OpenSearch(
    hosts = [{'host': OpenSearchDomainEndpoint, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    connection_class = RequestsHttpConnection
)

#restrict the data to 50 images
#items_ = items_[0:50]

cnt = 0
batch = 0
action = json.dumps({ "index": { "_index": "bedrock-multimodal-demostore-search-index" } })
body_ = ''
batch_size = 100
last_batch = int(len(items_)/batch_size)
width = 2048
height = 2048


with alive_bar(len(items_), force_tty = True) as bar:
    for item in items_:
        cnt = cnt +1
          
        payload = {}
        payload['image_s3_url'] = "https://retail-demo-store-us-east-1.s3.amazonaws.com/images/"+item["category"]+"/"+item["image"]
        payload['image_description'] = item['description']
        payload['price'] = item['price']
        
        if('style' in item):
            payload['style'] = item['style']
        payload['category'] = item['category']
        if('current_stock' in item):
            payload['current_stock'] = item['current_stock']
        if('gender_affinity' in item):
            payload['gender_affinity'] = item['gender_affinity']
        payload['caption'] = item['name']
        
        #generate image binary
        
        fileshort = "images_retail_amazon/"+item["image"]

        s3.download_file('retail-demo-store-us-east-1', "images/"+item["category"]+"/"+item["image"], fileshort)



        file_type, path = resize_image(fileshort, s3_bucket, width, height)

        with open(fileshort.split(".")[0]+"-resized."+file_type, "rb") as image_file:
            input_image = base64.b64encode(image_file.read()).decode("utf8")
        
        os.remove(fileshort.split(".")[0]+"-resized."+file_type)
        os.remove(fileshort)

        payload['image_binary'] = input_image
        
        body_ = body_ + action + "\n" + json.dumps(payload) + "\n"
        
        if(cnt == batch_size):
            
            response = client.bulk(
            index = 'bedrock-multimodal-demostore-search-index',
            body = body_
            )
            
            cnt = 0
            batch += 1
            
            if(batch != last_batch):
                body_ = ''
        
        
        bar()
    #ingest the remaining rows
    response = client.bulk(
            index = 'bedrock-multimodal-demostore-search-index',
            body = body_
            )
    


## Step 8: Update the environment variables of lambda

Here, we pass the OpenSearch bedrock model identifier to Lambda so that the the incoming queries will be embedded using the same model we used to embed the documents.

In [None]:
lambda_client = boto3.client('lambda')

exist_env = lambda_client.get_function_configuration(FunctionName=lambdaFunction)['Environment']['Variables']

if('BEDROCK_MULTIMODAL_MODEL_ID' in exist_env.keys()):
    exist_env.update({'BEDROCK_MULTIMODAL_MODEL_ID': model_id})
else:
    exist_env['BEDROCK_MULTIMODAL_MODEL_ID'] = model_id


response = lambda_client.update_function_configuration(
            FunctionName=lambdaFunction,
            Environment={
                'Variables': exist_env
            }
        )

## Step 9: Create the Lambda URL

Here we create external Lambda URL for lambda function to be called from the outside world.

In [115]:
lambda_ = boto3.client('lambda')

try:
  response = lambda_.get_function_url_config(FunctionName=lambdaFunction)

except lambda_.exceptions.ResourceNotFoundException:
    response_ = lambda_.add_permission(
    FunctionName=lambdaFunction,
    StatementId=lambdaFunction+'_permissions',
    Action="lambda:InvokeFunctionUrl",
    Principal=account_id,
    FunctionUrlAuthType='AWS_IAM')


    response = lambda_.create_function_url_config(
    FunctionName=lambdaFunction,
    AuthType='AWS_IAM',
    Cors={
        'AllowCredentials': True,

        'AllowMethods':["*"],
        'AllowOrigins': ["*"]

    },
    InvokeMode='RESPONSE_STREAM'
    )

query_invoke_URL = response['FunctionUrl']
print(query_invoke_URL)

https://bwwj5hqbym7w245urmrijqrifa0gzmoc.lambda-url.us-east-1.on.aws/


## Step 10: Host the Hybrid Search application in EC2

## Notice

To ensure security access to the provisioned resources, we use EC2 security group to limit access scope. Before you go into the final step, you need to add your current **PUBLIC IP** address to the ec2 security group so that you are able to access the web application (chat interface) that you are going to host in the next step.

<h3 style="color:red;"><U>Warning</U></h3>
<h4>Without doing the below steps, you will not be able to proceed further.</h4>

<div>
    <h3 style="color:red;"><U>Enter your IP address </U></h3>
    <h4> STEP 1. Get your IP address <span style="display:inline;color:blue"><a href = "https://ipinfo.io/ip ">HERE</a></span>. If you are connecting with VPN, we recommend you disconnect VPN first.</h4>
</div>

<h4>STEP 2. Run the below cell </h4>
<h4>STEP 3. Paste the IP address in the input box that prompts you to enter your IP</h4>
<h4>STEP 4. Press ENTER</h4>

In [None]:
my_ip = (input("Enter your IP : ")).split(".")
my_ip.pop()
IP = ".".join(my_ip)+".0/24"

port_protocol = {443:'HTTPS',80:'HTTP',8501:'streamlit'}

IpPermissions = []

for port in port_protocol.keys():
     IpPermissions.append({
            'FromPort': port,
            'IpProtocol': 'tcp',
            'IpRanges': [
                {
                    'CidrIp': IP,
                    'Description': port_protocol[port]+' access',
                },
            ],
            'ToPort': port,
        })

IpPermissions

for output in cfn_outputs:
    if('securitygroupid' in output['OutputKey'].lower()):
        sg_id = output['OutputValue']
        
#sg_id = 'sg-0e0d72baa90696638'

ec2_ = boto3.client('ec2')        

response = ec2_.authorize_security_group_ingress(
    GroupId=sg_id,
    IpPermissions=IpPermissions,
)

print("\nIngress rules added for the security group, ports:protocol - "+json.dumps(port_protocol)+" with my ip - "+IP)

Finally, We are ready to host our conversational search application, here we perform the following steps, Steps 2-5 are achieved by executing the terminal commands in the ec2 instance using a SSM client.
1. Update the web application code files with lambda url (in [api.py](https://github.com/aws-samples/semantic-search-with-amazon-opensearch/blob/main/generative-ai/Module_1_Build_Conversational_Search/webapp/api.py)) and s3 bucket name (in [app.py](https://github.com/aws-samples/semantic-search-with-amazon-opensearch/blob/main/generative-ai/Module_1_Build_Conversational_Search/webapp/app.py))
2. Archieve the application files and push to the configured s3 bucket.
3. Download the application (.zip) from s3 bucket into ec2 instance (/home/ec2-user/), and uncompress it.
4. We install the streamlit and boto3 dependencies inside a virtual environment inside the ec2 instance.
5. Start the streamlit application.

In [124]:
#modify the code files with lambda url and s3 bucket names
query_invoke_URL_cmd = query_invoke_URL.replace("/","\/")

#Update the webapp files to include the s3 bucket name and the LambdaURL
!sed -i 's/API_URL_TO_BE_REPLACED/{query_invoke_URL_cmd}/g' webapp/api.py
#Push the WebAPP code artefacts to s3
!cd webapp && zip -r webapp.zip *
!aws s3 cp webapp/webapp.zip s3://$s3_bucket
        
#Get the Ec2 instance ID which is already deployed
response = cfn.describe_stack_resources(
    StackName=stackname
)
for resource in response['StackResources']:
    if(resource['ResourceType'] == 'AWS::EC2::Instance'):
        ec2_instance_id = resource['PhysicalResourceId']
   
ec2_instance_id

updating: api.py (deflated 58%)
updating: app.py (deflated 75%)
upload: webapp/webapp.zip to s3://hybridsearch-opensearch-app-s3buckethosting-b7nfsjknlc83/webapp.zip


'i-05bbd71645c0b0990'

Copy the URL that will be generated after running the next cell and open the URL in your web browser to start using the application.

In [125]:
# function to execute commands in ec2 terminal
def execute_commands_on_linux_instances(client, commands):
    resp = client.send_command(
        DocumentName="AWS-RunShellScript", # One of AWS' preconfigured documents
        Parameters={'commands': commands},
        InstanceIds=[ec2_instance_id],
    )
    return resp['Command']['CommandId']

ssm_client = boto3.client('ssm') 

commands = [
            'aws s3 cp s3://'+s3_bucket+'/webapp.zip /home/ec2-user/',
            'unzip -o /home/ec2-user/webapp.zip -d /home/ec2-user/'  ,  
            'sudo chmod -R 0777 /home/ec2-user/',
            'python3 -m venv /home/ec2-user/.myenv',
            'source /home/ec2-user/.myenv/bin/activate',
            'pip install streamlit',
            'pip install boto3',
    
            #start the web applicaiton
            'streamlit run /home/ec2-user/app.py',
            ]

command_id = execute_commands_on_linux_instances(ssm_client, commands)

ec2_ = boto3.client('ec2')
response = ec2_.describe_instances(
    InstanceIds=[ec2_instance_id]
)
public_ip = response['Reservations'][0]['Instances'][0]['PublicIpAddress']
print("Please wait while the application is being hosted . . .")
time.sleep(10)
print("\nApplication hosted successfully")
print("\nClick the below URL to open the application. It may take up to a minute or two to start the application, Please keep refreshing the page if you are seeing connection error.\n")
print('http://'+public_ip+":8501")
#print("\nCheck the below video on how to interact with the application")

Please wait while the application is being hosted . . .

Application hosted successfully

Click the below URL to open the application. It may take up to a minute or two to start the application, Please keep refreshing the page if you are seeing connection error.

http://44.204.151.62:8501
