# Chapter 7 - Open-source Frameworks: Contextual Text and Image Search Engine with Amazon Bedrock

## Overview
This notebook demonstrates how to build a multimodal search engine that can handle both text and images using Amazon Bedrock. We'll explore how to create embeddings for different content types and perform semantic search across multimedia content.

## Introduction
This notebook demonstrates how to build a product recommendation search engine that understands both text and images using Amazon Bedrock's Titan Multimodal Embedding model and FAISS vector database. The system enables powerful semantic search across product data, allowing users to find visually and contextually similar products.

## Prerequisites
- AWS account with Amazon Bedrock access
- Access to Titan Multimodal Embedding model 
- The Amazon Berkeley Objects dataset (sample product catalog)

## Setup

### Install Required Dependencies

In [None]:
!pip install opensearch-py
!pip install requests-aws4auth
!pip install -U boto3
!pip install -U botocore
!pip install -U awscli
!pip install s3fs
!pip install sns
!pip install seaborn
!pip install sagemaker

In [None]:
%pip install -U --no-cache-dir boto3
%pip install -U --no-cache-dir  \
    "langchain>=0.1.11" \
    sqlalchemy -U \
    "faiss-cpu>=1.7,<2" \
    "pypdf>=3.8,<4" \
    pinecone-client==2.2.4 \
    apache-beam==2.52. \
    tiktoken==0.5.2 \
    "ipywidgets>=7,<8" \
    matplotlib==3.8.2 \
    anthropic==0.9.0
%pip install -U --no-cache-dir transformers

In [None]:
!pip install sqlalchemy --upgrade

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

### Initialize AWS Services

In [None]:
import boto3
import pandas as pd
import os
import re
import boto3
import json
import time
import base64
import logging
import numpy as np
import seaborn as sns
from PIL import Image
from io import BytesIO


from tqdm import tqdm
from urllib.parse import urlparse
from multiprocessing.pool import ThreadPool
from sagemaker.s3 import S3Downloader as s3down

from PIL import Image
import matplotlib.pyplot as plt



In [None]:
import boto3
import os
from IPython.display import Markdown, display, Pretty

# getting boto3 clients for required AWS services
sts_client = boto3.client('sts')
s3_client = boto3.client('s3')
#aoss_client = boto3.client('opensearchserverless')


# Configure AWS clients
region = os.environ.get("AWS_REGION")
boto3_bedrock = boto3.client(
    service_name='bedrock-runtime',
    region_name=region,
)

### Define Utility Functions

In [None]:
# Bedrock models
# Select Amazon titan-embed-image-v1 as Embedding model for multimodal indexing
multimodal_embed_model = f'amazon.titan-embed-image-v1'


""" 
Function to plot heatmap from embeddings
"""

def plot_similarity_heatmap(embeddings_a, embeddings_b):
    inner_product = np.inner(embeddings_a, embeddings_b)
    sns.set(font_scale=1.1)
    graph = sns.heatmap(
        inner_product,
        vmin=np.min(inner_product),
        vmax=1,
        cmap="OrRd",
    )

""" 
Function to fetch the image based on image id from dataset
"""
def get_image_from_item_id( item_id = "0", dataset = None, return_image=True):
 
    item_idx = dataset.query(f"item_id == {item_id}").index[0]
    img_path = dataset.iloc[item_idx].image_path
    
    if return_image:
        img = Image.open(img_path)
        return img, dataset.iloc[item_idx].item_desc
    else:
        return img_path, dataset.iloc[item_idx].item_desc
    print(item_idx,img_path)


""" 
Function to fetch the image based on image id from S3 bucket
"""
    
def get_image_from_item_id_s3(item_id = "B0896LJNLH", dataset = None, image_path = None,  return_image=True):

    item_idx = dataset.query(f"item_id == '{item_id}'").index[0]
    img_loc =  dataset.iloc[item_idx].img_full_path
    
    if img_loc.startswith('s3'):
        # download and store images locally 
        local_data_root = f'./data/images'
        local_file_name = img_loc.split('/')[-1]
 
        s3down.download(img_loc, local_data_root)
 
    local_image_path = f"{local_data_root}/{local_file_name}"
    
    if return_image:
        img = Image.open(local_image_path)
        return img, dataset.iloc[item_idx].item_name_in_en_us
    else:
        return local_image_path, dataset.iloc[item_idx].item_name_in_en_us

