# Building a RAG chatbot with LangChain, Hugging Face, Amazon SageMaker and Amazon OpenSearch Serverless

In [1]:
import boto3, json, sagemaker

from sagemaker.huggingface import HuggingFaceModel, get_huggingface_llm_image_uri
from transformers import AutoConfig
from typing import Dict

from opensearchpy import RequestsHttpConnection, AWSV4SignerAuth

from langchain import LLMChain
from langchain.chains import RetrievalQA
from langchain.document_loaders import HuggingFaceDatasetLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import SagemakerEndpoint
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.prompts import PromptTemplate
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import OpenSearchVectorSearch

sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/rich/Library/Application Support/sagemaker/config.yaml


  from .autonotebook import tqdm as notebook_tqdm
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


## Deploy our LLM on a SageMaker Endpoint

In [2]:
# role = sagemaker.get_execution_role()
role = 'arn:aws:iam::070576557102:role/service-role/AmazonSageMaker-ExecutionRole-20240512T164029'

hub = {
	'HF_MODEL_ID':'taide/Llama3-TAIDE-LX-8B-Chat-Alpha1-4bit',
	'SM_NUM_GPUS': '1'
}

huggingface_model = HuggingFaceModel(
	image_uri=get_huggingface_llm_image_uri("huggingface",version="1.1.0"),
	env=hub,
	role=role 
)

predictor = huggingface_model.deploy(
	initial_instance_count=1,
	instance_type="ml.g5.2xlarge",
	container_startup_health_check_timeout=300,
    wait=False,
  )

## Configure the LangChain input and output handlers for our LLM

In [3]:
model_kwargs = {"max_new_tokens": 512, "top_p": 0.8, "temperature": 0.8}

class ContentHandler(LLMContentHandler):
    content_type = "application/json"
    accepts = "application/json"

    def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes:
        input_str = json.dumps(
            # Mistral prompt, see https://huggingface.co/mistralai/Mistral-7B-Instruct-v0.1
            {"inputs": f"<s>[INST] {prompt} [/INST]", "parameters": {**model_kwargs}}
        )
        return input_str.encode("utf-8")

    def transform_output(self, output: bytes) -> str:
        response_json = json.loads(output.read().decode("utf-8"))
        splits = response_json[0]["generated_text"].split("[/INST] ")
        return splits[1]

content_handler = ContentHandler()

In [4]:
sm_client = boto3.client('sagemaker') # needed later to check that endpoint is up
smrt_client = boto3.client("sagemaker-runtime") # needed for AWS credentials

llm = SagemakerEndpoint(
    endpoint_name=predictor.endpoint_name,
    model_kwargs=model_kwargs,
    content_handler=content_handler,
    client=smrt_client,
)

## Load the Reuters news dataset from the Hugging Face hub

## Configure our embedding model

In [5]:
# import json
# import logging
# import boto3
# from botocore.exceptions import ClientError
# from langchain_community.document_loaders import PyPDFDirectoryLoader

# logger = logging.getLogger(__name__)
# logging.basicConfig(level=logging.INFO)

# def generate_text_embeddings(model_id, docs):
#     """
#     Generate text embeddings by using the Cohere Embed model.
#     Args:
#         model_id (str): The model ID to use.
#         docs (list): A list of Document objects to generate embeddings for.
#     Returns:
#         dict: The response from the model.
#     """
#     logger.info("Generating text embeddings with the Cohere Embed model %s", model_id)
#     accept = '*/*'
#     content_type = 'application/json'
#     bedrock = boto3.client(service_name='bedrock-runtime')
#     texts = [doc.page_content for doc in docs]  
#     body = json.dumps({"texts": texts, "input_type": "search_document"})
#     response = bedrock.invoke_model(
#         body=body,
#         modelId=model_id,
#         accept=accept,
#         contentType=content_type
#     )
#     logger.info("Successfully generated text embeddings with Cohere model %s", model_id)
#     return response


In [6]:
# logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
# model_id = 'cohere.embed-english-v3'

# # Load documents from PDF files
# loader = PyPDFDirectoryLoader("/Users/rich/Desktop/gen-ai-hackton/data")
# docs = loader.load()

# try:
#     response = generate_text_embeddings(model_id=model_id, docs=docs)
#     response_body = json.loads(response.get('body').read())
#     print(f"ID: {response_body.get('id')}")
#     print(f"Response type: {response_body.get('response_type')}")
#     print("Embeddings")
#     for i, embedding in enumerate(response_body.get('embeddings')):
#         print(f"\tEmbedding {i}")
#         print(*embedding)
#     print("Texts")
#     for i, text in enumerate(response_body.get('texts')):
#         print(f"\tText {i}: {text}")
# except ClientError as err:
#     message = err.response["Error"]["Message"]
#     logger.error("A client error occurred: %s", message)
#     print("A client error occurred: " + format(message))
# else:
#     print(f"Finished generating text embeddings with Cohere model {model_id}.")

