# Pdf exploration and preparation test

Main goal is to check if we can read the pdf, extract only relevant content, see how we can post-process the extracted text and finally have some informations about the extracted text.

Links to `Simple Local RAG Tutorial` :
* [GitHub](https://github.com/mrdbourke/simple-local-rag) ;
* [YouTube](https://youtu.be/qN_2fnOPY-M?si=APnkpsGY0z_scJ9Z).

## Initialization

In [1]:
import re
import textwrap
from pathlib import Path
from pprint import pprint

import numpy as np
import pandas as pd
import pdfplumber
import torch
from langchain.text_splitter import SentenceTransformersTokenTextSplitter
from sentence_transformers import SentenceTransformer, util

from utils.timing_functions import timing

In [2]:
# Set files path for pdf document and embeddings
PDF_FILENAME = "source.pdf"
EMBEDDINGS_FILENAME = "embeddings.csv"

p = Path()

pdf_filepath = p.resolve() / "pdf" / PDF_FILENAME
embeddings_filepath = p.resolve() / "datasets" / EMBEDDINGS_FILENAME

## Extract the pdf pages

### Read the pdf

First we can verify that our source pdf is available.

In [3]:
# Check if pdf file is available
if pdf_filepath.is_file():
    print(f"Pdf file path : '{pdf_filepath}'.")
else:
    print("No pdf file found.")

Pdf file path : '/home/anquetos/gcp-professional-data-engineer-rag/pdf/source.pdf'.


Let's see if the number of pages found is the right one.

In [4]:
with pdfplumber.open(pdf_filepath) as pdf:
    print(
        f"* Expected number of pages : \t355\n* Number of pages found : \t{len(pdf.pages)}"
    )

* Expected number of pages : 	355
* Number of pages found : 	355


That's ok, we can try to extract text from a random test page.

In [5]:
with pdfplumber.open(pdf_filepath) as pdf:
    page = pdf.pages[101]
    text = page.extract_text()
    print(text[:90])

Data pipelines are sequences of operations that copy, trans-
form, load, and analyze data.


The extraction works but the text doesn't correspond to the one in the selected page above. First thing to take in account is the fact that the first item in a list is at index 0. So when we write `page = pdf.pages[101]`, in fact it is the page 102 which is extracted.
But it is still not ok : the extracted text correpond to page 62 which means page 1 in the pdf is actually the page 41 (index 40). The reason is all the "About", "Introduction", etc. sections are not numbered the same way in the pdf file.
This is something to take in account to extract the desired content.

### Target relevant text

Documents can have several information which are not relevant to build a RAG :
* headers and footers ;
* tables ;
* hyperlink ;
* figures ;
* etc..

We only want to keep the body of the document but also the code samples even if a part of this last is not always relevant. Since each document is different, there is not a unique method to determine what is relevant or not. The only way to handle this is to take time to inspect the document structure, layout, etc..

In my case, it appears that the **font** will be the best way to help me target the body and the code.

> Take note that working with fonts means we will extract the text character by character to access its properties thanks to the [`chars` object](https://github.com/jsvine/pdfplumber?tab=readme-ov-file#objects) available for each instance of `pdfplumber.PDF` and `pdfplumber.Page`.

In [6]:
with pdfplumber.open(pdf_filepath) as pdf:
    page = pdf.pages[43]
    header_font = page.chars[3].get("fontname")
    body_font = page.chars[103].get("fontname")
    print(f"* Header fontname : \t{header_font}\n* Body fontname : \t{body_font}")

* Header fontname : 	GHSRZR+UniversLTStd
* Body fontname : 	GHSRZR+SabonLTStd-Roman


In [7]:
with pdfplumber.open(pdf_filepath) as pdf:
    page = pdf.pages[50]
    code_font = page.extract_text_lines(return_chars=True)[8]["chars"][0].get(
        "fontname"
    )
    print(f"* Sample code fontname : \t{code_font}")

* Sample code fontname : 	GHSRZR+SourceCodePro-Regular


Header, body and code have different fonts which is of great help. The last thing to take care of is the fact that the text we want to target can be *italic* or **bold**. So let's make a list of all available fonts in the file.

In [8]:
# Extract all fonts in the document
fontname_list = []
with pdfplumber.open(pdf_filepath) as pdf:
    for page in pdf.pages:
        [
            fontname_list.append(char.get("fontname"))
            for char in page.chars
            if char.get("fontname") not in fontname_list
        ]

In [9]:
# List only the necessary fonts
body_fontname_list = [
    fontname
    for fontname in fontname_list
    if "Sabon" in fontname or "SourceCode" in fontname
]
print(body_fontname_list)

['GHSRZR+SabonLTStd-Roman', 'GHSRZR+SourceCodePro-Regular', 'GHSRZR+SabonLTStd-Bold', 'GHSRZR+SabonLTStd-Italic', 'URTXBU+SourceCodePro-Bold']


Last step for the font part : we will create a helper function to filter the extracted text by font using the fontname of each character.

In [10]:
# Font filter helper funtion
def filter_text_by_font(chars: list[dict], target_fonts: list[str]) -> str:
    """Filters extracted text and, more precisely, its letters by their fonts.

    Args:
        chars (list[dict]): chars object from pdfplumber.
        target_fonts (list[str]): list of fontnames for which we want to keep the characters/text.

    Returns:
        str: filtered text.
    """
    char_text = [char["text"] for char in chars if char.get("fontname") in target_fonts]
    text = "".join(char_text)
    return text

### Text post-processing

#### Basic formatting

The goal is to have the cleanest text as possible for further steps. We will remove uppercase and unecessary spaces. In addition to that, we will also replace *fifi* string by *fi*. This is a specific error I noticed after the extraction of my document which shows how important it is to inspect each document carefully to identify the best way to process it.
Here is a sample text.

In [11]:
# Basic text formatter function
def basic_text_formatter(text: str) -> str:
    """Applies different operations to format and clean the text.

    Args:
        text (str): original text.

    Returns:
        str: formatted text.
    """
    formatted_text = " ".join(
        text.casefold().replace("\n", " ").replace("fifi", "fi").split()
    )
    return formatted_text

In [12]:
basic_text_sample = " I'm a Basic   text sample. "

print(
    f"* Before : \t{basic_text_sample}\n* After : \t{basic_text_formatter(basic_text_sample)}"
)

* Before : 	 I'm a Basic   text sample. 
* After : 	i'm a basic text sample.


#### Hyphens

Hyphens are used to break words so that the appearance of the page is nicer but it will interfere in the words recognition.

In [13]:
with pdfplumber.open(pdf_filepath) as pdf:
    page = pdf.pages[237]
    text = page.extract_text()
    hyphen_text_sample = text[1066:1078]
    print(hyphen_text_sample)

con-
necting


In [14]:
def remove_hyphens(text: str) -> str:
    """Removes hyphens from text.

    Args:
        text (str): original text.

    Returns:
        str: processed text.
    """
    lines = [line.rstrip() for line in text.split("\n")]

    # Find dashes
    line_numbers = []
    for line_no, line in enumerate(lines[:-1]):
        if line.endswith("-"):
            line_numbers.append(line_no)

    # Replace
    for line_no in line_numbers:
        lines = dehyphenate(lines, line_no)

    return " ".join(lines)


def dehyphenate(lines: list[str], line_no: int) -> list[str]:
    """Rebuilds lines (words) separated by hyphen.

    Args:
        lines (list[str]): lines to process.
        line_no (int): index of lines to process.

    Returns:
        list[str]: list of modified lines.
    """
    next_line = lines[line_no + 1]
    word_suffix = next_line.split(" ")[0]

    lines[line_no] = lines[line_no][:-1] + word_suffix
    lines[line_no + 1] = lines[line_no + 1][len(word_suffix) :]
    return lines

In [15]:
print(
    f"* Before : \t{hyphen_text_sample}\n* After : \t{remove_hyphens(hyphen_text_sample)}"
)

* Before : 	con-
necting
* After : 	connecting 


### Text extraction

We now have all our "tools"" to extract the pdf pages correctly and in a relevant way. To refine a bit more our target will remove the pages we don't want to keep (like introduction, glossary, etc.) and we will skip the blank pages (with no content).

Do do this, We will write a final function to process our whole document. Pages will be stored in a list of dictionnaries where we will be able to add information like page number, number of characters, tokens, sentences, etc.. and to explore the pages information by converting it to a DataFrame.

In [16]:
def extract_and_process_pdf(path: Path) -> list[dict]:
    """Open a pdf file with pdfplumber, extracts and formats relevant pages then append
    their content and statistics in a list.

    Args:
        path (Path): Pathlib path of the document.

    Returns:
        list[dict]: Extracted content and informations of pages.
    """
    extracted_pages = []

    with pdfplumber.open(path) as pdf:
        for page_idx, page in enumerate(pdf.pages):
            page_number = page_idx - 39
            lines = page.extract_text_lines(return_chars=True, keep_blank_chars=True)

            kept_lines = []
            for line in lines:
                kept_lines.append(
                    filter_text_by_font(line["chars"], body_fontname_list)
                )
            text = "\n".join(kept_lines)

            text = remove_hyphens(text)
            text = basic_text_formatter(text)

            if 0 < page_number <= 305 and text:
                extracted_pages.append(
                    {
                        "page_number": page_number,
                        "page_chars_count": len(text),
                        "page_words_count": len(text.split(" ")),
                        "page_raw_sentences_count": len(re.split(r"[.?!]", text)),
                        "page_text": text,
                    }
                )

    return extracted_pages

In [17]:
# Extract and process pdf
extracted_pages = extract_and_process_pdf(pdf_filepath)

## Splitting pages text

### Number of tokens

First of all we must take in account the fact that we will use the `sentence-transformers` model [`all-mpnet-base-v2`](https://huggingface.co/sentence-transformers/all-mpnet-base-v2) which as *a max tokens capacity of **514***. The token capacity *is very a important concept for a model* since it refers to *the maximum number of tokens it can process* in its context window during a single interaction.

In our case, the `all-mpnet-base-v2` model has been trained to ingest and turn into embeddings texts with 514 tokens. Texts over 514 tokens which are encoded by this model will be automatically reduced to 514 tokens in length, potentially losing some information.

So what we want to know is how many tokens we have per page. We wil start by a raw tokens counts using the method explained [here](https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them) or [here](https://python.langchain.com/docs/concepts/tokens/).

In [18]:
# Convert the extracted pages dictionnaries to DataFrame
df = pd.DataFrame(extracted_pages)

# Generate descriptive statistics
df.describe().drop(columns=["page_number"]).loc[["mean", "min", "max"]].round(2)

Unnamed: 0,page_chars_count,page_words_count,page_raw_sentences_count
mean,2082.93,331.97,21.32
min,121.0,22.0,1.0
max,3705.0,624.0,55.0


We can know add the `raw_token_count`.

In [19]:
# Calculate raw tokens count
for page in extracted_pages:
    page["page_raw_tokens_count"] = len(page["page_text"]) // 4

# Regenerate DataFrame and statistics
df = pd.DataFrame(extracted_pages)
df.describe().drop(columns=["page_number"]).loc[
    ["mean", "50%", "75%", "min", "max"]
].round(2)

Unnamed: 0,page_chars_count,page_words_count,page_raw_sentences_count,page_raw_tokens_count
mean,2082.93,331.97,21.32,520.38
50%,2161.0,341.0,20.0,540.0
75%,2615.0,414.0,26.0,653.0
min,121.0,22.0,1.0,30.0
max,3705.0,624.0,55.0,926.0


Looking at the results we can see that the average raw count per page is above the tokens capacity of our model. But is only a raw count so let's try to be more precise.

For the next step to come, we will use the *LangChain* framework and its different tools. We will instantiate a `SentenceTransformersTokenTextSplitter` and use the `count_token` method. What is nice is the fact that it is a specialized text splitter for use with `sentence-transformer` models. This means it will behave taking in account the model we will use.

In [20]:
# Instantiate the text splitter
text_splitter = SentenceTransformersTokenTextSplitter(
    model_name="sentence-transformers/all-mpnet-base-v2"
)

# Calculate real tokens count
for page in extracted_pages:
    page["page_real_tokens_count"] = text_splitter.count_tokens(text=page["page_text"])

# Regenerate DataFrame and statistics
df = pd.DataFrame(extracted_pages)
df.describe().drop(columns=["page_number"]).loc[
    ["mean", "50%", "75%", "min", "max"]
].round(2)

Unnamed: 0,page_chars_count,page_words_count,page_raw_sentences_count,page_raw_tokens_count,page_real_tokens_count
mean,2082.93,331.97,21.32,520.38,431.07
50%,2161.0,341.0,20.0,540.0,436.0
75%,2615.0,414.0,26.0,653.0,544.0
min,121.0,22.0,1.0,30.0,29.0
max,3705.0,624.0,55.0,926.0,785.0


This is better. The average tokens count is below the capacity of the model but *we still have 25 % of pages with more than 544 tokens*.

### Creating chunks/split the text

The next step will be to split our text in manageable chunks with the right amount of token. And will to that with using the `split_text` method of `SentenceTransformersTokenTextSplitter`.

When splitting the text, we will configure a chunk overlap which define the number of characters which overlap between chunks ensuring that context is preserved. Take in mind that increasing the overlap will increase the number of chunks created.

In [21]:
# Instantiate the text splitter
text_splitter = SentenceTransformersTokenTextSplitter(
    chunk_overlap=300, model_name="sentence-transformers/all-mpnet-base-v2"
)

# Split the text for each page
for page in extracted_pages:
    page["page_chunks"] = text_splitter.split_text(text=page["page_text"])
    page["page_chunks_max_tokens_count"] = max(
        text_splitter.count_tokens(text=chunk) for chunk in page["page_chunks"]
    )
    page["page_chunks_count"] = len(page["page_chunks"])

# Regenerate DataFrame and statistics
df = pd.DataFrame(extracted_pages)
df.describe().drop(columns=["page_number"]).loc[
    ["mean", "50%", "75%", "min", "max"]
].round(2)

Unnamed: 0,page_chars_count,page_words_count,page_raw_sentences_count,page_raw_tokens_count,page_real_tokens_count,page_chunks_max_tokens_count,page_chunks_count
mean,2082.93,331.97,21.32,520.38,431.07,344.08,2.38
50%,2161.0,341.0,20.0,540.0,436.0,386.0,2.0
75%,2615.0,414.0,26.0,653.0,544.0,386.0,3.0
min,121.0,22.0,1.0,30.0,29.0,29.0,1.0
max,3705.0,624.0,55.0,926.0,785.0,389.0,6.0


Nice the number of tokens now fits with the model capacity, we are ready to embed.

### Embeddings chunks

In [22]:
# Check if CUDA is available and set the device
cuda_available = torch.cuda.is_available()
if cuda_available:
    device = "cuda"
    print(f"CUDA is available : device set to {device.upper()}.")
else:
    device = "cpu"
    print(f"CUDA is not available : device set to {device.upper()}.")

CUDA is available : device set to CUDA.


In [23]:
# Instantiate the SentenceTransformer model
embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2")

In [24]:
def calculate_embeddings(pages_dict: list[dict]) -> list[dict]:
    """Calculate embeddings for chunks of text in each page using a SentenceTransformer model.

    Args:
        pages_dict (list[dict]): A list of dictionaries where each dictionary represents a page
        with its text chunks and metadata.

    Returns:
        list[dict]: A list of dictionaries where each dictionary contains the source page number,
        the text chunk, and its corresponding embedding.
    """

    # Calculate embeddings for each chunk of text for each page
    embeddings = []

    for page in extracted_pages:
        for chunk in page["page_chunks"]:
            embedding = embedding_model.encode(
                sentences=chunk, batch_size=32, device=device, normalize_embeddings=True
            )
            embeddings.append(
                {
                    "source_id": page["page_number"],
                    "text": chunk,
                    "embedding": embedding,
                }
            )

    return embeddings

In [25]:
# Check if embeddings file is available and calculate embeddings if not
if not embeddings_filepath.is_file():
    print("Embeddings file not found, calculating embeddings...")
    # Calculate embeddings
    embeddings = calculate_embeddings(extracted_pages)
    # Save embeddings to a CSV file
    pd.DataFrame(embeddings).to_csv(embeddings_filepath, index=False)
else:
    print(f"Embeddings file found : '{embeddings_filepath}'.")

Embeddings file found : '/home/anquetos/gcp-professional-data-engineer-rag/datasets/embeddings.csv'.


In [26]:
# Load embeddings from the CSV file
df_embeddings = pd.read_csv(embeddings_filepath)
# Convert the string representation of the embeddings back to numpy 'float32' arrays
# (original output format from SentenceTransformer encode method)
df_embeddings["embedding"] = df_embeddings["embedding"].apply(
    lambda x: np.array(x.strip("[]").split(), dtype="float32")
)

df_embeddings.head()

Unnamed: 0,source_id,text,embedding
0,2,data engineers choose how to store data for ma...,"[0.035572313, 0.03913909, -0.028386826, -0.036..."
1,3,the data lifecycle consists of four stages : i...,"[0.039683174, -0.013291235, -0.04148462, -0.01..."
2,3,with transforming data into a usable format fo...,"[0.008390116, -0.016996955, -0.04778342, -0.00..."
3,4,streaming data is a set of data that is typica...,"[-0.009797745, -0.042565953, -0.023716504, -0...."
4,4,", and pressure data every minute a customer ad...","[-0.0065062963, -0.045118842, -0.01891298, 8.3..."


## R.A.G.

In [27]:
# Convert the embeddings to PyTorch tensors
vectors_tensor = torch.from_numpy(np.stack(df_embeddings["embedding"].values))

# Check the shape and type of the tensor
print(f"* Tensor shape : {vectors_tensor.shape}")
print(f"* Tensor type : {vectors_tensor.dtype}")

* Tensor shape : torch.Size([677, 768])
* Tensor type : torch.float32


In [57]:
@timing
def search_top_k_vectors(query: str, k: int = 5) -> tuple[torch.Tensor, torch.Tensor]:
    """Search for the top k most similar vectors to a query in the embeddings space.

    Args:
        query (str): The query text.
        k (int, optional): The number of most similar vectors to return. Defaults to 5.

    Returns:
        tuple[torch.Tensor, torch.Tensor]: The top k vectors and their corresponding scores.
    """

    query_embedding = embedding_model.encode(
        sentences=query, batch_size=32, device=device, normalize_embeddings=True
    )

    # Calculate the dot product similarity between the query embedding and all the embeddings
    dot_scores = util.dot_score(a=query_embedding, b=vectors_tensor)

    # Get the top k most similar vectors
    top_k_vectors = torch.topk(dot_scores[0], k=k)

    return top_k_vectors

In [58]:
def retrieve_relevant_sources(query: str, embeddings: pd.DataFrame, k: int = 5) -> str:
    """
    Retrieve the top k most relevant sources for a given query based on their embeddings.
    Args:
        query (str): The search query.
        embeddings (pd.DataFrame): A DataFrame containing the embeddings and associated metadata.
        k (int, optional): The number of top results to return. Defaults to 5.
    Returns:
        str: A formatted string containing the query, scores, source IDs, and corresponding texts of the top k results.
    """
    
    # Get the top k most similar vectors
    top_k = search_top_k_vectors(query=query, k=k)

    # Set the datraframe to use
    df = embeddings

    # Initialize the results text
    results_text = ""

    # Loop through the top k results and format the output
    for score, idx in zip(top_k[0], top_k[1]):
        source_id = df.loc[idx.item()]["source_id"]
        query_text = f"Query : {query}\n"
        score_text = f"Score : {score.item():.4f}\n"
        source_id_text = f"Id : {source_id}\n"
        main_text = (
            f"""{(" ".join(df.loc[df["source_id"] == source_id, "text"].values))}"""
        )
        results_text += (
            query_text
            + score_text
            + source_id_text
            + textwrap.fill(main_text, width=100)
            + "\n\n"
        )

    return results_text

In [60]:
# Perform a semantic search using the indicated query
result = retrieve_relevant_sources(
    "CI/CD Practices", df_embeddings, 5
)

print(result)

INFO : 'search_top_k_vectors' function executed in 0.06451 seconds.
---
Query : CI/CD Practices
Score : 0.3481
Id : 161
you can find the answers in the appendix. you have been tasked with creating a pilot project in gcp
to demonstrate the feasibility of migrating workloads from an on - premises hadoop cluster to cloud
dataproc. three other engineers will work with you. none of the data that you will use contains
sensitive information. you want to minimize the amount of time that you spend on administering the
development environment. what would you use to control access to resources in the development
environment? predefined roles custom roles primitive roles access control lists the auditors for
your company have determined that several employees have more permissions than needed to carry out
their job responsibilities. all the employees have users accounts on gcp that have been assigned
predefined roles. you have concluded that the optimal way to meet the auditors ’ recommendations i