In [None]:
# Python 3.11.11
# Install ipykernel first

In [None]:
!pip install -r requirements.txt

In [43]:
from dotenv import load_dotenv

load_dotenv()

True

# 1. Some preparations

### Prepare embedding model

In [None]:
!pip install sentence_transformers

In [3]:
from langchain_community.embeddings import HuggingFaceBgeEmbeddings

model_name = "BAAI/bge-small-en"
model_kwargs = {"device": "cpu"}
encode_kwargs = {"normalize_embeddings": True}
embedding = HuggingFaceBgeEmbeddings(
    model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs
)

  from .autonotebook import tqdm as notebook_tqdm


### Prepare google Gemini model API

In [5]:
import getpass
import os

if "GOOGLE_API_KEY" not in os.environ:
    os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter your Google AI API key: ")
from langchain_google_genai import ChatGoogleGenerativeAI

model = ChatGoogleGenerativeAI( 
    model="gemini-2.0-flash-exp",
    temperature=0,
)

In [6]:
model

ChatGoogleGenerativeAI(model='models/gemini-2.0-flash-exp', google_api_key=SecretStr('**********'), temperature=0.0, client=<google.ai.generativelanguage_v1beta.services.generative_service.client.GenerativeServiceClient object at 0x32b46a410>, default_metadata=())

# 2. Data Loading

### Partition PDF tables, text, and images

#### Extract images, tables and text chunk

##### Use Unstructured to partition it

In [7]:
from langchain_text_splitters import CharacterTextSplitter
from unstructured.partition.pdf import partition_pdf

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Extract elements from PDF
def extract_texts_and_images(path, fname):
    """
    Extract images and chunk text from a PDF file.
    path: File path, which is used to dump images (.jpg)
    fname: File name
    """
    return partition_pdf(
        filename=path +"/"+ fname,
        # strategy="hi_res",
        extract_images_in_pdf=True,
        extract_image_block_types=["Image"],
        # infer_table_structure=True,
        chunking_strategy="by_title",
        max_characters=4000,
        new_after_n_chars=3800,
        combine_text_under_n_chars=2000,
        extract_image_block_output_dir=path,
    )

# extract tables from pdf
def extract_tables(path, fname):
    return partition_pdf(
        filename=path +"/"+ fname,
        infer_table_structure=True,
        strategy='hi_res',
    )

# Categorize elements by type
def categorize_elements(element, type):
    """
    Categorize extracted elements from a PDF into tables and texts.
    element: List of unstructured.documents.elements
    """
    unstructured_element = [el for el in element if el.category == type]
    result = [el.text for el in unstructured_element]
    return result


# File path
fpath = "sample_data/attention-is-all-you-need"
fname = "attention-is-all-you-need.pdf"

# Get elements
texts_element = extract_texts_and_images(fpath, fname)
tables_element = extract_tables(fpath, fname)

# Get text, tables
texts = categorize_elements(texts_element, "CompositeElement")
tables = categorize_elements(tables_element, "Table")

# Optional: Enforce a specific token size for texts
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=4000, chunk_overlap=0
)
joined_texts = " ".join(texts)
texts_4k_token = text_splitter.split_text(joined_texts)

#### Relocate images and tables

In [10]:
import os
import shutil

# Đường dẫn tới file PDF
pdf_path = fpath+"/"+fname
pdf_dir = os.path.dirname(pdf_path)

# Đường dẫn tới các thư mục đích
table_output_dir = os.path.join(pdf_dir, "tables")
image_output_dir = os.path.join(pdf_dir, "images")

# Tạo các thư mục nếu chưa tồn tại
os.makedirs(table_output_dir, exist_ok=True)
os.makedirs(image_output_dir, exist_ok=True)

# Liệt kê các tệp trong thư mục của file PDF
for filename in os.listdir(pdf_dir):
    # Đường dẫn đầy đủ tới từng file
    file_path = os.path.join(pdf_dir, filename)
    
    # Kiểm tra nếu là file bảng và di chuyển vào thư mục tables
    if filename.startswith("table-") and filename.endswith(".jpg"):
        shutil.move(file_path, os.path.join(table_output_dir, filename))
    
    # Kiểm tra nếu là file hình ảnh và di chuyển vào thư mục images
    elif filename.startswith("figure-") and filename.endswith(".jpg"):
        shutil.move(file_path, os.path.join(image_output_dir, filename))

