# Using AWS Opensearch ML Commons REST API for language detection

Amazon Opensearch 2.15.0 has a new ML inference processor that enables users to enrich ingest pipelines using inferences from OpenSearch-provided pretrained models. The ml_inference processor is used to invoke machine learning (ML) models registered in the OpenSearch ML Commons plugin. The model outputs are added as new fields to the ingested documents.

ML Commons for OpenSearch makes it easy to develop new machine learning features within Opensearch. The plugin allows machine learning engineers and developers to leverage existing opensource machine learning algorithms and streamlines the efforts to build new machine learning features.  

We will be deploying two models in this notebook, the first is an Amazon Comprehend model. The model examines the input text, detects the language using the Amazon Comprehend DetectDominantLanguage API, and sets a corresponding language code.

The second model is Amazon's Titan embeddings model v2 available on Amazon Bedrock. In this notebook, we will use this embeddings model to create vectors of text in three languages (English, French and German). These vectors will then be stored in Amazon OpenSearch and allow for semantic searches to be used across the language sets.

This notebook will walk through creating an Amazon OpenSearch connector, model, ingest pipeline, testing, and vector semantic search.

Note: This functionality is available in **Amazon OpenSearch** 2.15.0 or later (we release odd versions), and Opensearch 2.14.0 or later


#### Step 1. Install dependancies needed for this notebook.

Ignore the ERROR about pip's dependencies.

In [None]:
%pip install sagemaker requests-aws4auth GitPython opensearch-py --upgrade --quiet

#### Step 2. Install git-lfs so that we can clone the model repos to our notebook

In [None]:
!sudo yum install -y amazon-linux-extras
!sudo amazon-linux-extras install epel -y 
!sudo yum-config-manager --enable epel
!sudo yum install git-lfs -y
!git lfs install

#### Step 3.  Store the Cloudformation output values as variables

This code block will store your cloudformation stack outputs as variables: S3BucketName, SageMakerExecutionRoleArn, SageMakerOpenSearchRoleArn, OpensearchDashboardsURL, OpensearchDomainEndpoint

In [None]:
import boto3

cf_client = boto3.client('cloudformation')

# Replace 'YourStackName' with the actual name of your CloudFormation stack
stack_name = 'opensearch-lang-detect-demo'

# Get stack outputs
response = cf_client.describe_stacks(StackName=stack_name)
outputs = response['Stacks'][0]['Outputs']

# Create a dictionary to store the outputs
output_dict = {output['OutputKey']: output['OutputValue'] for output in outputs}

# Retrieve values from the output dictionary
s3BucketName = output_dict.get('S3BucketName', 'Not found')
sageMakerExecutionRoleArn = output_dict.get('SageMakerExecutionRoleArn', 'Not found')
sageMakerOpenSearchRoleArn = output_dict.get('SageMakerOpenSearchRoleArn', 'Not found')
opensearchDomainEndpoint = output_dict.get('OpenSearchDomainEndpoint', 'Not found')

print('S3 Bucket Name: ' + s3BucketName)
print('SageMaker Execution Role Arn: ' + sageMakerExecutionRoleArn)
print('SageMaker OpenSearch Role Arn: ' + sageMakerOpenSearchRoleArn)
print('OpenSearch Domain Endpoint: ' + opensearchDomainEndpoint)

if 'OpenSearchDashboardsURL' in output_dict:
    print('OpenSearch Dashboards URL: ' + output_dict['OpenSearchDashboardsURL'])

#### Step 4. Add the SageMaker Execution role to OpenSearch

For us to be able to interact with OpenSearch from the notebook we need to allow the SageMaker execution role that was created by the CloudFormationTemplate to perform actions in OpenSearch.

Navigate to OpenSearch Dashboard (from the Dashboard URL created in step 3) and login using the username and password you provided when deploying your cloudformation stack.  
![Dashboard](images/1_dashboard.png)

Then navigate to Security using the left hand menu.
![Security](images/2_security.png)

Next select **Roles** from the Security left hand menu.
![Roles](images/3_roles.png)

From the roles screen select **all_access**
![all access](images/4_all_access.png)

Select the **Mapped users** tab and then click on the **Manage mapping** button.
![mapped users](images/5_mapped_users.png)

