# Example of End-To-End Retrieval Augmented Generation Workflow

**Authors**:
- Henry Wicaksono (henry.wicaksono@gdplabs.id)

**Reviewers**:
- Kevin Yauris (kevin.yauris@gdplabs.id)
- Timotius Nugraha Chandra (timotius.n.chandra@gdplabs.id)

## References

[1] [GDP Labs Gen AI SDK - Document Processing Orchestrator: Parser Chunker](https://docs.glair.ai/generative-internal/modules/document-processing-orchestrator/parser-chunker)  
[2] [GDP Labs Gen AI SDK - Document Processing Orchestrator: Metadata Generator](https://docs.glair.ai/generative-internal/modules/document-processing-orchestrator/metadata-generator)  
[3] [GDP Labs Gen AI SDK - Document Processing Orchestrator: Indexer](https://docs.glair.ai/generative-internal/modules/document-processing-orchestrator/indexer)  
[4] [GDP Labs Gen AI SDK - Retrieval: Retriever](https://docs.glair.ai/generative-internal/modules/retrieval/retriever)  
[5] [GDP Labs Gen AI SDK - Inference Orchestrator: LLM](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/model-io/llm)  
[6] [GDP Labs Gen AI SDK - Inference Orchestrator: Prompt Builder](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/model-io/prompt-builder)  
[7] [GDP Labs Gen AI SDK - Inference Orchestrator: Use Case Handler](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/use-case-handler)  
[8] [GDP Labs Gen AI SDK - Inference Orchestrator: Flow Executor](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/flow-executor)

# Overview

In this example, we're going to use the following GDP Labs Gen AI SDK modules to perform end-to-end Retrieval Augmented Generation (RAG):
- [Document Processing Orchestrator](https://docs.glair.ai/generative-internal/modules/document-processing-orchestrator): To process document and index them into vector databases.
- [Retrieval](https://docs.glair.ai/generative-internal/modules/retrieval): To retrieve knowledge from a certain source to be used in the RAG flow.
- [Inference Orchestrator](https://docs.glair.ai/generative-internal/modules/inference-orchestrator): To perform the inference in the RAG flow.

# Prepare Environment

Before we start, ensure you have a GitHub account with access to the GDP Labs GenAI SDK GitHub repository. Then, follow these steps to create a personal access token:
1. Log in to your [GitHub](https://github.com/) account.
2. Navigate to the [Personal Access Tokens](https://github.com/settings/tokens) page.
3. Select the `Generate new token` option. You can use the classic version instead of the beta version.
4. Fill in the required information, ensuring that you've checked the `repo` option to grant access to private repositories.
5. Save the newly generated token.

In [1]:
import getpass
import subprocess
import sys

def install_sdk_library() -> None:
    """Installs the `gdplabs_gen_ai` library from a private GitHub repository using a Personal Access Token.

    This function prompts the user to input their Personal Access Token for GitHub authentication. It then constructs
    the repository URL with the provided token and executes a subprocess to install the library via pip from the
    specified repository.

    Raises:
        subprocess.CalledProcessError: If the installation process returns a non-zero exit code.

    Note:
        The function utilizes `getpass.getpass()` to securely receive the Personal Access Token without echoing it.
    """
    token = getpass.getpass("Input Your Personal Access Token: ")
    repo_url_with_token = f"https://{token}@github.com/GDP-ADMIN/gen-ai-internal.git"
    cmd = ["pip", "install", f"gdplabs_gen_ai[eval] @ git+{repo_url_with_token}", "-q"]

    try:
        with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                              text=True, bufsize=1, universal_newlines=True) as process:
            for line in process.stdout:
                sys.stdout.write(line)

            process.wait()  # Wait for the process to complete.
            if process.returncode != 0:
                raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd)
    except Exception as e:
        print(f"An error occurred: {e}.")

install_sdk_library()

Input Your Personal Access Token:  ········


<b>Warning:</b>
After running the command above, you need to restart the runtime in Google Colab for the changes to take effect. Not doing so might lead to the newly installed libraries not being recognized.

To restart the runtime in Google Colab:
- Click on the `Runtime` menu.
- Select `Restart runtime`.

Once you have completed the previous step, you are ready to start using the SDK.

# Set Up OpenAI API Key

Since we're going to use an OpenAI model in this example, we'd need to set up an OpenAI API key:

In [2]:
import os

os.environ["OPENAI_API_KEY"] = getpass.getpass("Input Your OpenAI API Key: ")

Input Your OpenAI API Key:  ········


# Processing Document Using Document Processing Orchestrator Modules

First, we're going to utilize the interfaces in the `Document Processing Orchestrator` modules to process a document and index it to a vector database. In this example, we'll be using [LangChain's UnstructuredPDFLoader](https://api.python.langchain.com/en/latest/document_loaders/langchain_community.document_loaders.pdf.UnstructuredPDFLoader.html#) to load the document. Let's start by installing the required packages:

In [5]:
!pip install unstructured==0.10.16 -q
!pip install pdfminer.six -q
!pip install pdf2image -q

Then, let's prepare the documents. In this example, we're going to use a PDF file called `gdplabs.pdf` that contains information about GDP Labs. You can find it in the [data/rag](https://github.com/glair-ai/glair-gen-ai-examples/tree/main/notebooks/data/rag) folder. Then, let's also define some of the paths that we're going to need later:

In [3]:
DOCUMENT_PATH = "data/rag/gdplabs.pdf"
CSV_PATH = "data/rag/gdplabs.csv"
CHROMA_PATH = "data/rag/gdplabs"

### Process Document Into Chunks With ParserChunker

Let's start working with the `Document Processing Orchestrator`. Here, we're creating an implementation of the [ParserChunker](https://docs.glair.ai/generative-internal/modules/document-processing-orchestrator/parser-chunker) interface that will load the document, parse the contents, split them into chunks, and store them as a CSV file.

In [4]:
from typing import List

from gdplabs_gen_ai.document_processing_orchestrator.parser_chunker import BaseParserChunker
from langchain.document_loaders import UnstructuredPDFLoader
import pandas as pd

class ParserChunker(BaseParserChunker):
    def parse_chunk(self, path_input: str, path_output: str) -> None:
        parsed_text = self._parse(path_input)
        chunk_list = self._chunk(parsed_text)

        df = pd.DataFrame(data={"chunk": chunk_list})
        df.to_csv(path_output, index=False)
        print("Successfully parsed and chunked document!")

    def _parse(self, path_input: str) -> str:
        loader = UnstructuredPDFLoader(path_input)
        page_content_list = [doc.page_content for doc in loader.load()]
        return "\n---\n".join(page_content_list)

    def _chunk(self, parsed_text: str) -> List[str]:
        chunk_list = parsed_text.split("\n---\n")
        title = chunk_list[0].strip()
        return [f"{title}\n{chunk.strip()}" for chunk in chunk_list[1:]]

parser_chunker = ParserChunker()
parser_chunker.parse_chunk(DOCUMENT_PATH, CSV_PATH)

Successfully parsed and chunked document!


### Add Metadata With MetadataGenerator

Next, we're utilizing an implementation of the [MetadataGenerator](https://docs.glair.ai/generative-internal/modules/document-processing-orchestrator/metadata-generator) interface that will add metadata to the chunks. Here, we're adding `chunk_id`, `chunk_size`, `prev_chunk_id`, and `next_chunk_id` as the metadata.

In [5]:
from gdplabs_gen_ai.document_processing_orchestrator.metadata_generator import BaseMetadataGenerator
from uuid import uuid4
import tiktoken

class MetadataGenerator(BaseMetadataGenerator):
    encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")

    def generate_metadata(self, path_input: str, path_output: str) -> None:
      chunk_list = pd.read_csv(path_input).chunk.tolist()
      chunk_size_list = [self._count_token(chunk) for chunk in chunk_list]
      chunk_id_list = [str(uuid4()) for chunk in chunk_list]
      prev_chunk_id_list = ["-"] + chunk_id_list[:-1]
      next_chunk_id_list = chunk_id_list[1:] + ["-"]

      df = pd.DataFrame(data={
          "chunk_id": chunk_id_list,
          "chunk": chunk_list,
          "chunk_size": chunk_size_list,
          "prev_chunk_id": prev_chunk_id_list,
          "next_chunk_id": next_chunk_id_list,
      })
      df.to_csv(path_output, index=False)
      print("Successfully generated metadata!")

    def _count_token(self, text: str) -> int:
        return len(self.encoding.encode(text))

metadata_generator = MetadataGenerator()
metadata_generator.generate_metadata(CSV_PATH, CSV_PATH)

Successfully generated metadata!


### Index Into Vector Database With Indexer

Last, we're going to insert the chunks along with the generated metadata into a vector database. We can do this by creating an implementation of the [Indexer](https://docs.glair.ai/generative-internal/modules/document-processing-orchestrator/indexer) interface. For simplicity sake, let's use [Chroma](https://python.langchain.com/docs/integrations/vectorstores/chroma) vector database in this example.

In [6]:
from gdplabs_gen_ai.document_processing_orchestrator.indexer import BaseIndexer
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema import Document

class Indexer(BaseIndexer):
    def index(self, path_input: str, db_url: str, index_name: str) -> None:
        df = pd.read_csv(path_input)
        documents = []
        for _, row in df.iterrows():
            documents.append(Document(
                page_content=row["chunk"],
                metadata={
                    "chunk_id": row["chunk_id"],
                    "chunk_size": row["chunk_size"],
                    "prev_chunk_id": row["prev_chunk_id"],
                    "next_chunk_id": row["next_chunk_id"],
                }
            ))
        embeddings = OpenAIEmbeddings()
        chroma = Chroma.from_documents(
            documents,
            embeddings,
            collection_name=index_name,
            persist_directory=db_url,
        )
        print("Successfully indexed data to vector database!")

    def delete_document(self, db_url: str, index_name: str, document_ids: List[str]) -> None:
        pass

indexer = Indexer()
indexer.index(CSV_PATH, CHROMA_PATH, "gdplabs")

Successfully indexed data to vector database!


# Retrieving Data Using Retrieval Modules

After we've successfully indexed the data into the vector database, we can utilize the [VectorDBSimilaritySearchRetriever](https://docs.glair.ai/generative-internal/modules/retrieval/retriever#vector-db-similarity-search-retriever) class to retrieve them. This class retrieves chunks from a vector database based on their semantic similarity with the given query.

In [19]:
from gdplabs_gen_ai.retrieval.retriever.vector_db_similarity_search_retriever import VectorDBSimilaritySearchRetriever

embeddings = OpenAIEmbeddings()
chroma = Chroma(
    persist_directory=CHROMA_PATH,
    embedding_function=embeddings,
    collection_name="gdplabs"
)
retriever = VectorDBSimilaritySearchRetriever(chroma)

docs = retriever.get_relevant_documents("What does GLAIR specialize in?")
print(docs[0].page_content)

GDP Labs
Part of Us

1. Visit CATAPA

CATAPA Intelligent Payroll Platform CATAPA 3S Payroll Platform is a Swift, Simple, and Secure Payroll Solution that is specifically designed to help you run your company payroll to be 15x more efficient. Say goodbye to time-wasting payroll process and save up to 12.000 minutes of your time in a year with a fast 1 minute payroll process for 300 employees*. Focus on things that truly matter for your business with CATAPA as your company payroll platform choice.

2. Visit GLAIR

GLAIR - Accelerate Digital Transformation One-stop technology consulting services in Indonesia. GLAIR specialise in Artificial Intelligence, Blockchain, Cloud, Data, Mobile, Web, and Security and ready to help your company embrace the next wave of technologies

a. GLAIR Analytics

A simple & intelligent way to turn your data into insights. Using Artificial Intelligence combined with our expertise in big data to discover valuable insights from your data b. GLAIR Consulting

Work

# Performing Inference Using Inference Orchestrator

Finally, to perform an RAG inference, we can use the retriever object we've just created along with the following [Inference Orchestrator](https://docs.glair.ai/generative-internal/modules/inference-orchestrator) modules:
1. [ChatOpenAILLM](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/model-io/llm#chat-open-aillm): To utilize OpenAI's chat models to generate response. The default model used is `gpt-3.5-turbo`.
2. [PromptBuilder](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/model-io/prompt-builder#prompt-builder): To manage prompt templates and format them before sending them to the LLM.
3. [QAUseCaseHandler](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/use-case-handler#qa-use-case-handler): To handle RAG-like flow where we retrieve knowledge from a certain source and uses it as an additional context.
4. [FlowExecutor](https://docs.glair.ai/generative-internal/modules/inference-orchestrator/flow-executor): To wrap the other components and executes the whole flow. It enables easier streaming management.

In [21]:
from threading import Thread

from gdplabs_gen_ai.inference_orchestrator.llm import ChatOpenAILLM, Generator
from gdplabs_gen_ai.inference_orchestrator.prompt import PromptBuilder
from gdplabs_gen_ai.inference_orchestrator.use_case import QAUseCaseHandler
from gdplabs_gen_ai.inference_orchestrator import FlowExecutor

llm = ChatOpenAILLM()
prompt_builder = PromptBuilder.from_template("answer_question")
retriever_map = {"topic_1": retriever}
configs = {"model_max_context_token": 400}

use_case_handler = QAUseCaseHandler(retriever_map, llm, prompt_builder, configs=configs)
flow_executor = FlowExecutor(use_case_handler)

generator = Generator()
message = "What does GLAIR specialize in?"
thread = Thread(target=flow_executor.run_flow, args=(generator, message, "topic_1"))
thread.start()
thread.join()

for token in generator:
    print(token, end="", flush=True)

[2024-01-05 10:29:29 +0700] [13759] [INFO] Processing the message into the use case flow...
[2024-01-05 10:29:29 +0700] [13759] [INFO] Searching for relevant documents in the topic_1 collection.
[2024-01-05 10:29:29 +0700] [13759] [INFO] Found 4 most relevant documents.
[2024-01-05 10:29:29 +0700] [13759] [INFO] Enough documents are retrieved. Using 1 of the most relevant documents to answer the question.


GLAIR specializes in Artificial Intelligence, Blockchain, Cloud, Data, Mobile, Web, and Security.

# Conclusion

In this example, we've learned how to use the GDP Labs Gen AI SDK's `Document Processing Orchestrator`, `Retrieval`, and `Inference Orchestrator` modules to perform an end-to-end Retrieval Augmented Generation flow.