# <span style='color:Tomato;'>Load Env Variables</span>

In [1]:
import dotenv
import utils

# Add the modules directory to the Python path if needed
# sys.path.append(os.path.abspath("./modules"))

# load variables into env
root_dir = utils.get_project_root()
f = root_dir / ".secrets" / ".env"
assert f.exists(), f"File not found: {f}"
dotenv.load_dotenv(f)


Root Directory: /DATA/Ali_Data/GraphRAG-Neo4j-VMD-NAMD


True

# <span style='color:Tomato;'>Process PDFs</span>

We'll use Langchain `PyMuPDF4LLM` to load the PDF files into LangChain documents.

We'll also use LLM to convert images into a summery and extract its data.


## <span style='color:Orange;'>Basic Imports</span>

In [2]:
import concurrent.futures as cf
import pickle
import pprint
import tempfile
from pathlib import Path

import fitz  # PyMuPDF
from IPython.display import Markdown, display

# from tqdm import tqdm
from tqdm.notebook import tqdm

temp_dir = False


## <span style='color:Orange;'>Loading PDF file as LangChain Document</span>

> Images will be extracted (to text) using a Multimodal LLM.

You can either use `load()` method to do it all at once in memory or inclemently do it using `lazy_load()`.

Since our docs are big, we'll use `lazy_load()` to also see the progress.

To save time, we will load the docs from a pickle file if previously processed, otherwise process them and save them as a pickle.

### <span style='color:Khaki;'>Custom Splitting Mode</span>

> By default, each page in the PDF is a (LangChain) Document!

When loading the PDF file you can split it in two different ways:
- By page `mode="page"`
- As a single text flow `mode="single"`. In other words, the whole PDF would be **one** LangChain Document. You can specify page delimiter to have the pages in the metadata


In [3]:
def update_filename_len(file_path: Path) -> tuple[str, int]:
    """
    Update the filename length for the given file path.
    :param file_path: The file path to update.
    :return: A tuple containing the updated file name and its length.
    """
    file_name = file_path.stem.lower()
    with fitz.open(file_path) as pdf_doc:
        file_len = len(pdf_doc)
    return file_name, file_len


In [4]:
# pdf file
file_path = Path() / ".." / "data" / "pdfs" / "biopython.pdf"

file_path = file_path.resolve()
file_path = utils.fuzzy_find(file_path)

file_name, file_len = update_filename_len(file_path)

# create directory for pkl files
pkl_dir = file_path.parent.parent / "pkls"
pkl_dir.mkdir(exist_ok=True, parents=True)

print(f"file_path = {file_path}")


File 'biopython' not found. Fuzzy Searching ...
file_path = /DATA/Ali_Data/GraphRAG-Neo4j-VMD-NAMD/data/pdfs/BioPython.pdf


In [5]:
# if a problem occurs during the loading, use this to delete previously processed pages.
# todo: the page numbers are reindexed to zero

problematic_pages = [391, 392, 395, 428]
range_to_keep = range(391, file_len)  # 391 to 445 (exclusive)

if 0:
    display(Markdown("#### <span style='color:orangered;'>Warning: Deleting Pages !!!</span>"))
    temp_dir = Path(tempfile.mkdtemp()) if not isinstance(temp_dir, Path) else temp_dir
    temp_dir.mkdir(exist_ok=True, parents=True)

    with fitz.open(file_path) as doc:
        # PART I: extract deleted pages
        if len(problematic_pages) > 0:
            range_to_keep = list(set(range_to_keep) - set(problematic_pages))  # needed for next part

            temp_doc = fitz.open()
            for page_number in problematic_pages:
                temp_doc.insert_pdf(doc, from_page=page_number, to_page=page_number)

            extract_file_path = temp_dir / f"{file_name}_extract.pdf"
            temp_doc.save(extract_file_path)
            temp_doc.close()

        # ========================================================
        # PART II: extract pages to keep
        doc.select(range_to_keep)
        partial_file_path = temp_dir / f"{file_name}_partial.pdf"
        doc.save(partial_file_path)
    
    print(f"extract_file_path = {extract_file_path}")
    print(f"partial_file_path = {partial_file_path}")

print(f"\nfile_path = {file_path}")



file_path = /DATA/Ali_Data/GraphRAG-Neo4j-VMD-NAMD/data/pdfs/BioPython.pdf


In [6]:
# # test if the correct pages are extracted
# with fitz.open(partial_file_path) as doc:
#     print(doc[0].get_textpage().extractText())


#### <span style='color:LightGreen;'>How the Asynchronous Lazy Loading Loop</span>

This code demonstrates an asynchronous lazy loading pattern with a progress bar. Let me explain how it works:


##### <span style='color:SkyBlue;'>Key Components</span>

1. `alazy_load()` - An asynchronous generator that yields documents one by one
2. `async for` - Asynchronous iteration through the generator
3. `tqdm.tqdm()` - Progress bar visualization
4. Batching logic to process documents in chunks of 100

