In [None]:
from unstructured.partition.pdf import partition_pdf

output_path = "data/"
file_path ='data/Kazakhstan_tarihi_7_atamura_sample.pdf'

DISCIPLINE = "Қазақстан тарихы"
GRADE = "10"
PUBLISHER = "Мектеп"

collection_name = "Docker-Redis_test_sample_tests_3"

# Reference: https://docs.unstructured.io/open-source/core-functionality/chunking
chunks = partition_pdf(
    filename=file_path,
    # infer_table_structure=True,            # extract tables
    strategy="ocr_only",                     # mandatory to infer tables

    languages=["kaz"],

    extract_image_block_types=["Image"],   # Add 'Table' to list to extract image of tables
    # image_output_dir_path=output_path,   # if None, images and tables will saved in base64

    extract_image_block_to_payload=True,   # if true, will extract base64 for API usage

    chunking_strategy="by_title",          # or 'basic'
    max_characters=20000,                  # defaults to 500
    combine_text_under_n_chars=5000,       # defaults to 0
    new_after_n_chars=10000,

    # extract_images_in_pdf=True,          # deprecated
)


# # Add metadata to chunks
for chunk in chunks:
    chunk.metadata.discipline = DISCIPLINE
    chunk.metadata.grade = GRADE
    chunk.metadata.publisher = PUBLISHER

print(chunks)

In [None]:
set([str(type(el)) for el in chunks])

In [None]:
import base64
import io
from PIL import Image as PILImage  # Rename to avoid conflict with IPython.display.Image
from IPython.display import Image, display


def display_base64_image(b64_code):
    try:
        image_data = base64.b64decode(b64_code)

        # 3. Filter: Check if width is greater than limit
        display(Image(data=image_data))
    except Exception as e:
        print(e)

def display_images(base64_list):
    
    for i, b64_code in enumerate(base64_list):
        try:
            display_base64_image(b64_code)     
        except Exception as e:
            print(f"Error processing image {i}: {e}")
    


In [None]:
tables = []
texts = []

for chunk in chunks:
    if "Table" in str(type(chunk)):
        tables.append(chunk)
    if "CompositeElement" in str(type(chunk)):
        texts.append(chunk)

tables, texts

In [None]:
print(texts[0].text)

In [None]:
def get_images_context(chunks):
    images_context = []
    for chunk in chunks:
        if "CompositeElement" in str(type(chunk)):
            chunk_els = chunk.metadata.orig_elements
            for i,el in enumerate(chunk_els):
                
                if "Image" in str(type(el)):
                    images_context.append({"context_text":chunk.text,"image_base64": el.metadata.image_base64})

    return images_context

images_context = get_images_context(chunks)

# 2. Create the second list by extracting references from the first list
# Do NOT call get_images_base64(chunks) here.
images = [item["image_base64"] for item in images_context]

In [None]:
display_images(images)

In [None]:

# --- VERIFICATION ---
addr_1 = id(images_context[0]["image_base64"])
addr_2 = id(images[0])

print(f"Address in Dict: {addr_1}")
print(f"Address in List: {addr_2}")
print(f"Are they the same object? {addr_1 == addr_2}")

In [None]:

def filter_images_context(text_image_list, min_width=150, min_height=150, max_width=1500, max_height=1500 ):
    print(f"Filtering for images wider than {min_width}px...\n")

    filtered_images = []
    
    for i, text_image in enumerate(text_image_list):
        try:
            # 1. Decode base64 to bytes
            image_data = base64.b64decode(text_image["image_base64"])
            
            # 2. Read image metadata using Pillow (without saving to disk)
            with PILImage.open(io.BytesIO(image_data)) as img:
                width, height = img.size
            
            # 3. Filter: Check if width is greater than limit
            if width > min_width and width < max_width and height > min_height and height < max_height:
                print(f"✅ Image {i}: {width}x{height} px")
                # display(Image(data=image_data))
                filtered_images.append(text_image)
                
            else:
                print(f"Skipped Image {i}: {width}x{height} px ")
                
        except Exception as e:
            print(f"Error processing image {i}: {e}")
    return filtered_images

