# Self-managed vector store using OpenSearch provisioned domain

This notebook demonstrates the self-managed approach to build a vector store using OpenSearch features like remote model with a connector to Amazon Bedrock, ingestion pipeline, create an index using the pipeline, and implement multi-tenancy using attributes.

Prerequisites before you run the cells in this notebook : 
1. Deploy an Amazon OpenSearch Domain. (If you are at an AWS event, these are pre-provisioned in your account)
2. Note the vpc domain name of the OpenSearch cluster. You will need when accessing from within the VPC. 


### Install boto3 & dependencies

In [None]:
%pip install -U boto3==1.34.84
%pip install -U opensearch-py requests_aws4auth
%pip install pypdf
%pip install langchain==0.2.7
%pip install langchain-community==0.2.3


### Restart the Kernel

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

### Import required libraries

In [4]:
import boto3
import json
import os
import warnings
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import requests
from pypdf import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader

###  SSH Tunnel to EC2 Bastion Host
- If you are running this notebook from outside the VPC where the OpenSearch cluster is deployed then you can SSH tunnel to it using an EC2 bastion host
- You may need to obtain the key-pair file contents from the parameter store of Systems Manager if you have provisioned the resources using the CloudFormation template.
- Copy the contents from the parameter store and create key-pair.pem. 
- You need to add read permissions to the file using the "chmod +400 key-pair.pem" command on a terminal 
- Run the following command from a terminal after updating the key-pair.pem file, bastion host ip and the vpc domain name of the OpenSearch cluster. 
```bash
ssh -i ~/.ssh/your-key.pem ec2-user@your-ec2-instance-public-ip -N -L 9200:vpc-domain-name.region.es.amazonaws.com:443
```

### Setup the AWSSigV4Auth

- If running this notebook from outside of the VPC, then update host = "localhost" after enabling the SSH tunnel to the EC2 bastion host. 
- If running this notebook from within VPC (VS Code Server provisioned by Workshop Studio), then update host = "vpc-domain-name.region.es.amazonaws.com"

In [None]:
warnings.filterwarnings('ignore')
host = "localhost"  # Update to vpc-domain-name when running from within the VPC

host = host.replace("https://", "")
service = 'es'
region = os.environ.get('AWS_REGION', 'us-west-2')
session_name = "opensearch-admin-session"

# 1. Create a AWSSigV4Auth from the assumed OpenSearch Admin Role 
account_id = boto3.client("sts").get_caller_identity().get("Account")
admin_role_arn = f"arn:aws:iam::{account_id}:role/opensearch-admin-role"
assumed_role = boto3.client('sts').assume_role(RoleArn=admin_role_arn,RoleSessionName=session_name)
credentials = assumed_role['Credentials']
master_secret_name = "development-opensearch-master-user" # If using the CloudFormation template, this is created as {environment}-opensearch-master-user. Update as appropriate.

admin_role_auth = AWS4Auth(
    credentials['AccessKeyId'],
    credentials['SecretAccessKey'],
    region,
    service,
    session_token=credentials['SessionToken']
)

# 2. Create a basic auth using the admin user/password from Secrets Manager
client = boto3.session.Session().client(service_name='secretsmanager',region_name=region)
get_secret_value_response = client.get_secret_value(SecretId=master_secret_name)

secret_dict = json.loads(get_secret_value_response['SecretString'])
admin_user_auth = (secret_dict['username'], secret_dict['password'])

print(f"Successfully created the authentication credentials")

### Test Connection using OpenSearch client

In [None]:
opensearch_client = OpenSearch(
    hosts=[{'host': host, 'port': 9200}],
    http_auth=admin_user_auth,
    use_ssl=True,
    verify_certs=False,
    connection_class=RequestsHttpConnection,
    timeout=300,
    retry_on_timeout=True,
    max_retries=3,
    region_name=region
)

# Test the connection - 
cluster_info = opensearch_client.info() 
print(f"Successfully connected to OpenSearch cluster: {cluster_info['cluster_name']}")