# 3. Multi-vector retriever

##### Use multi-vector-retriever to index image (and / or text, table) summaries, but retrieve raw images (along with raw texts or tables).

### Text and Table summaries using gemini
1. We will use Gemini 1.5 Flash to produce table and text summaries.
2. Text summaries are advised if using large chunk sizes (4k,...)
3. Summaries are used to retrieve raw tables and / or raw chunks of text.

In [11]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate


# Generate summaries of text elements
def generate_text_summaries(texts, tables, summarize_texts=False):
    """
    Summarize text elements
    texts: List of str
    tables: List of str
    summarize_texts: Bool to summarize texts
    """

    # Prompt
    prompt_text = """You are an intelligent assistant tasked with summarizing tables and text for retrieval. \
    You always provide well-reasoned answers that are both correct and helpful. \
    These summaries will be embedded and used to retrieve the raw text or table elements. \
    Give a concise summary of the table or text that is well optimized for retrieval. Table or text: {element} \
    Understand and describe the table or text in detail, and extract all data present in it"""
    prompt = ChatPromptTemplate.from_template(prompt_text)

    # Text summary chain
    summarize_chain = {"element": lambda x: x} | prompt | model | StrOutputParser()

    # Initialize empty summaries
    text_summaries = []
    table_summaries = []

    # Apply to text if texts are provided and summarization is requested
    if texts and summarize_texts:
        text_summaries = summarize_chain.batch(texts, {"max_concurrency": 5})
    elif texts:
        text_summaries = texts

    # Apply to tables if tables are provided
    if tables:
        table_summaries = summarize_chain.batch(tables, {"max_concurrency": 5})

    return text_summaries, table_summaries


# Get text, table summaries
text_summaries, table_summaries = generate_text_summaries(
    texts_4k_token, tables, summarize_texts=True
)

### Image summaries

#### Pass base64 encoded images

In [13]:
import base64
import os

from langchain_core.messages import HumanMessage, SystemMessage


def encode_image(image_path):
    """Getting the base64 string"""
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")


def image_summarize(img_base64, prompt):
    """Make image summary"""

    msg = model.invoke(
        [
            HumanMessage(
                content=[
                    {"type": "text", "text": prompt},
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"},
                    },
                ]
            )
        ]
    )
    return msg.content


def generate_img_summaries(path):
    """
    Generate summaries and base64 encoded strings for images
    path: Path to list of .jpg files extracted by Unstructured
    """

    # Store base64 encoded images
    img_base64_list = []

    # Store image summaries
    image_summaries = []
    # These summaries will be embedded and used to retrieve the raw image. \
    # Give a concise summary of the image that is well optimized for retrieval. \
    # Prompt
    prompt = """You are an assistant tasked with summarizing images for retrieval. \
    You always provide well-reasoned answers that are both correct and helpful. \
    Understand and describe the image in detail, and extract all data present in it."""

    # Apply to images
    for img_file in sorted(os.listdir(path)):
        if img_file.endswith(".jpg"):
            img_path = os.path.join(path, img_file)
            base64_image = encode_image(img_path)
            img_base64_list.append(base64_image)
            image_summaries.append(image_summarize(base64_image, prompt))

    return img_base64_list, image_summaries


# Image summaries
img_base64_list, image_summaries = generate_img_summaries(image_output_dir)

# 4. Vectorstore Qdrant config

### Create Qdrant vector store and connect

In [None]:
# create qdrant client
from qdrant_client import QdrantClient
client = QdrantClient(
    url=os.getenv("QDRANT_URL"),
    api_key=os.getenv("QDRANT_API_KEY")
)

### Create new collection to store the vetordatabase

In [None]:
# create colections 
from qdrant_client.http import models

