# Ingest massive amounts of data to a Vector DB (Amazon OpenSearch)
**_Use of Amazon OpenSearch as a vector database for storing embeddings_**

This notebook works well with the `conda_python3` kernel on a SageMaker Notebook `ml.t3.xlarge` instance.

We refer to [Build a powerful question answering bot with Amazon SageMaker, Amazon OpenSearch Service, Streamlit, and LangChain](https://aws.amazon.com/blogs/machine-learning/build-a-powerful-question-answering-bot-with-amazon-sagemaker-amazon-opensearch-service-streamlit-and-langchain/) for building RAG solution. Our contributions are beblow:
- Change data sources as text files stored in S3
- Return reference of generated contents containing timestamps in video and the the link of relevant videos

Prerequisites
- Complete [data preparation](data_preparation.ipynb)
- Deploy solution stack in [Build a powerful question answering bot with Amazon SageMaker, Amazon OpenSearch Service, Streamlit, and LangChain](https://aws.amazon.com/blogs/machine-learning/build-a-powerful-question-answering-bot-with-amazon-sagemaker-amazon-opensearch-service-streamlit-and-langchain/) 

---

## Step 1: Setup
Install the required packages.

In [None]:
!pip install --upgrade sagemaker --quiet
!pip install ipywidgets==7.0.0 langchain==0.0.201 opensearch-py==2.2.0 faiss_cpu==1.7.4 --quiet

In [None]:
import os
import sys
import time
import json
import logging
import numpy as np
from typing import List
import sagemaker, boto3, json
from sagemaker.session import Session
from sagemaker.processing import ProcessingInput

from langchain.document_loaders import TextLoader
from langchain.vectorstores import OpenSearchVectorSearch
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.llms.sagemaker_endpoint import ContentHandlerBase
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sagemaker.processing import ScriptProcessor, FrameworkProcessor

Change the parameters if you would like to scrape a different website for data, customize chunk size etc.

In [None]:
# global constants
MAX_OS_DOCS_PER_PUT = 500
IMAGE = "load-data-opensearch-custom"
IMAGE_TAG = "latest"
CHUNK_SIZE_FOR_DOC_SPLIT = 600
CHUNK_OVERLAP_FOR_DOC_SPLIT = 20
CREATE_OS_INDEX_HINT_FILE = "_create_index_hint"
FAISS_INDEX_DIR = "faiss_faq_index"

In [None]:
logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s,%(module)s,%(processName)s,%(levelname)s,%(message)s', level=logging.INFO, stream=sys.stderr)

### Read parameters from Cloud Formation stack

Some of the resources needed for this notebook such as the Embeddings LLM model endpoint, the Amazon OpenSearch cluster are created outside of this notebook, typically through a cloud formation template. We now read the outputs and parameters of the cloud formation stack created from that template to get the value of these parameters. 

The stack name here should match the stack name you used when creating the cloud formation stack.

In [None]:
# if used a different name while creating the cloud formation stack then change this to match the name you used
CFN_STACK_NAME = "llm-apps-blog-rag"

**If you did not use a cloud formation template for creating these resources then set the names of these resources manually in the code below.**

In [None]:
#boto3.client('cloudformation').describe_stacks(StackName="ssome")
stacks = boto3.client('cloudformation').list_stacks()
stack_found = CFN_STACK_NAME in [stack['StackName'] for stack in stacks['StackSummaries']]

In [None]:
def get_cfn_outputs(stackname: str) -> List:
    cfn = boto3.client('cloudformation')
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

def get_cfn_parameters(stackname: str) -> List:
    cfn = boto3.client('cloudformation')
    params = {}
    for param in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Parameters']:
        params[param['ParameterKey']] = param['ParameterValue']
    return params

if stack_found is True:
    outputs = get_cfn_outputs(CFN_STACK_NAME)
    params = get_cfn_parameters(CFN_STACK_NAME)
    logger.info(f"cfn outputs={outputs}\nparams={params}")

    embeddings_model_endpoint_name = outputs['EmbeddingEndpointName']
    opensearch_domain_endpoint = f"https://{outputs['OpenSearchDomainEndpoint']}"
    opensearch_index = params['OpenSearchIndexName']
    app_name = params['AppName']
    # ARN of the secret is of the following format arn:aws:secretsmanager:region:account_id:secret:my_path/my_secret_name-autoid
    os_creds_secretid_in_secrets_manager = "-".join(outputs['OpenSearchSecret'].split(":")[-1].split('-')[:-1])
else:
    logger.info(f"cloud formation stack {CFN_STACK_NAME} not found, set parameters manually here")
    # REPLACE THE "placeholder" WITH ACTUAL VALUES IF YOU CREATED THESE RESOURCES WITHOUT USING A CLOUD FORMATION TEMPLATE
    embeddings_model_endpoint_name = "placeholder"
    opensearch_domain_endpoint = "placeholder"
    opensearch_index = "placeholder"
    os_creds_secretid_in_secrets_manager = "placeholder"
    app_name = "llm-apps-blogs"

The embeddings model endpoint name, OpenSearch domain endpoint and the identifier for the OpenSearch credentials stored in the Secrets Mananger are all available as `Outputs` from the cloud formation stack.

In [None]:
logger.info(f"embeddings_model_endpoint_name={embeddings_model_endpoint_name},\nopensearch_domain_endpoint={opensearch_domain_endpoint},\n"
            f"os_creds_secretid_in_secrets_manager={os_creds_secretid_in_secrets_manager},opensearch_index={opensearch_index}")

In [None]:
sagemaker_session = Session()
aws_role = sagemaker_session.get_caller_identity_arn()
aws_region = boto3.Session().region_name
bucket = sagemaker_session.default_bucket()
logger.info(f"aws_role={aws_role}, aws_region={aws_region}, bucket={bucket}")

---

## Step 2: Data preparation

By running [data_preparation.ipynb](data_preparation.ipynb), we can extract transcription from video files.

---

## Load the data in a [FAISS](https://github.com/facebookresearch/faiss) index (Local mode)

We now create a FAISS index to store the embeddings. This is an alternative to OpenSearch for storing embeddings in-memory. We write the FAISS index locally and then upoad the files to an S3 bucket. A Lambda function can then download these files from S3 and load the FAISS index in memory to perform a similarity search.

In [None]:
%store -r s3_output_transcript

DATA_DIR = 'data'

!aws s3 cp --recursive {s3_output_transcript} {DATA_DIR}

In [None]:
from langchain.vectorstores import FAISS
from data_ingestion.sm_helper import create_sagemaker_embeddings_from_js_model
import glob

embeddings = create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name, aws_region)

# read all the docs, split them into chunks. 
st = time.time() 
logger.info('Loading documents ...')

file_list = glob.glob(DATA_DIR + "/*.txt")
docs = []
for file_path in file_list:
    loader = TextLoader(file_path)
    doc = loader.load()[0]
    doc.metadata['timestamp'] = time.time()
    doc.metadata['embeddings_model'] = embeddings_model_endpoint_name
    docs.append(doc)

chunks = docs 

et = time.time() - st
logger.info(f'Time taken: {et} seconds. {len(chunks)} chunks generated') 

Load the chunks into FAISS, we provide the embeddings object so that langchain can first convert the text chunks into embeddings and then store those embeddings into a FAISS index.

In [None]:
vector_db = FAISS.from_documents(chunks, embeddings)

Save to a local directory and upload to S3.

In [None]:
vector_db_path = FAISS_INDEX_DIR
vector_db.save_local(vector_db_path)

In [None]:
# upload this data to S3, to be used when we run the Sagemaker Processing Job
!aws s3 cp --recursive $vector_db_path/ s3://$bucket/$app_name/$vector_db_path

---

## Load the data in a `OpenSearch` index via SageMaker Processing Job (Distributed mode)

We now have a working script that is able to ingest data into an OpenSearch index. But for this to work for massive amounts of data we need to scale up the processing by running this code in a distributed fashion. We will do this using Sagemkaer Processing Job. This involves the following steps:

1. Create a custom container in which we will install the `langchain` and `opensearch-py` packges and then upload this container image to Amazon Elastic Container Registry (ECR).
2. Use the Sagemaker `ScriptProcessor` class to create a Sagemaker Processing job that will run on multiple nodes.
    - The data files available in S3 are automatically distributed across in the Sagemaker Processing Job instances by setting `s3_data_distribution_type='ShardedByS3Key'` as part of the `ProcessingInput` provided to the processing job.
    - Each node processes a subset of the files and this brings down the overall time required to ingest the data into Opensearch.
    - Each node also uses Python `multiprocessing` to internally also parallelize the file processing. Thus, **there are two levels of parallelization happening, one at the cluster level where individual nodes are distributing the work (files) amongst themselves and another at the node level where the files in a node are also split between multiple processes running on the node**.

### Create custom container

We will now create a container locally and push the container image to ECR. **The container creation process takes about 1 minute**.

1. The container include all the Python packages we need i.e. `langchain`, `opensearch-py`, `sagemaker` and `beautifulsoup4`.
1. The container also includes the `credentials.py` script for retrieving credentials from Secrets Manager and `sm_helper.py` for helping to create SageMaker endpoint classes that langchain uses.

In [None]:
# Run script to build docker custom containe image and push it to ECR 
# Set region and sagemaker URI variables 
session = boto3.session.Session()
client = boto3.client("sts")
account_id = client.get_caller_identity()["Account"]
logger.info(f"region={aws_region}, account_id={account_id}")
!bash scripts/build_and_push.sh $(pwd)/container $IMAGE $IMAGE_TAG $aws_region

### Create and run the Sagemaker Processing Job

Now we will run the Sagemaker Processing Job to ingest the data into OpenSearch.

In [None]:
# setup the parameters for the job
base_job_name = f"{app_name}-esfaq-job"
tags = [{"Key": "data", "Value": "embeddings-for-llm-apps"}]

# use the custom container we just created
image_uri = f"{account_id}.dkr.ecr.{aws_region}.amazonaws.com/{IMAGE}:{IMAGE_TAG}"

# instance type and count determined via trial and error: how much overall processing time
# and what compute cost works best for your use-case
instance_type = "ml.m5.xlarge"
instance_count = 1
logger.info(f"base_job_name={base_job_name}, tags={tags}, image_uri={image_uri}, instance_type={instance_type}, instance_count={instance_count}")

# setup the ScriptProcessor with the above parameters
processor = ScriptProcessor(base_job_name=base_job_name,
                            image_uri=image_uri,
                            role=aws_role,
                            instance_type=instance_type,
                            instance_count=instance_count,
                            command=["python3"],
                            tags=tags)

# setup input from S3, note the ShardedByS3Key, this ensures that 
# each instance gets a random and equal subset of the files in S3.
inputs = [ProcessingInput(source=s3_output_transcript,
                          destination='/opt/ml/processing/input_data',
                          s3_data_distribution_type='ShardedByS3Key',
                          s3_data_type='S3Prefix')]


logger.info(f"creating an opensearch index with name={opensearch_index}")
# ready to run the processing job
st = time.time()
processor.run(code="data_ingestion/load_data_into_opensearch.py",
              inputs=inputs,
              outputs=[],
              arguments=["--opensearch-cluster-domain", opensearch_domain_endpoint,
                         "--opensearch-secretid", os_creds_secretid_in_secrets_manager,
                         "--opensearch-index-name", opensearch_index,
                         "--aws-region", aws_region,
                         "--embeddings-model-endpoint-name", embeddings_model_endpoint_name,
                         "--chunk-size-for-doc-split", str(CHUNK_SIZE_FOR_DOC_SPLIT),
                         "--chunk-overlap-for-doc-split", str(CHUNK_OVERLAP_FOR_DOC_SPLIT),
                         "--input-data-dir", "/opt/ml/processing/input_data",
                         "--create-index-hint-file", CREATE_OS_INDEX_HINT_FILE,
                         "--process-count", "2"])
time_taken = time.time() - st
logger.info(f"processing job completed, total time taken={time_taken}s")
preprocessing_job_description = processor.jobs[-1].describe()
logger.info(preprocessing_job_description)

## Step 4: Do a similarity search for for user input to documents (embeddings) in OpenSearch

In [None]:
from data_ingestion.credentials import get_credentials

creds = get_credentials(os_creds_secretid_in_secrets_manager, aws_region)
http_auth = (creds['username'], creds['password'])
docsearch = OpenSearchVectorSearch(index_name=opensearch_index,
                                   embedding_function=create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name,
                                                                                                aws_region),
                                   opensearch_url=opensearch_domain_endpoint,
                                   http_auth=http_auth)
q = "what is the best practices for the sagemaker notebook?"
docs = docsearch.similarity_search(q, k=3) #, search_type="script_scoring", space_type="cosinesimil"
for doc in docs:
    logger.info("----------")
    logger.info(f"content=\"{doc.page_content}\",\nmetadata=\"{doc.metadata}\"")
    

In [None]:
opensearch_domain_endpoint

---

## Cleanup

To avoid incurring future charges, delete the resources. You can do this by deleting the CloudFormation template used to create the IAM role and SageMaker notebook.
