In [1]:
#make sure to run pip install requirements.txt to install libraries if you haven't already done so.
!pip install -r requirements.txt --quiet

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.32.113 requires botocore==1.34.113, but you have botocore 1.34.63 which is incompatible.
sparkmagic 0.21.0 requires pandas<2.0.0,>=0.17.1, but you have pandas 2.2.1 which is incompatible.
sphinx 7.2.6 requires docutils<0.21,>=0.18.1, but you have docutils 0.16 which is incompatible.[0m[31m
[0m

In [2]:
import boto3
import sagemaker
import json
import pprint as pp
import os
import shutil
import csv
import time

from opensearchpy import (
    AWSV4SignerAuth
)

#adding our utils library to sys path
import sys
sys.path.append("../lib/src/utils/")
import llm_utils

#to uncomment if you've modified llm_utils and need to refresh its content
#from importlib import reload
#reload(llm_utils)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


## Pre-requisites

Prior to going through this workshop you need to deploy the opensearch stack cdk to deploy the collection and the required policies.

We however need to add the execution role that you're using to run this notebook to the data access policy.

In [3]:
# serverless collection endpoint, without https://
#This would have been created from the execution of the CDK scrips in the os_cdk folder.
os_host = "7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com"

# e.g. us-east-1
REGION = 'us-east-1'

### Get current role/identity

First we identify the current identity. it might be an execution role if you're using sagemaker or a user if you're using visual studio with the AWS CLI plugin

In [7]:
sts_client = boto3.client('sts')

# Get the caller identity
caller_identity = sts_client.get_caller_identity()
account_id = caller_identity['Account']
identity_arn = sagemaker.get_execution_role()

print(f"Identity ARN:{identity_arn}")
print(f"Account ID:{account_id}")

Identity ARN:arn:aws:iam::327216439222:role/Sagemaker
Account ID:327216439222


In [20]:
# Create an IAM client
iam = boto3.client('iam')

# Define the policy document
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "*"
        }
    ]
}

# Create the IAM policy
response = iam.create_policy(
    PolicyName='SageMakerAOSSAccessPolicy',
    PolicyDocument=json.dumps(policy_document)
)

policy_arn = response['Policy']['Arn']

# Attach the policy to the SageMaker role
iam.attach_role_policy(
    RoleName=identity_arn.split("/")[-1],
    PolicyArn=policy_arn
)


{'ResponseMetadata': {'RequestId': 'fd34495a-d3fd-4d24-8a2c-0b180ecd4c64',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 30 May 2024 10:06:37 GMT',
   'x-amzn-requestid': 'fd34495a-d3fd-4d24-8a2c-0b180ecd4c64',
   'content-type': 'text/xml',
   'content-length': '212'},
  'RetryAttempts': 0}}

### Data Access Policy creation for opensearch

Note that a data policy has already been created via the cdk. we create a new one to attach the notebook's execution role or user in case it differs from the one which was used to deploy the CDK.

In [23]:
#wait for role to be updated with new policy.
time.sleep(10)

collection_name = "semantic-search"

# Create an OpenSearch Serverless client
opss_client = boto3.client('opensearchserverless')

data_access_policy = json.dumps([
      {
        "Rules": [
          {
            "Resource": [
              f"collection/{collection_name}"
            ],
            "Permission": [
              "aoss:CreateCollectionItems",
              "aoss:DeleteCollectionItems",
              "aoss:UpdateCollectionItems",
              "aoss:DescribeCollectionItems"
            ],
            "ResourceType": "collection"
          },
          {
            "Resource": [
              f"index/{collection_name}/*"
            ],
            "Permission": [
              "aoss:CreateIndex",
              "aoss:DeleteIndex",
              "aoss:UpdateIndex",
              "aoss:DescribeIndex",
              "aoss:ReadDocument",
              "aoss:WriteDocument"
            ],
            "ResourceType": "index"
          }
        ],
        "Principal": [
          identity_arn
        ],
        "Description": "data-access-rule"
      }
    ], indent=2)