Provide the **SageMaker Execution Role Arn**. (this was printed out in Step 3 above)
![mapped users](images/6_backend_roles.png)

Click on the **Map** button.

Navigate back to the **Roles** screen by using the breadcrumb at the top of the dashboard.

Search for the **ml_full_access** role and select it.
![mapped users](images/7_ml_full_access.png)

Select the **Mapped users** tab and then click on the **Manage mapping** button.
![mapped users](images/8_ml_full_access_tabs.png)

Provide the **SageMaker OpenSearch Role Arn** (this was printed out in Step 3 above)
![mapped users](images/9_add_role.png)

Click on the **Map** button.

#### Step 5. Setup the commons connector

We need to enable access control for the connector to talk to SageMaker.

In [None]:
import requests
import boto3
import time
from requests_aws4auth import AWS4Auth

service = 'es'
session = boto3.Session()
region = session.region_name
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, service, session_token=credentials.token)

host= 'https://' + opensearchDomainEndpoint

# Register repository
path = '/_cluster/settings'
url = host + path

payload = {
    "persistent": {
        "plugins.ml_commons.trusted_connector_endpoints_regex": [
        """^https://runtime\.sagemaker\..*[a-z0-9-]\.amazonaws\.com/.*$""",
        """^https://bedrock-runtime\..*[a-z0-9-]\.amazonaws\.com/.*$""",
        """^https://comprehend\..*[a-z0-9-]\.amazonaws\.com$"""

    ]
    }
    }
headers = {"Content-Type": "application/json"}

response = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

## Comprehend Language Classification Model

The first model will be built from the Amazon Comprehend service. This service examines the input text, detects the language using the Amazon Comprehend DetectDominantLanguage API, and sets a corresponding language code.

#### Step 6. Create the connector for the Comprehend Language Classification model

Now we will create the connector and model for Amazon Comprehend

In [None]:
comprehend = boto3.client('comprehend', region_name='us-east-1')
path = '/_plugins/_ml/connectors/_create'
url = host + path
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

payload = {
  "name": "Comprehend lang identification",
  "description": "comprehend model",
  "version": 1,
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": sageMakerOpenSearchRoleArn
  },
  "parameters": {
    "region": "us-east-1",
    "service_name": "comprehend",
    "api_version": "20171127",
    "api_name": "DetectDominantLanguage",
    "api": "Comprehend_${parameters.api_version}.${parameters.api_name}",
    "response_filter": "$"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://${parameters.service_name}.${parameters.region}.amazonaws.com",
      "headers": {
        "content-type": "application/x-amz-json-1.1",
        "X-Amz-Target": "${parameters.api}"
      },
      "request_body": "{\"Text\": \"${parameters.Text}\"}" 
    }
  ]
}
# headers = {"Content-Type": "application/json"}

comprehend_connector_response = requests.post(url, auth=awsauth, json=payload, headers=headers)
comprehend_connector = comprehend_connector_response.json()["connector_id"]
print('Connector id: ' + comprehend_connector)
# print(comprehend_connector_response.text)

#### Step 7. Register the Comprehend model

We now register the Comprehend model to the model group and the connector that we created.

In [None]:
path = '/_plugins/_ml/models/_register'
url = host + path
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

payload = {
    "name": "comprehend lang id model",
    "function_name": "remote",
    "description": "test model",
    "connector_id": comprehend_connector
}
headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
# print(response.json())
comprehend_model_id = response.json()['model_id']
print('Model id: ' + comprehend_model_id)

#### Step 8. Deploy the Comprehend model

In [None]:
path = '/_plugins/_ml/models/'+ comprehend_model_id + '/_deploy'
url = host + path

headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, headers=headers)
print(response.json())

#### Step 9. Test the Comprehend model through OpenSearch

In [None]:
path = '/_plugins/_ml/models/'+ comprehend_model_id + '/_predict'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
    "parameters": {
        "Text": "你知道厕所在哪里吗"
    }
}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(response.json())


#### Step 10. Create the comprehend index pipeline

Now we will create the pipeline for the index, this is how we tell OpenSearch to send the field(s) we wanted translations for to the Comprehend endpoint.

In [None]:
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