bedrock_runtime = boto3.client(service_name="bedrock-runtime", region_name=region)
embedding_model_id = "amazon.titan-embed-text-v1"

### Define the function to ingest documents to OpenSearch

In [7]:
def write_to_opensearch(content, tenant_id, metadata=None):
    path = 'surveys/_doc'
    url = 'https://' + host + ':9200/' + path 

    document = {
        "content": content,
        "tenant_id": tenant_id
    }
    if metadata:
        document["metadata"] = metadata

    headers = {"Content-Type": "application/json"}

    response = requests.post(url, auth=admin_user_auth, json=document, headers=headers, verify=False)
    print(response.text)
    return response

### Define the function to read a Tenant PDF and convert it to chunks

In [8]:
def insert_tenant_document(file_name, tenantid):
    # Load the document
    loader = PyPDFLoader(file_name)
    doc = loader.load()

    # split documents into chunks
    text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=10000,
    chunk_overlap=150
    )
    chunks = text_splitter.split_documents(doc)

    # write each chunk into opensearch vector store
    for chunk in chunks:
        write_to_opensearch(chunk.page_content, tenantid)

    return "Embeddings generated successfully!"

### Define the function to Invoke Anthrophic Claude LLM on Bedrock

In [9]:
def generate_message(bedrock_runtime, model_id, system_prompt, messages, max_tokens):

    body=json.dumps(
        {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "system": system_prompt,
            "messages": messages
        }  
    )  

    response = bedrock_runtime.invoke_model(body=body, modelId=model_id)
    response_body = json.loads(response.get('body').read())
   
    return response_body

def invoke_llm_with_rag(messages):
    model_id = 'anthropic.claude-3-sonnet-20240229-v1:0'
    
    response = generate_message (bedrock_runtime, model_id, "", messages, 300)

    return response

### Define the function to generate vector embeddings

In [10]:
def generate_vector_embeddings(data):
    body = json.dumps(
        {
            "inputText": data,
        }
    )

    # Invoke model
    response = bedrock_runtime.invoke_model(
        body=body,
        modelId=embedding_model_id,
        accept="application/json",
        contentType="application/json",
    )

    response_body = json.loads(response["body"].read())
    embedding = response_body.get("embedding")

    return embedding

### Step 1 : Configure the IAM Roles and Permissions

#### 1.1 Create the IAM roles required for OpenSearch to call Bedrock

In [None]:
# Inject these variable values into the policy templates and generate policies using sed. 

%env region_name = $region
%env account_id = $account_id
%env embedding_model_id = $embedding_model_id

!sed -e "s/\#embedding_model_id\#/$embedding_model_id/" -e "s/\#region_name\#/$region_name/" policy-templates/bedrock_model_permissions_policy.json > bedrock_model_permissions_policy.json
!sed -e "s/\#account_id\#/$account_id/" -e "s/\#region_name\#/$region_name/" policy-templates/bedrock_trust_relationship_policy.json > bedrock_trust_relationship_policy.json



In [None]:
# opensearch-bedrock-role
!aws iam create-role \
    --role-name opensearch-bedrock-role \
    --assume-role-policy-document file://bedrock_trust_relationship_policy.json

# opensearch_permissions_policy
!aws iam create-policy \
    --policy-name opensearch-bedrock-role-policy \
    --policy-document file://bedrock_model_permissions_policy.json

!aws iam attach-role-policy \
    --role-name opensearch-bedrock-role \
    --policy-arn "arn:aws:iam::$account_id:policy/opensearch-bedrock-role-policy"


In [None]:
# Remove all the generated policy json files.
!rm *.json
role_arn = f"arn:aws:iam::{account_id}:role/opensearch-bedrock-role"
role_arn

#### 1.2 Map the ML role in OpenSearch Dashboards 

Fine-grained access control introduces an additional step when setting up a connector. Even if you use HTTP basic authentication for all other purposes, you need to map the ml_full_access role to your IAM role that has iam:PassRole permissions to pass opensearch-bedrock-role.


