In [None]:
# define config 

# Specify the S3 bucket name
bucket_name = 'S3_bucket_name'

# OpenSearch domain information
region = 'region' # Replace with your OpenSearch region
host = 'open-search-host' # Replace with your OpenSearch host
username = 'username'  # Replace with your OpenSearch username
password = 'password'  # Replace with your OpenSearch password

index_name = 'vector_search_index'

In [None]:
import json
import boto3
import os
from typing import Optional
from botocore.config import Config
import os
import sys
import numpy as np
from opensearchpy import OpenSearch, RequestsHttpConnection
import time
import torch

In [None]:
with open('imdb_schema.jsonl', 'r') as file:
    schema = json.load(file) 

In [None]:
# Initialize the S3 client
s3_client = boto3.client('s3')

# List all objects in the bucket
response = s3_client.list_objects_v2(Bucket=bucket_name)

# Retrieve bucket name and keys
if 'Contents' in response:
    for obj in response['Contents']:
        key = obj['Key']
        print(f'Bucket: {bucket_name}, Key: {key}')
else:
    print("No objects found in the bucket.")

In [None]:
#Create the connection to Bedrock
bedrock = boto3.client(
    service_name='bedrock',
    region_name='us-west-2', 
    
)
 
bedrock_runtime = boto3.client(
    service_name='bedrock-runtime',
    region_name='us-west-2', 
    
)
 
# Let's see all available Amazon Models
available_models = bedrock.list_foundation_models()

In [None]:
# confirm available models

for model in available_models['modelSummaries']:
    print(model['modelId'])

In [None]:
def get_bedrock_client(assumed_role: Optional[str] = None, region: Optional[str] = 'us-east-1',runtime: Optional[bool] = True,external_id=None, ep_url=None):
    """Create a boto3 client for Amazon Bedrock, with optional configuration overrides 
    """
    target_region = region

    print(f"Create new client\n  Using region: {target_region}:external_id={external_id}: ")
    session_kwargs = {"region_name": target_region}
    client_kwargs = {**session_kwargs}

    profile_name = os.environ.get("AWS_PROFILE")
    if profile_name:
        print(f"  Using profile: {profile_name}")
        session_kwargs["profile_name"] = profile_name

    retry_config = Config(
        region_name=target_region,
        retries={
            "max_attempts": 10,
            "mode": "standard",
        },
    )
    session = boto3.Session(**session_kwargs)

    if assumed_role:
        print(f"  Using role: {assumed_role}", end='')
        sts = session.client("sts")
        if external_id:
            response = sts.assume_role(
                RoleArn=str(assumed_role),
                RoleSessionName="langchain-llm-1",
                ExternalId=external_id
            )
        else:
            response = sts.assume_role(
                RoleArn=str(assumed_role),
                RoleSessionName="langchain-llm-1",
            )
        print(f"Using role: {assumed_role} ... sts::successful!")
        client_kwargs["aws_access_key_id"] = response["Credentials"]["AccessKeyId"]
        client_kwargs["aws_secret_access_key"] = response["Credentials"]["SecretAccessKey"]
        client_kwargs["aws_session_token"] = response["Credentials"]["SessionToken"]

    if runtime:
        service_name='bedrock-runtime'
    else:
        service_name='bedrock'

    if ep_url:
        bedrock_client = session.client(service_name=service_name,config=retry_config,endpoint_url = ep_url, **client_kwargs )
    else:
        bedrock_client = session.client(service_name=service_name,config=retry_config, **client_kwargs )

    print("boto3 Bedrock client successfully created!")
    print(bedrock_client._endpoint)
    return bedrock_client

In [None]:
class TitanEmbeddings(object):
    accept = "application/json"
    content_type = "application/json"
    
    def __init__(self, model_id="amazon.titan-embed-text-v2:0", boto3_client=None, region_name='us-east-1'):
        
        if boto3_client:
            self.bedrock_boto3 = boto3_client
        else:
            # self.bedrock_boto3 = boto3.client(service_name='bedrock-runtime')
            self.bedrock_boto3 = boto3.client(
                service_name='bedrock-runtime', 
                region_name=region_name, 
            )
        self.model_id = model_id

    def __call__(self, text, dimensions, normalize=True):
        """
        Returns Titan Embeddings

        Args:
            text (str): text to embed
            dimensions (int): Number of output dimensions.
            normalize (bool): Whether to return the normalized embedding or not.

        Return:
            List[float]: Embedding
            
        """

        body = json.dumps({
            "inputText": text,
            "dimensions": dimensions,
            "normalize": normalize
        })

        response = self.bedrock_boto3.invoke_model(
            body=body, modelId=self.model_id, accept=self.accept, contentType=self.content_type
        )

        response_body = json.loads(response.get('body').read())

        return response_body['embedding']

In [None]:
boto3_bedrock_runtime = get_bedrock_client() #boto3.client('bedrock')

bedrock_embeddings = TitanEmbeddings(model_id="amazon.titan-embed-text-v2:0", boto3_client=boto3_bedrock_runtime)
bedrock_embeddings