path = '/_ingest/pipeline/comprehend_language_identification_pipeline'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
  "description": "ingest reviews and identify lang with the comprehend model",
  "processors":[
    {
      "ml_inference": {
        "model_id": comprehend_model_id,
        "input_map": [
            {
               "Text": "Text"
            }
        ],
        "output_map": [
            {
                
            "detected_language": "response.Languages[0].LanguageCode",
            "language_score": "response.Languages[0].Score"
            }
        ]
      }
    }
  ]
}
response = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

#### Step 11. Create the Comprehend index & test

Next we create the index using the pipeline.  

In [None]:
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

index_name = 'comprehend_lang_ident_test01'
path = '/' + index_name

headers = {"Content-Type": "application/json"}

index_settings = {
    "settings": {
        "index": {
            "default_pipeline": "comprehend_language_identification_pipeline"
        }
    }
}

# Send the PUT request to create the index
url = host + path
response = requests.put(url, auth=awsauth, json=index_settings, headers=headers)

# Print the response
print(response.json())

In [None]:
index_name = 'comprehend_lang_ident_test01'
path = '/' + index_name + '/_doc/'

headers = {"Content-Type": "application/json"}

# Define the document to index
document = {
    "Text": "parlez vous francais?"
}

url = host + path
response = requests.post(url, auth=awsauth, json=document, headers=headers)


print(response.json())

doc_id = response.json()['_id']

# Retrieve the indexed document
get_path = '/' + index_name + '/_doc/' + doc_id
get_url = host + get_path
get_response = requests.get(get_url, auth=awsauth, headers=headers)

# Print the retrieved document
print(get_response.json())

## Amazon Bedrock: Multilingual Vector Semantic Search

In the Sagemaker instance that has been deployed for you, you can see the following files before you click into the notebook you worked on in part 1: english.json, french.json, german.json. These documents have sentences in their respective languages that talk about the term “spring” in different contexts. These contexts include spring as a verb, ie. moving suddenly, includes spring as a noun ie. the season of Spring, and finally spring in the context of a mechanical part. In this section, we will deploy Amazon's Titan embeddings model v2 using the ML commons plugin for Amazon Bedrock. We will then use this embeddings model to create vectors of text in three languages from ingesting the different language json files. Finally, these vectors will be stored in Amazon OpenSearch and allow for semantic searches to be used across the language sets.

#### Step 1. Load Sentences from json files into dataframes


First we must load the json document sentences into dataframes for more structured organization. Each
row can contain the text, embeddings, and additional contextual information:

In [None]:
import json
import pandas as pd

def load_sentences(file_name):
    sentences = []
    with open(file_name, 'r', encoding='utf-8') as file:
        for line in file:
            try:
                data = json.loads(line)
                if 'sentence' in data and 'sentence_english' in data:
                    sentences.append({
                        'sentence': data['sentence'],
                        'sentence_english': data['sentence_english']
                    })
            except json.JSONDecodeError:
                # Skip lines that are not valid JSON (like the index lines)
                continue
    
    return pd.DataFrame(sentences)

# Usage
german_df = load_sentences('german.json')
english_df = load_sentences('english.json')
french_df = load_sentences('french.json')
# print(french_df.head())

#### Step 2. Create the OpenSearch Commons Connector to Bedrock

In [None]:
#create bedrock connector

path = '/_plugins/_ml/connectors/_create'
url = host + path
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

payload = {
  "name": "Amazon Bedrock Connector: embedding",
  "description": "The connector to bedrock Titan embedding model",
  "version": 1,
  "protocol": "aws_sigv4",
  "parameters": {
    "region": "us-east-1",
    "service_name": "bedrock",
    "model": "amazon.titan-embed-text-v2:0",
    "dimensions": 1024,
    "normalize": True,
    "embeddingTypes": ["float"]
  },
  "credential": {
    "roleArn": sageMakerOpenSearchRoleArn
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/invoke",
      "headers": {
        "content-type": "application/json",
        "x-amz-content-sha256": "required"
      },
      "request_body": "{ \"inputText\": \"${parameters.inputText}\", \"dimensions\": ${parameters.dimensions}, \"normalize\": ${parameters.normalize}, \"embeddingTypes\": ${parameters.embeddingTypes} }",
      "pre_process_function": "connector.pre_process.bedrock.embedding",
      "post_process_function": "connector.post_process.bedrock.embedding"
    }
  ]
}
# headers = {"Content-Type": "application/json"}

