**Your AI assistant**

Vous venez d'être embauché par la NBT (Nouvelle Bibliothèque de Toulouse) qui a entièrement dématérialisé ses ouvrages sous la forme de pdf 😧

Elle souhaite mettre en place un service de FAQ sur la base de ces ouvrages et vous a confié cette mission importante.

Dans 3 heures, vous devez mettre ce service en ligne et il ne vous reste plus que quelques fonctions à écrire. Vous décidez de tester ces fonctions sur la base de la documentation des documentations des machines à café de votre service ☕ sur la base de ce notebook.

Y parviendrez vous?



---

Timing: ~3h (watch out for long processing)

## Installing dependencies

### Langchain and vector storage

In [None]:
%%shell
pip install --quiet chromadb==0.4.18 langchain==0.0.349 loguru==0.7.2 pydantic==1.10.13 sentence-transformers==2.2.2

### PDF processing library

In [None]:
%%shell
pip install --quiet pymupdf==1.23.7

### LLM usage libraries

In [None]:
%%shell
pip install --quiet transformers==4.36.0 accelerate==0.25.0 bitsandbytes==0.41.3.post2 einops==0.7.0

### UI packages

In [None]:
%%shell
pip install --quiet gradio==3.44.0

### Update Runtime
Some of the libraries installed require a restart of the Kernel/Runtime to be properly loaded. Do it now.

## Import

In [None]:
from langchain.globals import set_debug
from torch.cuda import empty_cache, ipc_collect, is_available

set_debug(True)


def flush_gpu_memory():
  if is_available():
    empty_cache()
    ipc_collect()

## Config

In [None]:
import os

WORKING_DIR = "/content"
os.environ['WORKING_DIR'] = WORKING_DIR

# RAG components implementation

## Read documents and chunk them

### Get some documents

Here we just download PDF documents from the web. Any PDF would fit and you can get lots of them. Just be cautious that more documents (and expecially pages) will lead to more processing time and extra memory usage.

In [None]:
%%shell
curl https://www.nespresso.com/shared_res/manuals/inissia/inissia_C_breville.pdf > $WORKING_DIR/sample_data/inissia_C_breville.pdf

### Document preprocessing

This simple preprocessing function is using Langchain splitter to identify chunk in the document(s). Several splitting approaches are possible with potentially different granularity in the document indexing.

In [None]:
from langchain.text_splitter import CharacterTextSplitter
from langchain.text_splitter import RecursiveCharacterTextSplitter


def process_docs(docs):
  chunk_size = 500
  chunk_overlap = 50

  #text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
  text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

  chunked_documents = list()
  for doc_loader in docs:
    chunked_documents.extend(doc_loader.load_and_split(text_splitter))

  return chunked_documents

In [None]:
from langchain.document_loaders import PyMuPDFLoader

documents = []
for file in os.listdir('sample_data'):
    if file.endswith('.pdf'):
        pdf_path = f"{WORKING_DIR}/sample_data/" + file
        doc_loader = PyMuPDFLoader(pdf_path)
        documents.append(doc_loader)

chunked_documents = process_docs(documents)

In [None]:
from loguru import logger

assert len(chunked_documents) > 0, "Please load document"

logger.info(f"Found {len(chunked_documents)} pieces of texts over {len(documents)} documents")

## Visualizing a document chunk

In [None]:
for idx, chunk in enumerate(chunked_documents):
  if chunk.metadata['page'] == 15:
    print(f"Chunk #{idx}")
    print(chunk.page_content)

## Compute and store embeddings

Selecting sentence a embedding model: https://huggingface.co/sentence-transformers/all-MiniLM-L12-v2

This will be used to conver the text chunk into embedding vector to enable vector search (aka semantic). The size and quality of the embedding might lead to various result quality, `all-MiniLM-L12-v2` being quite a solid baseline.

Learn more:
* https://huggingface.co/blog/mteb
* https://huggingface.co/spaces/mteb/leaderboard

### Load embedding model

In [None]:
embeddings_model_name = "all-MiniLM-L6-v2"

In [None]:
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

embedding_model = HuggingFaceEmbeddings(model_name=embeddings_model_name)

### Create retriever

We are using Chroma (a simple vector database) to store the embedding and then build a retriever engine on top of it.