# Usage
# 6th atamura
# filtered_images_context = filter_images_context(images_context, max_height=10000, max_width=10000)
# filtered_images = [item["image_base64"] for item in filtered_images_context]
# 7th atamura
filtered_images_context = filter_images_context(images_context, max_height=1600, max_width=1200)
filtered_images = [item["image_base64"] for item in filtered_images_context]
# filtered_images = images

In [None]:
# --- VERIFICATION ---
if filtered_images_context:
    addr_1 = id(filtered_images_context[0]["image_base64"])
    addr_2 = id(images[0])

    print(f"Address in Dict: {addr_1}")
    print(f"Address in List: {addr_2}")
    print(f"Are they the same object? {addr_1 == addr_2}")

In [None]:
display_images(filtered_images)

In [None]:
from dotenv import load_dotenv
import os

load_dotenv()

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt_text = """
You are an assistant tasked with summarizing tables and text.
Give a concise summary of the table or text in kazakh language.

Respond only with the summary, no additionnal comment.
Do not start your message by saying "Here is a summary" or anything like that.
Just give the summary as it is.

Table or text chunk: {element}

"""

prompt = ChatPromptTemplate.from_template(prompt_text)

# Summary chain
model = ChatGoogleGenerativeAI(model="gemini-3-flash-preview", api_key=GEMINI_API_KEY, temperature=1)
summarize_chain = {"element": lambda x: x} | prompt | model | StrOutputParser()

In [None]:
# Summarize text
text_summaries = summarize_chain.batch(texts, {"max_concurrency": 10})
text_summaries

In [None]:

# Summarize tables
tables_html = [table.metadata.text_as_html for table in tables]
table_summaries = summarize_chain.batch(tables_html, {"max_concurrency": 10})
table_summaries

In [None]:
prompt_template = """Describe the image in detail in kazakh language.
                This is a context text where the image appears:
                {context_text}"""
messages = [
    (
        "user",
        [
            {"type": "text", "text": prompt_template},
            {
                "type": "image_url",
                "image_url": {"url": "data:image/jpeg;base64,{image_base64}"},
            },
        ],
    )
]

prompt = ChatPromptTemplate.from_messages(messages)

chain = prompt | model | StrOutputParser()


image_summaries = chain.batch(filtered_images_context, {"max_concurrency": 5})

In [None]:
image_summaries

In [None]:
import uuid
from qdrant_client import QdrantClient, models
from langchain_qdrant import QdrantVectorStore



from langchain_core.documents import Document
from langchain_google_genai import GoogleGenerativeAIEmbeddings

from langchain_classic.retrievers import MultiVectorRetriever


QDRANT_API = os.getenv("QDRANT_API")
QDRANT_URL = os.getenv("QDRANT_URL")

REDIS_URL = os.getenv("REDIS_URL")

In [None]:
import json
from langchain_qdrant import QdrantVectorStore
from langchain_community.storage import RedisStore
from langchain_classic.storage import EncoderBackedStore
from langchain_classic.retrievers import MultiVectorRetriever
from langchain_core.documents import Document
from qdrant_client import QdrantClient, models

from unstructured.staging.base import dict_to_elements

# --- 1. Qdrant Setup (Your existing code) ---
client = QdrantClient(location=QDRANT_URL, api_key=QDRANT_API)


# Check if collection exists to avoid errors on restart
if not client.collection_exists(collection_name):
    client.create_collection(
        collection_name=collection_name,
        vectors_config=models.VectorParams(
            size=768, 
            distance=models.Distance.COSINE
        ),
    )

# Note the "metadata." prefix
nested_fields = ["metadata.discipline", "metadata.publisher", "metadata.grade"]

for field in nested_fields:
    client.create_payload_index(
        collection_name=collection_name,
        field_name=field,
        field_schema=models.PayloadSchemaType.KEYWORD
    )
    print(f"Index created for nested field: '{field}'")