collection_name = "YOUR_COLLECTION_NAME_HERE"
collection_config = models.VectorParams(
    size=384,
    distance=models.Distance.COSINE
)

client.recreate_collection(
    collection_name=collection_name,
    vectors_config=collection_config
)

  client.recreate_collection(


True

In [51]:
# check colections again 
client.get_collections()
# optional
# delete colection
# client.delete_collection(collection_name="Data1") 

# close client down
# client.close()

CollectionsResponse(collections=[CollectionDescription(name='Session 7'), CollectionDescription(name='Session 3'), CollectionDescription(name='Session 2'), CollectionDescription(name='Data1'), CollectionDescription(name='Session 5'), CollectionDescription(name='Session 1'), CollectionDescription(name='Session 6'), CollectionDescription(name='Session 4')])

### Qdrant is ready wwith embedding model and retrieval_mode !!!

In [None]:
from langchain_qdrant import QdrantVectorStore, RetrievalMode

qdrant = QdrantVectorStore(
    embedding=embedding,
    client=client,
    collection_name=collection_name,
    retrieval_mode=RetrievalMode.DENSE,
)

# 5. Add to vectorstore

### Add raw docs and doc summaries to Multi Vector Retriever:
1. Store the raw texts, tables, and images in the docstore.
2. Store the texts, table summaries, and image summaries in the vectorstore for efficient semantic retrieval.

In [53]:
import uuid
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
from langchain_core.documents import Document



def create_multi_vector_retriever(
    vectorstore, text_summaries, texts, table_summaries, tables, image_summaries, images
):
    """
    Create retriever that indexes summaries, but returns raw images or texts
    """

    # Initialize the storage layer
    store = InMemoryStore()
    id_key = "doc_id"

    # Create the multi-vector retriever
    retriever = MultiVectorRetriever(
        vectorstore=vectorstore,
        docstore=store,
        id_key=id_key,
    )

    # Helper function to add documents to the vectorstore and docstore
    def add_documents(retriever, doc_summaries, doc_contents):
        doc_ids = [str(uuid.uuid4()) for _ in doc_contents]
        summary_docs = [
            Document(page_content=s, metadata={id_key: doc_ids[i]})
            for i, s in enumerate(doc_summaries)
        ]
        retriever.vectorstore.add_documents(summary_docs)
        retriever.docstore.mset(list(zip(doc_ids, doc_contents)))

    # Add texts, tables, and images
    # Check that text_summaries is not empty before adding
    if text_summaries:
        add_documents(retriever, text_summaries, texts)
    # Check that table_summaries is not empty before adding
    if table_summaries:
        add_documents(retriever, table_summaries, tables)
    # Check that image_summaries is not empty before adding
    if image_summaries:
        add_documents(retriever, image_summaries, images)

    return retriever, vectorstore, store, id_key


# The vectorstore to use to index the summaries

# Create retriever
retriever_multi_vector_img, vectorstore1, store1, id_key1 = create_multi_vector_retriever(
    qdrant,
    text_summaries,
    texts,
    table_summaries,
    tables,
    image_summaries,
    img_base64_list,
)

# 6. Building RAG system with Langchain LCEL

## Adding Memory

### Question maker

#### Multimodel RAG system needs conversational memory to continue the conversation wtih users !!!
One user asks a new question, there is a history of questions and answers in his/her mind. Here the idea is to reformulate user's question into a format that has its own context. We are going to use LLM to perform this reformulation of the question.

User's followup question => LLM => reformulated question (with history)

In [54]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableSequence

instruction_to_system = """
Given a chat history and the latest user question 
which might reference context in the chat history, formulate a standalone question 
which can be understood without the chat history. Do NOT answer the question, 
just reformulate it if needed and otherwise return it as is.
"""

question_maker_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", instruction_to_system),
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "{question}"),
    ]
)


question_chain = RunnableSequence(question_maker_prompt, model, StrOutputParser())

#### Choose the question to pass to the LLM 
Define a function that looks at the chat history: there are 2 cases

