This lab is binom:

SUN Xu: 22200118

HOU Dan: 22215394

In [1]:
!pip install chromadb
!pip install -U bitsandbytes
!pip install llama-cpp-python
!pip install rank_bm25 nltk

Collecting chromadb
  Downloading chromadb-0.5.23-py3-none-any.whl.metadata (6.8 kB)
Collecting build>=1.0.3 (from chromadb)
  Downloading build-1.2.2.post1-py3-none-any.whl.metadata (6.5 kB)
Collecting chroma-hnswlib==0.7.6 (from chromadb)
  Downloading chroma_hnswlib-0.7.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (252 bytes)
Collecting fastapi>=0.95.2 (from chromadb)
  Downloading fastapi-0.115.6-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn>=0.18.3 (from uvicorn[standard]>=0.18.3->chromadb)
  Downloading uvicorn-0.32.1-py3-none-any.whl.metadata (6.6 kB)
Collecting posthog>=2.4.0 (from chromadb)
  Downloading posthog-3.7.4-py2.py3-none-any.whl.metadata (2.0 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.20.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.28.2-py3

# Objectives

Build a simple RAG for question answering based on a light quantized LLama3.2 1B model. The goal is to answer accuratly questions concerning Warhammer 40K rules.

In order to simplify the workstream I already prepared the textual data in an additional notebook. As always, the parsing is not perfect and there can be some artifacts.

## Evaluation

The evaluation will be decomposed alon,g the different componnents:
- the LLM and prompting engine
- the vectod-DB/Approximate kNN
- The overall quality of the full RAG function
- the quality and readibility of the code.

## Overview

First we will import Llama 3.2 and try some templatting and chat with the model.

Secondly, we will experiment with [ChromaDB](https://docs.trychroma.com/getting-started) and build a first RAG.

Finally, we will be using the headers and BM25 to try and improve the retriever.

# Imports

In [2]:
import chromadb
import json
import uuid

from llama_cpp import Llama
from transformers import AutoModelForCausalLM, AutoTokenizer
from jinja2 import Template

from rank_bm25 import BM25Okapi
import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

# Large Language Model

## CPU implementation

Use the llama-cpp 'from_pretrained' function to import a LLM from the ["bartowski/Llama-3.2-1B-Instruct-GGUF"](https://huggingface.co/bartowski/Llama-3.2-1B-Instruct-GGUF) collection in the CPU.

It can be used as a fallback if you're GPU credit is finished.

**Clean the output and remove special tokens and the input prompt from the answer**

In [3]:
llm = Llama.from_pretrained(
    repo_id="bartowski/Llama-3.2-1B-Instruct-GGUF",
    filename="*Q8_0.gguf",
    verbose=False,
    n_ctx=4096,
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Llama-3.2-1B-Instruct-Q8_0.gguf:   0%|          | 0.00/1.32G [00:00<?, ?B/s]

llama_new_context_with_model: n_ctx_per_seq (4096) < n_ctx_train (131072) -- the full capacity of the model will not be utilized


In [4]:
def llm_cpu(prompt, max_tokens=2048):
    output = llm(
        prompt=prompt,
        max_tokens=max_tokens,
    )
    return output # ["choices"][0]["text"].strip()

In [5]:
def clean_output(output):
    # remove special tokens and the input prompt from the answer
    output = output.split("<|start_header_id|>assistant<|end_header_id|>")[-1].replace("<|eot_id|>", "")
    # remove extra blanks and empty lines
    return "\n".join([line.strip() for line in output.splitlines() if line.strip()])

## GPU Implementation

Use HuggingFace transformer library to load and generate text using ["unsloth/Llama-3.2-1B-Instruct"](https://huggingface.co/unsloth/Llama-3.2-1B-Instruct) models.

In [6]:
model = AutoModelForCausalLM.from_pretrained("unsloth/Llama-3.2-1B-Instruct").to('cuda')
tokenizer = AutoTokenizer.from_pretrained("unsloth/Llama-3.2-1B-Instruct")

config.json:   0%|          | 0.00/927 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.47G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/184 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/54.6k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/454 [00:00<?, ?B/s]

In [7]:
# Import model

def llm_complete(prompt, max_tokens=2048):
    # Fill it
    inputs = tokenizer(prompt, return_tensors="pt", max_length=max_tokens, padding=True, truncation=True).to('cuda')

    # make sure pad_token_id has been set
    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = tokenizer.eos_token_id

    outputs = model.generate(
        **inputs,
        max_length=max_tokens,
        pad_token_id=tokenizer.pad_token_id,
    ).to('cpu')

    answer_only = tokenizer.decode(outputs[0])
    return answer_only

In [8]:
import torch

def llm(prompt, max_tokens=2048):
    """
    choose to use cpu model or gpu model
    """
    return llm_complete(prompt, max_tokens) if torch.cuda.is_available() else llm_cpu(prompt, max_tokens)

## Initial LLM experiments

### Prompt Template
Create a Jinja2 ``Template`` to add the llama 3.2 (same as llama 3.1) sepcial tokens in order to optionnaly set roles and wrap a user prompt.

In [9]:
prompt_template = Template(
    """
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>{{ role }}<|eot_id|>
<|start_header_id|>user<|end_header_id|>{{ input }}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>
    """
)

### Play with the LLM

Try to trigger different model behaviours by changing the role for a same question. For example, use the prompt  'tell me a joke !' with different roles.

In [10]:
input_1 = {'role':'you are a depressed clown' ,'input': 'tell me a joke !'}
input_2 = {'role':'you are a clown for children' ,'input': 'tell me a joke !'}

In [11]:
prompt_1 = prompt_template.render(input_1)
result_1 = llm(prompt_1, max_tokens=2048)
print(clean_output(result_1))

*sigh*... okay... here's one... *muffled laughter*
Why couldn't the bicycle stand up by itself?... *pauses*... because it was two-tired... *more muffled laughter*...


In [12]:
prompt_2 = prompt_template.render(input_2)
result_2 = llm(prompt_2, max_tokens=2048)
print(clean_output(result_2))

*squirts water from flower on lapel*
Why did the clown resign from the circus?
Because he was tired of working for peanuts! *wipes water from lapel with a red nose*


What do you think ?

---

The model is correctly functioning with Jinja template, it is the same idea as the llama prompt setting, where "role" would be the system input and "input' will be the actual prompt.

# Retriever

In [13]:
import chromadb.utils.embedding_functions as embedding_functions

In [14]:
client = chromadb.Client()
stf_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [15]:
collection = client.create_collection(name="warhammer_40k",
                                      metadata={"hnsw:space": "cosine"},
                                      embedding_function=stf_function)

In [17]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [20]:
with open('/content/drive/MyDrive/M2/RAG/lab2/data/processed/chunks.json', 'r') as f:
    chunks = json.load(f)

# Check the different field of the JSON what do you think?

We can observe that the JSON contains a lot of chunks, each chunk contains multiple fields, such as id: record document identifier; metadata: contains the title level information, easy to categorize and retrieve; page_content: contains the actual text data, in our case, Warhammer 40K game rules of the content; type: category information of the document. These fields facilitate the construction of knowledge bases in our subsequent semantic retrieval or Q&A systems.

In [21]:
# Add the documents to the collection
collection.add(
    ids = [str(uuid.uuid4()) for _ in range(len(chunks))],
    metadatas = [chunk['metadata'] for chunk in chunks],
    documents = [chunk['page_content'] for chunk in chunks]
)

In [22]:
question = "What is a visible unit ?"

# Perform a query

Create a retrieval function wrapping the ChromaDB query and returning an adapted format.

In [23]:
def retrieve(question, n_results=5):
    # query collection
    query_results = collection.query(query_texts=question, n_results=n_results)
    # return texts and associated headers
    return [{'metadata': header, 'text': text} for header, text in zip(query_results['metadatas'][0], query_results['documents'][0])]

In [24]:
retrieve(question)

[{'metadata': {'Header 1': 'Introduction',
   'Header 2': 'Terrain Features (Pg 44-52)'},
  'text': "- **Model Visible:** If any part of a model can be seen, it is visible.  \n- **Unit Visible:** If any model in a unit is visible, that model's unit is visible.  \n- **Model Fully Visible:** If every Warhammer 40,000 battles are fought across all manner of grim and perilous landscapes, often strewn with ruins, wreckage and other obstacles your forces must navigate while they fight.  \n#### - Unit Fully Visible: If Every Model In A Unit Is Fully Visible, That Unit Is"},
 {'metadata': {'Header 1': 'Core Concepts',
   'Header 2': 'Determining Visibility'},
  'text': '- **Unit Fully Visible:** If every model in a unit is fully visible, that unit is fully visible.  \n#### Hints And Tips  \nDice Rolling'},
 {'metadata': {'Header 1': 'Introduction',
   'Header 2': 'Datasheets And Unit Abilities (Pg 37-39)'},
  'text': 'MODEL FULLY VISIBLE\nIf every part of another model that is facing the obser

## Rag template

Create a RAG template in Jinja

In [25]:
rag_template = Template(
    """
<|begin_of_text|><|start_header_id|>system<|end_header_id|>{{ role }}<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Use the context and only the context to answer the following question:

Question: {{ question }}

Context:
{% for chunk in chunks %}
{% if chunk.metadata %}Section: {{ chunk.metadata['Header 1'] }}
{% if chunk.metadata['Header 2'] is defined %}Subsection: {{ chunk.metadata['Header 2'] }}
{% if chunk.metadata['Header 3'] is defined %}Subsubsection: {{ chunk.metadata['Header 3'] }}
{% endif %}{% endif %}{% endif %}Content:
{{ chunk.text }}
{% endfor %}
<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>
    """
)

In [26]:
print(rag_template.render(**{
    'role': 'you are an experienced wargame player',
    'question': "What is a visible unit ?",
    # 'chunks':[{'header': {'header1':'toto'},'text':'ctx1'},{'header': {'header1':'tato', 'header2':'tato'},'text':'ctx2'},{'header': {'header1':'tato'}, 'text':'ctx3'}]
    'chunks':[{'metadata': {'Header 1':'toto'},'text':'ctx1'},{'metadata': {'Header 1':'tato', 'Header 2':'tato'},'text':'ctx2'},{'metadata': {'Header 1':'tato'}, 'text':'ctx3'}]

}))


<|begin_of_text|><|start_header_id|>system<|end_header_id|>you are an experienced wargame player<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Use the context and only the context to answer the following question:

Question: What is a visible unit ?

Context:

Section: toto
Content:
ctx1

Section: tato
Subsection: tato
Content:
ctx2

Section: tato
Content:
ctx3

<|eot_id|>

<|start_header_id|>assistant<|end_header_id|>
    


Createz a question that will create the prompt using the question and chunks.

In [27]:
def prompt_generation(question, chunks):
    # return the prompt
    return rag_template.render(**{
            'role': 'you are an experienced wargame player',
            'question': question,
            'chunks':chunks})

In [28]:
print(prompt_generation(question, retrieve(question)))


<|begin_of_text|><|start_header_id|>system<|end_header_id|>you are an experienced wargame player<|eot_id|>

<|start_header_id|>user<|end_header_id|>
Use the context and only the context to answer the following question:

Question: What is a visible unit ?

Context:

Section: Introduction
Subsection: Terrain Features (Pg 44-52)
Content:
- **Model Visible:** If any part of a model can be seen, it is visible.  
- **Unit Visible:** If any model in a unit is visible, that model's unit is visible.  
- **Model Fully Visible:** If every Warhammer 40,000 battles are fought across all manner of grim and perilous landscapes, often strewn with ruins, wreckage and other obstacles your forces must navigate while they fight.  
#### - Unit Fully Visible: If Every Model In A Unit Is Fully Visible, That Unit Is

Section: Core Concepts
Subsection: Determining Visibility
Content:
- **Unit Fully Visible:** If every model in a unit is fully visible, that unit is fully visible.  
#### Hints And Tips  
Dice 

# Full Rag

Create functions to perform the full RAG pipeline, you may create a function for the CPU and another one for the GPU.

In [29]:
question_0 = "What is a visible unit ?"
question_1 = 'What are the limitations associated to the advance mouvement rule ?'
question_2 = 'Is there a stratagem that can be used to reroll a failed dice role?'
question_3 = 'Explain the Comand Re-roll stratagem'

In [30]:
def full_rag_cpu(question, n_results=4):
    prompt = prompt_generation(question, retrieve(question, n_results))
    return clean_output(llm_cpu(prompt, max_tokens=2048))

In [31]:
print(full_rag_cpu(question_1))




In [32]:
def full_rag_gpu(question, n_results=4):
    prompt = prompt_generation(question, retrieve(question, n_results))
    return clean_output(llm_complete(prompt, max_tokens=2048))

In [33]:
print(full_rag_gpu(question_1))

The limitations associated with the Advance move rule are:
* The total Advance roll must be less than or equal to the Move characteristic of each model in that unit.
* Each model in that unit can make an Advance move by moving a distance in inches less than or equal to the total, but no model can be moved within Engagement Range of enemy models.


# Adding a reranker
The results Might not be satisfactory for some questions.

In order to better use the header extraction, we will rerank the chunks using BM25 over the headers.

In [None]:
from nltk.tokenize import word_tokenize
nltk.download('punkt')
nltk.download('punkt_tab')
stopwords_en = stopwords.words('english')
stemmer = SnowballStemmer("english")

def metadata_preprocessing(chunks):
    preprocessed_headers = []
    for chunk in chunks:
        # check if exists "metadata"
        if "metadata" not in chunk:
            # print(f"Skipping chunk without metadata: {chunk}")
            continue
        # extract headers
        headers = " ".join(chunk["metadata"].values())
        # transformer headers in lowercase and tokenize it
        tokens = word_tokenize(headers.lower())
        # remove stop words and stemming
        cleaned_tokens = [stemmer.stem(tok) for tok in tokens if tok.isalnum() and tok not in stopwords_en]
        preprocessed_headers.append(cleaned_tokens)
    return preprocessed_headers # preprocessed headers

def query_preprocessing(query):
    # transformer headers in lowercase and tokenize it
    tokens = word_tokenize(query.lower())
    # remove stop words and stemming
    cleaned_tokens = [stemmer.stem(tok) for tok in tokens if tok.isalnum() and tok not in stopwords_en]
    return cleaned_tokens # preprocessed queries

In [None]:
print(metadata_preprocessing(chunks))

In [None]:
print(query_preprocessing(question_1))

In [None]:
def rerank_chunks(question, chunks, n_results=5):
    preprocessed_headers = metadata_preprocessing(chunks)
    preprocessed_queries = query_preprocessing(question)
    # create the BM25 Okapi engine
    bm25 = BM25Okapi(preprocessed_headers)
    # compute the score for each chunk
    scores = bm25.get_scores(preprocessed_queries)
    # sort chunks
    sorted_chunks = sorted(zip(scores, chunks), key=lambda x: x[0], reverse=True)
    return sorted_chunks[:n_results] # reranked chunks

In [None]:
retrieved_chunks = retrieve(question_2, n_results=100)
reranked_chunks = rerank_chunks(question_2, retrieved_chunks, n_results=10)
for rr_chk in reranked_chunks:
    print('score: {}'.format(rr_chk[0]))
    print('header: {}'.format('#'.join(rr_chk[1]['metadata'].values())))
    print('text: {}'.format(rr_chk[1]['text']))
    print('-'*100)

In [None]:
question_3 = 'Explain the Comand Re-roll stratagem'
retrieved_chunks = retrieve(question_3, n_results=100)
reranked_chunks = rerank_chunks(question_3, retrieved_chunks, n_results=10)
for rr_chk in reranked_chunks:
    # display reranked chunks
    print('score: {}'.format(rr_chk[0]))
    print('header: {}'.format('#'.join(rr_chk[1]['metadata'].values())))
    print('text: {}'.format(rr_chk[1]['text']))
    print('-'*100)

## RAG with reranker

In [34]:
def full_rag_reranker(question, n_results=5):
    # get retrieved chunks
    retrieved_chunks = retrieve(question, n_results=n_results*10)
    # rerank chunks
    reranked_chunks = rerank_chunks(question, retrieved_chunks, n_results=n_results)
    # get prompt
    prompt = prompt_generation(question, reranked_chunks)
    return clean_output(llm(prompt, max_tokens=2048)) # full pipeline with renraker

In [None]:
print(full_rag_reranker(question_1, n_results=5))

Compare the generation from both pipeline (with and without reranker)

What do you think ?

We can observe that generations with rerankers are more relevant to the question and more precise in their answers. Reranking plays an important role in the RAG process. In a RAG method without reranker, a large number of contexts can be retrieved, but not all of them are relevant to the problem. Reranker allows reordering and filtering of documents to place the relevant ones at the top, thus improving the effectiveness of RAG.

# Further application

At this point, we created a single question/answer turn RAG. It can be usefull for some applications to allow conversations with documents. Modify the generation using [this example.](https://huggingface.co/docs/transformers/conversations).

In [None]:
# load the model and tokenizer
model_id = "CohereForAI/c4ai-command-r-v01-4bit"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto")
device = model.device

In [None]:
# define conversation input
conversation = [
    {"role": "system", "content": "you are an experienced wargame player"},
    {"role": "user", "content": question_2}
]

# get retrieved chunks and reranked chunks
retrieved_chunks = retrieve(question_2, n_results=100)
reranked_chunks = rerank_chunks(question_2, retrieved_chunks, n_results=10)
# define documents for retrieval-based generation
documents = [
    {
        "title": ' '.join([ f"{key} : {head}" for key, head in item[1]['metadata'].items()]),
        "text": item[1]['text']
    } for item in reranked_chunks
]

# tokenize conversation and documents using a RAG template, returning PyTorch tensors.
input_ids = tokenizer.apply_chat_template(
    conversation=conversation,
    documents=documents,
    chat_template="rag",
    tokenize=True,
    add_generation_prompt=True,
    return_tensors="pt").to(device)

# generate a response
gen_tokens = model.generate(
    input_ids,
    max_new_tokens=100,
    do_sample=True,
    temperature=0.3,
    ).to('cpu')

# decode and print the generated text along with generation prompt
gen_text = tokenizer.decode(gen_tokens[0])