In [None]:
vectordb = Chroma.from_documents(chunked_documents, embedding=embedding_model, persist_directory="./db")
vectordb.persist()
retriever = vectordb.as_retriever(search_type="mmr", search_kwargs={"k": 5, "fetch_k": 10})

### Simple search test

In [None]:
retrieved_docs = retriever.invoke(input="cold coffee")

for i, doc in enumerate(retrieved_docs):
  print(f'\n<<{i}>> on page {doc.metadata["page"]}: \n{doc.page_content}')

## Local LLM

### Load model

In [None]:
flush_gpu_memory()

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import torch


def load_model(model_name):
  flush_gpu_memory()

  tokenizer = AutoTokenizer.from_pretrained(model_name)

  model = None
  if model_name == "TinyLlama/TinyLlama-1.1B-Chat-v0.6":
      model = AutoModelForCausalLM.from_pretrained(
          model_name, device_map="auto", torch_dtype="auto"
      )
  elif model_name == "HuggingFaceH4/zephyr-7b-beta":
      model = AutoModelForCausalLM.from_pretrained(
          model_name, device_map="auto", torch_dtype="auto", load_in_4bit=True
      )
  elif model_name == "SkunkworksAI/phi-2":
      model = AutoModelForCausalLM.from_pretrained(
          model_name, device_map="auto", torch_dtype=torch.float16, trust_remote_code=True
      )
  else:
    assert False, f"{model_name} unknown"

  generation_config = GenerationConfig.from_pretrained(model_name)
  generation_config.max_new_tokens = 1024
  generation_config.temperature = 0.0001
  generation_config.do_sample = True

  return model, tokenizer, generation_config

In [None]:
# model_name = "TinyLlama/TinyLlama-1.1B-Chat-v0.6"
#model_name = "mistralai/Mistral-7B-Instruct-v0.2"
model_name = "HuggingFaceH4/zephyr-7b-beta"
# model_name = "microsoft/phi-2"

model, tokenizer, generation_config = load_model(model_name)

### Conversational pipeline

In [None]:
from transformers import pipeline

pipe_conversation = pipeline(
    "conversational",
    model=model,
    tokenizer=tokenizer,
    generation_config=generation_config,
    num_return_sequences=1,
    eos_token_id=tokenizer.eos_token_id,
    pad_token_id=tokenizer.eos_token_id,
)

#### Basic test

In [None]:
messages = [
    {
        "role": "system",
        "content": """You are a troubleshooting chatbot that talks like a pirate. Your goal is to help user solving the problem they have with
their appliance.

Make short messages of one or two sentences maximum.
""",
    },
    {
        "role": "user",
        "content": "cold coffee"
    },
]

In [None]:
%%time
conversation = pipe_conversation(messages)

In [None]:
for is_user, text in conversation.iter_texts():
  role = "USER" if is_user else "BOT"
  print(role + "> " + text)

### RAG pipeline

In [None]:
from transformers import pipeline

pipe_text_generation = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    generation_config=generation_config,
    num_return_sequences=1,
    eos_token_id=tokenizer.eos_token_id,
    pad_token_id=tokenizer.eos_token_id,
)

In [None]:
from langchain.llms.huggingface_pipeline import HuggingFacePipeline

llm = HuggingFacePipeline(pipeline=pipe_text_generation)

In [None]:
flush_gpu_memory()

#### Basic test

In [None]:
from time import time

from langchain.chains import RetrievalQA

query = "cold coffee"

hide_source = True # Switch that to see what has been used to retrieve info
qa = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents= not hide_source)

# Get the answer from the chain
start = time()
res = qa(query)
answer, docs = res['result'], [] if hide_source else res['source_documents']
end = time()

# Print the result
print("\n\n> Question:")
print(query)
print("\n\n> Anwser:")
print(answer)

# Print the relevant sources used for the answer
for document in docs:
    print("\n> " + document.metadata["source"] + ":")
    print(document.page_content)

#### Uncomment the following cell if you want a continous run without UI

In [None]:
# import time

# hide_source = True # Switch that to see what has been used to retrieve info
# while True:
#     query = input("\nEnter a query: ")
#     if query == "exit":
#         break
#     if query.strip() == "":
#         continue

#     # Get the answer from the chain
#     start = time.time()
#     res = qa(query)
#     answer, docs = res['result'], [] if hide_source else res['source_documents']
#     end = time.time()

#     # Print the result
#     print("\n\n> Question:")
#     print(query)
#     print(answer)