""" 
Function to display the images.
"""
def display_images(images: [Image], columns=2, width=20, height=8, max_images=15, label_wrap_length=50, label_font_size=8):
 
    if not images:
        print("No images to display.")
        return 
 
    if len(images) > max_images:
        print(f"Showing {max_images} images of {len(images)}:")
        images=images[0:max_images]
 
    height = max(height, int(len(images)/columns) * height)
    plt.figure(figsize=(width, height))
    for i, image in enumerate(images):
 
        plt.subplot(int(len(images) / columns + 1), columns, i + 1)
        plt.imshow(image)
 
        if hasattr(image, 'name_and_score'):
            plt.title(image.name_and_score, fontsize=label_font_size); 
            

## Data Preparation

### Load Product Dataset

In [None]:
# Load product metadata
meta = pd.read_json("s3://amazon-berkeley-objects/listings/metadata/listings_0.json.gz", lines=True)
# Extract English product titles
def func_(x):
    us_texts = [item["value"] for item in x if item["language_tag"] == "en_US"]
    return us_texts[0] if us_texts else None

meta = meta.assign(item_name_in_en_us=meta.item_name.apply(func_))
meta = meta[~meta.item_name_in_en_us.isna()][["item_id", "item_name_in_en_us", "main_image_id"]]
print(f"#products with US English title: {len(meta)}")
meta.head()

In [None]:
# Load image metadata and merge with product data
image_meta = pd.read_csv("s3://amazon-berkeley-objects/images/metadata/images.csv.gz")
dataset = meta.merge(image_meta, left_on="main_image_id", right_on="image_id")

In [None]:
# Create a new column in dataset with FULL PATH of the image
dataset = dataset.assign(img_full_path=f's3://amazon-berkeley-objects/images/small/' + dataset.path.astype(str))
dataset

In [None]:
image, item_name = get_image_from_item_id_s3(item_id = "B07JQX8S2X", dataset = dataset, image_path = f's3://amazon-berkeley-objects/images/small/' )
print(item_name)
image

In [None]:
# Sample a small batch for demonstration
batch_size=10
dataset = dataset.iloc[:batch_size]
dataset

In [None]:
for img_details in enumerate(zip(dataset['img_full_path'], dataset['item_name_in_en_us'])):
    print(img_details[0], img_details[1])

## Generate Multimodal Embeddings

In [None]:
%%time


def get_titan_multimodal_embedding_fix(
    image_path:str=None,  # maximum 2048 x 2048 pixels
    description:str=None, # English only and max input tokens 128
    dimension:int=1024,   # 1,024 (default), 384, 256
    model_id:str=multimodal_embed_model
):
    # print(image_path)
    # print(description)
    payload_body = {}
    embedding_config = {
        "embeddingConfig": { 
             "outputEmbeddingLength": dimension
         }
    }
    # You can specify either text or image or both
    if image_path:
        if image_path.startswith('s3'):
            s3 = boto3.client('s3')
            bucket_name, key = image_path.replace("s3://", "").split("/", 1)
            obj = s3.get_object(Bucket=bucket_name, Key=key)
            # Read the object's body
            body = obj['Body'].read()
            # Encode the body in base64
            base64_image = base64.b64encode(body).decode('utf-8')
            payload_body["inputImage"] = base64_image
        else:   
            with open(image_path, "rb") as image_file:
                input_image = base64.b64encode(image_file.read()).decode('utf8')
            payload_body["inputImage"] = input_image
    if description:
        payload_body["inputText"] = description

    # print(payload_body)
    # print(json.dumps({**payload_body, **embedding_config}))
    print(f" get_titan_multimodal_embedding_fix()::payload:keys={payload_body.keys()}::")
    response = boto3_bedrock.invoke_model(
        body=json.dumps({**payload_body, **embedding_config}), 
        modelId=model_id,
        accept="application/json", 
        contentType="application/json"
    )

    return json.loads(response.get("body").read())