vectorstore = QdrantVectorStore(
    client=client,
    collection_name=collection_name,
    embedding=GoogleGenerativeAIEmbeddings(
        model="models/gemini-embedding-001", 
        api_key=GEMINI_API_KEY,
        output_dimensionality=768
    ),
)

# --- 2. Redis Storage Setup (Replaces LocalFileStore) ---

# A. Create the Base Store (Handles raw bytes in Redis)
# 'namespace' adds a prefix to keys (e.g. "parent_docs:doc_id") so they are organized


redis_byte_store = RedisStore(
    # redis_url=REDIS_URL,
    redis_url="redis://localhost:6379", 
    namespace="parent_docs"
)

# B. Define Serializers (Object -> JSON Bytes)
# JSON is safer and cleaner than pickle for production
def json_encoder(obj: Document) -> bytes:
    if hasattr(obj, "to_dict"):
        return json.dumps(obj.to_dict())
    # If it's already a string (like your base64 images), just dump it
    return json.dumps(obj)


def json_decoder(data):
    """Восстанавливаем объект из JSON-строки"""
    if data is None:
        return None
    
    dict_data = json.loads(data)
    
    # Проверяем, является ли это словарем от unstructured (наличие типа элемента)
    if isinstance(dict_data, dict) and "type" in dict_data:
        # dict_to_elements ожидает список, поэтому оборачиваем в []
        elements = dict_to_elements([dict_data])
        return elements[0]
    
    return dict_data

# C. Create the "Smart" Store
# This wraps Redis to automatically handle Document objects
store = EncoderBackedStore(
    store=redis_byte_store,
    key_encoder=lambda x: x, 
    value_serializer=json_encoder,
    value_deserializer=json_decoder
)

# --- 3. The Retriever ---
id_key = "doc_id"

retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    docstore=store, 
    id_key=id_key,
)

In [None]:
doc_ids = []
summary_texts = []
valid_texts = [] # List to store only texts that have valid summaries

for i, summary in enumerate(text_summaries):
    # Check if summary exists and is not whitespace
    if summary and summary.strip():
        current_id = str(uuid.uuid4())
        doc_ids.append(current_id)
        
        # Capture the corresponding raw text
        valid_texts.append(texts[i])
        
        summary_texts.append(
            Document(
                page_content=summary, 
                metadata={
                    id_key: current_id,
                    # ADD YOUR METADATA HERE
                    "discipline": DISCIPLINE,
                    "publisher": PUBLISHER,
                    "grade": GRADE
                }
            )
        )
    else:
        print(f"⚠️ Warning: Text summary {i} is empty. Skipping.")

# Add to vectorstore and docstore only if valid data exists
if summary_texts:
    retriever.vectorstore.add_documents(summary_texts)
    retriever.docstore.mset(list(zip(doc_ids, valid_texts)))
    print(f"Successfully inserted {len(summary_texts)} text documents.")
else:
    print("No valid text summaries found to insert.")

In [None]:
table_ids = []
summary_tables = []
valid_tables = [] # List to store only tables that have valid summaries

for i, summary in enumerate(table_summaries):
    # Check if summary exists and is not whitespace
    if summary and summary.strip():
        current_id = str(uuid.uuid4())
        table_ids.append(current_id)
        
        # Capture the corresponding raw table
        valid_tables.append(tables[i])
        
        summary_tables.append(
            Document(
                page_content=summary,              
                metadata={
                    id_key: current_id, 
                    "discipline": DISCIPLINE,
                    "publisher": PUBLISHER,
                    "grade": GRADE
                }
            )
        )
    else:
        print(f"⚠️ Warning: Table summary {i} is empty. Skipping.")

# Add to vectorstore and docstore only if valid data exists
if summary_tables:
    retriever.vectorstore.add_documents(summary_tables)
    retriever.docstore.mset(list(zip(table_ids, valid_tables)))
    print(f"Successfully inserted {len(summary_tables)} tables.")
