In [None]:
%load_ext autoreload
%autoreload 2

%pip install opensearch-py
%pip install requests-aws4auth
%pip install -U aiobotocore
%pip install -U boto3
%pip install -U botocore
%pip install -U awscli
%pip install -U s3fs

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

import boto3
import pandas as pd
import os
import re
import boto3
import json
import time
import base64
import logging
import numpy as np
import seaborn as sns
from PIL import Image
from io import BytesIO


from tqdm import tqdm
from urllib.parse import urlparse
from multiprocessing.pool import ThreadPool

import sagemaker
from utils import *
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, helpers

# getting boto3 clients for required AWS services
sts_client = boto3.client('sts')
s3_client = boto3.client('s3')
aoss_client = boto3.client('opensearchserverless')

bedrock_client = boto3.client(
    "bedrock-runtime", 
    "us-east-1", 
    endpoint_url="https://bedrock-runtime.us-east-1.amazonaws.com"
)

session = boto3.session.Session()
region_name = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
region, account_id

# Create a SageMaker session
sagemaker_role_arn = sagemaker.get_execution_role()
sagemaker_role_arn

Load the metadata

You can use pandas to load metadata, then select products which have titles in US English from the data frame. You will use a column called main_image_id to merge item name with item image later.

In [None]:
meta = pd.read_json("s3://amazon-berkeley-objects/listings/metadata/listings_0.json.gz", lines=True)
def func_(x):
    us_texts = [item["value"] for item in x if item["language_tag"] == "en_US"]
    return us_texts[0] if us_texts else None

meta = meta.assign(item_name_in_en_us=meta.item_name.apply(func_))
meta = meta[~meta.item_name_in_en_us.isna()][["item_id", "item_name_in_en_us", "main_image_id"]]
print(f"#products with US English title: {len(meta)}")
meta.head()

You should be able to see over 1600 products in the data frame. Next, you can link the item names with item images. images/metadata/images.csv.gz contains Image metadata. This file is a gzip-compressed comma-separated value (CSV) file with the following columns: image_id, height, width, and path. You can read the meta data file and then merge it with item metadata.

In [None]:
image_meta = pd.read_csv("s3://amazon-berkeley-objects/images/metadata/images.csv.gz")
dataset = meta.merge(image_meta, left_on="main_image_id", right_on="image_id")
# Create a new column in dataset with FULL PATH of the image
dataset = dataset.assign(img_full_path=f's3://amazon-berkeley-objects/images/small/' + dataset.path.astype(str))
dataset

You can have a look at one sample image from the dataset by running the following code

image, item_name = get_image_from_item_id_s3(item_id = "B0896LJNLH", dataset = dataset, image_path = f's3://amazon-berkeley-objects/images/small/' )
print(item_name)
image

## 3. Generate embedding from item images
Amazon Titan Multimodal Embeddings G1 Generation 1 (G1) is able to project both images and text into the same latent space, so we only need to encode item images or texts into embedding space. In this practice, you can use batch inference to encode item images. Before creating the job, you need to copy item images from Amazon Berkeley Objects Dataset public S3 bucket to your own S3 Bucket. The operation needs take less than 10 mins.

But for this notebook, we'll use real-time API than batch inference.

In [None]:
%%time
multimodal_embeddings_img = []

for idx, path in enumerate(dataset['img_full_path']):
    embedding = get_titan_multimodal_embedding(image_path=path, dimension=1024)["embedding"]
    multimodal_embeddings_img.append(embedding)

dataset = dataset.assign(embedding_img=multimodal_embeddings_img)
dataset.head()
# Store dataset
 dataset.to_csv('dataset.csv', index = False)

## 4. Create a vector store - OpenSearch Serverless index


In [None]:
import random
suffix = random.randrange(200, 900)

identity = boto3.client('sts').get_caller_identity()['Arn']