In [None]:
path = '_plugins/_security/api/rolesmapping/ml_full_access'
url = 'https://' + host + ':9200/' + path 

role_arn1 = f"arn:aws:iam::{account_id}:role/opensearch-bedrock-role"
role_arn2 = f"arn:aws:iam::{account_id}:role/opensearch-admin-role"

# Role mapping payload
payload = {
    "backend_roles": [role_arn1, role_arn2],
    "hosts": [],
    "users": []
}

# Send PUT request to update role mapping
headers = {"Content-Type": "application/json"}
r = requests.put(url, auth=admin_user_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)

# Send GET request to verify role mapping
r = requests.get(url, auth=admin_user_auth, headers=headers, verify=False)
print(r.status_code)
print(r.text)

### Step 2 : Configure the OpenSearch ML Connectors

#### 2.1 Create the remote model connector in OpenSearch

This example is using the Amazon Titan Embeddings model, but you could use any embeddings model you like. This example consumes the model via the Amazon Bedrock service, but again you could use SageMaker or an external API that hosts the model.

In [None]:
path = '_plugins/_ml/connectors/_create'
url = 'https://' + host + ':9200/' + path 

payload = {
  "name": "Amazon Bedrock Connector: embedding",
  "description": "The connector to bedrock Titan embedding model",
  "version": 1,
  "protocol": "aws_sigv4",
  "parameters": {
    "region": region,
    "service_name": "bedrock"
  },
  "credential": {
    "roleArn": role_arn
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/" + embedding_model_id + "/invoke",
      "headers": {
        "content-type": "application/json",
        "x-amz-content-sha256": "required"
      },
      "request_body": "{ \"inputText\": \"${parameters.inputText}\" }",
      "pre_process_function": "connector.pre_process.bedrock.embedding",
      "post_process_function": "connector.post_process.bedrock.embedding"
    }
  ]
}

headers = {"Content-Type": "application/json"}

r = requests.post(url, auth=admin_role_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)
connector_id = json.loads(r.text)['connector_id']

#### 2.2 Create the Model Group in OpenSearch and register the connector

In [None]:
# Create remote model group
path = '_plugins/_ml/model_groups/_register'
url = 'https://' + host + ':9200/' + path 

payload = {
  "name": "remote_model_group",
  "description": "A model group for external models"
}

headers = {"Content-Type": "application/json"}

r = requests.post(url, auth=admin_role_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)
model_group_id = json.loads(r.text)['model_group_id']

# Register the remote model
path = '_plugins/_ml/models/_register'
url = 'https://' + host + ':9200/' + path 

payload = {
    "name": "Amazon Titan Embeddings",
    "function_name": "remote",
    "model_group_id": model_group_id,
    "description": "Titan remote model",
    "connector_id": connector_id
}

headers = {"Content-Type": "application/json"}

r = requests.post(url, auth=admin_role_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)
task_id = json.loads(r.text)['task_id']

#### 2.3 Check the status of the model registration task

Note: if this does not show "COMPLETED" after running the first time, wait a couple of seconds and run it again.

In [None]:
path = '_plugins/_ml/tasks/' + task_id
url = 'https://' + host + ':9200/' + path 
r = requests.get(url, auth=admin_role_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)
model_id = json.loads(r.text)['model_id']

#### 2.3 Test running remote inference using the model
- Since we are testing the Titan embedding model, the inference response should contain the vector embeddings corresponding to the inputText.

In [None]:
print(model_id)
path = f'_plugins/_ml/models/{model_id}/_predict'
url = 'https://' + host + ':9200/' + path 

payload = {
  "parameters": {
    "inputText": "What is the meaning of life?"
  }
}
headers = {"Content-Type": "application/json"}

r = requests.post(url, auth=admin_role_auth, json=payload, headers=headers, verify=False)
print(r.status_code)