else:
    print("No valid table summaries found to insert.")


In [None]:
# 1. Создаем список ID и документов, но добавляем проверку на пустоту
img_ids = []
summary_img = []

for i, summary in enumerate(image_summaries):
    # Проверяем, что summary не пустое и не состоит только из пробелов
    if summary and summary.strip():
        current_id = str(uuid.uuid4())
        img_ids.append(current_id)
        summary_img.append(
            Document(
                page_content=summary,             
                metadata={
                    id_key: current_id, 
                    "discipline": DISCIPLINE,
                    "publisher": PUBLISHER,
                    "grade": GRADE
                }
            )
        )
    else:
        print(f"⚠️ Warning: Image summary {i} is empty. Skipping this image.")

# 2. Соответственно, фильтруем исходные base64 изображения, чтобы они совпадали с summary_img по индексам
# (docstore должен получить только те изображения, для которых есть описания)
valid_filtered_images = [img for i, img in enumerate(filtered_images) if image_summaries[i] and image_summaries[i].strip()]

# 3. Добавляем только валидные документы
if summary_img:
    retriever.vectorstore.add_documents(summary_img)
    retriever.docstore.mset(list(zip(img_ids, valid_filtered_images)))
    print(f"Successfully inserted {len(summary_img)} images.")
else:
    print("No valid image summaries found to insert.")

In [None]:
# Search ONLY within images for "user_123"
query = "жонгар"

search_filter = models.Filter(
    must=[
        models.FieldCondition(
            key="metadata.grade", # <--- Changed from "grade" to "metadata.grade"
            match=models.MatchValue(value="10")
        )
    ]
)

# Apply the filter
retriever.search_kwargs.update({"filter": search_filter})
retriever.search_kwargs.update({"k":5})

# Run query
docs = retriever.invoke(query)
docs

In [None]:

display_images(docs)

# Render PDF page

In [None]:
import fitz  # PyMuPDF
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image as PILImage  # Rename to avoid conflict with IPython.display.Image
from langchain_core.documents import Document



# 1. PLOTTING FUNCTION
def plot_pdf_with_boxes(pdf_page, segments):
    pix = pdf_page.get_pixmap()
    pil_image = PILImage.frombytes("RGB", [pix.width, pix.height], pix.samples)

    # Create the figure
    fig, ax = plt.subplots(1, figsize=(10, 10))
    ax.imshow(pil_image)
    
    categories = set()
    category_to_color = {
        "Title": "orchid",
        "Image": "forestgreen",
        "Table": "tomato",
    }
    
    for segment in segments:
        points = segment["coordinates"]["points"]
        layout_width = segment["coordinates"]["layout_width"]
        layout_height = segment["coordinates"]["layout_height"]
        
        # Scale points to match image dimensions
        scaled_points = [
            (x * pix.width / layout_width, y * pix.height / layout_height)
            for x, y in points
        ]
        
        # Use .get() for the dictionary color lookup, defaulting to deepskyblue
        box_color = category_to_color.get(segment["category"], "deepskyblue")
        categories.add(segment["category"])
        
        rect = patches.Polygon(
            scaled_points, linewidth=1, edgecolor=box_color, facecolor="none"
        )
        ax.add_patch(rect)

    # Make legend
    legend_handles = [patches.Patch(color="deepskyblue", label="Text")]
    for category in ["Title", "Image", "Table"]:
        if category in categories:
            legend_handles.append(
                patches.Patch(color=category_to_color[category], label=category)
            )
    ax.axis("off")
    ax.legend(handles=legend_handles, loc="upper right")
    plt.tight_layout()
    
    # Display the plot
    plt.show()
    
    # FIX: Explicitly close the figure to prevent "<Figure size...>" logs
    plt.close(fig) 