1. if there is a history: it will pass the question chain (that reformulates user's question)
2. if chat history is empty, it will pass user's question directly

In [55]:
def contextualized_question(input: dict):
    if input.get("chat_history"):
        return question_chain
    else:
        return input["question"]

### Build retriever
Bin the retrieved doc(s) into the correct parts of the Google gemini prompt template.

In [56]:
import io
import re

from IPython.display import HTML, display
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from PIL import Image
from langchain_core.prompts import MessagesPlaceholder
from langchain.memory import ConversationBufferMemory


def plt_img_base64(img_base64):
    """Disply base64 encoded string as image"""
    # Create an HTML img tag with the base64 string as the source
    image_html = f'<img src="data:image/jpeg;base64,{img_base64}" />'
    # Display the image by rendering the HTML
    display(HTML(image_html))


def looks_like_base64(sb):
    """Check if the string looks like base64"""
    return re.match("^[A-Za-z0-9+/]+[=]{0,2}$", sb) is not None


def is_image_data(b64data):
    """
    Check if the base64 data is an image by looking at the start of the data
    """
    image_signatures = {
        b"\xff\xd8\xff": "jpg",
        b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a": "png",
        b"\x47\x49\x46\x38": "gif",
        b"\x52\x49\x46\x46": "webp",
    }
    try:
        header = base64.b64decode(b64data)[:8]  # Decode and get the first 8 bytes
        for sig, format in image_signatures.items():
            if header.startswith(sig):
                return True
        return False
    except Exception:
        return False


def resize_base64_image(base64_string, size=(128, 128)):
    """
    Resize an image encoded as a Base64 string
    """
    # Decode the Base64 string
    img_data = base64.b64decode(base64_string)
    img = Image.open(io.BytesIO(img_data))

    # Resize the image
    resized_img = img.resize(size, Image.LANCZOS)

    # Save the resized image to a bytes buffer
    buffered = io.BytesIO()
    resized_img.save(buffered, format=img.format)

    # Encode the resized image to Base64
    return base64.b64encode(buffered.getvalue()).decode("utf-8")


def split_image_text_types(docs):
    """
    Split base64-encoded images and texts
    """
    b64_images = []
    texts = []
    for doc in docs:
        # Check if the document is of type Document and extract page_content if so
        if isinstance(doc, Document):
            doc = doc.page_content
        if looks_like_base64(doc) and is_image_data(doc):
            doc = resize_base64_image(doc, size=(1300, 600))
            b64_images.append(doc)
        else:
            texts.append(doc)
    return {"images": b64_images, "texts": texts}

### Our main prompt with/without the memory, this generates the final output !!!
1. Context and Question: Use conversational memory as a part of the context, pass them with question provided by users to LLM.
2. Avoid redundant, meaningless answer: The LLM use all the information provided to generate helpful, complete and concise answer.
3. Avoid hallucination: Tell the LLM not to make up answer if it does not know, just say "I don't know"
4. Visualization: The answer must be written in LaTeX format for better visualized, because it may contain mathematic symbolics, formulas, functions,...

In [None]:
def img_prompt_func(data_dict):
    """
    Join the context into a single string
    """
    formatted_texts = "\n".join(data_dict["context"]["texts"])
    messages = []

    # Adding image(s) to the messages if present
    if data_dict["context"]["images"]:
        for image in data_dict["context"]["images"]:
            image_message = {
                "type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{image}"},
            }
            messages.append(image_message)

    # Adding the text for q and a
    text_message = {
        "type": "text",
        "text": (
            "You are a helpful and smart math assistant tasking with providing mathematics knowledge \n"
            "You will be given a mixed of text, tables, and image(s) usually of charts or graphs.\n"
            "Use this information to provide the answer to the user question.\n"
            "The answer must be written in LaTeX format because it may contain mathemtic sybolics, formulas.\n"
            "Only provide meaningful, complete and concise answer. If you don't know the answer, just say you don't know, dont' try to make up one.\n"
            f"User-provided question: {data_dict['question']}\n\n"
            "Text and / or tables:\n"
            f"{formatted_texts}"
        ),
    }
    messages.append(text_message)
    system_prompt = "Your name is Ngoc Anh."
    
    qa_prompt = ChatPromptTemplate.from_messages(
    [   SystemMessage(content=system_prompt),
        MessagesPlaceholder(variable_name="chat_history"),
        HumanMessage(content=messages)
    ]
)
    return qa_prompt