print(json.loads(r.text)['inference_results'][0]['output'][0]['data'][:10]) # printing only first 10 elements of the embedding vector
print(len(json.loads(r.text)['inference_results'][0]['output'][0]['data'])) # printing the length of the embedding vector

### Step 3 : Provision the ingestion pipeline and vector store index

#### 3.1 Create an Ingestion Pipeline in OpenSearch

The ingestion pipeline will run when documents are written to the index it is associated with. This will then call the specified model and write the result back to the mapped field. In this case mapping the content field from the original document to a generated embeddings field we will use for vector search

In [None]:
path = '_ingest/pipeline/bedrock_pipeline'
url = 'https://' + host + ':9200/' + path 

pipeline_config = {
    "description": "Ingestion pipeline for Bedrock model",
    "processors": [
        {
            "text_embedding": {
                "model_id": model_id,
                "field_map": {
                    "content": "embedding"
                }
            }
        }
    ]
}

headers = {"Content-Type": "application/json"}

r = requests.put(url, auth=admin_user_auth, json=pipeline_config, headers=headers, verify=False)
print(r.status_code)
print(r.text)

#### 3.2 Create an index 

This index will contain the documents and the generated embeddings we will use for performing a neural search. The index specifies which ingestion pipeline to use as documents are written to it. 


In [None]:
path = '/surveys'
url = 'https://' + host + ':9200/' + path 

index_config = {
  "settings": {
    "index.knn": True,
    "default_pipeline": "bedrock_pipeline"
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "text"
      },
      "embedding": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "engine": "lucene",
          "space_type": "l2",
          "name": "hnsw",
          "parameters": {}
        },
      },
      "content": {
        "type": "text"
      }
    }
  }
}

headers = {"Content-Type": "application/json"}

r = requests.put(url, auth=admin_user_auth, json=index_config, headers=headers, verify=False)
print(r.status_code)
print(r.text)

### Step 4: Generate vector embeddings and insert into vector store. 

#### 4.1 Insert Tenant1 data

In [None]:
insert_tenant_document(
    file_name="../multi_tenant_survey_reports/Home_Survey_Tenant1.pdf",
    tenantid="tenant1"
)

#### 4.2: Search and validate tenant1 data and generated embeddings are returned

In [None]:
path = '/surveys/_search'
url = 'https://' + host + ':9200/' + path 

search_config = {
  "query": {
    "bool": {
      "should": [
        {
          "script_score": {
            "query": {
              "neural": {
                "embedding": {
                  "query_text": "What is the state of the roof?",
                  "model_id": model_id,
                  "k": 100
                }
              }
            },
            "script": {
              "source": "_score * 1.5"
            }
          }
        },
        {
          "script_score": {
            "query": {
              "match": {
                "content": "What is the state of the roof?"
              }
            },
            "script": {
              "source": "_score * 1.7"
            }
          }
        }
      ]
    }
  }
}

headers = {"Content-Type": "application/json"}

r = requests.get(url, auth=admin_user_auth, json=search_config, headers=headers, verify=False)
print(r.status_code)

hits = json.loads(r.text)['hits']['hits']
for hit in hits:
    print(hit['_source']['tenant_id'])
    print(hit['_source']['embedding'][:10]) # Printing only the first 10 elements of the embedding
    print(hit['_source']['content'][:100]) # Printing only the first 100 characters of the content


#### 4.3 Insert Tenant2 data

In [None]:
insert_tenant_document(
    file_name="../multi_tenant_survey_reports/Home_Survey_Tenant2.pdf",
    tenantid="tenant2"
)

#### 4.4 - Verify that data for both tenants is returned

In [None]:
path = 'surveys/_search'
url = 'https://' + host + ':9200/' + path 