#     # Print the relevant sources used for the answer
#     for document in docs:
#         print("\n> " + document.metadata["source"] + ":")
#         print(document.page_content)

# Conversational demo

## Recap on RAG architecture

![Diagram from wikipedia](https://upload.wikimedia.org/wikipedia/commons/thumb/3/37/RAG_schema.svg/1280px-RAG_schema.svg.png)

## Document encoder & vector database for retriever

In [None]:
def process_docs(docs):
  chunk_size = 500
  chunk_overlap = 50

  #text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
  text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

  chunked_documents = list()
  for doc in docs:
    doc_loader = PyMuPDFLoader(doc)
    chunked_documents.extend(doc_loader.load_and_split(text_splitter))

  return chunked_documents


def doc_retriever(chunked_documents, model=embedding_model):
  vectordb = Chroma.from_documents(chunked_documents, embedding=embedding_model, persist_directory="./db")
  vectordb.persist()
  retriever = vectordb.as_retriever(search_type="mmr", search_kwargs={"k": 3, "fetch_k": 5})

  return retriever

## Conversational LLM

In [None]:
# model_name = "TinyLlama/TinyLlama-1.1B-Chat-v0.6"
#model_name = "mistralai/Mistral-7B-Instruct-v0.2"
model_name = "HuggingFaceH4/zephyr-7b-beta"
# model_name = "SkunkworksAI/phi-2"

flush_gpu_memory()

model, tokenizer, generation_config = load_model(model_name)

pipe_conversation = pipeline(
    "conversational",
    model=model,
    tokenizer=tokenizer,
    generation_config=generation_config,
    num_return_sequences=1,
    eos_token_id=tokenizer.eos_token_id,
    pad_token_id=tokenizer.eos_token_id,
)

In [None]:
def request_chat(messages):
  logger.info("Generating next message...")
  conversation = pipe_conversation(messages)

  turn = 0
  messages = list()
  for is_user, text in conversation.iter_texts():
    role = "USER" if is_user else "BOT"
    if turn == 0:
      role='system'
    else:
      print(turn, '#', role + "> " + text)

    messages.append(dict(role=role, content=text))
    turn += 1

  # return last message from conversation
  return messages[-1]["content"]

## Conversation prompt

In [None]:
system_prompt = """You are a troubleshooting chatbot that talks like a pirate. Your goal is to help user solving the problem they have with
their appliance. One of the first thing is to ask the user what is the product or appliance concerned.
"""

contextual_prompt = """\nIn the following, we will provide you contextual information about the product from a
document. Use it to answer questions from the user.
Your answer MUST ONLY come from this text.
DO NOT answer if the information is insufficient.
DO NOT be creative in your answer and only cite what from the contextual information.
You still talk like a pirate.
"""

file_uploaded_prompt = "Explain briefly that the file has been processed and it's now ready for questions. You still talk like a pirate."

def raw_llm_mode_prompt(prompt, chunked_documents):
  logger.info('Update prompt in RAW_LLM_MODE')
  # default RAW mode where the max content is added in prompt as context
  total_chars_count = 0
  prompt += '\n\nHere is the contextual information document which is limited to few pages:'

  for chunk in chunked_documents:
    content = chunk.page_content
    total_chars_count += len(content)
    if (total_chars_count + 300) > CONTEXT_MAX_CHARS:
      break
    prompt += '\n' + str(content)

  return prompt


def rag_llm_mode_prompt(query, prompt, chunked_documents, retriever):
  logger.info('Update prompt in RAG_MODE')

  if isinstance(query, str):
    retrieved_docs = retriever.invoke(query)
    logger.info(f'Context found:')
    for i, doc in enumerate(retrieved_docs):
      logger.info(f'\n<<{i}>> on page {doc.metadata["page"]}: \n{doc.page_content[0:100]} [...]')

    prompt += '\nWhen providing an answer, you SHALL cite the page number and suggest the user to read it.'
    prompt += '\n\nHere is the contextual information document which is focused on the key pages:'
    total_chars_count = 0
    for i, doc in enumerate(retrieved_docs):
        added_prompt = '\n\n# ON PAGE '+str(doc.metadata["page"])
        added_prompt += '\n' + str(doc.page_content)

        total_chars_count += len(added_prompt)
        if (total_chars_count + 300) > CONTEXT_MAX_CHARS:
            break

        prompt += added_prompt
  else:
    logger.warning("No query yet, RAG mode will be started as soon as a query arrives.")

  return prompt

def adapt_prompt(history, file_name, messages):
    prompt = system_prompt
    if file_name is None:
        logger.warning('No file.')
    else:
        chunked_documents, retriever = session_context['files'][file_name]

        logger.info("Adding content to prompt...")
        prompt += contextual_prompt
        prompt += '\n'

        if session_context['llm_mode'] == RAW_LLM_MODE:
            prompt += raw_llm_mode_prompt(prompt, chunked_documents)

        if session_context['llm_mode'] == RAG_MODE:
            # use last user message to query relevant pages as context and add it to the prompt
            query = history[-1][0]
            prompt += rag_llm_mode_prompt(query, prompt, chunked_documents, retriever)

        # update original system prompt
        messages[0] = dict(role="system", content=prompt)

        logger.info(f"Updated prompt.")

    return messages

In [None]:
from random import randint

import gradio as gr
import time

import logging


logger = logging.getLogger('rag_chatbot')
logger.setLevel(logging.INFO)

CONTEXT_MAX_CHARS = 2000
RAW_LLM_MODE = "Fixed context"
RAG_MODE = "RAG"

session_context = dict(
  files=dict(),
  llm_mode=RAW_LLM_MODE
)

def push_response(history, response):
  history[-1][1] = ""
  for character in response:
    history[-1][1] += character
    time.sleep(randint(25, 75)/1000)
    yield history


def add_text(history, text):
  history = history + [(text, None)]
  return history, gr.update(value="", interactive=False)


def add_file(history, file):
  history = history + [((file.name, None), None)]
  return history


def change_mode(llm_mode):
  logger.info(f'Updating LLm Mode: llm_mode={llm_mode}')
  session_context['llm_mode'] = llm_mode


def process_message_history(history):
  messages = [
    dict(role="system", content=system_prompt)
  ]
  file_name = None
  for msg in history:
    if isinstance(msg[0], str):
      messages.append(dict(role="user", content=msg[0]))
    if isinstance(msg[0], tuple):
      file_name = msg[0][0]
      if file_name not in session_context['files']:
        logger.info("Processing file...")

        chunked_documents = process_docs([file_name])
        retriever = doc_retriever(chunked_documents)

        session_context['files'][file_name] = (chunked_documents, retriever)

      messages.append(dict(role="user", content="The user uploaded a file: " + file_name))

    if msg[1] is not None and isinstance(msg[1], str):
      messages.append(dict(role="assistant", content=msg[1]))
  return file_name, messages


def bot(history):
  logger.info(f"Preparing Bot message turn={len(history)}")

  # building messages sequence from history
  file_name, messages = process_message_history(history)

  # if a file was processed, then update prompt
  adapt_prompt(history, file_name, messages)

  if isinstance(history[-1][0], tuple):
    # if last message was file processed, answer that it's ready
    messages.append(
      dict(
        role="system",
        content=file_uploaded_prompt
      )
    )

  # get LLM to answer
  response = request_chat(messages)
  history[-1][1] = response
  yield history

### Demo runner

In [None]:
with gr.Blocks() as demo:
  chatbot = gr.Chatbot(
    [[None, "Hi, I'm your assistant. Ask me anything or upload a product manual to start."]],
    elem_id="chatbot",
    bubble_full_width=False,
  )

  with gr.Row():
    txt = gr.Textbox(
      scale=4,
      show_label=False,
      placeholder="Enter text and press enter, or upload an image",
      container=False,
    )
    with gr.Column():
      btn = gr.UploadButton("📁", file_types=["pdf"])
      drpdwn = gr.Dropdown(
        [RAW_LLM_MODE, RAG_MODE],
        value=RAW_LLM_MODE,
        label="LLM Mode",
      )

  txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then(
    bot, chatbot, chatbot
  )
  txt_msg.then(lambda: gr.update(interactive=True), None, [txt], queue=False)
  file_msg = btn.upload(add_file, [chatbot, btn], [chatbot], queue=False).then(
    bot, chatbot, chatbot
  )
  drpdwn.change(change_mode, drpdwn)

demo.queue()
if __name__ == "__main__":
  demo.queue(concurrency_count=3).launch(
    server_name="0.0.0.0",
    share=True,
    debug=True
  )