##### <span style='color:SkyBlue;'>How the Async Loop Works</span>

```python
async for doc in tqdm.tqdm(await loader.alazy_load()):
    # Process each document as it becomes available
```

The `await loader.alazy_load()` returns an asynchronous iterable. The `async for` loop then:

1. Asynchronously requests the next document
2. Waits for it to be retrieved without blocking the event loop
3. Updates the progress bar via `tqdm`
4. Processes the document once available

The batching logic (collecting 100 pages before processing) allows for more efficient operations on groups of documents rather than one at a time.

This pattern is especially useful when loading documents involves network requests or other I/O operations that would otherwise block execution.


### <span style='color:Khaki;'>LLM Prompt</span>

> You are an assistant tasked with summarizing images for retrieval.
> 1. These summaries will be embedded and used to retrieve the raw image.
>    Give a concise summary of the image that is well optimized for retrieval
> 2. extract all the text from the image. Do not exclude any content from the page.
> Format answer in markdown without explanatory text and without markdown delimiter ``` at the beginning.


### <span style='color:Khaki;'>Which LLM to use?</span>

- `gemma3:4b`: **biggest,** but provide a general understanding of the images.
- `granite3.2-vision`: **small,** and fine-tunned for data extraction from images in PDF docs.
- `moondream`: **smallest,** but only good for overall description of the image.


In [7]:
from langchain_community.document_loaders.parsers import LLMImageBlobParser
from langchain_ollama import ChatOllama
from langchain_pymupdf4llm import PyMuPDF4LLMLoader

pfile_path = file_path
extract_images = True

pfile_name, pfile_len = update_filename_len(pfile_path)

if extract_images:
    loader = PyMuPDF4LLMLoader(
        pfile_path,
        mode="page",
        extract_images=True,
        images_parser=LLMImageBlobParser(model=ChatOllama(model="granite3.2-vision", max_tokens=1024)),
    )
else:
    loader = PyMuPDF4LLMLoader(pfile_path, mode="page")

print(f"{pfile_name=} -> {pfile_len} pages")


pfile_name='biopython' -> 445 pages


The nice thing about `lazy_load()`, is that we can stop processing any page and skip it if a problem happen.

You can also resume whenever you want or process pages with different config.

In [8]:
if (pkl_dir / f"docs_{pfile_name}.pkl").exists():
    print("Loading docs from pickle")
    with open(pkl_dir / f"docs_{pfile_name}.pkl", "rb") as f:
        docs = pickle.load(f)
else:
    print(f"Loading docs from pdf. \nThis will take some time (~{int(pfile_len / 30)} min)")  # on average 30 pages per minute

    # Option 1: loading small docs
    # docs = loader.load()

    # ---------------------------

    # Option 2: Load documents asynchronously (almost 3x faster)
    # assert not extract_images, "Async loading not supported for image extraction"
    # docs = await loader.aload()

    # ---------------------------

    # Option 3: lazy load with progress bar
    # # todo: make this asynchronous
    # docs = []
    # for doc in tqdm(loader.lazy_load(), total=pfile_len):
    #     docs.append(doc)

    # ---------------------------

    # Option 4: Load with timeout
    # todo: not working properly when timeout is reached
    
    timeout_seconds = 30
    skipped_pages = []
    docs = []

    def get_next_doc(loader):
        return next(loader)

    loader_iter = iter(loader.lazy_load())

    for i in tqdm(range(pfile_len), total=pfile_len):
        with cf.ThreadPoolExecutor(max_workers=5) as executor:
            future = executor.submit(get_next_doc, loader_iter)
            try:
                doc = future.result(timeout=timeout_seconds)
                docs.append(doc)
            except cf.TimeoutError:
                skipped_pages.append(i)

    # pickle save the docs
    with open(pkl_dir / f"docs_{pfile_name}.pkl", "wb") as f:
        pickle.dump(docs, f)

print(f"Loaded {pfile_name}: {len(docs)} documents")


Loading docs from pickle
Loaded biopython: 445 documents


In [9]:
# # merging docs if partially processed
# with open(pkl_dir / "docs_w.img.biopython_part1.pkl", "rb") as f:
#     docs0 = pickle.load(f)

# with open(pkl_dir / "docs_w.img.biopython_partial.pkl", "rb") as f:
#     docs1 = pickle.load(f)

# with open(pkl_dir / "docs_biopython_extract.pkl", "rb") as f:
#     docs2 = pickle.load(f)

# docs = docs0 + docs1 + docs2

# print(f"Loaded {len(docs)} documents")

# with open(pkl_dir / f"docs_biopython.pkl", "wb") as f:
#     pickle.dump(docs, f)

# -----------------------------

# # Correcting the page numbers
# correct_pages = list(range(0, 445))
# extract_pages = [391, 392, 395, 428]

# for i in extract_pages:
#     correct_pages.remove(i)
# correct_pages = correct_pages + extract_pages

# for d, i in zip(docs, correct_pages):
#     d.metadata["page"] = i


In [10]:
# temp = docs[-5]
# display(Markdown(temp.page_content))
# print('-'*50)
# pprint.pp(temp.metadata)


## <span style='color:Orange;'>Graph Database</span>

In [11]:
from langchain_neo4j import Neo4jGraph

graph = Neo4jGraph()


## <span style='color:Orange;'>Doc Chunking (Spiting)</span>

We'll split text based on semantic similarity instead of character based. Inspired by the [5 Levels Of Text Splitting](https://github.com/FullStackRetrieval-com/RetrievalTutorials/blob/main/tutorials/LevelsOfTextSplitting/5_Levels_Of_Text_Splitting.ipynb).
To instantiate a [SemanticChunker](https://python.langchain.com/api_reference/experimental/text_splitter/langchain_experimental.text_splitter.SemanticChunker.html), we must specify an embedding model first.

```python
from langchain_experimental.text_splitter import SemanticChunker