bedrock_connector_response = requests.post(url, auth=awsauth, json=payload, headers=headers)
# print(bedrock_connector_response)
bedrock_connector_3 = bedrock_connector_response.json()["connector_id"]
print('Connector id: ' + bedrock_connector_3)
# print(comprehend_connector_response.text)

#### Step 3. Register the Titan embeddings model 

In [None]:
path = '/_plugins/_ml/models/_register'
url = host + path

payload = {
    "name": "bedrock multi-modal-embedding",
    "function_name": "remote",
    "description": "test model",
    "connector_id": bedrock_connector_3
}
headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
bedrock_model_id = response.json()['model_id']
print('Model id: ' + bedrock_model_id)

#### Step 4. Deploy the model 

In [None]:
path = '/_plugins/_ml/models/'+ bedrock_model_id + '/_deploy'
url = host + path

headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, headers=headers)
print(response.json())

#### Step 5. Test the model through Opensearch 

In [None]:
path = '/_plugins/_ml/models/'+ bedrock_model_id + '/_predict'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
  "parameters": {
    "inputText": "It's nice to see the flowers bloom and hear the birds sing in the spring"
  }
}
response = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

#### Step 6. Create Bedrock Titan embeddings model pipeline

In [None]:
import requests
from requests_aws4auth import AWS4Auth
import json

region = 'us-east-1'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, service, session_token=credentials.token)

pipeline_name = "titan_embedding_pipeline_v2"
url = f"{host}/_ingest/pipeline/{pipeline_name}"

pipeline_body = {
    "description": "Titan embedding pipeline",
    "processors": [
        {
            "text_embedding": {
                "model_id": bedrock_model_id,
                "field_map": {
                    "sentence": "sentence_vector"
                }
            }
        }
    ]
}

response = requests.put(url, auth=awsauth, json=pipeline_body, headers={"Content-Type": "application/json"})
print(response.text)

#### Step 7. Create Bedrock Titan embeddings model index 

In [None]:
import boto3
import requests
from requests_aws4auth import AWS4Auth

# Set up your AWS credentials and region
region = 'us-east-1'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, service, session_token=credentials.token)

# Create the index
index_name = 'bedrock-knn-index-v2'
url = f'{host}/{index_name}'
mapping = {
    "mappings": {
        "properties": {
            "sentence_vector": {
                "type": "knn_vector",
                "dimension": 1024,  
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "nmslib"
                },
                "store":True
            },
            "sentence":{
                "type": "text",
                "store": True
            },
            "sentence_english":{
                "type": "text",
                "store": True
            }
        }
    },
    "settings": {
        "index": {
            "knn": True,
            "knn.space_type": "cosinesimil",
            "default_pipeline": pipeline_name
        }
    }
}

response = requests.put(url, auth=awsauth, json=mapping, headers={"Content-Type": "application/json"})
print(f"Index creation response: {response.text}")

#### Step 8. Ingest a test doc to ensure that the pipeline is working properly

In [None]:
test_doc = {
    "sentence": "Shwetha Test - This is a test sentence for embedding generation.",
    "sentence_english": "Shwetha Test - This is a test sentence for embedding generation."
}

index_url = f"{host}/{index_name}/_doc"#?pipeline={pipeline_name}"
index_response = requests.post(index_url, auth=awsauth, json=test_doc, headers={"Content-Type": "application/json"})
print("Manual indexing response:")
print(json.dumps(index_response.json(), indent=2))

if index_response.status_code == 201:
    doc_id = index_response.json()['_id']
    # Retrieve the document
    get_url = f"{host}/{index_name}/_doc/{doc_id}"
    get_response = requests.get(get_url, auth=awsauth)
    print("Retrieved document:")
    print(json.dumps(get_response.json(), indent=2))

#### Step 9. Load dataframes into index

Index all your documents and generate embeddings for them using the Titan Embeddings Model v2. The embeddings will be stored in the sentence_vector field.

In [None]:
import json
import requests
from requests_aws4auth import AWS4Auth
import boto3
import time

# Set up your AWS credentials and region
region = 'us-east-1'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

# Define your OpenSearch index name
index_name = "bedrock-knn-index-v2"