In [7]:
import json
import logging
import boto3
from botocore.exceptions import ClientError
from langchain_community.document_loaders import PyPDFLoader
from opensearchpy import OpenSearch, RequestsHttpConnection

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

def generate_text_embeddings(model_id, docs):
    """Generate text embeddings by using the Cohere Embed model."""
    logger.info("Generating text embeddings with the Cohere Embed model %s", model_id)
    accept = '*/*'
    content_type = 'application/json'
    bedrock = boto3.client(service_name='bedrock-runtime')
    texts = [doc.page_content for doc in docs]
    
    # Split texts into batches of 128
    batched_texts = [texts[i:i+128] for i in range(0, len(texts), 128)]
    
    all_embeddings = []
    for batch in batched_texts:
        body = json.dumps({"texts": batch, "input_type": "search_document"})
        response = bedrock.invoke_model(
            body=body, modelId=model_id, accept=accept, contentType=content_type
        )
        response_body = json.loads(response.get('body').read())
        all_embeddings.extend(response_body.get('embeddings'))
    
    logger.info("Successfully generated text embeddings with Cohere model %s", model_id)
    return all_embeddings

In [8]:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
model_id = 'cohere.embed-english-v3'

# Load a single PDF file
loader = PyPDFLoader("/Users/rich/Desktop/gen-ai-hackton/data/6nrhqkvp47vz7hnrzyl7oy7pjdmv.pdf")
docs = loader.load()

In [10]:
host = 'vpc-mydomain-zzliggvd46i3slgldu6mk55gmu.ap-northeast-1.es.amazonaws.com'
index_name = 'index'
region = 'ap-northeast-1'
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, "aoss")

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


In [16]:
import boto3

# 創建 OpenSearch 客戶端
session = boto3.Session()
client = session.client(
    "opensearch",
)

try:
    # 生成文檔嵌入向量
    embeddings = generate_text_embeddings(model_id=model_id, docs=docs)

    # 按批次處理文檔
    docs_100 = [docs[x:x+100] for x in range(0, len(docs), 100)]
    for doc_batch in docs_100:
        # 構建要索引的文檔
        documents = []
        for doc, embedding in zip(doc_batch, embeddings):
            document = {
                "id": doc.metadata.get("id", ""),
                "text": doc.page_content,
                "embedding": embedding,
                # 添加其他元數據字段...
            }
            documents.append(document)

        # 使用 boto3 客戶端將文檔索引到 OpenSearch
        response = client.bulk(
            body=documents,
            index_name=index_name,
            domain_name=domain_name,
        )

        print(f"Created vector search index for {len(doc_batch)} documents.")
except ClientError as err:
    message = err.response["Error"]["Message"]
    logger.error("A client error occurred: %s", message)
    print("A client error occurred: " + format(message))

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:Generating text embeddings with the Cohere Embed model cohere.embed-english-v3
INFO:__main__:Successfully generated text embeddings with Cohere model cohere.embed-english-v3


AttributeError: 'OpenSearchService' object has no attribute 'bulk'

## Define credentials for Amazon OpenSearch Serverless

## Embed and index chunks

In [None]:
docs_100 = [docs[x:x+100] for x in range(0, len(docs), 100)]

In [None]:
%%time

for docs in docs_100:
    oss = OpenSearchVectorSearch.from_documents(
        docs,
        embeddings=model_id,
        opensearch_url=f'https://{host}:443',
        http_auth=auth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        index_name=index_name,
        timeout=60,
    )
    print(".", end="")

## Configure RAG chain

In [None]:
retriever = oss.as_retriever(search_kwargs={"k": 10})

In [None]:
# Define prompt template

prompt_template = """
As a helpful news agent, please answer the question using only the context below.
If you don't know, say you don't know.
Cite the title of the articles you used to build your answer.

question: {question}

context: {context}
"""

prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])

In [None]:
chain = RetrievalQA.from_chain_type(
    llm=llm, 
    chain_type="stuff",
    retriever=retriever, 
    chain_type_kwargs = {"prompt": prompt})

In [None]:
# Make sure that our LLM has been deployed

waiter = sm_client.get_waiter('endpoint_in_service')
waiter.wait(EndpointName=predictor.endpoint_name)

## Ask a question

In [None]:
question = "What are the worst storms in recent news?"
answer = chain.run({"query": question})
print(answer)

In [None]:
predictor.delete_model()
predictor.delete_endpoint()