# 2. RENDER PAGE FUNCTION
def render_page(doc_list: list, page_number: int, print_text=True) -> None:
    # Ensure 'file_path' is defined in your global scope or passed in
    pdf_page = fitz.open(file_path).load_page(page_number - 1)
    
    page_docs = [
        doc for doc in doc_list if doc.metadata.get("page_number") == page_number
    ]
    segments = [doc.metadata for doc in page_docs]
    
    plot_pdf_with_boxes(pdf_page, segments)
    
    if print_text:
        for doc in page_docs:
            print(f"{doc.page_content}\n")

# 3. HELPER FUNCTION
def extract_page_numbers_from_chunk(chunk):
    # REVERTED: Back to dot notation as requested
    elements = chunk.metadata.orig_elements
    page_numbers = set()
    for element in elements:
        # Check if 'element.metadata' is an object or dict. 
        # Using dot notation based on your previous working code.
        if element.metadata.page_number:
            page_numbers.add(element.metadata.page_number)

    return page_numbers

# 4. MAIN DISPLAY FUNCTION
def display_chunk_pages(chunk):
    page_numbers = extract_page_numbers_from_chunk(chunk)
    docs = []
    # REVERTED: Back to dot notation
    for element in chunk.metadata.orig_elements:
        metadata = element.metadata.to_dict()

        
        # Determine Category based on element type
        if "Table" in str(type(element)):
            metadata["category"] = "Table"
        elif "Image" in str(type(element)):
            metadata["category"] = "Image"
        else:
            metadata["category"] = "Text"
            
        metadata["page_number"] = int(element.metadata.page_number)
        
        docs.append(Document(page_content=element.text, metadata=metadata))

    # Render every page found in this chunk
    for page_number in page_numbers:
        render_page(docs, page_number)

# usage
# display_chunk_pages(docs[1])

In [None]:
for chunk in chunks:
    print(chunk.metadata.page_number)
    print("-"*100)
    display_chunk_pages(chunk)

In [None]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from base64 import b64decode


def parse_docs(docs):
    """Разделяем изображения (строки base64) и тексты (объекты unstructured)"""
    b64 = []
    text = []
    for doc in docs:
        # Картинки в Redis мы сохраняли как обычные строки (json.dumps строки)
        if isinstance(doc, str):
            b64.append(doc)
        else:
            # Все остальное (тексты и таблицы) — это объекты классов unstructured
            text.append(doc)
    return {"images": b64, "texts": text}


def build_prompt(kwargs):

    docs_by_type = kwargs["context"]
    user_question = kwargs["question"]

    context_text = ""
    if len(docs_by_type["texts"]) > 0:
        for text_element in docs_by_type["texts"]:
            context_text += f"The Discipline: {text_element.metadata.discipline} \n"
            context_text += f"The Grade: {text_element.metadata.grade} \n"
            context_text += f"The Publisher: {text_element.metadata.publisher} \n"
            context_text += f"The Page number: {text_element.metadata.page_number} \n\n"
            context_text += text_element.text + "\n\n"

    
    system_instruction = """
    You are an expert UNT (Unified National Testing) tutor in Kazakhstan, specializing in preparing students for high-stakes exams.
    Your goal is not just to answer, but to help the student understand the material based strictly on the provided text.

    ### STRICT DATA BOUNDARIES
    - Answer **ONLY** based on the provided Context.
    - If the answer is not in the context, explicitly state: "Мәтінде бұл сұрақтың жауабы жоқ" (if Kazakh) or "В тексте нет ответа на этот вопрос" (if Russian). Do not make up information.

    ### RESPONSE FORMAT
    1. **Direct Answer**: Start with a clear, direct answer to the question.
    2. **Explanation**: Provide a long sentence explanation citing the context (e.g., "Because the text mentions...").
    3. **Questions**: Ask 2-3 questions from the context to ensure that students understand the material
    4. **Source**: Necessarily provide information sources that you have used (Discipline, Grade, Publisher, and Page number)

    ### TONE & STYLE
    - **Language**: Strictly mirror the user's language (Kazakh or Russian).
    - **Format**: Use bullet points for readability.
    """

    # construct prompt with context (including images)
    prompt_template = f"""
    Answer the question based only on the following context, which can include text, tables, and the below image.
    Context: {context_text}
    Question: {user_question}
    """

    prompt_content = [{"type": "text", "text": prompt_template}]

    if len(docs_by_type["images"]) > 0:
        for image in docs_by_type["images"]:
            prompt_content.append(
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{image}"},
                }
            )


    return ChatPromptTemplate.from_messages(
        [
            SystemMessage(content=system_instruction),
            HumanMessage(content=prompt_content),
        ]
    )