data_access_policy_name = f"{collection_name}-policy-notebook"

# Create the data access policy
response = opss_client.create_access_policy(
    clientToken='unique-token-1234',
    description='Data access policy for semantic search collection',
    name=data_access_policy_name,
    policy=str(data_access_policy),
    type='data'
)

print(response)

{'accessPolicyDetail': {'createdDate': 1717063744973, 'description': 'Data access policy for semantic search collection', 'lastModifiedDate': 1717063744973, 'name': 'semantic-search-policy-notebook', 'policy': [{'Rules': [{'Resource': ['collection/semantic-search'], 'Permission': ['aoss:CreateCollectionItems', 'aoss:DeleteCollectionItems', 'aoss:UpdateCollectionItems', 'aoss:DescribeCollectionItems'], 'ResourceType': 'collection'}, {'Resource': ['index/semantic-search/*'], 'Permission': ['aoss:CreateIndex', 'aoss:DeleteIndex', 'aoss:UpdateIndex', 'aoss:DescribeIndex', 'aoss:ReadDocument', 'aoss:WriteDocument'], 'ResourceType': 'index'}], 'Principal': ['arn:aws:iam::327216439222:role/Sagemaker'], 'Description': 'data-access-rule'}], 'policyVersion': 'MTcxNzA2Mzc0NDk3M18x', 'type': 'data'}, 'ResponseMetadata': {'RequestId': '0b54e271-091c-478c-a405-2e13b52945ad', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '0b54e271-091c-478c-a405-2e13b52945ad', 'date': 'Thu, 30 May 2024 1

## Create semantic search engine with Amazon OpenSearch Service Serverless

### Opensearch data access policy update

To access our opensearch serverless collection, we need to update its data policy update with the current user

In [24]:
#opensearch serverless service, aka aoss
service = 'aoss'

#get an Auth object to call aoss
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, REGION, service)

#LLMUtils.connect_to_aoss() can be found in lib/src/utils/ folder.
aoss_client = llm_utils.connect_to_aoss(auth, os_host)


2024-05-30 10:09:13,393 - botocore.credentials - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


### Create OpenSearch index



In [25]:
#opensearch index name
index_name = "movies-index"

#data column to add to the index
data_columns = ['tmdb_id', 'original_language', 'original_title', 'description', 'genres', 'year', 'keywords', 'director', 'actors', 'popularity', 'popularity_bins',
                  'vote_average', 'vote_average_bins']

In [26]:
#to delete the index, uncomment the below line. useful if you're re-running the notebook several times.
#aoss_client.indices.delete(index=index_name)

In [27]:
#index configuration. note that we're adding both text metadata as well as the vector_index property that will be storing our embedding for each title.
# For additional information on the K-NN index configuration, please read the below documentation.
#https://opensearch.org/docs/latest/field-types/supported-field-types/knn-vector/
#https://opensearch.org/docs/latest/search-plugins/knn/knn-index/

index_body = {
  "settings": {
    "index": {
      'number_of_shards': 4,
      "number_of_replicas": 0,
      "knn": True,
      "knn.algo_param.ef_search": 100
    }
  },
  "mappings": {
    "properties": {
      "tmdb_id": {"type": "integer"},
      "original_language": {"type": "text"},
      "original_title": {"type": "text"},
      "description": {"type": "text"},
      "genres": {"type": "text"},
      "year": {"type": "integer"},
      "keywords": {"type": "text"},
      "director": {"type": "text"},
      "actors": {"type": "text"},
      "popularity": {"type": "float"},
      "popularity_bins": {"type": "text"},
      "vote_average": {"type": "float"},
      "vote_average_bins": {"type": "text"},
      "vector_index": {
        "type": "knn_vector",
        "dimension": 1024, #if you use cohere: dimension of the embedding is 1024, for titan: 1536
        "method": {
          "name": "hnsw",
          "space_type": "l2",
          "engine": "nmslib",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      }
    }
  }
}

In [28]:
#get a list of the indexes already existing
indexes = aoss_client.indices.get_alias("*")
indexes_list = list(indexes.keys())

2024-05-30 10:09:30,662 - opensearch - GET https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/*/_alias [status:200 request:0.142s]


In [29]:
#check if index doesn't already exist and create it
if index_name not in indexes_list:
    print('Creating index:\n')
    create_response = aoss_client.indices.create(index_name, body=index_body)
    print(create_response)
else:
    print("index already exists")

Creating index:



2024-05-30 10:09:35,428 - opensearch - PUT https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index [status:200 request:0.619s]


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'movies-index'}


In [30]:
#display information on the index you just created

# Get index mapping
response = aoss_client.indices.get_mapping(index=index_name)
pp.pprint(response) 

# Get index settings
response = aoss_client.indices.get_settings(index=index_name)
pp.pprint(response)

# Get index aliases
response = aoss_client.indices.get_alias(index=index_name) 
pp.pprint(response)

2024-05-30 10:09:45,268 - opensearch - GET https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_mapping [status:200 request:0.034s]
2024-05-30 10:09:45,306 - opensearch - GET https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_settings [status:200 request:0.034s]
2024-05-30 10:09:45,332 - opensearch - GET https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_alias [status:200 request:0.023s]


{'movies-index': {'mappings': {'properties': {'actors': {'type': 'text'},
                                              'description': {'type': 'text'},
                                              'director': {'type': 'text'},
                                              'genres': {'type': 'text'},
                                              'keywords': {'type': 'text'},
                                              'original_language': {'type': 'text'},
                                              'original_title': {'type': 'text'},
                                              'popularity': {'type': 'float'},
                                              'popularity_bins': {'type': 'text'},
                                              'tmdb_id': {'type': 'integer'},
                                              'vector_index': {'dimension': 1024,
                                                               'method': {'engine': 'nmslib',
                                      

## Create embeddings from CSV file

In [31]:
#small dataset with 1000 movies
movies_data_path = "../dataset/movies_metadata_small.csv"

#full dataset with 45K movies
#movies_data_path = "../dataset/movies_metadata_45K.csv"

In [32]:
#get bedrock instances with boto3
bedrock = boto3.client('bedrock')
bedrock_client = boto3.client('bedrock-runtime')

In [33]:
#local folder where embeddings will be generated
embeddings_folder_path = "../tmp/embeddings"

if os.path.exists(embeddings_folder_path):
  print("Folder already exists, deleting contents")
  for filename in os.listdir(embeddings_folder_path):
    file_path = os.path.join(embeddings_folder_path, filename)
    if os.path.isfile(file_path) or os.path.islink(file_path):
        os.unlink(file_path)
    elif os.path.isdir(file_path):
        shutil.rmtree(file_path)
  os.rmdir(embeddings_folder_path)

print("Recreating folder")  
os.makedirs(embeddings_folder_path)

Recreating folder


We're now ready to generate the embeddings. expect it to take around 20min

In [34]:
# Limit the number of records to process in each block
block_size = 100

with open(movies_data_path) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    
    #get header
    header = next(csv_reader)
    
    #counter for line in csv file
    line_num = 1

    #document buffer
    documents = []

    for row in csv_reader:

        #create dict with title's metadata using the header and the row values
        title_metadata = dict()
        for col in header:
            title_metadata[col] = row[header.index(col)]

        #generate embedding with Bedrock
        vector_embedding = llm_utils.get_embeddings_from_text(json.dumps(title_metadata), "cohere", input_type="search_document")

        #merge vector and metadata
        request_body_dict = dict()
        request_body_dict['id'] = line_num
        request_body_dict['vector_index'] = vector_embedding
        request_body_dict = request_body_dict | title_metadata
        
        #dict to json string
        request_body = json.dumps(request_body_dict)

        #add to documents
        documents.append(request_body)

        #write down the json file every line_num
        if line_num > 0 and line_num % block_size == 0:
            print('writing file')
            block_num = line_num // block_size
            output_file_path = f"{embeddings_folder_path}/embeddings_block{block_num}.json"
            with open(output_file_path, 'w') as output_file:
                json.dump(documents, output_file, indent=2)
                print(f"Processed {len(documents)} records and saved to {output_file_path}")
                #reset documents buffer
                documents = []

        line_num += 1

    if documents:
        # Write the remaining documents to a file
        output_file_path = f"{embeddings_folder_path}/embeddings_block{block_num + 1}.json"
        with open(output_file_path, 'w') as output_file:
            json.dump(documents, output_file, indent=2)

        print(f"Processed {len(documents)} records and saved to {output_file_path}")

writing file
Processed 100 records and saved to ../tmp/embeddings/embeddings_block1.json
writing file
Processed 100 records and saved to ../tmp/embeddings/embeddings_block2.json


## Load embeddings in index

In [35]:
#format the data to match format expected by opensearch bulk ingest
def format_data_for_bulk_import(data):
    actions = []
    for doc in data:
        #str to dict
        doc_dict = json.loads(doc)

        #building the json format required for bulk index
        actions.append({
            "_op_type": "index",
            "_index": index_name,
            #"_id": doc_dict['id'],  #not allowed for index operation
            "_source": {
                "vector_index": doc_dict["vector_index"],
                "tmdb_id" : doc_dict['tmdb_id'],
                "original_language" : doc_dict['original_language'],
                "original_title" : doc_dict['original_title'],
                "description" : doc_dict['description'],
                "genres" : doc_dict['genres'],
                "year" : doc_dict['year'],
                "keywords" : doc_dict['keywords'],
                "director" : doc_dict['director'],
                "actors" : doc_dict['actors'],
                "popularity" : doc_dict['popularity'],
                "popularity_bins" : doc_dict['popularity_bins'],
                "vote_average" : doc_dict['vote_average'],
                "vote_average_bins" : doc_dict['vote_average_bins']
            }
        })
    return actions

In [36]:
from opensearchpy.helpers import bulk

#reconnect to avoid potential timeout of the session
aoss_client = llm_utils.connect_to_aoss(auth, os_host)

# Iterate through each JSON file
for filename in os.listdir(embeddings_folder_path):
    
    file_path = os.path.join(embeddings_folder_path, filename)

    # Load JSON file
    with open(file_path, "r") as file:
        data = json.load(file)

        #format data into actions
        actions = format_data_for_bulk_import(data)

        print(f"Indexing {filename}")

        # Use the bulk API to insert documents from the file
        success, failed = bulk(
            aoss_client,
            actions,
            index=index_name, 
            raise_on_exception=True
        )

    print(f"Indexed {success} documents successfully, {failed} documents failed for file: {filename}")

Indexing embeddings_block1.json


2024-05-30 10:15:13,270 - opensearch - POST https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_bulk [status:200 request:3.071s]


Indexed 100 documents successfully, [] documents failed for file: embeddings_block1.json
Indexing embeddings_block2.json


2024-05-30 10:15:14,976 - opensearch - POST https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_bulk [status:200 request:1.592s]


Indexed 100 documents successfully, [] documents failed for file: embeddings_block2.json


In [37]:
#checking how many document we have in the index (might need a refresh. you should see 1000 if you've indexed the small dataset)
time.sleep(30)
print(f"number of docs in index:{aoss_client.count(index=index_name)}")

2024-05-30 10:15:47,223 - opensearch - POST https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_count [status:200 request:1.635s]


number of docs in index:{'count': 102, '_shards': {'total': 0, 'successful': 0, 'skipped': 0, 'failed': 0}}


## Search the index

### Using vector embeddings and KNN

In [38]:
#example question
question = "list horror movies that take place in nature"

#we generate the vectorised version of the question
question_embedding = llm_utils.get_embeddings_from_text(question, "cohere", input_type="search_query")

#number of documents to retrieve
k = 5

query = {
    "size": k,
    "query": {
        "knn": {
        "vector_index": {
            "vector": question_embedding,
            "k": k
        }
        }
    },
    "_source": data_columns
}

search_response = aoss_client.search(body=query, index=index_name)

2024-05-30 10:16:09,966 - opensearch - POST https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_search [status:200 request:0.291s]


In [39]:
#extract object from os response
response = llm_utils.extract_response_from_os_response(search_response)
response

[{'tmdb_id': '281957',
  'original_language': 'en',
  'original_title': 'The Revenant',
  'keywords': 'father son relationship,rape,based on novel,mountain,winter,grizzly bear,wilderness,frontier,revenge,murder,native american,survival,bear,snow,violence,animal death,bear attack,death of son,based on true events,fur trapper',
  'year': '2015',
  'director': 'Alejandro González Iñárritu',
  'description': 'In the 1820s, a frontiersman, Hugh Glass, sets out on a path of vengeance against those who left him for dead after a bear mauling.',
  'popularity_bins': 'Very High',
  'actors': 'Leonardo DiCaprio,Tom Hardy,Will Poulter',
  'genres': 'Western,Drama,Adventure,Thriller',
  'popularity': '23.5',
  'vote_average': '7.3',
  'vote_average_bins': 'Very High'},
 {'tmdb_id': '129',
  'original_language': 'ja',
  'original_title': '千と千尋の神隠し',
  'keywords': 'witch,parent child relationship,magic,darkness,bath house,ghost world,parallel world,amusement park,youkai,japanese mythology,anime,spiri

### Using the normal opensearch index (and not the embeddings)

In [40]:
query = {
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "original_title": "deadpool"
                    }
                },
                {
                    "match_phrase": {
                        "actors": "Ryan Reynolds"
                    }
                }
            ]
        }
    },
    "size": 10,
    "sort": [
        {
            "popularity": {
                "order": "desc"
            }
        }
    ]
}

search_response = aoss_client.search(body=query, index=index_name)


2024-05-30 10:16:31,614 - opensearch - POST https://7cw6yxi2593yos7odti7.us-east-1.aoss.amazonaws.com:443/movies-index/_search [status:200 request:0.145s]


In [41]:
response = llm_utils.extract_response_from_os_response(search_response)
#removing the vector_index as we are not using it in that scenario
if len(response) > 0:
    response[0].pop("vector_index")

response

[{'tmdb_id': '293660',
  'original_language': 'en',
  'original_title': 'Deadpool',
  'description': 'Deadpool tells the origin story of former Special Forces operative turned mercenary Wade Wilson, who after being subjected to a rogue experiment that leaves him with accelerated healing powers, adopts the alter ego Deadpool. Armed with his new abilities and a dark, twisted sense of humor, Deadpool hunts down the man who nearly destroyed his life.',
  'genres': 'Action,Adventure,Comedy',
  'year': '2016',
  'keywords': 'anti hero,mercenary,marvel comic,superhero,based on comic,breaking the fourth wall,aftercreditsstinger,duringcreditsstinger,self healing',
  'director': 'Tim Miller',
  'actors': 'Ryan Reynolds,Morena Baccarin,Ed Skrein',
  'popularity': '187.9',
  'popularity_bins': 'Very High',
  'vote_average': '7.4',
  'vote_average_bins': 'Very High'}]

In [43]:
%store index_name
%store os_host
%store collection_name

Stored 'index_name' (str)
Stored 'os_host' (str)
Stored 'collection_name' (str)
