# Creating a vectorstore with Amazon Bedrock multimodal-embeddings

This notebook gives a step-by-step tutorial to populate a vector database in [Opensearch Serverless](https://aws.amazon.com/opensearch-service/features/serverless/). These vector embeddings will be used by the Bedrock Agent to search for similar images in the provided vectorstore.

This notebook is required if you would like to the agent to be able to take the `/image_look_up` action, otherwise you can directly run the `Create_Fashion_Agent.ipynb` notebook.

#### Download the dataset locally

For demo purposes, we will be using an external [dataset](https://github.com/orbitalsonic/Fashion-Dataset-Images-Eastern-Dress) from GitHub.

In [None]:
!git clone https://github.com/orbitalsonic/Fashion-Dataset-Images-Western-Dress.git

### Add all the dependencies/imports

In [None]:
!pip install -r requirements.txt

In [None]:
import os
import boto3
from opensearchpy import AWSV4SignerAuth, OpenSearch, RequestsHttpConnection
from pathlib import Path
import yaml
import json
from tqdm.auto import tqdm
from pathlib import Path

In [2]:
with open(os.path.join(Path(os.path.abspath("")), "config.yml"), "r") as ymlfile:
    config = yaml.load(ymlfile, Loader=yaml.SafeLoader)

In [3]:
with open("variables.json", "r") as f:
    variables = json.load(f)

In [4]:
boto3_session = boto3.Session(profile_name="alexhrn-Admin", region_name="us-east-1")

In [5]:
import io
import json
import boto3
import base64
from PIL import Image


# Define output vector size – 1,024 (default), 384, 256

EMBEDDING_CONFIG = {
    "embeddingConfig": {"outputEmbeddingLength": int(config["embeddingSize"])}
}


class OpensearchIngestion:
    def __init__(self, client, session=None):
        self.client = client
        self.session = session if session else boto3.Session()
        self.region = self.session.region_name

    def put_bulk_in_opensearch(self, docs):
        print(f"Putting {len(docs)} documents in OpenSearch")
        success, failed = self.client.bulk(docs)
        return success, failed

    def check_index_exists(self, index_name):
        return self.client.indices.exists(index=index_name)

    def create_index(self, index_name):
        if not self.check_index_exists(index_name):
            settings = {
                "settings": {
                    "index.knn": True,
                }
            }
            response = self.client.indices.create(index=index_name, body=settings)
            return bool(response["acknowledged"])
        return False

    def create_index_mapping(self, index_name):
        response = self.client.indices.put_mapping(
            index=index_name,
            body={
                "properties": {
                    "vector_field": {
                        "type": "knn_vector",
                        "dimension": config["embeddingSize"],
                        "method": {
                            "name": "hnsw",
                            "engine": "nmslib",
                        },
                    },
                    "image_b64": {"type": "text"},
                }
            },
        )
        return bool(response["acknowledged"])

    def get_bedrock_client(self):
        return self.session.client("bedrock-runtime", region_name=self.region)

    def create_titan_multimodal_embeddings(
        self,
        image_path: str = "None",
        text: str = "None",
    ):
        """Creates the titan embeddings from the provided image and/or text."""
        payload_body = {}

        if image_path and image_path != "None":
            payload_body["inputImage"] = self.get_encoded_image(image_path)
        if text and (text != "None"):
            payload_body["inputText"] = text
        if (image_path == "None") and (text == "None"):
            raise "please provide either an image and/or a text description"

        bedrock_client = self.get_bedrock_client()

        response = bedrock_client.invoke_model(
            body=json.dumps({**payload_body, **EMBEDDING_CONFIG}),
            modelId="amazon.titan-embed-image-v1",
            accept="application/json",
            contentType="application/json",
        )
        vector = json.loads(response["body"].read())
        return (payload_body, vector)

    def get_encoded_image(self, image_path: str):
        max_height, max_width = 1024, 1024  # Conservative Limit. Can increase to 2048
        # Open the image and compress it if greater than the defined max size.
        with Image.open(image_path) as image:
            if (image.size[0] * image.size[1]) > (max_height * max_width):
                image.thumbnail((max_height, max_width))
                resized_img = image.copy()
            else:
                resized_img = image
            img_byte_array = io.BytesIO()
            resized_img.save(img_byte_array, format=image.format)
            img_bytes = img_byte_array.getvalue()

        # Encode the image to base64
        image_encoded = base64.b64encode(img_bytes).decode("utf8")
        return image_encoded

In [6]:
# create a client for OSS
client = boto3_session.client("opensearchserverless")
service = "aoss"
region = boto3_session.region_name
credentials = boto3_session.get_credentials()
AWSAUTH = AWSV4SignerAuth(credentials, region, "aoss")
for key in variables["FashionAgentStack"].keys():
    if key.startswith("OpenSearchServerlessConstructsFashionAgentStackOSSEndpoint"):
        host = variables["FashionAgentStack"][key].removeprefix("https://")

#### Initialize an Opensearch client

In [7]:
# Create the client with SSL/TLS enabled.
OSSclient = OpenSearch(
    hosts=[{"host": host, "port": 443}],
    http_auth=AWSAUTH,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    pool_maxsize=20,
    timeout=3000,
)

### Create an index for the Opensearch ingestion
Opensearch Ingestion class (created in opensearch_utils.py) contains helper functions for the document processing and ingestion into the index

In [8]:
oss_instance = OpensearchIngestion(client=OSSclient, session=boto3_session)

In [None]:
oss_instance.create_index(config["opensearch"]["opensearch_index_name"])
oss_instance.create_index_mapping(config["opensearch"]["opensearch_index_name"])

### Ingest the images

In [17]:
dataset_path = Path("Fashion-Dataset-Images-Western-Dress/WesternDress_Images")
image_count = sum(
    1
    for item in dataset_path.iterdir()
    if item.is_file() and not item.name.startswith(".")
)


In [None]:
failed = []
for image_path in tqdm(dataset_path.iterdir(), total=image_count):
    try:
        (data, embedding) = oss_instance.create_titan_multimodal_embeddings(
            image_path=image_path
        )
        img_id = str(image_path).rsplit("/", 1)[1].split(".")[0]
        body = {
            "vector_field": embedding["embedding"],
            "image_b64": data["inputImage"],
        }
    except Exception as e:
        print(f"Exception thrown in image {image_path}: {e}")
        continue
    # Ingest the images one by one.
    status = oss_instance.client.index(
        index=config["opensearch"]["opensearch_index_name"],
        body=body,
    )
    if status["result"] != "created":
        failed.append(image_path)

print(f"Ingestion Complete. Failed ingestion for the following: {failed}")

##### Clean up will be done together by destroying the CDK

run the `cdk destroy` command in the Terminal