# 🚀 LLMOps for Production RAG

<a target="_blank" href="https://colab.research.google.com/github/unionai-oss/llmops-production-rag/blob/main/workshop.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

Welcome to the LLMOps for Production RAG workshop! In this workshop, we will cover:

1. Creating a baseline RAG pipeline
2. Bootstrapping an evaluation dataset
3. RAG Hyperparameter Optimization

## 📦 Install Dependencies

In [None]:
%pip install gradio

try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    !git clone https://github.com/unionai-oss/llmops-production-rag.git
    %cd /content/llmops-production-rag
    !pip install .

Cloning into 'llmops-production-rag'...
remote: Enumerating objects: 285, done.[K
remote: Counting objects: 100% (285/285), done.[K
remote: Compressing objects: 100% (189/189), done.[K
remote: Total 285 (delta 180), reused 193 (delta 92), pack-reused 0 (from 0)[K
Receiving objects: 100% (285/285), 351.23 KiB | 3.38 MiB/s, done.
Resolving deltas: 100% (180/180), done.
/content/llmops-production-rag
Processing /content/llmops-production-rag
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting flytekit@ git+https://github.com/flyteorg/flytekit.git@bugfix/enable-notebook-registration (from llmops_rag==0.0.1)
  Cloning https://github.com/flyteorg/flytekit.git (to revision bugfix/enable-notebook-registration) to /tmp/pip-install-xajy84nn/flytekit_ded4c5d452d242c392fad2c5a58e66ff
  Running command git clone --filter=blob:none --quiet https://github.com/flyteorg/

While dependencies are being installed, create an account on Union Serverless:

👉 https://signup.union.ai/

Go to the Union Serverless dashboard to make sure you can check out the UI:

👉 https://serverless.union.ai/

Then, login to Union on this notebook session:

In [None]:
%cd /content/llmops-production-rag
!union create login --auth device-flow --serverless

## 🔑 Create OpenAI API Key Secret on Union

First go to https://platform.openai.com/account/api-keys and create an OpenAI API key.

Then, run the following command to make the secret accessible on Union:

In [None]:
!union create secret openai_api_key

In [None]:
!union get secret

If you have issues with the secret, you can delete it by uncommenting the code cell below:

In [None]:
#!union delete secret openai_api_key

## 🗂️ Creating a Baseline RAG Pipeline

Create the vector store:

In [None]:
import pandas as pd
import union
from typing import Optional
from llmops_rag.vector_store import create_knowledge_base, chunk_and_embed_documents


@union.workflow
def create_vector_store(
    root_url_tags_mapping: Optional[dict] = None,
    splitter: str = "character",
    chunk_size: int = 2048,
    limit: Optional[int | float] = None,
    embedding_model: Optional[str] = "text-embedding-ada-002",
    exclude_patterns: Optional[list[str]] = None,
) -> union.FlyteDirectory:
    """
    Workflow for creating the vector store knowledge base.
    """
    docs = create_knowledge_base(
        root_url_tags_mapping=root_url_tags_mapping,
        limit=limit,
        exclude_patterns=exclude_patterns,
    )
    vector_store = chunk_and_embed_documents(
        documents=docs,
        splitter=splitter,
        chunk_size=chunk_size,
        embedding_model=embedding_model,
    )
    return vector_store

To execute this workflow, let's create a `UnionRemote` object:

In [None]:
remote = union.UnionRemote()

Then, we execute the `create_vector_store` workflow with the `.execute()` method:

In [None]:
vector_store_execution = remote.execute(
    create_vector_store,
    inputs=dict(
        limit=10,
        chunk_size=512,
        splitter="character",
    ),
)
vector_store_execution

⚠️ Note: The above command will take a few minutes to complete since we're building our container for the first time.

Then implement a basic RAG pipeline:

In [None]:
import union
from typing import Optional
from llmops_rag.vector_store import VectorStore
from llmops_rag.rag_basic import retrieve, generate


@union.workflow
def rag_basic(
    questions: list[str],
    vector_store: union.FlyteDirectory = VectorStore.query(),  # 👈 this uses the vector store artifact by default
    embedding_model: str = "text-embedding-ada-002",
    generation_model: str = "gpt-4o-mini",
    search_type: str = "similarity",
    rerank: bool = False,
    num_retrieved_docs: int = 20,
    num_docs_final: int = 5,
    prompt_template: Optional[str] = None,
) -> list[str]:
    contexts = retrieve(
        questions=questions,
        vector_store=vector_store,
        embedding_model=embedding_model,
        search_type=search_type,
        rerank=rerank,
        num_retrieved_docs=num_retrieved_docs,
        num_docs_final=num_docs_final,
    )
    return generate(
        questions=questions,
        contexts=contexts,
        generation_model=generation_model,
        prompt_template=prompt_template,
    )

In [None]:
remote = union.UnionRemote()
rag_basic_execution = remote.execute(
    rag_basic,
    inputs=dict(
        questions=["How do I read and write a pandas dataframe to csv format?"],
    ),
)
rag_basic_execution

Grab the output of the rag pipeline execution:

In [None]:
rag_basic_execution = remote.sync(remote.wait(rag_basic_execution))
print(rag_basic_execution.outputs["o0"][0])

### ✨ Maintaining a Fresh Vector Store

Let's use launch plan schedules to maintain a fresh vector store.

In [None]:
from datetime import timedelta
import flytekit as fl
import union


union.LaunchPlan.CACHE = {}
schedule_vector_store_lp = union.LaunchPlan.get_or_create(
    name="schedule_vector_store_lp",
    workflow=create_vector_store,
    default_inputs=dict(
        limit=10,
        chunk_size=512,
        splitter="character",
    ),
    schedule=fl.FixedRate(
        duration=timedelta(minutes=2)
    )
)

In [None]:
version = "workshop-v0"
remote = union.UnionRemote()

registered_schedule_vector_store_lp = remote.register_launch_plan(
    schedule_vector_store_lp,
    version=version,
)
url = remote.generate_console_url(registered_schedule_vector_store_lp)
remote.activate_launchplan(registered_schedule_vector_store_lp.id)
print(f"🚀 Launch plan activated: {url}")

In [None]:
remote.deactivate_launchplan(registered_schedule_vector_store_lp.id)

Go to the Serverless dashboard to see the schedule in action.

Make sure to deactivate the launchplan in the UI!

### 💻 Run RAG pipeline with Gradio App

In [None]:
import union
import gradio as gr
from datetime import timedelta


def add_message(history, message):
    if message is not None:
        history.append({"role": "user", "content": message})
    return history, gr.Textbox(value=None, interactive=False)


def bot(history: list):
    remote = union.UnionRemote()
    last_user_message = [msg for msg in history if msg["role"] == "user"][-1]["content"]
    execution = remote.execute(rag_basic, inputs={"questions": [last_user_message]})
    url = remote.generate_console_url(execution)
    print(f"🚀 Union Serverless execution url: {url}")

    answers = None
    execution = remote.wait(execution, poll_interval=timedelta(seconds=2))
    answers = execution.outputs["o0"]

    if answers is None:
        raise RuntimeError("Failed to get answer")

    answer = answers[0]

    history.append({"role": "assistant", "content": ""})
    history[-1]["content"] += answer
    yield history


with gr.Blocks() as demo:
    chatbot = gr.Chatbot(elem_id="chatbot", type="messages")

    chat_input = gr.Textbox(
        interactive=True,
        placeholder="How do I write a dataframe to csv?",
        show_label=False,
    )
    chat_msg = chat_input.submit(
        add_message, [chatbot, chat_input], [chatbot, chat_input]
    )
    bot_msg = chat_msg.then(bot, chatbot, chatbot, api_name="bot_response")
    bot_msg.then(lambda: gr.Textbox(interactive=True), None, [chat_input])


demo.launch(debug=True, share=True)

## 🥾 Bootstrapping an Evaluation Dataset

Then generate a question and answer dataset. This will use the raw knowledge base we created
in the previous step.

In [None]:
import functools
from typing import Annotated

import union
from llmops_rag.create_qa_dataset import generate_qa_datapoints, create_dataset, QuestionAndAnswerDataset
from llmops_rag.document import CustomDocument
from llmops_rag.vector_store import KnowledgeBase


@union.workflow
def create_qa_dataset(
    documents: list[CustomDocument] = KnowledgeBase.query(),
    n_questions_per_doc: int = 1,
    n_answers_per_question: int = 5,
) -> Annotated[union.FlyteFile, QuestionAndAnswerDataset]:
    partial_task = functools.partial(
        generate_qa_datapoints,
        n_questions_per_doc=n_questions_per_doc,
        n_answers_per_question=n_answers_per_question,
    )
    questions_and_answers = union.map_task(partial_task)(flyte_doc=documents)
    return create_dataset(questions_and_answers, n_answers_per_question)

In [None]:
remote = union.UnionRemote()
qa_dataset_execution = remote.execute(
    create_qa_dataset,
    inputs={"n_questions_per_doc": 3, "n_answers_per_question": 5}
)
qa_dataset_execution

Filter the dataset with an LLM critic:

In [None]:
import pandas as pd
import union
from llmops_rag.create_llm_filtered_dataset import apply_llm_critic, filter_dataset, prepare_dataset
from llmops_rag.create_qa_dataset import QuestionAndAnswerDataset


@union.workflow
def create_llm_filtered_dataset(
    dataset: union.FlyteFile = QuestionAndAnswerDataset.query(),
) -> pd.DataFrame:
    scores = apply_llm_critic(dataset)
    reference_answers = filter_dataset(dataset, scores)
    return prepare_dataset(reference_answers)

In [None]:
remote = union.UnionRemote()
filtered_dataset_execution = remote.execute(create_llm_filtered_dataset, inputs={})
filtered_dataset_execution

## 📊 RAG Hyperparameter Optimization

Experiment with different embedding models:

In [None]:
import union
from typing import Optional, Annotated
from llmops_rag.optimize_rag import (
    GridSearchConfig,
    prepare_hpo_configs,
    prepare_questions,
    gridsearch,
    combine_answers,
    evaluate,
    report,
)
from llmops_rag.config import RAGConfig
from llmops_rag.create_llm_filtered_dataset import EvalDatasetArtifact
import pandas as pd


@union.workflow
def optimize_rag(
    gridsearch_config: GridSearchConfig,
    root_url_tags_mapping: Optional[dict] = None,
    exclude_patterns: Optional[list[str]] = None,
    limit: Optional[int] = 10,
    eval_dataset: Annotated[pd.DataFrame, EvalDatasetArtifact] = EvalDatasetArtifact.query(dataset_type="llm_filtered"),
    eval_prompt_template: Optional[str] = None,
    n_answers: int = 1,
) -> RAGConfig:
    hpo_configs = prepare_hpo_configs(gridsearch_config)
    questions = prepare_questions(eval_dataset, n_answers)
    answers = gridsearch(questions, hpo_configs, root_url_tags_mapping, exclude_patterns, limit)
    answers_dataset = combine_answers(answers, hpo_configs, questions)
    best_config, evaluation, evalution_summary = evaluate(
        answers_dataset, eval_prompt_template
    )
    report(evaluation, evalution_summary)
    return best_config

In [None]:
import yaml

def run_experiment(config_path: str):
    with open(config_path, "r") as f:
        gridsearch_config = GridSearchConfig(**yaml.safe_load(f))

    remote = union.UnionRemote()
    execution = remote.execute(optimize_rag, inputs={"gridsearch_config": gridsearch_config})
    return execution


run_experiment("config/embedding_model_experiment.yaml")

### 🧪 More experiments to run (optional)

Uncomment the code cells below to run different experiments.

Experiment with different prompts:

In [None]:
# run_experiment("config/prompt_experiment.yaml")

Experiment with different chunksizes:

In [None]:
# run_experiment("config/chunksize_experiment.yaml")

Experiment with different splitters:

In [None]:
# run_experiment("config/splitter_experiment.yaml")

Experiment with reranking:

In [None]:
# run_experiment("config/reranking_experiment.yaml")

Experiment with document retrieval:

In [None]:
# run_experiment("config/search_params_experiment.yaml")

## 🔄 Putting it all together into a reactive pipeline

In [None]:
from typing import Optional, Annotated
from datetime import timedelta

import pandas as pd

import union
import flytekit as fl
from union.artifacts import OnArtifact

from llmops_rag.document import CustomDocument
from llmops_rag.create_qa_dataset import create_qa_dataset
from llmops_rag.create_llm_filtered_dataset import create_llm_filtered_dataset, EvalDatasetArtifact
from llmops_rag.image import image
from llmops_rag.optimize_rag import optimize_rag, GridSearchConfig
from llmops_rag.vector_store import create_knowledge_base as _create_knowledge_base


# Clear the launch plan cache
union.LaunchPlan.CACHE = {}


KnowledgeBaseHPO = union.Artifact(name="knowledge-base-hpo")


@union.task(
    container_image=image,
    requests=union.Resources(cpu="2", mem="8Gi"),
    enable_deck=True,
)
def create_knowledge_base_hpo(
    root_url_tags_mapping: Optional[dict] = None,
    limit: Optional[int | float] = None,
    exclude_patterns: Optional[list[str]] = None,
) -> Annotated[list[CustomDocument], KnowledgeBaseHPO]:
    return _create_knowledge_base.task_function(
        root_url_tags_mapping=root_url_tags_mapping,
        limit=limit,
        exclude_patterns=exclude_patterns,
    )


@union.workflow
def knowledge_base_workflow(
    root_url_tags_mapping: Optional[dict] = None,
    limit: Optional[int | float] = None,
    exclude_patterns: Optional[list[str]] = None,
) -> list[CustomDocument]:
    return create_knowledge_base_hpo(
        root_url_tags_mapping=root_url_tags_mapping,
        limit=limit,
        exclude_patterns=exclude_patterns,
    )


@union.workflow
def create_eval_dataset(
    documents: list[CustomDocument],
    n_questions_per_doc: int = 1,
    n_answers_per_question: int = 5,
) -> pd.DataFrame:
    qa_dataset = create_qa_dataset(
        documents=documents,
        n_questions_per_doc=n_questions_per_doc,
        n_answers_per_question=n_answers_per_question,
    )
    return create_llm_filtered_dataset(dataset=qa_dataset)


knowledge_base_lp = union.LaunchPlan.get_or_create(
    knowledge_base_workflow,
    name="knowledge_base_lp",
    default_inputs={"limit": 10},
    schedule=fl.FixedRate(duration=timedelta(minutes=2))
)

create_eval_dataset_lp = union.LaunchPlan.get_or_create(
    create_eval_dataset,
    name="create_eval_dataset_lp",
    trigger=OnArtifact(
        trigger_on=KnowledgeBaseHPO,
        inputs={"documents": KnowledgeBaseHPO.query()},
    )
)

optimize_rag_lp = union.LaunchPlan.get_or_create(
    optimize_rag,
    name="optimize_rag_lp",
    default_inputs={
        "gridsearch_config": GridSearchConfig(
            embedding_model=[
                "text-embedding-ada-002",
                "text-embedding-3-small",
                "text-embedding-3-large",
            ],
            chunk_size=[256],
            splitter=["recursive"],
        ),
    },
    trigger=OnArtifact(
        trigger_on=EvalDatasetArtifact,
        inputs={"eval_dataset": EvalDatasetArtifact.query()},
    )
)

In [None]:
remote = union.UnionRemote()

version = "workshop-v0"
registered_lps = []
for lp in [
    knowledge_base_lp,
    create_eval_dataset_lp,
    optimize_rag_lp,
]:
    registered_lp = remote.register_launch_plan(lp, version=version)
    registered_lps.append(registered_lp)
    remote.activate_launchplan(registered_lp.id)
    url = remote.generate_console_url(registered_lp)
    print(f"🚀 Launch plan {lp.name} activated: {url}")

Deactivate the reactive pipeline:

In [None]:
for registered_lp in registered_lps:
    remote.deactivate_launchplan(registered_lp.id)

## 🎉 Congrats!

You've completed the LLMOps for Production RAG workshop! To recap, you:
- Built a simple baseline RAG pipeline
- Scheduled a job to maintain a fresh vector store
- Bootstrapped an evaluation dataset
- Optimized the RAG pipeline with HPO and LLM-as-a-judge