search_config = {
  "_source": {
    "excludes": [
      "embedding"
    ]
  },
  "query": {
    "bool": {
      "should": [
        {
          "script_score": {
            "query": {
              "neural": {
                "embedding": {
                  "query_text": "What is the state of the roof ?",
                  "model_id": model_id,
                  "k": 100
                }
              }
            },
            "script": {
              "source": "_score * 1.5"
            }
          }
        },
        {
          "script_score": {
            "query": {
              "match": {
                "content": "What is the state of the roof ?"
              }
            },
            "script": {
              "source": "_score * 1.7"
            }
          }
        }
      ]
    }
  }
}

headers = {"Content-Type": "application/json"}

r = requests.get(url, auth=admin_user_auth, json=search_config, headers=headers, verify=False)
print(r.status_code)

hits = json.loads(r.text)['hits']['hits']
for hit in hits:
    print(hit['_source']['tenant_id'])
    print(hit['_source']['content'][:100])

### Step 5 : Enforce multi-tenant data isolation 

#### 5.1 Apply a filter to the query so that only documents where the tenant_id is tenant1 are returned

In [None]:
path = '/surveys/_search'
url = 'https://' + host + ':9200/' + path 

search_config = {
  "_source": {
    "excludes": [
      "embedding"
    ]
  },
  "query": {
    "bool": {
      "filter": {
         "wildcard":  { "tenant_id": "tenant1" }
      },
      "should": [
        {
          "script_score": {
            "query": {
              "neural": {
                "embedding": {
                  "query_text": "Property contents and state",
                  "model_id": model_id,
                  "k": 100
                }
              }
            },
            "script": {
              "source": "_score * 1.5"
            }
          }
        },
        {
          "script_score": {
            "query": {
              "match": {
                "content": "Property contents and state"
              }
            },
            "script": {
              "source": "_score * 1.7"
            }
          }
        }
      ]
    }
  }
}

headers = {"Content-Type": "application/json"}

r = requests.get(url, auth=admin_user_auth, json=search_config, headers=headers, verify=False)
print(r.status_code)

hits = json.loads(r.text)['hits']['hits']
for hit in hits:
    print(hit['_source']['tenant_id'])
    print(hit['_source']['content'][:100]) # Printing only the first 100 characters of the content


#### 5.2 Enforce document level security by creating a role for tenant_1

In [None]:
# Create role for tenant1
path = '_plugins/_security/api/roles/tenant_1'
url = 'https://' + host + ':9200/' + path 

payload = {
  "cluster_permissions": [
    "*"
  ],
  "index_permissions": [{
    "index_patterns": [
      "surveys*"
    ],
    "dls": "{\"term\": { \"tenant_id\": \"tenant1\"}}",
    "allowed_actions": [
      "read"
    ]
  }]
}

r = requests.put(url, auth=admin_user_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)

# create user for tenant1
path = '_plugins/_security/api/internalusers/tenant_1'
url = 'https://' + host + ':9200/' + path 

payload = {
  "password": "t3n4Nt1!?",
}

r = requests.put(url, auth=admin_user_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)

# map user to role
path = '_plugins/_security/api/rolesmapping/tenant_1'
url = 'https://' + host + ':9200/' + path 

payload = {
  "users" : [ "tenant_1" ]
}

r = requests.put(url, auth=admin_user_auth, json=payload, headers=headers, verify=False)
print(r.status_code)
print(r.text)

#### 5.3 Use the newly created tenant_1 user to query the index

Note that even with the filter omitted, only data for tenant 1 is returned as the document level security policy filters the results

In [None]:
path = '/surveys/_search'
url = 'https://' + host + ':9200/' + path 

search = {
  "_source": {
    "excludes": [
      "embedding"
    ]
  },
  "query": {
    "bool": {
      "should": [
        {
          "script_score": {
            "query": {
              "neural": {
                "embedding": {
                  "query_text": "What is the state of the roof?",
                  "model_id": model_id,
                  "k": 100
                }
              }
            },
            "script": {
              "source": "_score * 1.5"
            }
          }
        },
        {
          "script_score": {
            "query": {
              "match": {
                "content": "What is the state of the roof?"
              }
            },
            "script": {
              "source": "_score * 1.7"
            }
          }
        }
      ]
    }
  }
}