# Generate embeddings for each product
multimodal_embeddings_img = []
for img_details in enumerate(zip(dataset['img_full_path'], dataset['item_name_in_en_us'])):
    #print(img_details[1])
    embedding = get_titan_multimodal_embedding_fix(description=img_details[1][1], image_path=img_details[1][0], dimension=1024)["embedding"]
    print(np.array(embedding).shape)
    multimodal_embeddings_img.append(embedding)

# Add embeddings to dataset
dataset = dataset.assign(embedding_img=multimodal_embeddings_img)

In [None]:
dataset.head()

In [None]:
dataset['item_name_in_en_us'].to_list()

In [None]:

plot_similarity_heatmap(multimodal_embeddings_img[:batch_size], multimodal_embeddings_img[:batch_size])

## Create Vector Store with FAISS

### Prepare Metadata for Vector Store

In [None]:
from langchain.document_loaders import CSVLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.schema import Document

In [None]:
# Prepare metadata dictionaries
metadata_dict =  [ {key:value} for i, (key, value) in enumerate(zip(dataset['item_name_in_en_us'].to_list(), dataset['img_full_path'].to_list()))] 
metadata_dict

In [None]:
# create vector store
from langchain.document_loaders import CSVLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.schema import Document

# Import from langchain_aws instead of langchain.embeddings
from langchain_aws import BedrockEmbeddings
from langchain.vectorstores import FAISS

multimodal_embed_model = 'amazon.titan-embed-image-v1'
# create instantiation to embedding model
embedding_model = BedrockEmbeddings(
    client=boto3_bedrock,
    model_id=multimodal_embed_model
)

text_embedding_pairs = zip(dataset['item_name_in_en_us'].to_list(), multimodal_embeddings_img)

# It seems metadata_dict is commented out, but you're using it in the FAISS creation
# Make sure to define metadata_dict before using it
metadata_dict = [{"item_name": name, "img_path": path} for name, path in zip(dataset['item_name_in_en_us'].to_list(), dataset['img_full_path'].to_list())]

db = FAISS.from_embeddings(text_embedding_pairs, embedding_model, metadatas=metadata_dict)

## Perform Semantic Search

### Test Search Functionality with Text Query

In [None]:
# Example text query
query_prompt = "drinkware glass"

v = embedding_model.embed_query(query_prompt)
print(v[0:10])
results = db.similarity_search_by_vector(v, k=2)
display(Markdown('Let us look at the documents which had the relevant information pertaining to our query'))
for r in results:
    display(Markdown(f'{r.page_content}'), Markdown(f'{r.metadata}'))
    display(Markdown(f'------------------------------------'))

In [None]:
print(results[0].metadata.values())
print(results[0].metadata.keys())

In [None]:
def get_image_from_faiss_results(results=None):
    image_list = []
    
    for result in results:
        # Extract only the image path from metadata
        img_path = result.metadata['img_path']
        print(f"Processing image: {img_path}")
        
        if img_path.startswith('s3'):
            # download and store images locally 
            local_data_root = f'./data/images'
            local_file_name = img_path.split('/')[-1]
            
            # Make sure the directory exists
            import os
            os.makedirs(local_data_root, exist_ok=True)
            
            # Download the image
            s3down.download(img_path, local_data_root)
            
            local_image_path = f"{local_data_root}/{local_file_name}"
            
            # Check if file exists before opening
            if os.path.exists(local_image_path):
                img = Image.open(local_image_path)
                image_list.append(img)
            else:
                print(f"Warning: Could not find downloaded image at {local_image_path}")
        else:
            print(f"Skipping non-S3 path: {img_path}")
    
    return image_list


### 5.1. Perform Image Search based on Text Input

Let’s take a look at the results of a simple query. In below example, we'll receive an text input i.e. "drinkware glass" from user, and then will send it to search engine to find the similar items.

Find the similar items based on use queries. You can see that we found glass drinkware from our dataset based on the input query. That's what we want to achieve.


In [None]:
query_prompt = "drinkware glass"
v = embedding_model.embed_query(query_prompt)
results = db.similarity_search_by_vector(v, k=2)