def index_documents(df, batch_size=100):
    total = len(df)
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        batch = df.iloc[start:end]

        bulk_data = []
        for _, row in batch.iterrows():
            # Prepare the action metadata
            action = {
                "index": {
                    "_index": index_name
                }
            }
            # Prepare the document data
            doc = {
                "sentence": row['sentence'],
                "sentence_english": row['sentence_english']
            }
            
            # Add the action and document to the bulk data
            bulk_data.append(json.dumps(action))
            bulk_data.append(json.dumps(doc))

        # Join the bulk data with newlines
        bulk_body = "\n".join(bulk_data) + "\n"

        # Send the bulk request
        bulk_url = f"{host}/_bulk"
        response = requests.post(bulk_url, auth=awsauth, data=bulk_body, headers={"Content-Type": "application/x-ndjson"})

        if response.status_code == 200:
            print(f"Successfully indexed batch {start}-{end} of {total}")
        else:
            print(f"Error indexing batch {start}-{end} of {total}: {response.text}")

        # Optional: add a small delay to avoid overwhelming the cluster
        time.sleep(1)

# Index your documents
print("Indexing German documents:")
index_documents(german_df)
print("\nIndexing English documents:")
index_documents(english_df)
print("\nIndexing French documents:")
index_documents(french_df)

#### Step 10. Verify that documents are indexed properly by searching for a document in each language

In [None]:
search_url = f"{host}/{index_name}/_search"

for df, language in [(german_df, "German"), (english_df, "English"), (french_df, "French")]:
    search_query = {
        "query": {
            "match": {
                "sentence": df.iloc[0]['sentence']
            }
        },
        "_source": ["sentence", "sentence_english", "sentence_vector"]
    }

    search_response = requests.get(search_url, auth=awsauth, json=search_query, headers={"Content-Type": "application/json"})
    print(f"\nSearch result for {language}:")
    print(json.dumps(search_response.json(), indent=2))

#### Step 11. Perform Semantic KNN search across all documents

In [None]:
import json
import requests
from requests_aws4auth import AWS4Auth
import boto3

# Set up your AWS credentials and region
region = 'us-east-1'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

# Define your OpenSearch host and index name

index_name = "bedrock-knn-index-v2"

def semantic_search(query_text, k=5):
    search_url = f"{host}/{index_name}/_search"
    
    # First, index the query to generate its embedding
    index_doc = {
        "sentence": query_text,
        "sentence_english": query_text  # Assuming the query is in English
    }
    index_url = f"{host}/{index_name}/_doc"
    index_response = requests.post(index_url, auth=awsauth, json=index_doc, headers={"Content-Type": "application/json"})
    
    if index_response.status_code != 201:
        print(f"Failed to index query document: {index_response.text}")
        return []
    
    # Retrieve the indexed query document to get its vector
    doc_id = index_response.json()['_id']
    get_url = f"{host}/{index_name}/_doc/{doc_id}"
    get_response = requests.get(get_url, auth=awsauth)
    query_vector = get_response.json()['_source']['sentence_vector']
    
    # Now perform the KNN search
    search_query = {
        "size": 30,
        "query": {
            "knn": {
                "sentence_vector": {
                    "vector": query_vector,
                    "k": 30
                }
            }
        },
        "_source": ["sentence", "sentence_english"]
    }

    search_response = requests.post(search_url, auth=awsauth, json=search_query, headers={"Content-Type": "application/json"})
    
    if search_response.status_code != 200:
        print(f"Search failed with status code {search_response.status_code}")
        print(search_response.text)
        return []

    # Clean up - delete the temporary query document
    delete_url = f"{host}/{index_name}/_doc/{doc_id}"
    requests.delete(delete_url, auth=awsauth)

    return search_response.json()['hits']['hits']

# Example usage
query = "le soleil brille"
results = semantic_search(query)

if results:
    print(f"Search results for: '{query}'")
    for result in results:
        print(f"Score: {result['_score']}")
        print(f"Sentence: {result['_source']['sentence']}")
        print(f"English: {result['_source']['sentence_english']}")
        print()
else:
    print("No results found or search failed.")


#### Step 12. Cleanup
Please go to the Cloudformation console and delete the stack you deployed. This will ensure that the resources get terminated after experimenting to avoid incurring unnecessary charges