# npr MC1: Cleantech Retrieval Augemented Generation

**Dominik Filliger, Nils Fahrni, Noah Leuenberger**

> The topic of Mini-Challenge 1 is retrieval augmented generation (RAG) incorporating a combination of unsupervised learning, pre-training and in-context learning techniques.

- [Description of the task](https://spaces.technik.fhnw.ch/storage/uploads/spaces/81/exercises/NPR-Mini-Challenge-1-Cleantech-RAG-1708982891.pdf)
- [Introduction to RAG](https://spaces.technik.fhnw.ch/storage/uploads/spaces/81/exercises/Retrieval-Augmented-Generation-Intro-1709021241.pdf)

This notebook serves as the main entry point for our solution to the NPR Mini-Challenge 1. We will provide a detailed explanation of our approach and the code we used to solve the task. However, we have outsourced the code for the evaluation, Langchain LLM model creation and vectorstore interaction to script files which can be found in the `src` directory.

Additionally, scripts for the development subset and subset evaluation set creation can be found in the `scripts` directory and will be referenced in their respective sections.


# Setup


In [None]:
import os

from dotenv import load_dotenv

load_dotenv()

DEV_MODE = False
USE_GPU = False
USE_CACHE = True

In [None]:
def get_path(dev_mode=DEV_MODE):
    if dev_mode:
        # Paths used in development mode
        return {
            'df_path': 'data/Cleantech Media Dataset/cleantech_media_dataset_v2_2024-02-23_subset.csv',
            'df_eval_path': 'data/Cleantech Media Dataset/cleantech_media_dataset_v2_2024-02-23_subset_eval.csv',
            'df_preprocessed_path': get_preprocessed_path(
                'data/Cleantech Media Dataset/cleantech_media_dataset_v2_2024-02-23_subset.csv'),
            'df_eval_preprocessed_path': get_preprocessed_path(
                'data/Cleantech Media Dataset/cleantech_media_dataset_v2_2024-02-23_subset_eval.csv')
        }
    else:
        # Paths used in production mode
        return {
            'df_path': 'data/Cleantech Media Dataset/cleantech_media_dataset_v2_2024-02-23.csv',
            'df_eval_path': 'data/Cleantech Media Dataset/cleantech_rag_evaluation_data_2024-02-23.csv',
            'df_preprocessed_path': get_preprocessed_path(
                'data/Cleantech Media Dataset/cleantech_media_dataset_v2_2024-02-23.csv'),
            'df_eval_preprocessed_path': get_preprocessed_path(
                'data/Cleantech Media Dataset/cleantech_rag_evaluation_data_2024-02-23.csv')
        }


def get_preprocessed_path(path: str) -> str:
    return path.replace(".csv", "_preprocessed.csv")


paths = get_path()
print(paths)

## Observability & Monitoring

> Phoenix is an open-source observability library designed for experimentation, evaluation, and troubleshooting. It allows AI Engineers and Data Scientists to quickly visualize their data, evaluate performance, track down issues, and export data to improve.

We will use Phoenix to visualize traces to quickly debug pipelines. The library offers way more feature which we will not use. Down below we add the Phoenix callbacks to Langchain, our main library for the solution, to visualize the traces.

To get quick access to the Phoenix dashboard, the dashboard is rendered in the notebook. The dashboard is interactive and can be used to explore the traces.


In [None]:
from phoenix.trace.langchain import LangChainInstrumentor
import phoenix as px

try:
    px.close_app()
    session = px.launch_app()
except:
    print(
        "Could not launch Phoenix app. Please make sure any existing Phoenix app process is closed. Restart the kernel if necessary.")

LangChainInstrumentor().instrument()

# Data Loading & Preprocessing


In [None]:
import pandas as pd

df = pd.read_csv(get_path()['df_path'])
df.head()

In [None]:
from src.preprocessing import Preprocessor

if os.path.exists(paths['df_preprocessed_path']) and USE_CACHE:
    df = pd.read_csv(paths['df_preprocessed_path'])
else:
    df = Preprocessor(df).preprocess()
    df.to_csv(paths['df_preprocessed_path'], index=False)

# Indexing

In [None]:
from langchain_community.embeddings.huggingface import HuggingFaceBgeEmbeddings

bge_embeddings = HuggingFaceBgeEmbeddings(
    model_name="BAAI/bge-small-en",
    model_kwargs={"device": "cpu" if not USE_GPU else "cuda:0"},
    encode_kwargs={"normalize_embeddings": True}
)

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

recursive_text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=0,
    length_function=len,
    is_separator_regex=False,
)

In [None]:
def create_documents(df: pd.DataFrame, text_splitter, verbose=True):
    metadata_cols = ['url', 'domain', 'title', 'author', 'date', 'id']
    if not all(col in df.columns for col in metadata_cols + ['content']):
        raise ValueError(
            f"DataFrame must contain all metadata columns and a 'content' column: {metadata_cols + ['content']}")

    metadata = df[metadata_cols].rename(columns={'id': 'origin_doc_id'}).to_dict('records')
    docs = text_splitter.create_documents(df['content'], metadata)

    if verbose:
        print(
            f"{text_splitter.__class__.__name__}: "
            f"Number of documents created: {len(docs)}, "
            f"Number of rows in source df: {len(df)}, "
            f"Percentage of documents created: {len(docs) / len(df) * 100:.2f}%")

    return docs


documents = create_documents(df, recursive_text_splitter)

In [None]:
documents[0]

## Vector Store

We will use [ChromaDB](https://www.trychroma.com/) to store the embeddings. For easier interaction with the embeddings, we will use the VectorStore class which is a wrapper around the embeddings and ChromaDB. It provides a simple interface to interact with the embeddings and ChromaDB functionality we need for the task.

### ChromaDB Setup

The ChromaDB uses a persistent client that persists the data to disk. The client is used to interact with the database.

### VectorStore Usage

The vector store is directly tied to the embeddings. Therefore a vector store is embedding specific and can only be used with the embeddings it was created with.

In [None]:
from src.vector_store import VectorStore


def get_vector_store_collection_name(embeddings, text_splitter):
    return f"cleantech-{embeddings.model_name}-{text_splitter.__class__.__name__}_{'dev' if DEV_MODE else 'prod'}".lower().replace(
        " ", "_").replace("/", "_").replace(":", "_").replace("-", "_")


bge_vector_store = VectorStore(embedding_function=bge_embeddings,
                               collection=get_vector_store_collection_name(bge_embeddings, recursive_text_splitter))

get_vector_store_collection_name(bge_embeddings, recursive_text_splitter)

In the next step we will add the prepared documents from the previous step to the VectorStore.

In [None]:
bge_vector_store.add_documents(documents, verbose=True)

After adding the documents to the vector store we can now perform similarity searches on the documents to verify that the interaction with the vector store works as expected.

In [None]:
bge_vector_store.similarity_search_w_scores("The company is also aiming to reduce gas flaring?")

# Baseline Pipeline

The baseline pipeline is a first simple implementation of the RAG pipeline.


In [None]:
from src.generation import get_llm_model, LLMModel
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables import RunnableParallel

retriever = bge_vector_store.get_retriever()
azure_model = get_llm_model(LLMModel.GPT_3_AZURE)

In [None]:
base_rag_prompt = """
Answer the question to your best knowledge when looking at the following context:
{context}
                
Question: {question}
"""

In [None]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


base_rag_chain = (
        RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
        | ChatPromptTemplate.from_template(base_rag_prompt)
        | azure_model
        | StrOutputParser()
)

base_rag = RunnableParallel(
    {
        "context": retriever,
        "question": RunnablePassthrough()
    }
).assign(answer=base_rag_chain)

In [None]:
base_rag.invoke("Is the company aiming to reduce gas flaring?")

# Evaluation

In order to compare the performance of different pipelines we need to evaluate them. The evaluation is done with the `ragas` library. The library provides a function to evaluate the performance of the pipeline. `ragas` provides predefined metrics for the evaluation which are described in the [documentation](https://docs.ragas.io/en/stable/concepts/metrics/index.html). We will use the following metrics to evaluate the performance of our pipelines:

- **Context Relevancy**: The context relevancy metric measures how well the generated response is related to the context. The metric is calculated as the cosine similarity between the context and the generated response.

## Evaluation Set
In order to provide a fair comparison between the different pipelines we will use the same evaluation set for all pipelines. The evaluation set was created before hand with the script `scripts/generate_testset.py`. With that we can evaluate the performance of our pipelines with a subset of the data which saves time and resources.

In [None]:
df_eval = pd.read_csv(paths['df_eval_path'])
df_eval.head()

In [None]:
from src.preprocessing import EvaluationPreprocessor

if os.path.exists(paths['df_eval_preprocessed_path']) and USE_CACHE:
    df_eval = pd.read_csv(paths['df_eval_preprocessed_path'])
else:
    df_eval = EvaluationPreprocessor(df, df_eval).preprocess()
    df_eval.to_csv(paths['df_eval_preprocessed_path'], index=False)

df_eval.head()

## Evaluator
The Evaluator evaluation class is a wrapper around the `ragas` library. It provides a simple interface to evaluate the performance of the pipelines. The class provides a method to evaluate the performance of the pipeline and returns the results as a pandas DataFrame. The metrics are calculated for each example in the evaluation set and results can be aggregated over the whole evaluation set to get an overall performance of the pipeline.

In [None]:
from src.evaluation import Evaluator

base_evaluator = Evaluator(name="Baseline",
                           cache_results=USE_CACHE,
                           rag_chain=base_rag,
                           llm_model=azure_model,
                           embeddings=bge_embeddings)

base_evaluator.evaluate(df_eval)
base_evaluator.summarize()

# Experiment 1: Looking at the impact of context and its chunking strategy

Contrary to the apparent structure of the data, which seems to have already chunked the data according to recursive character chunking strategy, this step will introduce Semantic Chunking. This will help us to see if it is beneficial for the LLM to consider the context in a more structured manner.

In order to embed the processed documents we again can turn them into langchain-digestible Documents.

In [None]:
from langchain_experimental.text_splitter import SemanticChunker

semantic_chunker = SemanticChunker(
    bge_embeddings, breakpoint_threshold_type="percentile"
)

semantic_documents = create_documents(df, semantic_chunker)
semantic_documents[0]

In [None]:
semantic_documents[0]

And in order to look at this experiment in an encapsulated manner, a new `VectorStore` will be created with a separate collection to keep concerns separated.

In [None]:
bge_semantic_vector_store = VectorStore(embedding_function=bge_embeddings,
                                        collection=get_vector_store_collection_name(bge_embeddings, semantic_chunker))

semantic_retriever = bge_semantic_vector_store.get_retriever()

In [None]:
bge_semantic_vector_store.add_documents(semantic_documents, verbose=True, batch_size=128)

In [None]:
bge_semantic_vector_store.similarity_search_w_scores("The company is also aiming to reduce gas flaring?")

In [None]:
semantic_rag = RunnableParallel(
    {
        "context": semantic_retriever,
        "question": RunnablePassthrough()
    }
).assign(answer=base_rag_chain)

In [None]:
semantic_rag.invoke("Is the company aiming to reduce gas flaring?")

In [None]:
semantic_evaluator = Evaluator(name="Semantic Chunking",
                               cache_results=USE_CACHE,
                               rag_chain=semantic_rag,
                               llm_model=azure_model,
                               embeddings=bge_embeddings)

semantic_evaluator.evaluate(df_eval)
semantic_evaluator.summarize()

# Experiment 2: Using a Multi-Query Retrieval Strategy

At the heart of the RAG is the retriever, which is responsible for finding the most relevant documents for a given question. The baseline RAG uses the vector retriever to find the most relevant document, using cosine-similarity. 

We will now experiment with a multi-query retrieval strategy. The idea is to use multiple queries to retrieve a multidude of documents and take a unique union of the results. This way we can increase the diversity of the documents and potentially improve the quality of the generated answer. 

For this we will use the MultiQueryRetriever from langchain.


In [None]:
from langchain.retrievers import MultiQueryRetriever

mqr_retriever = MultiQueryRetriever.from_llm(
    retriever=retriever, llm=azure_model
)

In [None]:
## using the langchain template for the prompt
template = """You are an AI language model assistant. Your task is to generate five 
different versions of the given user question to retrieve relevant documents from a vector 
database. By generating multiple perspectives on the user question, your goal is to help
the user overcome some of the limitations of the distance-based similarity search. 
Provide these alternative questions separated by newlines. Original question: {question}"""
prompt_perspectives = ChatPromptTemplate.from_template(template)

generate_queries = (
        prompt_perspectives
        | azure_model
        | StrOutputParser()
        | (lambda x: x.split("\n"))
)

In [None]:
from langchain.load import dumps, loads


def get_unique_union(union_docs: list[list]):
    """ Unique union of retrieved docs """
    flattened_docs = [dumps(doc) for sublist in union_docs for doc in sublist]
    unique_docs = list(set(flattened_docs))
    return [loads(doc) for doc in unique_docs]


mqr_retrieval_chain = (
        generate_queries
        | mqr_retriever.map()
        | get_unique_union
)

In [None]:
mqr_rag = RunnableParallel(
    {
        "context": mqr_retrieval_chain,
        "question": RunnablePassthrough()
    }
).assign(answer=base_rag_chain)

In [None]:
mqr_rag.invoke("Is the company aiming to reduce gas flaring?")

In [None]:
mqr_evaluator = Evaluator(name="Multi-Query Retrieval",
                          cache_results=USE_CACHE,
                          rag_chain=mqr_rag,
                          llm_model=azure_model,
                          embeddings=bge_embeddings)

mqr_evaluator.evaluate(df_eval)
mqr_evaluator.summarize()

# Experiment 3: Using a Step Back Strategy

In [None]:
# Few Shot Examples
from langchain_core.prompts import ChatPromptTemplate, FewShotChatMessagePromptTemplate

examples = [
    {
        "input": "Could the members of The Police perform lawful arrests?",
        "output": "what can the members of The Police do?",
    },
    {
        "input": "Jan Sindel’s was born in what country?",
        "output": "what is Jan Sindel’s personal history?",
    },
]

example_prompt = ChatPromptTemplate.from_messages(
    [
        ("human", "{input}"),
        ("ai", "{output}"),
    ]
)
few_shot_prompt = FewShotChatMessagePromptTemplate(
    example_prompt=example_prompt,
    examples=examples,
)
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are an expert at world knowledge. Your task is to step back and paraphrase a question to a more generic step-back question, which is easier to answer. Here are a few examples:""",
        ),
        few_shot_prompt,
        ("user", "{question}"),
    ]
)
generate_queries_step_back = (prompt | azure_model | StrOutputParser())

In [None]:
from langchain_core.runnables import RunnableLambda

response_prompt_template = """
You are an expert of world knowledge. I am going to ask you a question. Your response should be comprehensive and not contradicted with the following context if they are relevant. Otherwise, ignore them if they are not relevant.

# {normal_context}
# {step_back_context}

# Original Question: {question}
# Answer:
"""
response_prompt = ChatPromptTemplate.from_template(response_prompt_template)

step_back_retrieval_chain = (
        {
            "normal_context": RunnableLambda(lambda x: x["question"]) | retriever,
            "step_back_context": generate_queries_step_back | retriever,
            "question": lambda x: x["question"],
        }
        | response_prompt
        | azure_model
        | StrOutputParser()
)

step_back_rag = RunnableParallel(
    {
        "question": RunnablePassthrough()
    }
).assign(answer=step_back_retrieval_chain)

In [None]:
step_back_rag.invoke("What is the company's goal in reducing gas flaring?")

In [None]:
step_back_evaluator = Evaluator(name="Step Back",
                                cache_results=USE_CACHE,
                                rag_chain=step_back_rag,
                                llm_model=azure_model,
                                embeddings=bge_embeddings)

step_back_evaluator.evaluate(df_eval)
step_back_evaluator.summarize()

# Experiment 4: HyDE approach

In [None]:
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

system = """You are an expert about the Clean Technology Sector.
            Answer the user question as best you can. Answer as though you were writing a tutorial that addressed the user question."""

hyde_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

gen_hypothetical_doc = hyde_prompt | azure_model | StrOutputParser() | RunnablePassthrough()

In [None]:
hyde_retrieval_chain = (gen_hypothetical_doc
            | base_rag
            )

hyde_rag = RunnableParallel(
    {
        "question": RunnablePassthrough()
    }
).assign(answer=hyde_retrieval_chain)


hyde_rag.invoke("What is the company's goal in reducing gas flaring?")

In [None]:
hyde_evaluator = Evaluator(name="HyDE",
                           cache_results=USE_CACHE,
                           rag_chain=step_back_rag,
                           llm_model=azure_model,
                           embeddings=bge_embeddings)

hyde_evaluator.evaluate(df_eval)
hyde_evaluator.summarize()