In [None]:
model = ChatGoogleGenerativeAI(model="gemini-3-pro-preview", api_key=GEMINI_API_KEY, temperature=1)


chain = (
    {
        "context": retriever | RunnableLambda(parse_docs),
        "question": RunnablePassthrough(),
    }
    | RunnableLambda(build_prompt)
    | model
    | StrOutputParser()
)

chain_with_sources = {
    "context": retriever | RunnableLambda(parse_docs),
    "question": RunnablePassthrough(),
} | RunnablePassthrough().assign(
    response=(
        RunnableLambda(build_prompt)
        | model
        | StrOutputParser()
    )
)

In [None]:
query = "кого ты видишь на картинке?"

response = chain_with_sources.invoke(
    query
)

print("Response:", response['response'])

print("\n\nContext:")
for text in response['context']['texts']:
    print(text.text) 
    # Замените text["metadata"]["page_number"] на text.metadata.page_number
    print("Page number: ", text.metadata.page_number)
    print("\n" + "-"*50 + "\n")


In [None]:
response

In [None]:
def run_rag_chain(question: str):
    # 1. Retrieve Documents
    # We explicitly call .invoke() on the retriever
    retrieved_docs = retriever.invoke(question)
    
    # 2. Parse Documents (Split into Text and Images)
    # We call your custom function directly
    parsed_context = parse_docs(retrieved_docs)
    
    # 3. Build the Prompt
    # We manually create the dictionary that build_prompt expects
    prompt_arguments = {
        "context": parsed_context,
        "question": question
    }
    messages = build_prompt(prompt_arguments)
    
    # 4. Generate Response (Run the Model)
    # passing the list of messages directly to the LLM
    ai_message = model.invoke(messages)
    
    # 5. Parse Output (Get the string content)
    response_string = ai_message.content
    
    # 6. Return Final Result
    # This matches exactly what the 'assign' chain would have returned
    return {
        "context": parsed_context,
        "question": question,
        "response": response_string
    }

# --- How to use it ---
result = run_rag_chain("Бұмын не істеді?")

print("--- ANSWER ---")
print(result["response"])

print("\n--- SOURCES USED ---")
print(result["context"]["texts"])

In [None]:
docs

In [None]:
question = "Что ты видишь на картинках?"

# 1. Retrieve Documents
# We explicitly call .invoke() on the retriever
# retriever.search_kwargs.update({"k": 6})
# retrieved_docs = retriever.invoke(question)
retrieved_docs = docs

In [None]:
retrieved_docs

In [None]:
retrieved_docs[0].text

In [None]:

# 2. Parse Documents (Split into Text and Images)
# We call your custom function directly
parsed_context = parse_docs(retrieved_docs)
parsed_context


In [None]:
# 3. Build the Prompt
# We manually create the dictionary that build_prompt expects
prompt_arguments = {
    "context": parsed_context,
    "question": question
}
prompt_arguments

In [None]:

messages = build_prompt(prompt_arguments)
messages

In [None]:

# 1. Create the chain
chain = messages | model

# 2. Invoke the chain (passing the variables your template needs)
# If your template has no variables, pass an empty dict {}
ai_message = chain.invoke({"question": "What is in the document?"})

response_string = ai_message.content

# 5. Parse Output (Get the string content)
response_string = ai_message.content

# 6. Return Final Result
# This matches exactly what the 'assign' chain would have returned
result={
    "context": parsed_context,
    "question": question,
    "response": response_string
}
response_string