In [None]:
class TitanV2Model():
    def __init__(self) -> None:

        self.br_embeddings = None     
        self._init_connection()
        
        print(f"TitanV2Model:__init__::ready:to:Invoke:::successful::") 
    
    def _init_connection(self, dim=256):
        boto3_bedrock_runtime = get_bedrock_client() #boto3.client('bedrock')

        self.br_embeddings = TitanEmbeddings(model_id="amazon.titan-embed-text-v2:0", boto3_client=boto3_bedrock_runtime)
        self.dim = dim
        
    def process_dict_text(self, single_text_dict):
        """ **IMPORTANT** CHANGE this Code to be tuned to your data set and use this -- DO NOT USE THIS AS IS. refer to  https://github.com/embeddings-benchmark/mteb/blob/main/mteb/abstasks/AbsTaskRetrieval.py as an example. Please goover this Git hub in detail"""
        single_text = [str(key).strip() + " " + str(val).strip() if 'title' in key.lower() else str(val).strip() for key, val in single_text_dict.items()]
        return " ".join(single_text)[:30000]
        
    def reorg_text(self, single_text):
        
        """ **IMPORTANT** CHANGE this Code to be tuned to your data set and use this -- DO NOT USE THIS AS IS. refer to  https://github.com/embeddings-benchmark/mteb/blob/main/mteb/abstasks/AbsTaskRetrieval.py as an example. Please goover this Git hub in detail"""
        if isinstance(single_text, dict):
            single_text = self.process_dict_text(single_text)
        single_text = "0" if not single_text else single_text 
        # check for json -- 
        try:
            single_text_dict = json.loads(single_text)
            single_text = self.process_dict_text(single_text_dict)
        except:
            pass
        return single_text
    
    def invoke_model(self, text_list: list[str]):
        """ **IMPORTANT** CHANGE this Code to be tuned to your data set and use this -- DO NOT USE THIS AS IS. refer to  https://github.com/embeddings-benchmark/mteb/blob/main/mteb/abstasks/AbsTaskRetrieval.py as an example. Please goover this Git hub in detail"""
        list_embeddings = []
        
        for single_text in text_list:
            single_text = self.reorg_text(single_text)
            single_embed = bedrock_embeddings(text=single_text, dimensions=self.dim, normalize=True)
            list_embeddings.append(single_embed)

        return list_embeddings

    def reshape_titan_embeddings(self, query_embeddings: np.ndarray, **kwargs) -> list[np.ndarray]:
        # - use this to re shape your embeddings as needed
        return query_embeddings # 
        
        
    def encode(self, queries: list[str], **kwargs) -> list[np.ndarray] | list[torch.Tensor] : # - | list[torch.Tensor] 
        """
        Returns a list of embeddings for the given sentences.
        Args:
            queries: List of sentences to encode

        Returns:
            List of embeddings for the given sentences
        """
        
        embedding_list = self.invoke_model(queries)
        return self.reshape_titan_embeddings(np.array(embedding_list))

    
    def encode_queries(self, queries: list[str], **kwargs) -> list[np.ndarray] | list[torch.Tensor] : # - | list[torch.Tensor] 
        """
        Returns a list of embeddings for the given sentences.
        Args:
            queries: List of sentences to encode

        Returns:
            List of embeddings for the given sentences
        """
        
        embedding_list = self.invoke_model(queries)
        return self.reshape_titan_embeddings(np.array(embedding_list))


    def encode_corpus(self, corpus: list[str] | list[dict[str, str]], **kwargs) -> list[np.ndarray] | list[torch.Tensor] : #- | list[torch.Tensor]:
        """
        Returns a list of embeddings for the given sentences.
        Args:
            corpus: List of sentences to encode
                or list of dictionaries with keys "title" and "text"

        Returns:
            List of embeddings for the given sentences
        """
        
        embedding_list = self.invoke_model(corpus)
        return self.reshape_titan_embeddings(np.array(embedding_list))

In [None]:
final_output = TitanV2Model().encode_queries(schema)

print(f"Embneddings Generated ::")
print(f"shape:of:embeddings -- > length of embeddings={len(final_output)}::")
print(f"shape:of:embeddings -- > {len(final_output[0])}::")

final_output

In [None]:
# Initialize OpenSearch client with basic authentication
client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=(username, password),
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# Set number of shards and replicas
num_shards = 4

index_body = {
  'settings': {
    'index': {
      'number_of_shards': 4,
       "knn": True 
    }
  },
    
    "mappings": {
        "properties": {
            "embedding": {
                "type": "knn_vector",
                "dimension": 256,  # Adjust this to the dimension of your vectors
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "nmslib",
                    "parameters": {
                        "ef_construction": 128,
                        "m": 16
                    }
                }
            }
        }
    }
}

# Create index
try:
    response = client.indices.create(index=index_name, body=index_body)
    print(f"Index creation response: {response}")
except Exception as e:
    print(f"Error creating index: {str(e)}")

# Verify index existence
response = client.indices.exists(index=index_name)
print(f"Index exists: {response}")

In [None]:
for i, embedding in enumerate(final_output):
    document = {
        "embedding": embedding.tolist()
    }
    response = client.index(index=index_name, id=f"doc_{i}", body=document)
    print(f"Document {i} response: {response}")

In [None]:
response = client.indices.exists(index=index_name)
print(f"Index exists: {response}")

# Test


In [None]:
# Example embeddings, replace with actual embeddings
embeddings = np.random.rand(256).tolist()

# Index a sample document
document = {
    "embedding": embeddings
}

try:
    response = client.index(index=index_name, id="doc_1", body=document)
    print(f"Document indexing response: {response}")
except Exception as e:
    print(f"Error indexing document: {str(e)}")


In [None]:
query = {
    "size": 5,
    "query": {
        "knn": {
            "embedding": {
                "vector": embeddings,
                "k": 5
            }
        }
    }
}

try:
    response = client.search(index=index_name, body=query)
    print(f"Search response: {response}")
except Exception as e:
    print(f"Error performing search: {str(e)}")