def multi_modal_rag_chain(retriever):
    """
    Multi-modal RAG chain
    """

    # RAG LCEL pipeline without conversational memory
    # chain = (
    #     {
    #         "context": retriever | RunnableLambda(split_image_text_types),
    #         "question": RunnablePassthrough(),
    #     }
    #     | RunnableLambda(img_prompt_func)
    #     | model
    #     | StrOutputParser()
    # )
    
    # RAG LCEL pipeline with conversational memory
    # chain = ( 
    #     RunnablePassthrough.assign(
    #         context = contextualized_question | retriever | RunnableLambda(split_image_text_types)
    #     )
    #     | RunnableLambda(img_prompt_func)
    #     | model
    #     | StrOutputParser()
    # )
    context_chain = RunnableSequence(contextualized_question, retriever, RunnableLambda(split_image_text_types))
    # RAG pipeline with conversational memory
    chain =RunnableSequence(
        RunnablePassthrough.assign(context = context_chain), RunnableLambda(img_prompt_func), model, StrOutputParser()
    )
    return chain

# Create RAG chain
chain_multimodal_rag = multi_modal_rag_chain(retriever_multi_vector_img)

# 7. Check

In [58]:
question = 'What is complexity per layer of the self-attention layer?'

In [59]:
# Check retrieval
docs = retriever_multi_vector_img.invoke(question, limit=6)

# We get 4 docs
len(docs)

4

In [61]:
from langchain_core.messages.ai import AIMessage

In [62]:
# create empty chat history
chat_history = []

In [68]:
def test(question):
    ai_msg = chain_multimodal_rag.invoke({"question": question, "chat_history": chat_history})
    chat_history.extend([
    HumanMessage(content=question),
    AIMessage(content=ai_msg)
    ]) # add conversation to chat_history
    return ai_msg
a = test(question=question)

In [69]:
eval(repr(a))

'The complexity per layer of the self-attention layer is $O(n^2 \\cdot d)$, where $n$ is the sequence length and $d$ is the dimension of the input.\n'

In [70]:
chat_history

[HumanMessage(content='What is complexity per layer of the self-attention layer?', additional_kwargs={}, response_metadata={}),
 AIMessage(content='Based on the provided table, the complexity per layer of the self-attention layer is $O(n^2 \\cdot d)$, where $n$ is the sequence length and $d$ is the dimension of the input.\n', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='What is complexity per layer of the self-attention layer?', additional_kwargs={}, response_metadata={}),
 AIMessage(content='The complexity per layer of the self-attention layer is $O(n^2 \\cdot d)$, where $n$ is the sequence length and $d$ is the dimension of the input.\n', additional_kwargs={}, response_metadata={})]

In [71]:
# question2 = "What about other layers ?"
question2 = "What I just ask?"

In [33]:
chat_history2 = []

In [None]:
# Conversation with conversational memory
def test(question):
    ai_msg = chain_multimodal_rag.invoke({"question": question2, "chat_history": chat_history})
    chat_history.extend([
    HumanMessage(content=question),
    AIMessage(content=ai_msg)
    ]) # add conversation to chat_history
    return ai_msg
test(question=question2)

'You asked about the complexity per layer of the self-attention layer. Based on the provided table, the complexity per layer of the self-attention layer is $O(n^2 \\cdot d)$, where $n$ is the sequence length and $d$ is the dimension of the input.\n'

In [None]:
# Conversation without conversational memory
def test(question):
    ai_msg = chain_multimodal_rag.invoke({"question": question, "chat_history": chat_history2})
    chat_history.extend([
    HumanMessage(content=question),
    AIMessage(content=ai_msg)
    ]) # add conversation to chat_history
    return ai_msg
test(question=question2)

'I do not know.  The provided images show attention mechanisms in a neural network and some word association graphs, but there is no record of a previous question.\n'