In [1]:
# !pip install chromadb
# !pip install -U bitsandbytes
# !pip install llama-cpp-python
# !pip install rank_bm25 nltk


In [2]:
# !pip install vllm
# !pip install xformers
# !pip install sentence-transformers

# Objectives

Build a simple RAG for question answering based on a light quantized LLama3.2 1B model. The goal is to answer accuratly questions concerning Warhammer 40K rules.

In order to simplify the workstream I already prepared the textual data in an additional notebook. As always, the parsing is not perfect and there can be some artifacts.

## Evaluation

The evaluation will be decomposed alon,g the different componnents:
- the LLM and prompting engine
- the vectod-DB/Approximate kNN
- The overall quality of the full RAG function
- the quality and readibility of the code.

## Overview

First we will import Llama 3.2 and try some templatting and chat with the model.

Secondly, we will experiment with [ChromaDB](https://docs.trychroma.com/getting-started) and build a first RAG.

Finally, we will be using the headers and BM25 to try and improve the retriever.

# Imports

In [2]:
import chromadb
import json
import uuid

from bleach import clean
from scipy.special import no_result
from torch.onnx.symbolic_opset11 import chunk
# from llama_cpp import Llama
from transformers import AutoModelForCausalLM, AutoTokenizer
from jinja2 import Template


from rank_bm25 import BM25Okapi
import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from transformers.models.oneformer.image_processing_oneformer import prepare_metadata

nltk.download('stopwords')


[nltk_data] Downloading package stopwords to /home/sidney/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# Large Language Model

## CPU implementation

Use the llama-cpp 'from_pretrained' function to import a LLM from the ["bartowski/Llama-3.2-1B-Instruct-GGUF"](https://huggingface.co/bartowski/Llama-3.2-1B-Instruct-GGUF) collection in the CPU.

It can be used as a fallback if you're GPU credit is finished.

**Clean the output and remove special tokens and the input prompt from the answer**

In [None]:
# dont run 
# llm = Llama.from_pretrained(
    # repo_id="bartowski/Llama-3.2-1B-Instruct-GGUF",
    # filename="*Q8_0.gguf",
    # verbose=False,
    # n_ctx=4096,
    # device = 'cuda'
# )

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Llama-3.2-1B-Instruct-Q8_0.gguf:   0%|          | 0.00/1.32G [00:00<?, ?B/s]

llama_new_context_with_model: n_ctx_per_seq (4096) < n_ctx_train (131072) -- the full capacity of the model will not be utilized


## GPU Implementation

Use HuggingFace transformer library to load and generate text using ["unsloth/Llama-3.2-1B-Instruct"](https://huggingface.co/unsloth/Llama-3.2-1B-Instruct) models.

In [3]:
# Import model
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "unsloth/Llama-3.2-1B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model_gpu = AutoModelForCausalLM.from_pretrained(model_name).to(device)


In [4]:
def create_prompt(prompt_content):
  return [
      {"role":"system","content":"You are an expert in a specific field"},
      {"role":"user","content": prompt_content}
      ]
def generate_prompt(tokenizer,prompt):
  message = create_prompt(prompt)
  return tokenizer.apply_chat_template(
      message,
      tokenize= False,
      add_generation_prompt=True,
  )
def get_reponse(full_response):
  sentences = full_response.split("|start_header_id|>assistant<|end_header_id|>")
  desired_out = sentences[1]
  out = ''.join(desired_out)
  out = out.strip().replace("<|eot_id|>", "")

  return out

In [None]:
## XU: one case example
# prompt = "who are you"

# formatted_prompt = generate_prompt(tokenizer=tokenizer,prompt=prompt)
# # print(formatted_prompt)

# encoded_prompt = tokenizer.encode(formatted_prompt,return_tensors='pt').to(device)

# out = model_gpu.generate(encoded_prompt,max_new_tokens=50)

# generated = tokenizer.decode(out[0])
# # print(generated)
# cleaned_return = get_reponse(tokenizer.decode(out[0]))

# print(cleaned_return)


# # )
# # tokenized_version = tokenizer.encode(messages,return_tensors="pt").to('cuda')
# # # tokenized_version
# # out = model_gpu.generate(tokenized_version,max_new_tokens=50)
# # tokenizer.decode(out[0])

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


I'm an artificial intelligence model known as Llama. Llama stands for "Large Language Model Meta AI."


In [5]:
def llm_complete(prompt, max_tokens=2048,if_jinja=False):
    # Fill it
    # get formatted prompt
    if  if_jinja:
      formatted_prompt = prompt
    else:
      formatted_prompt = generate_prompt(tokenizer=tokenizer, prompt=prompt)

    # tokenizer
    encode_prompt = tokenizer(formatted_prompt,
                              return_tensors='pt',
                              padding=True,
                              truncation=True,
                              return_attention_mask=True).to('cuda')

    #generate
    generation_config = {
        "max_new_tokens": max_tokens,
        "pad_token_id": tokenizer.eos_token_id,
        "eos_token_id": tokenizer.eos_token_id,

    }
    outputs = model_gpu.generate(
        input_ids=encode_prompt.input_ids,
        attention_mask=encode_prompt.attention_mask,
        **generation_config
    )

    # clean output
    answer_only = get_reponse(tokenizer.decode(outputs[0]))


    return answer_only

## Initial LLM experiments

### Prompt Template
Create a Jinja2 ``Template`` to add the llama 3.2 (same as llama 3.1) sepcial tokens in order to optionnaly set roles and wrap a user prompt.

In [6]:
prompt_template = Template(
    """
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
{{ role }}<|eot_id|><|start_header_id|>user<|end_header_id|>
{{ input }}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """
)

### Play with the LLM

Try to trigger different model behaviours by changing the role for a same question. For example, use the prompt  'tell me a joke !' with different roles.

In [7]:
input = {'role':'you are a depressed clown' ,'input': 'tell me a joke !'}
input_2 = {'role':'you are a clown for children' ,'input': 'tell me a joke !'}


In [8]:
prompt1 = prompt_template.render(**input)

prompt2 = prompt_template.render(**input_2)

In [9]:
out1 = llm_complete(prompt1,if_jinja=True)
print(out1)

*sigh*  Okay...  Why did the clown resign from the circus? *pauses*  Because he was tired of working for peanuts.


In [10]:
out2 = llm_complete(prompt2,if_jinja=True)

print(out2)

why did the clown resign from the circus?
Because he was tired of working for peanuts!


What do you think ?




# Retriever

In [11]:
import chromadb.utils.embedding_functions as embedding_functions


In [12]:
client = chromadb.Client()
stf_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")

In [13]:
collection = client.create_collection(name="warhammer_40k",
                                      metadata={"hnsw:space": "cosine"},
                                      embedding_function=stf_function)

# What does previous three cells do? 

1. import the ChromaDB's embedding function and initialize ChromaDB server 
2. Create a sentence-transformer embedding function, with all mininLM as the embedding model, this model will convert text into vectorial representation
3. A new collection called warharmmer_40k has been established 
4. cosine similarity will be the matrix , and the embedding function will be stf_function 
----
next: 
1. prepare the data
2. use collection.add() add the document and its embedding into the db
3. use collection.query to make query 

In [14]:
with open('/home/sidney/PycharmProjects/RAG_Project/data/processed/chunks.json', 'r') as f:
    chunks = json.load(f)

# Check the different field of the JSON what do you think?

In [None]:
chunks 

In [15]:
chunks

[{'id': None,
  'metadata': {'Header 1': 'Core Rules'},
  'page_content': "'We are beset on all sides by vile predatory aliens and sedition gnaws at us from within; in this dark hour the best we can do is look to our wargear and pray to our gods.'\n- Skolak a'Trellar IV, Imperial Commander  \n200 STORE",
  'type': 'Document'},
 {'id': None,
  'metadata': {'Header 1': 'Introduction'},
  'page_content': '++ THERE IS NO TIME FOR PEACE. NO RESPITE. NO FORGIVENESS. THERE IS ONLY WAR. ++\nWelcome to the Warhammer 40,000 Core Rules! The following pages contain everything you need to know in order to wage glorious battle across the war-torn galaxy of the 41st Millennium. Warhammer 40,000 is a tabletop war game in which players command armies of Citadel miniatures and attempt to defeat their opponent through a mixture of skill, tactics and luck. Storytelling is at the core of Warhammer 40,000, with the rules designed to bring to life the epic conflicts between the forces of Mankind, aliens and 

In [16]:
uuid.uuid4()

UUID('18803a6e-ddd5-4cc9-b130-426ed8520d4d')

In [17]:
def create_id(chunks):
    for i,chunk in enumerate(chunks):
        chunk['id'] = str(uuid.uuid4())
    return chunks

new_chunks = create_id(chunks)

In [18]:
def create_dataset(chunks):
    documents = []
    metadatas = [] 
    ids = [] 
    types = [] 
    for i, chunk in enumerate(chunks):
        documents.append(chunk['page_content'])
        metadata = chunk['metadata'].copy()
        metadata['type'] = chunk['type']    
        metadatas.append(metadata)
        ids.append(chunk['id'])
    return documents, metadatas, ids
    
documents,metadatas,ids = create_dataset(new_chunks)    

In [19]:
# add data into the database 
collection.add(
    documents=documents,
    metadatas=metadatas,
    ids=ids,
)

In [194]:
ids

['a1145463-1ed4-47b2-9651-d8f47d03a4ae',
 'dd72fc83-1b13-4fc3-a3c2-08e425d9675f',
 '63cd7545-f58c-468a-a572-644aa36728a9',
 'c9fe615f-ea20-4d12-943b-c4741e94a78c',
 '753c9a31-be3e-4daf-aad9-16e96aa52d70',
 'd1ff2b30-757c-4a86-9d2b-c1a74a89cab2',
 '74ec4b20-17e7-42e1-96fe-303279191b2b',
 '54cb2d72-2141-45e7-b943-2e47649c8ed4',
 '18d20ce9-d10d-4e12-acd9-722409fe66da',
 '0134a460-c992-42ab-a9de-bdf649a4f08a',
 '6d08b28b-ccd3-4b9b-b268-a82a78dcf26e',
 '24ac6e40-1c3f-46cb-9fdf-d21d9d32c02e',
 '738e9ce3-a5b4-480d-ab4a-29fdbcf053fd',
 '8f0fdcd2-c5a4-4b03-8b96-8ca3651cd643',
 '9fe68bbc-4bb7-498a-a17b-35b3da34f957',
 'fb1647f2-b49a-4972-bf8b-2dc0473b4a07',
 '801a3f7c-be08-48f8-b4a3-29a93b529403',
 '613cde78-79b3-493c-a410-2b1c536dfd95',
 'b91fc394-6b2f-4122-a160-e64c47315c62',
 'd30f5465-5fc3-4a0c-be8b-0680a720a1ed',
 'ba6bf301-5e9c-4743-9319-a0310a456107',
 '19160fee-046d-4b51-8b57-4e03c0af6376',
 '135278b9-cb7b-4ddd-af12-fade834c24a9',
 'f8a4c78e-d441-4deb-9136-bec3690b2b23',
 '716c9b99-55ee-

In [20]:
question = "What is a visible unit ?"

# Perform a query

In [22]:
results = collection.query(query_texts=[question],
                          n_results=2,
                           include=['distances','documents','metadatas']
                           )
print(results)

{'ids': [['54cb2d72-2141-45e7-b943-2e47649c8ed4', 'd4a19162-f5dd-4f09-afe2-a08077ab1d31']], 'embeddings': None, 'documents': [["- **Model Visible:** If any part of a model can be seen, it is visible.  \n- **Unit Visible:** If any model in a unit is visible, that model's unit is visible.  \n- **Model Fully Visible:** If every Warhammer 40,000 battles are fought across all manner of grim and perilous landscapes, often strewn with ruins, wreckage and other obstacles your forces must navigate while they fight.  \n#### - Unit Fully Visible: If Every Model In A Unit Is Fully Visible, That Unit Is", '- **Unit Fully Visible:** If every model in a unit is fully visible, that unit is fully visible.  \n#### Hints And Tips  \nDice Rolling']], 'uris': None, 'data': None, 'metadatas': [[{'Header 1': 'Introduction', 'Header 2': 'Terrain Features (Pg 44-52)', 'type': 'Document'}, {'Header 1': 'Core Concepts', 'Header 2': 'Determining Visibility', 'type': 'Document'}]], 'distances': [[0.384770274162292

Create a retrieval function wrapping the ChromaDB query and returning an adapted format.

In [23]:
def retrieve(question, n_results=5):
    retrieved_text = collection.query(query_texts=[question],  
                     n_results=n_results)
    out_text = retrieved_text['documents']
    out_headers = retrieved_text['metadatas']
    
    return out_text,out_headers# retunr texts and associated headers

In [24]:
retrieve(question)

([["- **Model Visible:** If any part of a model can be seen, it is visible.  \n- **Unit Visible:** If any model in a unit is visible, that model's unit is visible.  \n- **Model Fully Visible:** If every Warhammer 40,000 battles are fought across all manner of grim and perilous landscapes, often strewn with ruins, wreckage and other obstacles your forces must navigate while they fight.  \n#### - Unit Fully Visible: If Every Model In A Unit Is Fully Visible, That Unit Is",
   '- **Unit Fully Visible:** If every model in a unit is fully visible, that unit is fully visible.  \n#### Hints And Tips  \nDice Rolling',
   'MODEL FULLY VISIBLE\nIf every part of another model that is facing the observing model can be seen from any part of the observing model, then that other model is said Every Warhammer 40,000 unit has a datasheet, reflecting the characteristics and abilities they can draw upon in battle.  \nUnit Visible  \n#### Has Line Of Sight To All Parts Of The Other Model That Are Facing I

## Rag template

Create a RAG template in Jinja

In [30]:
rag_template = Template(
    """
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
{{ role }}
Here is the relevant context:
{% for chunk in chunks %}
{% if chunk.header %}
Section: {{ chunk.header.header1 }}
{% if chunk.header.header2 is defined %}
Subsection: {{ chunk.header.header2 }}
{% endif %}
{% endif %}
Content: {{ chunk.text }}
{% endfor %}
<|eot_id|><|start_header_id|>user<|end_header_id|>
{{ question }}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """
)

In [None]:
# # claude 
# rag_template = Template(
#     """
# <|begin_of_text|><|start_header_id|>system<|end_header_id|>
# {{ role }}
# <userStyle>{{ style }}</userStyle>
# <|eot_id|><|start_header_id|>user<|end_header_id|>
# {{ question }}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
# Based on the provided context:
# {% for chunk in chunks %}
# {{ chunk.text }}
# {% endfor %}
# 
# Please answer the question about: {{ question }}
#     """
# )

In [26]:
# rag_template= Template(
#     """
# <|begin_of_text|><|start_header_id|>system<|end_header_id|>
# {{ role }}<|eot_id|><|start_header_id|>user<|end_header_id|>
# {{ input }}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
#     """
# )

In [27]:
print(rag_template.render(**{
    'role': 'you are an experienced wargame player',
    'question': "What is a visible unit ?",
    'chunks':[{'header': {'header1':'toto'},'text':'ctx1'},{'header': {'header1':'tato', 'header2':'tato'},'text':'ctx2'},{'header': {'header1':'tato'}, 'text':'ctx3'}]
}))


<|begin_of_text|><|start_header_id|>system<|end_header_id|>
you are an experienced wargame player
<userStyle>Normal</userStyle>

Here is the relevant context:


Section: toto


Content: ctx1


Section: tato

Subsection: tato


Content: ctx2


Section: tato


Content: ctx3

<|eot_id|><|start_header_id|>user<|end_header_id|>
What is a visible unit ?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
    


In [31]:
example = rag_template.render(**{
    'role': 'you are an experienced wargame player',
    'question': "What is a visible unit ?",
    'chunks':[{'header': {'header1':'toto'},'text':'ctx1'},{'header': {'header1':'tato', 'header2':'tato'},'text':'ctx2'},{'header': {'header1':'tato'}, 'text':'ctx3'}]
})

In [32]:
llm_complete(example,if_jinja=True)

'A visible unit is an entity in a game that is easily visible to the opponent or the player. It is typically a character, creature, or other entity that can be seen on the battlefield or map, and is usually represented by a visual representation, such as a symbol, icon, or image.\n\nExamples of visible units include:\n\n* Characters on a tabletop board\n* Creatures on a fantasy world map\n* Minions or enemy units on a game of Starcraft\n* Heroes or champions on a fantasy RPG game\n* Units on a game of Risk or other board games\n\nVisible units can be distinguished from invisible units by their visual representation, which can be:\n\n* A distinct symbol or icon\n* A specific color or pattern\n* A unique design or shape\n* A distinctive sound or audio cue\n\nVisible units can also be distinguished from invisible units by their behavior or movement patterns. For example, a visible unit may be able to move, attack, or cast spells, while an invisible unit may not.'

Createz a question that will create the prompt using the question and chunks.

In [209]:
def prompt_generation(question, chunks, if_sorted=False):
    if if_sorted:
        # for the final sorted chunks
        chunks = [item['chunk'] for item in reranked_chunks]
    prompt = {'role':'you are an experienced wargame player',
              'question':question,
              'chunks':chunks}
    formatted_prompt = rag_template.render(**prompt)
    
    return formatted_prompt# return the prompt

In [36]:
print(prompt_generation(question, retrieve(question)))


<|begin_of_text|><|start_header_id|>system<|end_header_id|>
you are an experienced wargame player
Here is the relevant context:


Content: 


Content: 

<|eot_id|><|start_header_id|>user<|end_header_id|>
What is a visible unit ?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
    


# Full Rag

Create functions to perform the full RAG pipeline, you may create a function for the CPU and another one for the GPU.

In [37]:
question_0 = "What is a visible unit ?"
question_1 = 'What are the limitations associated to the advance mouvement rule ?'
question_2 = 'Is there a stratagem that can be used to reroll a failed dice role?'
question_3 = 'Explain the Comand Re-roll stratagem'

In [None]:
def full_rag_cpu(question, n_results=4):
    #

In [38]:
def full_rag_gpu(question, n_results=4):
    chunks = retrieve(question, n_results=n_results)
    prompt = prompt_generation(question, chunks)
    out = llm_complete(prompt,if_jinja=True)
    return out


In [39]:
full_rag_gpu(question_0, n_results=4)

'In wargaming, a visible unit refers to a unit that is clearly recognizable and can be seen on the battlefield, either by the player or by the opponent. Visible units are typically characterized by their:\n\n1. **Visible color**: Units with visible colors, such as red, blue, or green, which are distinguishable from the background or other units.\n2. **Distinctive markings**: Units with distinctive markings, such as a specific insignia, emblem, or iconography, that set them apart from other units.\n3. **Distinctive appearance**: Units with a unique appearance, such as a specific shape, size, or design, that makes them stand out from the surrounding units.\n4. **Visible position**: Units that are clearly visible on the battlefield, either by being in a visible position, such as on the front lines or in a visible formation, or by being in a visible location, such as on a visible terrain feature.\n\nExamples of visible units include:\n\n* Armored vehicles, such as tanks, tanks, or infantry

In [40]:
full_rag_gpu(question_1, n_results=4)  

'In the game of War of the Ring, the Advance Movement rule has several limitations that affect gameplay. Here are some of them:\n\n\n1.  **No Advance Movement**:  The Advance Movement rule prevents a unit from advancing beyond the current location on the board. This means that units cannot move more than one space in any direction, except for the initial movement.\n\n2.  **No Retreat**:  Once a unit is placed on the board, it cannot retreat.  If a unit is in a position where it can retreat, it must retreat and can only retreat to a position that is not occupied by another unit.\n\n3.  **No Movement to adjacent spaces**:  Units cannot move to adjacent spaces, unless they are in a position where they can move to a space that is not occupied by another unit.\n\n4.  **No Movement to adjacent hexes**:  Units cannot move to adjacent hexes, unless they are in a position where they can move to a space that is not occupied by another unit.\n\n5.  **No Movement through terrain**:  Units cannot m

# Adding a reranker
The results Might not be satisfactory for some questions.

In order to better use the header extraction, we will rerank the chunks using BM25 over the headers.

In [60]:
from nltk.tokenize import word_tokenize

In [64]:
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /home/sidney/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [125]:
stopwords_en = stopwords.words('english')
stemmer = SnowballStemmer("english")
# def metadata_preprocessing(metadata):
    
    
    # return # preprocessed headers
def preprocessing_string(anystring):
    tokens = word_tokenize(anystring)
    tokens = [stemmer.stem(w) for w in tokens]
    tokens = [w for w in tokens if not w in stopwords_en]
    return " ".join(tokens) 

def query_preprocessing(query):
    query = preprocessing_string(query)
    return query 
    # return # preprocessed queries
def metadata_preprocessing(processed_query,n_results):
    chunks = retrieve(processed_query, n_results=n_results)
    for chunk in chunks[1]:
        for sub_chunk in chunk:
            keys = sub_chunk.keys()
            for key in keys:
                if keys != 'type':
                    context = sub_chunk[key]
                    context = preprocessing_string(context)
                    sub_chunk[key] = context    
    return chunks
def call_retriever(query,n_results): 
    query = query_preprocessing(query)
    chunks = metadata_preprocessing(processed_query=query,n_results=n_results)
    return chunks

In [None]:
# i think more logically we should first process query and then we do retrive, and process the metadata generated by retriever 

In [83]:
question_3

'Explain the Comand Re-roll stratagem'

In [80]:
example = retrieve(question_3,n_results=2)

In [81]:
example[1]

[[{'Header 1': 'Core Concepts', 'Header 2': 'Dice', 'type': 'Document'},
  {'Header 1': 'Stratagems', 'type': 'Document'}]]

In [82]:
example[1][0][0]

{'Header 1': 'Core Concepts', 'Header 2': 'Dice', 'type': 'Document'}

In [78]:
for item in example[1]:
   for sub_item in item:
       keys = list(sub_item.keys())
       for key in keys:
           if key != 'type':
               context = sub_item[key]

               tokenized = word_tokenize(context)
               tokenized = [stemmer.stem(word) for word in tokenized]
               tokenized = [word for word in tokenized if not word in stopwords_en]
               new_strings = " ".join(tokenized) 
               sub_item[key] = new_strings

               

Introduction
['Introduction']
['introduct']
['introduct']
Terrain Features (Pg 44-52)
['Terrain', 'Features', '(', 'Pg', '44-52', ')']
['terrain', 'featur', '(', 'pg', '44-52', ')']
['terrain', 'featur', '(', 'pg', '44-52', ')']
Core Concepts
['Core', 'Concepts']
['core', 'concept']
['core', 'concept']
Determining Visibility
['Determining', 'Visibility']
['determin', 'visibl']
['determin', 'visibl']


In [79]:
example

([["- **Model Visible:** If any part of a model can be seen, it is visible.  \n- **Unit Visible:** If any model in a unit is visible, that model's unit is visible.  \n- **Model Fully Visible:** If every Warhammer 40,000 battles are fought across all manner of grim and perilous landscapes, often strewn with ruins, wreckage and other obstacles your forces must navigate while they fight.  \n#### - Unit Fully Visible: If Every Model In A Unit Is Fully Visible, That Unit Is",
   '- **Unit Fully Visible:** If every model in a unit is fully visible, that unit is fully visible.  \n#### Hints And Tips  \nDice Rolling']],
 [[{'Header 1': 'introduct',
    'Header 2': 'terrain featur ( pg 44-52 )',
    'type': 'Document'},
   {'Header 1': 'core concept',
    'Header 2': 'determin visibl',
    'type': 'Document'}]])

In [88]:
# process query 
question_0 = query_preprocessing(question_0)
# process_meta and get meta
chunks_0 = metadata_preprocessing(question_0,n_results=5)

print(chunks_0)

([['- **Unit Fully Visible:** If every model in a unit is fully visible, that unit is fully visible.  \n#### Hints And Tips  \nDice Rolling', "- **Model Visible:** If any part of a model can be seen, it is visible.  \n- **Unit Visible:** If any model in a unit is visible, that model's unit is visible.  \n- **Model Fully Visible:** If every Warhammer 40,000 battles are fought across all manner of grim and perilous landscapes, often strewn with ruins, wreckage and other obstacles your forces must navigate while they fight.  \n#### - Unit Fully Visible: If Every Model In A Unit Is Fully Visible, That Unit Is", 'MODEL FULLY VISIBLE\nIf every part of another model that is facing the observing model can be seen from any part of the observing model, then that other model is said Every Warhammer 40,000 unit has a datasheet, reflecting the characteristics and abilities they can draw upon in battle.  \nUnit Visible  \n#### Has Line Of Sight To All Parts Of The Other Model That Are Facing It, Wit

In [126]:
call_retriever(question_1,n_results=5)

([['This section introduces various rules terms that you will find throughout the Core Rules and beyond. These key concepts form the basis of the Warhammer 40,000 rules, and are essential for every kind of battle.',
   '- **Consolidation Move:** Up to 3". - Every model that moves must end closer to the closest enemy model, and in base-to-base contact with an enemy model if possible. The unit must end in Unit Coherency and within Engagement Range of at least one enemy unit if possible.  \n- If the above is not possible, each model can move towards the closest objective marker, but this must result in the unit being within range of it and in Unit Coherency.  \n- If the above is also not possible, no models can Consolidate.  \n1 2 3 4  \n35  \n- these Termagants will also get to make attacks. The remaining two Termagants are too distant from the foe, and will not get to make attacks.',
   '- A model is within range of an objective marker if within 3" horizontally and 5" vertically.  \n- *

In [189]:
# # reranker use bm25 -> 
# # initialize bm25 on all the data 
# new_chunks[0].keys()

In [190]:
# chunks[0]

In [188]:
# all_corpus_keys = new_chunks[0].keys()
# corpus = [] 
# 
# for chunk in chunks:
#     strings = ''
#     
#     meta_context = chunk['metadata']
#     for key in meta_context.keys():
#         strings += meta_context[key]
#     page_context = chunk['page_content']
#     strings += page_context
#     type_context = chunk['type']
#     # print(type_context)
#     strings += type_context
#     cleaned_strings = preprocessing_string(strings)
#     corpus.append(cleaned_strings)
# 
#     
# tokenized_corpus = [word_tokenize(doc) for doc in corpus]
# 
# # initialize bm25
# bm25 = BM25Okapi(tokenized_corpus)

In [104]:
# cleaned_corpus = preprocessing_string(strings)

In [187]:
# # how to rerank chunks_0 
# chunks_0

In [185]:
# # rerank 
# retrived_docs,retrieved_meta = chunks_0
# # print(retrived_docs[0][1])
# 
# combined_text = [] 
# 
# for doc,meta in zip(retrived_docs[0],retrieved_meta[0]):
#     text = ""
#     for key in meta.keys():
#         text += meta[key]
#     text += doc
#     combined_text.append(text)
# tokenized_combined_text = [preprocessing_string(doc) for doc in combined_text]
# bm_25_local = BM25Okapi(combined_text)
# scores = bm25.get_scores(question_0.split())
# # print(scores)
# # scored_results = list(zip(scores,retrived_docs[0],retrieved_meta[0]))
# scored_results = list(zip(scores, retrived_docs[0], retrieved_meta[0]))
# reranked_results = sorted(scored_results, key=lambda x: x[0], reverse=True)


In [186]:
# reranked_results

In [180]:
# import numpy as np
# 
# def rerank_chunks(question, chunks, n_results=int):
#     
#     
#     retrieved_docs, retrieved_meta = chunks
# 
#     # prepare the dataset 
#     combined_text = []
#     for doc, meta in zip(retrieved_docs[0], retrieved_meta[0]):
#         text = ""
#         for key in meta.keys():
#             text += meta[key]
#         text += doc
#         combined_text.append(text)
# 
#     # calculate bm25 scores for each retrieved content 
#     tokenized_combined_text = [word_tokenize(doc) for doc in combined_text]
#     bm_25 = BM25Okapi(tokenized_combined_text)
#     scores = bm_25.get_scores(question.split())
# 
#     # create a index for selecting n_results for the reranker 
#     all_results = []
#     for i, (score, doc, meta) in enumerate(zip(scores, retrieved_docs[0], retrieved_meta[0])):
#         all_results.append({
#             'score': score,
#             'document': doc,
#             'metadata': meta,
#             'index': i
#         })
# 
#     # reranker 
#     reranked_results = sorted(all_results,
#                               key=lambda x: x['score'],
#                               reverse=True)[:n_results]
#     return [(item['score'], item['document'], item['metadata'])
#             for item in reranked_results]

In [210]:
def rerank_chunks(question, chunks, n_results=int):
    retrieved_docs, retrieved_meta = chunks

    # 1. 准备文档
    combined_text = []
    for doc, meta in zip(retrieved_docs[0], retrieved_meta[0]):
        text = ""
        for key in meta.keys():
            text += meta[key]
        text += doc
        combined_text.append(text)

    # 2. 计算分数
    tokenized_combined_text = [word_tokenize(doc) for doc in combined_text]
    bm_25 = BM25Okapi(tokenized_combined_text)
    scores = bm_25.get_scores(question.split())

    # 3. 创建结果列表
    all_results = []
    for score, doc, meta in zip(scores, retrieved_docs[0], retrieved_meta[0]):
        result = {
            'score': score,
            'chunk': {  # 为template创建chunk格式
                'header': {
                    'header1': meta.get('Header 1', ''),
                    'header2': meta.get('Header 2', '')
                },
                'text': doc
            }
        }
        all_results.append(result)

    # 4. 排序
    reranked_results = sorted(all_results,
                              key=lambda x: x['score'],
                              reverse=True)[:n_results]

    # 5. 返回包含score和template格式的结果
    return reranked_results
# 
# def prompt_generation(question, reranked_chunks):
#     # 从reranked_chunks中只取chunk部分用于template
#     chunks_for_template = [item['chunk'] for item in reranked_chunks]
# 
#     prompt = rag_template.render(
#         role='you are an experienced wargame player',
#         question=question,
#         chunks=chunks_for_template
#     )
#     return prompt, reranked_chunks  # 同时返回带分数的完整结果

In [208]:
retrieved_chunks = call_retriever(question_2, n_results=100)
reranked_chunks = rerank_chunks(question_2, retrieved_chunks, n_results=10)
# print(question_2)
print(reranked_chunks)
for rr_chk in reranked_chunks:
    print('score: {}'.format(rr_chk['score']))
    print('header: {}'.format('#'.join(rr_chk['chunk']['header'][key]for key in rr_chk['chunk']['header'].keys() if key != 'type')))
    print('text: {}'.format(rr_chk['chunk']['text']))
    print('-'*100)

[{'score': 15.420252591115945, 'chunk': {'header': {'header1': 'introduct', 'header2': 'strateg reserv stratagem ( pg 41-43 )'}, 'text': 'If every model in a unit is fully visible to an observing model, then that unit is fully visible to that observing model. For the purposes of determining if an enemy unit is One model in this unit is visible, meaning the unit is visible.  \nFrom well-timed Strategic Reserves to deftly executed Stratagems, gifted generals make use of all the tactical advantages at their disposal.  \nModel Fully Visible  \n#### Unit It Is Observing.'}}, {'score': 14.3578906842436, 'chunk': {'header': {'header1': 'weapon abil', 'header2': ''}, 'text': '27  \n#### Hazardous  \nWeapons powered by unstable and dangerous energy sources pose a substantial risk to the wielder every time they are used. Weapons with **[HAZARDOUS]** in their profile are known as Hazardous weapons. Each time a unit is selected to shoot or fight, if one or more models attack with Hazardous weapons

In [205]:
for rr_chk in reranked_chunks:
    print(rr_chk)
    print(rr_chk['header'])
    break

{'score': 15.420252591115945, 'chunk': {'header': {'header1': 'introduct', 'header2': 'strateg reserv stratagem ( pg 41-43 )'}, 'text': 'If every model in a unit is fully visible to an observing model, then that unit is fully visible to that observing model. For the purposes of determining if an enemy unit is One model in this unit is visible, meaning the unit is visible.  \nFrom well-timed Strategic Reserves to deftly executed Stratagems, gifted generals make use of all the tactical advantages at their disposal.  \nModel Fully Visible  \n#### Unit It Is Observing.'}}


KeyError: 'header'

In [None]:
retrieved_chunks = retrieve(question_2, n_results=100)
reranked_chunks = rerank_chunks(question_2, retrieved_chunks, n_results=10)
for rr_chk in reranked_chunks:
    print('score: {}'.format(rr_chk[0]))
    print('header: {}'.format('#'.join(rr_chk[1]['header'].values())))
    print('text: {}'.format(rr_chk[1]['text']))
    print('-'*100)

score: 3.3712760371802126
header: Core Concepts#Dice
text: #### Re-Rolls  
Some rules allow you to re-roll a dice roll, which means you get to roll some or all of the dice again. If a rule allows you to re-roll a dice roll that was made by adding several dice together (e.g. 2D6, 3D6, etc.) then, unless otherwise stated, you must re-roll all of those dice again. You can never re-roll a dice more than once, and re-rolls happen before modifiers (if any) are applied. Rules that refer to the value of an 'unmodified' dice roll are referring to the dice result after any re-rolls, but before any modifiers are applied.  
- **Unmodified Dice**: the result after re-rolls, but before any modifiers.  
- A dice can never be re-rolled more than once. - You must re-roll all dice if several need adding together
(e.g. 2D6).  
- Re-rolls are applied before any modifiers.  
#### Roll-Offs
----------------------------------------------------------------------------------------------------
score: 3.37127603

In [None]:
question_3 = 'Explain the Comand Re-roll stratagem'
retrieved_chunks = retrieve(question_2, n_results=100)
reranked_chunks = rerank_chunks(question_2, retrieved_chunks, n_results=10)
for rr_chk in reranked_chunks:
    # display reranked chunks
    print('-'*100)

In [None]:
def full_rag_gpu(question, n_results=4):
    chunks = retrieve(question, n_results=n_results)
    prompt = prompt_generation(question, chunks)
    
    out = llm_complete(prompt,if_jinja=True)
    return out


## RAG with reranker

In [213]:
def full_rag_reranker(question, n_results=5):
    # retriever part 
    retrieved_chunks = call_retriever(question, n_results=n_results*5)
    reranked_chunks = rerank_chunks(question, retrieved_chunks, n_results=n_results)
    
    # generation part 
    ## prompt preparation 
    prompt = prompt_generation(question,reranked_chunks,if_sorted=True)
    print(prompt)
    out = llm_complete(prompt,if_jinja=True)
    return out


    



Compare the generation from both pipeline (with and without reranker)

What do you think ?

In [214]:
full_rag_reranker(question_2, n_results=5)


<|begin_of_text|><|start_header_id|>system<|end_header_id|>
you are an experienced wargame player
Here is the relevant context:


Section: introduct

Subsection: strateg reserv stratagem ( pg 41-43 )


Content: If every model in a unit is fully visible to an observing model, then that unit is fully visible to that observing model. For the purposes of determining if an enemy unit is One model in this unit is visible, meaning the unit is visible.  
From well-timed Strategic Reserves to deftly executed Stratagems, gifted generals make use of all the tactical advantages at their disposal.  
Model Fully Visible  
#### Unit It Is Observing.


Section: weapon abil

Subsection: 


Content: 27  
#### Hazardous  
Weapons powered by unstable and dangerous energy sources pose a substantial risk to the wielder every time they are used. Weapons with **[HAZARDOUS]** in their profile are known as Hazardous weapons. Each time a unit is selected to shoot or fight, if one or more models attack with Haza

'In the context of the Warhammer 40,000 wargame, there is a Stratagem that can be used to reroll a failed dice roll, which is:\n\n**Re-roll D6 Roll**\n\nThis Stratagem allows you to reroll a failed D6 roll.'

# Further application

At this point, we created a single question/answer turn RAG. It can be usefull for some applications to allow conversations with documents. Modify the generation using [this example.](https://huggingface.co/docs/transformers/conversations).