def create_policies_in_oss(vector_store_name, aoss_client, role_arn):
    
    encryption_policy_name = f"titan-mm-sample-sp-{suffix}"
    network_policy_name = f"titan-mm-sample-np-{suffix}"
    access_policy_name = f'titan-mm-sample-ap-{suffix}'

    try:
        encryption_policy = aoss_client.create_security_policy(
            name=encryption_policy_name,
            policy=json.dumps(
                {
                    'Rules': [{'Resource': ['collection/' + vector_store_name],
                               'ResourceType': 'collection'}],
                    'AWSOwnedKey': True
                }),
            type='encryption'
        )
    except Exception as ex:
        print(ex)
    
    try:
        network_policy = aoss_client.create_security_policy(
            name=network_policy_name,
            policy=json.dumps(
                [
                    {'Rules': [{'Resource': ['collection/' + vector_store_name],
                                'ResourceType': 'collection'}],
                     'AllowFromPublic': True}
                ]),
            type='network'
        )
    except Exception as ex:
        print(ex)
    
    try:
        
        access_policy = aoss_client.create_access_policy(
            name=access_policy_name,
            policy=json.dumps(
                [
                    {
                        'Rules': [
                            {
                                'Resource': ['collection/' + vector_store_name],
                                'Permission': [
                                    'aoss:CreateCollectionItems',
                                    'aoss:DeleteCollectionItems',
                                    'aoss:UpdateCollectionItems',
                                    'aoss:DescribeCollectionItems'],
                                'ResourceType': 'collection'
                            },
                            {
                                'Resource': ['index/' + vector_store_name + '/*'],
                                'Permission': [
                                    'aoss:CreateIndex',
                                    'aoss:DeleteIndex',
                                    'aoss:UpdateIndex',
                                    'aoss:DescribeIndex',
                                    'aoss:ReadDocument',
                                    'aoss:WriteDocument'],
                                'ResourceType': 'index'
                            }],
                        'Principal': [identity, role_arn],
                        'Description': 'Easy data policy'}
                ]),
            type='data'
        )
    except Exception as ex:
        print(ex)
        
    return encryption_policy, network_policy, access_policy

### 4.1 create a new collection of type VECTORSEARCH


In [None]:
# Create Collection
vector_store_name = f'titan-mm-image-collection-{suffix}'

encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       role_arn=sagemaker_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

### 4.2 Setting up the Amazon OpenSearch Serverless index using KNN settings
Once the OpenSearch collection is created, create an index to store the item meta data and the embeddings. The index settings must be configured beforehand to enable the KNN functionality using the following configuration:

In [None]:
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'

service = 'aoss'
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"titam-mm-index"
index_body = {
   "settings": {
      "index.knn": "true"
   },
   "mappings": {
      "properties": {
         "image_vector": {
            "type": "knn_vector",
            "dimension": 1024 # Embedding size for Amanon Titan Multimodal Embedding G1 model, it is 1,024 (default), 384, 256
         },
         "description": {"type": "text"},
          "item_id" : {"type": "text"},
         "image_url": {"type": "text"}
      }
   }
}
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
time.sleep(60)

To confirm its creation, we can retrieve the description of the new vector index you just created

In [None]:
# We would get an index already exists exception if the index already exists, and that is fine.
try:
    response = oss_client.indices.create(index_name, body=index_body)
    print(f"response received for the create index -> {response}")
except Exception as e:
    print(f"error in creating index={index_name}, exception={e}")

%%Optional
### 4.3 Ingest the image embeddings
Next you need to loop through your dataset and ingest items data into the cluster. A more robust and scalable solution for the embedding ingestion can be found in [Ingesting enriched data into Amazon ES](https://aws.amazon.com/blogs/industries/novartis-ag-uses-amazon-elasticsearch-k-nearest-neighbor-knn-and-amazon-sagemaker-to-power-search-and-recommendation/). The data ingestion for this POC should finish within 60 seconds. It also executes a simple query to verify the data have been ingested into the index.

In [None]:
%%time
import tqdm.notebook as tq

for idx, record in tq.tqdm(dataset.iterrows(), total=len(dataset)):
    document = {
                'image_vector': dataset['embedding_img'][idx],
                "description":   dataset['item_name_in_en_us'][idx],
                "item_id" : dataset['item_id'][idx],
                "image_url": dataset['img_full_path'][idx],  
                }
    response = oss_client.index(
    index = index_name,
    body = document
    )

In [None]:
## 5. Perform a real-time Multimodal Search

In [None]:
query_prompt = "articulos de limpieza para mascota"
similar_items = find_similar_items_from_query(query_prompt = query_prompt, k=2, num_results=10, index_name=index_name, dataset = dataset, 
                                   open_search_client = oss_client, image_root_path=f's3://amazon-berkeley-objects/images/small/')

display_images(similar_items)