# RAG 핸즈온 (API Gateway 사용 시)

## Step 1. Prepare large language model (LLM) and embedding model 
---

In [None]:
URL = "https://1b2lfnpk5b.execute-api.us-east-1.amazonaws.com/api/"
URL = "[YOUR-URL]" 
LLM_URL = f"{URL}/llm/llama2_13b"
EMB_URL = f"{URL}/emb/gptj_6b"
HEADERS = {
    'Content-Type': 'application/json',
    'Accept': 'application/json',
}

In [None]:
!pip install --upgrade pip --quiet --disable-pip-version-check --root-user-action=ignore

In [None]:
!pip install -qU nltk faiss-cpu langchain wikipedia pdfplumber

In [None]:
# #ignore the dependency conflict error
!pip install unstructured --quiet --disable-pip-version-check --root-user-action=ignore

In [None]:
import nltk
nltk.download('punkt')
import time
import sagemaker, boto3, json
import glob
import os
import pandas as pd
import requests
import json
from sagemaker.session import Session
from sagemaker.model import Model
from sagemaker import image_uris, model_uris, script_uris, hyperparameters
from sagemaker.predictor import Predictor
from sagemaker.utils import name_from_base
from typing import Any, Dict, List, Optional

sagemaker_session = Session()
aws_role = sagemaker_session.get_caller_identity_arn()
aws_region = boto3.Session().region_name

<br>

## Step 2. Ask a question to LLM without RAG
---


In [None]:
def query_endpoint_with_json_payload(encoded_json, endpoint_name, content_type="application/json"):
    client = boto3.client("runtime.sagemaker")
    response = client.invoke_endpoint(
        EndpointName=endpoint_name, ContentType=content_type, Body=encoded_json
    )
    return response


def parse_response_model_flan_t5(query_response):
    model_predictions = json.loads(query_response["Body"].read())
    generated_text = model_predictions["generated_texts"]
    return generated_text


def parse_response_multiple_texts_bloomz(query_response):
    generated_text = []
    model_predictions = json.loads(query_response["Body"].read())
    for x in model_predictions[0]:
        generated_text.append(x["generated_text"])
    return generated_text

from typing import Dict, Any
from langchain.llms import AmazonAPIGateway
from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.agents import AgentType

class Llama2ContentHandlerAmazonAPIGateway:
    """Adapter to prepare the inputs from Langchain to a format
    that LLM model expects.

    It also provides helper function to extract
    the generated text from the model response."""

    @classmethod
    def transform_input(
        cls, prompt: str, model_kwargs: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"inputs": prompt, "parameters": model_kwargs}

    @classmethod
    def transform_output(cls, response: Any) -> str:
        return response.json()[0]["generation"]

llm = AmazonAPIGateway(api_url=LLM_URL, headers=HEADERS)
llm.content_handler = Llama2ContentHandlerAmazonAPIGateway()

### Without providing the context
- 컨텍스트 없이 질의응답 수행 (모델 환각 확인) 

In [None]:
question = "Which instances can I use with Managed Spot Training in Amazon SageMaker?"

parameters = {
    'max_new_tokens': 64,
    'top_p': 0.9,
    'temperature': 0.6,
    'return_full_text': False
}

payload = {
    'inputs': question,
    'parameters': parameters
}


response = requests.post(url=LLM_URL, headers=HEADERS, json=payload)
print(json.dumps(response.json(), indent=2))

### With Context
- 추가 컨텍스트 or few-shot 제공

In [None]:
context = """Managed Spot Training can be used with all instances supported in Amazon SageMaker. 
Managed Spot Training is supported in all AWS Regions where Amazon SageMaker is currently available."""
    
prompt = """Answer based on context:\n\n{context}\n\n{question}"""

text_input = prompt.replace("{context}", context)
text_input = text_input.replace("{question}", question)

payload = {"inputs": text_input, "parameters": parameters}
response = requests.post(url=LLM_URL, headers=HEADERS, json=payload)
print(response.json()[0]["generation"])

### Apply LangChain


In [None]:
prompt = "What is Amazon SageMaker's advantages for Data Scientists? Please summarize in 100 words"
llm.model_kwargs = parameters
result = llm(text_input)
#result = llm(prompt)

print(result)

<br>

## Step 3. Use RAG based approach with [LangChain](https://python.langchain.com/en/latest/index.html) 
---

In [None]:
import requests
from typing import Any, Dict, List
from pydantic import BaseModel, root_validator
from langchain.embeddings.base import Embeddings