all_images = get_image_from_faiss_results(results)

# If the display_images function exists, use it
if all_images:
    display_images(all_images)
else:
    print("No images were retrieved")

### Retrieve and Display Images from Search Results

In [None]:
item_id = "B0896LJNLH"

image, item_name = get_image_from_item_id_s3(item_id = item_id, dataset = dataset, image_path = f's3://amazon-berkeley-objects/images/small/' )
print(item_name)
image

In [None]:
""" 
Function for semantic search capability using knn on input image prompt.
"""
def find_similar_items_from_image(image_path: str, k_nn: int ) -> []:
    """
    Main semantic search capability using knn on input image prompt.
    Args:
        k: number of top-k similar vectors to retrieve from OpenSearch index
        num_results: number of the top-k similar vectors to retrieve
        index_name: index name in OpenSearch
    """
    query_emb = get_titan_multimodal_embedding_fix(image_path=search_image_path, dimension=1024)["embedding"]
    #print(query_emb)
    results = db.similarity_search_by_vector(query_emb, k=2)
    print(results)
    image_list = get_image_from_faiss_results(results)
    return image_list

In [None]:
item_id = "B0896LJNLH"
search_image_path = dataset[dataset['item_id']==item_id]['img_full_path'].iloc[0]
print(search_image_path)

image_list = find_similar_items_from_image(search_image_path, 2)
display_images(image_list)


### Retrieve and Display Images from Search Results



In [None]:
import json 
import boto3
import os
import base64
from PIL import Image
from IPython.display import Markdown, display

# Setup S3 client for downloading the image
s3 = boto3.client('s3')

# Define the S3 path and extract bucket and key
s3_path = "s3://amazon-berkeley-objects/images/small/07/075e5d67.jpg"
bucket_name = s3_path.split('/')[2]
object_key = '/'.join(s3_path.split('/')[3:])

# Create local directory for the image if it doesn't exist
local_data_root = './data/images'
os.makedirs(local_data_root, exist_ok=True)

# Define the local path for the downloaded image
local_file_name = s3_path.split('/')[-1]
local_image_path = f"{local_data_root}/{local_file_name}"

# Download the image from S3
try:
    s3.download_file(bucket_name, object_key, local_image_path)
    print(f"Successfully downloaded image to {local_image_path}")
except Exception as e:
    print(f"Error downloading image: {e}")
    # Use a fallback image path if download fails
    local_image_path = "data/images/departure_rate.jpg"

# Setup Bedrock client
region = os.environ.get("AWS_REGION")
bedrock = boto3.client(
    service_name='bedrock-runtime',
    region_name=region,
)

# Read and encode the image
with open(local_image_path, "rb") as image_file:
    content_image = base64.b64encode(image_file.read()).decode('utf8')

# Display the image we're about to send
img = Image.open(local_image_path)
display(img)


In [None]:
# Prepare the request body
body = json.dumps(
    {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 300,
        "messages": [{
            "role": "user",
            "content": [{
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": "image/jpeg",
                    "data": content_image,
                }
            },
            {
                "type": "text",
                "text": "Describe this product in detail. What is it used for? What are its key features?"
            }
            ]
        }],
        "temperature": 0.5,
        "top_p": 0.9
    }  
)  

# Define model and invoke it
modelId = "anthropic.claude-3-sonnet-20240229-v1:0"
accept = "application/json"
contentType = "application/json"

try:
    response = bedrock.invoke_model(
        body=body, modelId=modelId, accept=accept, contentType=contentType
    )
    response_body = json.loads(response.get("body").read())
    
    # Display the response
    print("\nClaude's description:")
    print(response_body['content'][0]['text'])
except Exception as e:
    print(f"Error invoking model: {e}")

## Conclusion

This notebook demonstrates how to build a contextual text and image search engine using Amazon Bedrock's Titan Multimodal Embedding model and FAISS vector database. The system enables powerful semantic search capabilities that understand both visual and textual context, making it ideal for product recommendation systems.

The approach can be extended to larger datasets and more complex search requirements by:
- Scaling the vector database with more products
- Adding filtering capabilities based on metadata
- Integrating with recommendation systems
- Adding user personalization features