### This extract pages number (starting from actual page 1) and bundles it with page content

In [46]:
import fitz  # PyMuPDF
import warnings

def extract_page_data_fitz(pdf_path):
    """
    Extracts page numbers and text from a PDF file using PyMuPDF.
    The function looks for page numbers in the top and bottom 15% of each page.
    It returns a list of dictionaries, each containing the page index, page number,
    and the full text of the page.
    """
    doc = fitz.open(pdf_path)
    pages_data = []

    for i, page in enumerate(doc):
        height = page.rect.height
        width = page.rect.width

        top_rect = fitz.Rect(0, 0, width, height * 0.15)
        bottom_rect = fitz.Rect(0, height * 0.85, width, height)

        top_text = page.get_text("text", clip=top_rect).split()
        bottom_text = page.get_text("text", clip=bottom_rect).split()

        found_number = None
        for text in top_text + bottom_text:
            if text.isdigit():
                found_number = int(text)
                break

        full_text = page.get_text("text")

        pages_data.append({
            "index": i,
            "number": found_number,
            "content": full_text
        })

    doc.close()
    return pages_data


def correct_page_numbers(pages_data, sequence_length=10):
    """
    Corrects the page numbers in the extracted data.
    It looks for a sequence of consecutive page numbers and fills in the gaps.
    The function also handles the case where page numbers are not in a sequential order
    by correcting them based on the first found sequence of consecutive page numbers.
    The function also sets page numbers less than 1 to None.
    If no sequence is found, it returns None.
    The function returns the index of the first page with number 1.
    """
    try:
        # Find first sequence of 'sequence_length' consecutive page numbers
        seen = [(i, d["number"]) for i, d in enumerate(pages_data) if isinstance(d["number"], int)]

        for start in range(len(seen) - sequence_length + 1):
            valid = True
            for j in range(sequence_length):
                if seen[start + j][1] != seen[start][1] + j:
                    valid = False
                    break
            if valid:
                base_index, base_number = seen[start]
                break
        else:
            # No sequence found
            return None

        # Forward fill from base_index
        for offset, page in enumerate(pages_data[base_index:], start=0):
            page["number"] = base_number + offset

        # Backward fill before base_index
        for offset in range(1, base_index + 1):
            page = pages_data[base_index - offset]
            page["number"] = base_number - offset

        # Set pages < 1 == None
        for page in pages_data:
            if page["number"] < 1:
                page["number"] = None

        # Find index of first page with number 1
        start_chapter = next((page['index'] for page in pages_data if page["number"] == 1), None)

        return start_chapter

    except Exception:
        # Catch any unexpected errors and return None
        return None


def extract_text(pdf_path, start_chapter=None):
    """
    Extracts the text from a PDF file using PyMuPDF.
    It returns the text of the book starting from the specified page index.
    If no start_chapter is provided, it extracts the text from the entire PDF.
    """
    if start_chapter:
        doc = fitz.open(pdf_path)
        all_pages_text = []
        for page_range in range(start_chapter, len(doc)):
            page_text = doc[page_range].get_text("text")
            all_pages_text.append(page_text)
        doc.close()
        whole_text = "\n".join(all_pages_text)
    else:
        warnings.warn(
            "start_chapter is None: extracting text from the entire PDF.",
            UserWarning
        )
        doc = fitz.open(pdf_path)
        whole_text = ""
        for page in doc:
            page_text = page.get_text("text")
            whole_text += page_text
        doc.close()
    
    return whole_text

In [56]:
pdf_path1 = "../../data/mcelreath_2020_statistical-rethinking.pdf"
pdf_path2 = "../../data/Theory of Statistic.pdf"
pdf_path3 = "../../data/Deep Learning with Python.pdf"
pdf_path4 = "../../data/Natural_Image_Statistics.pdf"
pdf_path5 = "../../data/mml-book.pdf"

pdf_path = pdf_path4