# this is an example handler
class ContentHandlerEmbeddingAmazonAPIGateway:
    """Adapter to prepare the inputs from Langchain to a format
    that LLM model expects.

    It also provides helper function to extract
    the generated text from the model response."""

    @classmethod
    def transform_input(
        cls, prompt: str, model_kwargs: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {"text_inputs": prompt}

    @classmethod
    def transform_output(cls, response: Any) -> str:
        return response.json()["embedding"]
        

class EmbeddingAmazonApiGateway(BaseModel, Embeddings):

    api_url: str
    """API Gateway URL"""

    headers: Optional[Dict] = None
    """API Gateway HTTP Headers to send, e.g. for authentication"""

    model_kwargs: Optional[Dict] = None
    """Key word arguments to pass to the model."""

    content_handler: ContentHandlerEmbeddingAmazonAPIGateway = ContentHandlerEmbeddingAmazonAPIGateway()
    """The content handler class that provides an input and
    output transform functions to handle formats between LLM
    and the endpoint.
    """

    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        try:
            if values["headers"] is None:
                values["headers"] = {
                    'Content-Type': 'application/json',
                    'Accept': 'application/json'
                }
        except Exception as error:
            pass

        return values
    
    class Config:
        """Configuration for this pydantic object."""
        skip_on_failure = True
        arbitrary_types_allowed=True
        # extra = Extra.forbid


    def _embedding_func(self, texts: List[str]) -> List[List[float]]:
        """Call out to SageMaker Inference embedding endpoint."""
        # replace newlines, which can negatively affect performance.
        texts = list(map(lambda x: x.replace("\n", " "), texts))
        
        _model_kwargs = self.model_kwargs or {}

        payload = self.content_handler.transform_input(texts, _model_kwargs)
        
        # content_type = self.content_handler.content_type
        # accepts = self.content_handler.accepts
        
        try:
            response = requests.post(
                self.api_url,
                headers=self.headers,
                json=payload,
            )
            
            text = self.content_handler.transform_output(response)

        except Exception as error:
            raise ValueError(f"Error raised by the service: {error}")
        
        return text

    def embed_documents(
        self, texts: List[str], chunk_size: int = 64
    ) -> List[List[float]]:
        """Compute doc embeddings using a SageMaker Inference Endpoint.

        Args:
            texts: The list of texts to embed.
            chunk_size: The chunk size defines how many input texts will
                be grouped together as request. If None, will use the
                chunk size specified by the class.


        Returns:
            List of embeddings, one for each text.
        """
        results = []
        _chunk_size = len(texts) if chunk_size > len(texts) else chunk_size
        for i in range(0, len(texts), _chunk_size):
            response = self._embedding_func(texts[i : i + _chunk_size])
            results.extend(response)
        return results

    def embed_query(self, text: str) -> List[float]:
        """Compute query embeddings using a SageMaker inference endpoint.

        Args:
            text: The text to embed.

        Returns:
            Embeddings for the text.
        """
        
        return self._embedding_func([text])[0]
    
    

In [None]:
emb = EmbeddingAmazonApiGateway(api_url=EMB_URL)

In [None]:
prompt = "What is Amazon SageMaker's advantages for Data Scientists? Please summarize in 100 words"
result = emb.embed_query(prompt)
print(result[0:5])

In [None]:
#testing our embeddings model
print(emb.embed_documents(["Hello World"])[0][:5])

Next, we wrap up our SageMaker endpoints for LLM into `langchain.llms.sagemaker_endpoint.SagemakerEndpoint`. 

In [None]:
sm_llm = llm

### Question and Answering with RAG and Lanchain

Langchain uses different "components" to implement a RAG architecture:

- Document loaders: Load documents from many different sources
- Document transformers: Split documents, drop redundant documents, and more
- Text embedding models: Take unstructured text and turn it into a list of floating point numbers
- Vector stores: Store and search over embedded data
- Retrievers: Query your data

### Document Loaders

Document loader를 사용하여 원본 소스에서 데이터를 문서로 로드합니다. 문서는 텍스트와 관련 메타데이터를 의미합니다. 예를 들어 간단한 텍스트 파일을 로드하거나 웹페이지의 텍스트 콘텐츠를 로드하거나 YouTube 동영상의 스크립트를 로드하기 위한 Document loader가 있습니다. Document loader는  기본적으로 'load' 메서드를 사용하며, 상황에 따라 'lazy load'도 사용할 수 있습니다.

pdf, html, json, txt, csv와 같은 다양한 파일 유형에 사용할 수 있는 다양한 'loader'는 물론 Slack, Twitter 등과 같은 타사 플랫폼과의 통합도 지원합니다. 전체 목록은 여기에서 확인해 주세요. https://python.langchain.com/docs/modules/data_connection/document_loaders


In [None]:
from langchain.document_loaders import TextLoader
from langchain.indexes import VectorstoreIndexCreator
from langchain.vectorstores import Chroma, AtlasDB, FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain import PromptTemplate
from langchain.chains.question_answering import load_qa_chain
from langchain.document_loaders.csv_loader import CSVLoader
from langchain.chains import RetrievalQA
from langchain.chains.question_answering import load_qa_chain

이제 예제 데이터를 다운로드합니다. Amazon SageMaker FAQ (https://aws.amazon.com/sagemaker/faqs/) 를 지식 라이브러리로 사용하겠습니다. 데이터는 질문과 답변의 두 열이 있는 CSV 파일로 구성되며, 이 중에서 답변 열만 지식 라이브러리의 문서로 사용하여 쿼리 기반으로 관련 문서를 검색합니다.

**필요에 따라 예제 데이터 세트를 여러분의 QnA 데이터 세트로 대체하여 구축할 수 있습니다.

In [None]:
tmp_folder = "rag_data"

sagemaker_faq = "s3://jumpstart-cache-prod-us-east-2/training-datasets/Amazon_SageMaker_FAQs/"

!rm -rf $tmp_folder
!mkdir -p $tmp_folder
!aws s3 cp --recursive $sagemaker_faq rag_data

In [None]:
import os, glob
import pandas as pd
all_files = glob.glob(os.path.join("rag_data/", "*.csv"))

In [None]:
df_knowledge = pd.concat(
    (pd.read_csv(f, header=None, names=["Question", "Answer"]) for f in all_files),
    axis=0,
    ignore_index=True,
)

In [None]:
#drop the question column as we're not using it for the exercise.
df_knowledge.drop(["Question"], axis=1, inplace=True)

#saving the modified df 
df_knowledge.to_csv("rag_data/processed_data.csv", header=False, index=False)

df_knowledge.head(5)

Use langchain to read the `csv` data. There are multiple built-in functions in LangChain to read different format of files such as `txt`, `html`, and `pdf`. For details, see [LangChain document loaders](https://python.langchain.com/en/latest/modules/indexes/document_loaders.html).

In [None]:
csv_loader = CSVLoader(file_path="rag_data/processed_data.csv")

### Create the vectorstore index

VectorstoreIndexCreator 로 빠르게 구현하기
- FAISS: https://github.com/facebookresearch/faiss
- LangChain document: https://python.langchain.com/docs/modules/data_connection/vectorstores/

In [None]:
index_creator = VectorstoreIndexCreator(
    vectorstore_cls=FAISS,
    embedding=emb,
    #embedding=embeddings,
    text_splitter=RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=0, separators=[" ", ",", ".", "\n"])
)

In [None]:
index = index_creator.from_loaders([csv_loader])

In [None]:
print(question)

In [None]:
print(index.query(question=question, llm=sm_llm))

<br>

## Step 4. Customize the QA application above with different prompt.
---

### Use the vectorstore index as a retriever within a RetrievalQA chain

위의 예시처럼 RAG를 매우 편리하고 빠르게 구현할 수 있지만, VectorstoreIndex는 "블랙박스"처럼 사용 중인 프롬프트를 완전히 제어할 수 있는 옵션이 제공되지 않는다는 것을 알 수 있습니다. 이를 위한 한 가지 방법은 "retriever(검색기)"로 래핑하고 사용자 지정 프롬프트 템플릿을 활용하는 RetrievalQA 객체와 vectorstore index를 retriever 객체로 사용하는 것입니다.

In [None]:
retriever = index.vectorstore.as_retriever()

In [None]:
from langchain.prompts import PromptTemplate
prompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.

{context}

Question: {question}

Answer:"""
PROMPT = PromptTemplate(
    template=prompt_template, input_variables=["context", "question"]
)

In [None]:
chain_type_kwargs = {"prompt": PROMPT}
qa = RetrievalQA.from_chain_type(llm=sm_llm, chain_type="stuff", retriever=retriever, chain_type_kwargs=chain_type_kwargs)

In [None]:
print(qa.run(question))

### Customise prompt and text transformers

Let's go even further and break down the above `VectorstoreIndexCreator` and see what's happening under the hood. Beside customising the default prompt, we'll also use a specific text transformer to split our documents.

To integrate with OpenSearch you'd typically break it down and manage the documents retrieval independently from the question & answering with LLM.

한 걸음 더 나아가 위의 `VectorstoreIndexCreator`를 분해하여 내부에서 어떤 일이 일어나는지 살펴봅시다. 기본 프롬프트를 사용자 정의하는 것 외에도 특정 텍스트 변환기를 사용하여 문서를 청킹(분할)할 것입니다. OpenSearch와 통합하려면 일반적으로 문서를 청킹하는 것을 권장합니다.

In [None]:
#using the same loader
documents = csv_loader.load()

#looking into the first docs
print(documents[:3])

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Get your splitter ready
text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=5)

# Split your docs into texts
texts = text_splitter.split_documents(documents)

# generate embeddings and load that into FAISS
docsearch = FAISS.from_documents(texts, emb)

In [None]:
#feel free to change the question
print(f'question:{question}')

Based on the question above, we then **identify top K most relevant documents based on user query, where K = 3 in this setup**.

In [None]:
docs = docsearch.similarity_search(question, k=3)
print(docs)

Finally, we **combine the retrieved documents with prompt and question and send them into SageMaker LLM.** 

We define a customized prompt as below.

In [None]:
prompt_template = """{context}\n\nGiven the above context, answer the following question:\n{question}\n\nAnswer:"""

PROMPT = PromptTemplate(template=prompt_template, input_variables=["context", "question"])

In [None]:
chain = load_qa_chain(llm=sm_llm, prompt=PROMPT)

가장 관련성이 높은 상위 3개의 문서를 LLM의 컨텍스트로 전송합니다. 특히 입력 토큰 크기가 제한되어 있기에, 상위 k개의 문서만 전달함으로써 LLM으로 전달되는 컨텍스트의 양을 제어합니다.

In [None]:
result = chain({"input_documents": docs, "question": question})[
    "output_text"
]
print(result)

<br>

## Step 5. Additional exercises
---

- https://python.langchain.com/en/latest/modules/indexes/document_loaders.html

In [None]:
from langchain.document_loaders import WikipediaLoader
from langchain.document_loaders import UnstructuredURLLoader
from langchain.document_loaders import PDFPlumberLoader

### Wikipedia as source

위키피디아에서 자료 가져오기

In [None]:
wikipedia_loader = WikipediaLoader(query="AWS", load_max_docs=2)
wikipedia_texts = wikipedia_loader.load_and_split(text_splitter=text_splitter)

### URLs as source

인터넷 웹페이지 크롤링 - Amazon Rekognition 온라인 문서 추출

In [None]:
# !pip install unstructured

In [None]:
urls = ["https://docs.aws.amazon.com/rekognition/latest/dg/labels.html", 
        "https://docs.aws.amazon.com/rekognition/latest/dg/faces.html",
       "https://docs.aws.amazon.com/rekognition/latest/dg/collections.html",
       "https://docs.aws.amazon.com/rekognition/latest/dg/celebrities.html"]
url_loader = UnstructuredURLLoader(urls=urls)
url_texts = url_loader.load_and_split(text_splitter=text_splitter)

### PDF source

PDF 소스 활용 - RAG 논문 

In [None]:
import requests
sagemaker_pdf_url = "https://arxiv.org/pdf/2005.11401"
response = requests.get(sagemaker_pdf_url)
file = open(f"./{tmp_folder}/rag_paper.pdf", "wb")
file.write(response.content)
file.close()

In [None]:
#possible free options: PyPDFLoader, PDFPlumberLoader, PyMuPDFLoader, PDFMinerLoader, PyPDFium2Loader
pdf_loader = PDFPlumberLoader(f"./{tmp_folder}/rag_paper.pdf")
pdf_texts = pdf_loader.load_and_split(text_splitter=text_splitter)

### Build vector index

위키피디아 + PDF + 웹크롤링 정보로 벡터 인덱스 구축

In [None]:
all_texts = wikipedia_texts + pdf_texts + url_texts

In [None]:
print(f'total texts size:{len(all_texts)}')

In [None]:
!pip install faiss-gpu

In [None]:
# Embed your texts
agg_docsearch = FAISS.from_documents(all_texts, emb)

In [None]:
rekognition_question = "what kind of information does Amazon Rekognition Image returns about image quality?"
aws_question = "what is AWS market share for cloud infrastructure?"
rag_question = "what datasets were used for experiments with RAG?"
questions_list = [rekognition_question, aws_question, rag_question]

In [None]:
for q in questions_list:
    res_docs = agg_docsearch.similarity_search(q, k=15)
    out = chain({"input_documents": res_docs, "question": q})["output_text"]
    print(f'{out}\n')