text_splitter = SemanticChunker(EmbeddingModel())
```


### <span style='color:Khaki;'>Custom `EmbeddingModel` Class</span>

This class must conform to the `Embeddings` interface because `SemanticChunker` expects an object that implements the `embed_documents` (or `embed_query`) methods.
Just a standalone function `embed_content()` won’t satisfy the interface that `SemanticChunker` relies on.

```python
from langchain.embeddings.base import Embeddings

class CustomEmbeddingModel(Embeddings):
    def __init__(self, task_type="SEMANTIC_SIMILARITY"):
        self.task_type = task_type

    @retry.Retry(predicate=is_retriable)  # if you have a Retry function (like Gemini)
    def embed_documents(self, input: Documents) -> Embeddings:
        """
        Args:
            input (Documents: list[str])

        Returns:
            Embeddings (list[list[float]])
        """
        response = client.models.embed_content(
            model="models/text-embedding-004", contents=input, config=types.EmbedContentConfig(task_type=self.task_type)
        )
        return [e.values for e in response.embeddings]
    
    def embed_query():
        print("embed_query() not implemented")
        return None

    def __call__(self, input: Documents) -> Embeddings:
        return self.embed_documents(input)
```



### <span style='color:Khaki;'>Gemmini Embeddings</span>

If you use your free Google quota for embedding, it will exhaust it.
We'll use an open source embedding for text splitting.


```python
from google import genai
from google.genai import types
from google.api_core import retry
from langchain_google_genai.embeddings import GoogleGenerativeAIEmbeddings

print(genai.__version__)

# the api is loaded from the env
client = genai.Client()

for m in client.models.list():
    if "embedContent" in m.supported_actions:
        print(m.name)

# Define a helper to retry when per-minute quota is reached.
is_retriable = lambda e: (isinstance(e, genai.errors.APIError) and e.code in {429, 503})

Embedding_Model = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004", task_type="SEMANTIC_SIMILARITY")
```

### <span style='color:Khaki;'>Create Text Splitter</span>

**What is the difference between `transform_documents(documents: Sequence[Document])` and `split_documents(documents: Iterable[Document])`?**

`transform_documents()` is just a wrapper around `split_documents()`. So both end up producing the same split documents.
If you already have a list of documents and just want them split, you can call `split_documents()` directly.
If your pipeline expects a `transform_documents()` method (as defined by the `BaseDocumentTransformer` interface), use `transform_documents()`.

In [None]:
from my_langchain_experimental.text_splitter import SemanticChunker
from langchain_ollama.embeddings import OllamaEmbeddings

pkl_name = pkl_dir / f"docs_split_{pfile_name}.pkl"

if pkl_name.exists():
    print("Loading split docs from pickle")
    with open(pkl_name, "rb") as f:
        docs_split = pickle.load(f)
else:
    text_splitter = SemanticChunker(
        embeddings=OllamaEmbeddings(model="bge-m3"), add_start_index=True, show_progress=True, save_temp=True
    )
    docs_split = text_splitter.split_documents(docs)

    with open(pkl_name, "wb") as f:
        pickle.dump(docs_split, f)

print(f"Loaded {pfile_name}: {len(docs_split)} documents")


Loading split docs from pickle
Loaded biopython: 1530 documents


In [None]:
from langchain_experimental.llms.ollama_functions import OllamaFunctions
from my_langchain_experimental.graph_transformers import LLMGraphTransformer


pkl_name = pkl_dir / f"docs_graph_{pfile_name}.pkl"

if pkl_name.exists():
    print("Loading graph docs from pickle")
    with open(pkl_name, "rb") as f:
        graph_documents = pickle.load(f)
else:
    llm = OllamaFunctions(model="gemma3", temperature=0, format="json")
    llm_transformer = LLMGraphTransformer(llm=llm, show_progress=True, save_temp=True)
    graph_documents = llm_transformer.convert_to_graph_documents(docs_split)

    with open(pkl_name, "wb") as f:
        pickle.dump(graph_documents, f)

print(f"Loaded {pfile_name}: {len(graph_documents)} documents")