headers = {"Content-Type": "application/json"}
tenant_scoped_auth= ("tenant_1","t3n4Nt1!?")

r = requests.get(url, auth=tenant_scoped_auth, json=search_config, headers=headers, verify=False)
print(r.status_code)

hits = json.loads(r.text)['hits']['hits']
for hit in hits:
    print(hit['_source']['tenant_id'])
    print(hit['_source']['content'][:100])

### Step 6: Define a function to query vector embeddings from the vector store


In [32]:
def query_vector_store(query, tenant_scoped_auth):
    search_config = {
        "_source": {
            "excludes": [
            "embedding"
            ]
        },
        "query": {
            "bool": {
            "should": [
                {
                "script_score": {
                    "query": {
                    "neural": {
                        "embedding": {
                        "query_text": query,
                        "model_id": model_id,
                        "k": 100
                        }
                    }
                    },
                    "script": {
                    "source": "_score * 1.5"
                    }
                }
                },
                {
                "script_score": {
                    "query": {
                    "match": {
                        "content": query
                    }
                    },
                    "script": {
                    "source": "_score * 1.7"
                    }
                }
                }
            ]
            }
        }
    }

    headers = {"Content-Type": "application/json"}
    
    r = requests.get(url, auth=tenant_scoped_auth, json=search_config, headers=headers, verify=False)
    return r.text

### Step 7: Run a user query using the vector embedding. 
Review the results from the query 

In [None]:
# Define the query data and convert it to vector embeddings to query from the vector store
question = "What is the condition of the roof in my survey report?"
tenant_scoped_auth= ("tenant_1","t3n4Nt1!?")
query_response = query_vector_store(question, tenant_scoped_auth)

hits = json.loads(query_response)['hits']['hits']
for hit in hits:
    print(hit['_source']['tenant_id'])
    print(hit['_source']['content'][:100]) # print the first 100 characters of the document
    print("\n")

### Step 8: Augment the prompt with the context data from the vector store

In [34]:
def get_contexts(retrievalResults):
    contexts = []
    for retrievedResult in retrievalResults: 
        contexts.append(retrievedResult['_source']['content'])
    return contexts

contexts = get_contexts(hits)

prompt = f"""
Human: Use the following pieces of context to provide a concise answer to the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.
<context>
{contexts}
</context
Question: {question}
Assistant:
"""

### Step 9: Invoke the LLM with the augmented prompt

In [None]:
messages=[{ "role":'user', "content":[{'type':'text','text': prompt.format(contexts, question)}]}]
llm_response = invoke_llm_with_rag(messages)
print(llm_response['content'][0]['text'])

In [None]:
# Detach Policy from role

!aws iam detach-role-policy --role-name opensearch-bedrock-role-policy --policy-arn "arn:aws:iam::$account_id:policy/opensearch-bedrock-role-policy"

# Delete policy
!aws iam delete-policy --policy-arn "arn:aws:iam::$account_id:policy/opensearch-bedrock-role-policy"

# Delete role
!aws iam delete-role --role-name opensearch-bedrock-role


print("Clean up completed !!")

### Conclusion 

We now successfully implemented a self-managed vector store using Amazon OpenSearch service. Also we learnt how to configure tenant isolation using the filters and document level security features to enforce tenant isolation. Overall we used the following features : 

- Amazon OpenSearch ML Connectors : To integrate OpenSearch with Bedrock service for LLM access. 
- Amazon OpenSearch Ingestion Pipelines : To transform the input data chunks to embeddings automatically before indexing.
- Amazon OpenSearch filters : To achieve tenant isolation. 
- Amazon OpenSearch document level security: To achieve tenant isolation
- Amazon OpenSearch Cluster : To index and store vector embeddings
- Amazon Bedrock Titan Embeddings model - Generate vector embeddings. 
- Amazon Bedrock Anthrophic Claude Foundation model - To generate response back to user. 