pages_data = extract_page_data_fitz(pdf_path)
start_chapter = correct_page_numbers(pages_data, sequence_length=10)
text = extract_text(pdf_path, start_chapter)

print(text)

Chapter 1
Introduction
1.1 What this book is all about
The purpose of this book is to present a general theory of early vision and image pro-
cessing. The theory is normative, i.e. it says what is the optimal way of doing these
things. It is based on construction of statistical models of images combined with
Bayesian inference. Bayesian inference shows how we can use prior information
on the structure of typical images to greatly improve image analysis, and statistical
models are used for learning and storing that prior information.
The theory predicts what kind of features should be computed from the incoming
visual stimuli in the visual cortex. The predictions on the primary visual cortex
have been largely conﬁrmed by experiments in visual neuroscience. The theory also
predicts something about what should happen in higher areas such as V2, which
gives new hints for people doing neuroscientiﬁc experiments.
Also, the theory can be applied on engineering problems to develop more efﬁ-
ci

### Set-up ChromaDB

In [None]:
import streamlit as st
import nltk
from nltk.tokenize import sent_tokenize
nltk.download('punkt_tab')
nltk.download("punkt")  


def paragraphs_chunking(text, max_words=750):
    """
    Splits text into structured chunks, preserving paragraph integrity and avoiding unnatural breaks.
    - Uses paragraph-based splitting first.
    - Splits long paragraphs into smaller chunks based on sentence boundaries.
    """
    # Split text into paragraphs first
    paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
    
    chunks = []
    for para in paragraphs:
        words = para.split()
        
        # If paragraph is within limit, keep as a single chunk
        if len(words) <= max_words:
            chunks.append(para)
            continue
        
        # Sentence-based chunking for large paragraphs
        sentences = sent_tokenize(para)
        chunk, chunk_word_count = [], 0

        for sentence in sentences:
            sentence_word_count = len(sentence.split())
            
            # If adding this sentence keeps chunk within word limit, add it
            if chunk_word_count + sentence_word_count <= max_words:
                chunk.append(sentence)
                chunk_word_count += sentence_word_count
            else:
                # Finalize current chunk and start a new one
                chunks.append(" ".join(chunk))
                chunk = [sentence]
                chunk_word_count = sentence_word_count

        # Append any remaining chunk
        if chunk:
            chunks.append(" ".join(chunk))

    return chunks


def lines_chunking(text, max_words=200):
    """
    Splits a text into semantically meaningful chunks without breaking sentences or paragraphs abruptly.
    - Preserves paragraph boundaries by detecting empty lines as paragraph breaks.
    - Further splits long paragraphs into sentence-based chunks, ensuring each chunk stays within a maximum word limit.
    """
    # Split text into lines
    lines = text.splitlines()

    # Group lines into paragraphs
    paragraphs = []
    current_paragraph = []
    for line in lines:
        if line.strip():  
            current_paragraph.append(line.strip())
        else:  # Empty line indicates end of paragraph
            if current_paragraph:
                paragraphs.append(" ".join(current_paragraph))
                current_paragraph = []
    if current_paragraph: 
        paragraphs.append(" ".join(current_paragraph))

    # Process paragraphs
    chunks = []
    for para in paragraphs:
        words = para.split()
        if len(words) <= max_words:
            chunks.append(para)
        else:
            sentences = sent_tokenize(para)
            chunk, chunk_word_count = [], 0
            for sentence in sentences:
                sentence_word_count = len(sentence.split())
                if chunk_word_count + sentence_word_count <= max_words:
                    chunk.append(sentence)
                    chunk_word_count += sentence_word_count
                else:
                    chunks.append(" ".join(chunk))
                    chunk = [sentence]
                    chunk_word_count = sentence_word_count
            if chunk:
                chunks.append(" ".join(chunk))

    return chunks

In [None]:
import os
import chromadb
from chromadb.utils import embedding_functions

def get_database_directory():
    """
    Get the directory for storing the database.
    """
    # Use an absolute path for better reliability
    parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
    persist_dir = os.path.join(parent_dir, "database")

    # Create directory if it doesn't exist
    os.makedirs(persist_dir, exist_ok=True)
    
    return persist_dir


def get_chroma_client():
    """
    Get a ChromaDB client.
    """
    persist_dir = get_database_directory()
    return chromadb.Client()


def initialize_chromadb(EMBEDDING_MODEL):
    """
    Initialize ChromaDB client and embedding function.
    """
    # Create a persistent directory for storing the database
    client = get_chroma_client()

    # Initialize an embedding function (using a Sentence Transformer model)
    embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction(
        model_name=EMBEDDING_MODEL
    )

    return client, embedding_func


def initialize_collection(client, embedding_func, collection_name):
    """
    Initialize a collection in ChromaDB.
    """
    collection = client.get_or_create_collection(
        name=collection_name,
        embedding_function=embedding_func,
        metadata={"hnsw:space": "cosine"},
    )

    return collection


def update_collection(collection, text, max_words=750):
    """
    Update the ChromaDB collection with new files.
    """
    # Tokenize text into chunks
    max_words = 200
    chunks = lines_chunking(text, max_words=max_words)

    # Store chunks in the collection
    filename = "uploaded_book"
    collection.add(
        documents=chunks,
        ids=[f"id{filename[:-4]}.{j}" for j in range(len(chunks))],
        metadatas=[{"source": filename, "part": n} for n in range(len(chunks))],
    )
    
    return collection

def update_collection(collection, text, book_title="textbook", max_words=750):
    chunks = lines_chunking(text, max_words=max_words)
    
    collection.add(
        documents=chunks,
        ids=[f"{book_title}_chunk_{j:04d}" for j in range(len(chunks))],  # Zero-padded
        metadatas=[{
            "source": book_title,
            "chunk_index": j,
            "page_estimate": j * max_words // 250,  # Rough page estimate
        } for j in range(len(chunks))],
    )
    return collection

In [7]:
# test

EMBEDDING_MODEL = "all-MiniLM-L6-v2"  
client, embedding_func = initialize_chromadb(EMBEDDING_MODEL)

# Create two collections with different purposes
whole_text_collection = initialize_collection(
    client, embedding_func, "whole_text_chunks"
)

chapter_collection = initialize_collection(
    client, embedding_func, "chapter_chunks"
)

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
def get_relevant_text(collection, query='', nresults=2, sim_th=None):
    """Get relevant text from a collection for a given query"""

    query_result = collection.query(query_texts=query, n_results=nresults)
    docs = query_result.get('documents')[0]

    if sim_th is not None:
        similarities = [1 - d for d in query_result.get("distances")[0]]
        relevant_docs = [d for d, s in zip(docs, similarities) if s >= sim_th]
        return ''.join(relevant_docs)
    return ''.join([doc for doc in docs if doc is not None])


def generate_answer(base_url, model, prompt, context=[], top_k=5, top_p=0.9, temp=0.5):
    url = base_url + "/generate"
    data = {
        "prompt": prompt,
        "model": model,
        "stream": False,
        "context": context,
        "options": {"temperature": temp, "top_p": top_p, "top_k": top_k},
    }
    try:
        response = requests.post(url, json=data)
        response.raise_for_status()
        response_dict = response.json()
        return response_dict.get('response', ''), response_dict.get('context', [])
    except requests.exceptions.RequestException as e:
        st.error(f"An error occurred: {e}")
        return "", []


def get_contextual_prompt(question, context):
    contextual_prompt = (
        "You are a helpful assistant. Use the information provided in the context below to answer the question. "
        "Ensure your answer is accurate, concise, and directly addresses the question. "
        "If the context does not provide enough information to answer the question, state that explicitly.\n\n"
        "### Context:\n"
        f"{context}\n\n"
        "### Question:\n"
        f"{question}\n\n"
        "### Answer:"
    )
    return contextual_prompt