# Scanned PDF Acquisition and Cleaning Notebook
## How to use this notebook ?

Enter the connect string and the PDF path and Run All Cells.

In [None]:
# DECLARATIONS & CONSTANTS      #
#################################

# Requirements : 
# 1. A working installation of Tesseract with the french package ('fra').
# 2. A working installation of Ollama with the selected model downloaded.
# 3. A SQL Server up and running with the right database.

#pip install pymupdf
#pip install pymongo
#pip install pdfminer.six
#pip install pytesseract
#pip install opencv-python
#pip install pyspellchecker
#pip install ollama

import numpy as oNumPy
import pandas as oPandas
import fitz  # PyMuPDF
import pytesseract
import pyodbc
import base64
import io
from PIL import Image
from pdfminer.high_level import extract_text
import cv2
import matplotlib.pyplot as plt
import re
import ollama
from datetime import datetime, timedelta
import time
import os
import json
import requests
from difflib import get_close_matches

from rapidfuzz import process, fuzz


############# SOURCE PDF ############################################
#_sPDF_Path = "Coléoptères Carabiques - Subset_Mistral.pdf"

_sPDF_Path = "British Spiders_Alexandra.pdf"

#####################################################################

_sSave_Mode = "SQL"                   # Options: "SQL" or "FILES"
_sBase_Output_Dir = "ExtractedBooks"  # Root folder for file-based saving
_bUse_OCR = True                      # Toggle usage of Tesseract
_bUse_Ollama = True                   # Toggle usage of Ollama AI
_bUse_Species_Index = True            # Toggle usage of the Excel file with taxonomy
_sTaxonomy_Filename = "Species_Index.xlsx"
_sTarget_Language = "français"        # Or "english", "dutch", etc.

_sConnectString = "DRIVER={SQL Server};SERVER=ROCINANTE;DATABASE=Digital_Library;UID=XXX;PWD=XXX;"
_sOllamaModel = "mistral"             # Models : deepseek-r1:32b, deepseek-r1:8b, mistral #


# TAXONOMY => Table of contents family ranges
family_ranges = {
    "Agelenidae": range(1, 32),
    "Mimetidae": range(32, 37),
    "Theridiidae": range(37, 92),
    "Nesticidae": range(92, 94),
    "Tetragnathidae": range(94, 111),
    "Argiopidae": range(111, 172),
    "Linyphiidae": range(172, 418)
}

# TAXONOMY => Headers
section_headers = {
    "DESCRIPTION": "DESCRIPTION",
    "OCCURRENCE": "OCCURRENCE",
    "CHARACTERS OF GENUS": "GENUS_DESCRIPTION",
    "CHARACTERS OF FAMILY": "FAMILY_DESCRIPTION",
    "KEY TO THE GENERA": "GENUS_KEY",
    "NOTES": "NOTES",
    "REMARKS": "NOTES"
}

# OCR & Text Cleaning Functions

In [None]:
# OCR of a book (or PDF Document) #
###################################

def Is_Text_PDF(sPDF_Path):
    # Check if the PDF is in text or image mode.
    sText = extract_text(sPDF_Path)
    return len(sText.strip()) > 0


def Clean_Text(sText):
    sText = sText.replace("\n", " ").strip()
    return ' '.join(sText.split())


def Clean_Text_Advanced(sText):   
    if not sText:
        return ""

    sText = sText.replace("\n", " ").strip()
    sText = re.sub(r'\s+', ' ', sText)  # Clean long spaces
    sText = re.sub(r'[^a-zA-ZÀ-ÿ0-9,.!? ]+', '', sText)  
    
    return sText


def Correct_Text_with_Ollama(sModel, sText, sPrompt):

    if not sText or sText.strip() == "":
        return sText
    
    try:
        response = ollama.chat(model=sModel, messages=[
            {"role": "user", "content": f"{sPrompt} {sText}"}
        ])
        
        return response["message"]["content"]
    
    except Exception as e:
        print(f"Error with Ollama : {e}")
        return sText


# === Extract text with formatting => Usage with Taxonomy ===
def extract_text_with_formatting(page):
    text = ""
    blocks = page.get_text("dict")["blocks"]

    for block in blocks:
        if "lines" not in block:
            continue
        for line in block["lines"]:
            line_text = ""
            for span in line["spans"]:
                span_text = span["text"]
                font = span["font"].lower()
                if "bold" in font:
                    span_text = f"<b>{span_text}</b>"
                if "italic" in font:
                    span_text = f"<i>{span_text}</i>"
                line_text += span_text
            text += line_text.strip() + "\n"
        text += "\n"
    return text.strip()


def Extract_Drawings_from_Page(oImage):
    # Detects and extracts illustrations from a PDF page using an improved method. 
    
    # Convert the image to grayscale
    gray = cv2.cvtColor(oNumPy.array(oImage), cv2.COLOR_RGB2GRAY)

    # Apply slight blur to reduce noise
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)

    # Adaptive thresholding to better isolate illustrations
    binary = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2)

    # Contour detection
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # List of extracted drawings
    drawings = []
    min_size = 5000  # Minimum size to consider an illustration

    # Filter contours
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)

        # Ignore very small elements
        if w * h > min_size:
            cropped = oImage.crop((x, y, x + w, y + h))

            # Convert to base64 for storage
            img_byte_arr = io.BytesIO()
            cropped.save(img_byte_arr, format="PNG")
            img_base64 = base64.b64encode(img_byte_arr.getvalue()).decode("utf-8")
            drawings.append(img_base64)

    return drawings


def Extract_Text_and_Images_from_Page(oPage, bIsTextPDF):
    # Extracts text and illustrations from a PDF page. 
    
    # Convert the page to an image
    pix = oPage.get_pixmap()
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)

    # OCR to retrieve the text (if necessary)
    if _bUse_OCR:
        sOCRText = Clean_Text(pytesseract.image_to_string(img, lang="fra"))
    else:
        sOCRText = Clean_Text(oPage.get_text("text"))

    oImages = Extract_Drawings_from_Page(img)

    # If it's a text-based PDF, get the text directly, otherwise use OCR
    if bIsTextPDF:
        sText = Clean_Text(oPage.get_text("text"))
    else:
        sText = sOCRText

    return sText, sOCRText, oImages

In [None]:
# Taxonomy Functions

def fuzzy_match_one(text, choices, threshold=85):
    result = process.extractOne(text, choices, score_cutoff=threshold)
    return result[0] if result else None


def clean_tags(text):
    return re.sub(r"</?[^>]+>", "", text)


def normalize_paragraph(text):
    text = re.sub(r"[\n\r]+", " ", text)
    text = re.sub(r"[\u2010\u2011\u2012\u2013\u2014\u2212\-]+", "-", text)
    text = re.sub(r"\s+", " ", text)
    return text.strip()


def expected_family_for_page(page):
    for fam, pages in family_ranges.items():
        if page in pages:
            return fam
    return None


# === Taxon matching logic ===
def match_taxa(text, family_df, current_genus=None):
    text = re.sub(r"[\n\r]+", " ", text)
    text = re.sub(r"[\u2010\u2011\u2012\u2013\u2014\u2212\-]+", "-", text)
    text = re.sub(r"\s+", " ", text).strip()

    matched_species = []
    matched_genera = []

    full_species = family_df["latin_name"].tolist()
    epithets = family_df["species"].tolist()
    genus_choices = family_df["genus"].unique().tolist()

    # === Exact full species match
    for name in full_species:
        if name in text:
            row = family_df[family_df["latin_name"] == name].iloc[0]
            matched_species.append({
                "name": name,
                "genus": row["genus"],
                "epithet": row["species"],
                "confidence": 100,
                "match_type": "exact"
            })

    # === Epithet + current genus
    for epithet in epithets:
        if f" {epithet}" in text or text.startswith(epithet):
            candidates = family_df[family_df["species"] == epithet]
            if current_genus:
                match = candidates[candidates["genus"] == current_genus]
                if not match.empty:
                    row = match.iloc[0]
                    matched_species.append({
                        "name": row["latin_name"],
                        "genus": row["genus"],
                        "epithet": row["species"],
                        "confidence": 95,
                        "match_type": "epithet + current genus"
                    })
            elif len(candidates["genus"].unique()) == 1:
                row = candidates.iloc[0]
                matched_species.append({
                    "name": row["latin_name"],
                    "genus": row["genus"],
                    "epithet": row["species"],
                    "confidence": 90,
                    "match_type": "unique epithet"
                })

    # === Fuzzy epithet match
    for epithet in epithets:
        if not any(s["epithet"] == epithet for s in matched_species):
            score = fuzz.partial_ratio(epithet, text)
            if score >= 85:
                row = family_df[family_df["species"] == epithet].iloc[0]
                matched_species.append({
                    "name": row["latin_name"],
                    "genus": row["genus"],
                    "epithet": row["species"],
                    "confidence": score,
                    "match_type": "fuzzy epithet"
                })

    # === Genus block
    genus_block = re.match(r"^(?:\d+\.\s*)?genus\s+([A-Z][a-z]+)", text, re.IGNORECASE)
    if genus_block:
        candidate = genus_block.group(1)
        if candidate in genus_choices:
            matched_genera.append({
                "name": candidate,
                "confidence": 100,
                "source": "genus block"
            })
        else:
            fuzzy = process.extractOne(candidate, genus_choices, score_cutoff=85)
            if fuzzy:
                matched_genera.append({
                    "name": fuzzy[0],
                    "confidence": fuzzy[1],
                    "source": "genus block (fuzzy)"
                })

    # === Add genus from species match
    for s in matched_species:
        if not any(g["name"] == s["genus"] for g in matched_genera):
            matched_genera.append({
                "name": s["genus"],
                "confidence": s["confidence"],
                "source": "from species"
            })

    return matched_genera, matched_species


# === Flatten species and genera for one-row-per-page ===
def join_matches(match_list, key):
    return ", ".join(str(m[key]) for m in match_list if isinstance(m, dict))


def Load_Species_Index():
    # Load reference Excel
    excel_path = os.path.join("", _sTaxonomy_Filename)
    df_species = oPandas.read_excel(excel_path).fillna("").astype(str)
    
    df_species["order"] = df_species["order"].str.strip()
    df_species["order_common"] = df_species["order_common"].str.strip()
    df_species["latin_name"] = df_species["latin_name"].str.strip()
    df_species["genus"] = df_species["genus"].str.strip()
    df_species["family"] = df_species["family"].str.strip()
    df_species["species"] = df_species["species"].str.strip()
    df_species["short_name"] = df_species.apply(
        lambda row: f"{row['genus'][0]}. {row['species']}" if row["genus"] and row["species"] else "", axis=1
    )
    return df_species


def Extract_Species_From_Page(sText_Ollama, iPage_Num, df_species, family_ranges):
    """
    :param sText_Ollama: Texte brut de la page (str)
    :param iPage_Num: Numéro de la page (int)
    :param df_species: DataFrame de taxonomie (colonnes: family, genus, species, latin_name, etc.)
    :param family_ranges: Dictionnaire des familles par plage de pages
    :return: (dict) contenant les genres et espèces extraits
    """

    family = expected_family_for_page(iPage_Num)
    if not family:
        return None
    
    family_df = df_species[df_species["family"] == family]
    genera, species = match_taxa(sText_Ollama, family_df)
    
    ordre = family_df["order"].iloc[0] if "order" in family_df.columns else ""
    ordre_common = family_df["order_common"].iloc[0] if "order_common" in family_df.columns else ""
    
    return {
        "page": iPage_Num,
        "order": ordre,
        "order_common": ordre_common,
        "family": family,
        "genus": ", ".join(g["name"] for g in genera),
        "genus_conf": ", ".join(str(g["confidence"]) for g in genera),
        "genus_source": ", ".join(g["source"] for g in genera),
        "species": ", ".join(s["name"] for s in species),
        "species_conf": ", ".join(str(s["confidence"]) for s in species),
        "species_type": ", ".join(s["match_type"] for s in species),
    }


In [None]:
# MAIN PROCESS DEFINITION            #
######################################

def Do_Process_PDF(sPDF_Path):
    try:
        oDoc = fitz.open(sPDF_Path)
        file_size = os.path.getsize(sPDF_Path) / (1024 * 1024)  # Size in MB

        print(f"File Size : {file_size:.2f} MB")
        print(f"Page(s) : {len(oDoc)}")

        sBook_Title = os.path.splitext(os.path.basename(sPDF_Path))[0]  # PDF Name without extension

        if (_bUse_Species_Index):
            df_species = Load_Species_Index()
        
        if _sSave_Mode == "SQL":
            cursor.execute("""
                INSERT INTO Books (book_title, book_date_added, book_pages_count)
                OUTPUT INSERTED.book_id_pkey
                VALUES (?, GETDATE(), ?)
            """, (sBook_Title, len(oDoc)))
            iBook_ID = cursor.fetchone()[0]
            conn.commit()
        else:
            output_dir = os.path.join(_sBase_Output_Dir, sBook_Title)
            os.makedirs(output_dir, exist_ok=True)

        for iPage_Num, oPage in enumerate(oDoc):
            dPage_Start_Time = time.time()

            print(f"Page {iPage_Num+1} : Begin OCR and Image extraction.")

            sText, sOCRText, oImages = Extract_Text_and_Images_from_Page(oPage, Is_Text_PDF(sPDF_Path))

            if (_bUse_Species_Index):
                sText = extract_text_with_formatting(oPage)
            
            sTextClean = Clean_Text_Advanced(sText)

            print(f"Page {iPage_Num+1} : Begin AI enhancement step.")

            sText_Ollama = ""
            sText_OllamaExp = ""
            if _bUse_Ollama:
                if (_bUse_Species_Index):
                    prompt_clean = (f"""
                                You are processing pages from a naturalist book for use in a searchable chatbot.
                                
                                - Remove visual artifacts like underscores, broken punctuation, pipes, or fake bullets
                                - Remove HTML-like formatting tags such as <b>, <i>, <u>, etc
                                - Join broken lines and sentences into paragraphs
                                - Keep scientific names and taxonomic terms exactly as written
                                - Keep original capitalization, especially for headers and scientific terms
                                - Only return the cleaned text content. Do not explain your actions
                                - Do not add comments or summaries
                                - Do not include any formatting or titles — just the cleaned text
                                - Do not paraphrase, interpret, explain or narrate
                                - Keep the original phrasing, sentence structure and terminology exactly as written
                                
                                Here is the original OCR:"""
                        )
                else:
                    prompt_clean = (
                        f"Ce texte provient d'un OCR d'une page d'un livre ou d'un document scientifique. "
                        f"Saurais-tu nettoyer le résultat (lettres manquantes, orthographe, mots ou phrases incomplètes, etc.) "
                        f"tout en modifiant le minimum et en conservant son sens, le tout toujours en {_sTarget_Language} ? "
                        f"N'ajoute aucun autre commentaire et ne change pas de langue par rapport à la langue cible : "
                    )
            
                prompt_explain = (
                    f"Ce texte provient d'un OCR d'une page d'un livre ou d'un document scientifique. "
                    f"Peux-tu reprendre les éléments de la page, les expliquer, annoter et compléter un maximum "
                    f"tout en conservant le sens original, le tout toujours en {_sTarget_Language} ? "
                    f"N'ajoute aucun autre commentaire et ne change pas de langue par rapport à la langue cible : "
                )                
                
                sText_Ollama = Correct_Text_with_Ollama(_sOllamaModel, sText, prompt_clean)
                sText_OllamaExp = Correct_Text_with_Ollama(_sOllamaModel, sText, prompt_explain)

            if (_bUse_Species_Index):
                dSpecies_Result = Extract_Species_From_Page(sText_Ollama, iPage_Num + 1, df_species, family_ranges)
                #print (dSpecies_Result)
            
            if _sSave_Mode == "SQL":
                cursor.execute("""
                    INSERT INTO Pages (page_book_id_fkey, page_number, page_raw_text, page_ocr_text, page_raw_text_cleaned, page_raw_text_llm, page_raw_text_llm_explain)
                    OUTPUT INSERTED.page_id_pkey
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                """, (iBook_ID, iPage_Num + 1, sText, sOCRText, sTextClean, sText_Ollama, sText_OllamaExp))
                page_id = cursor.fetchone()[0]
                conn.commit()

                for iImageCount, img_base64 in enumerate(oImages):
                    img_data = base64.b64decode(img_base64)
                    cursor.execute("""
                        INSERT INTO Images (image_page_id_fkey, image_data, image_index)
                        VALUES (?, ?, ?)
                    """, (page_id, img_data, iImageCount + 1))
                conn.commit()

                if (_bUse_Species_Index and dSpecies_Result):
                    cursor.execute("""
                        INSERT INTO Page_Species (
                            species_page_id_fkey,
                            species_order,
                            species_order_common,
                            species_family,
                            species_genus,
                            species_genus_conf,
                            species_genus_source,
                            species_species,
                            species_species_conf,
                            species_species_type
                        )
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """, (
                        page_id,
                        dSpecies_Result["order"],
                        dSpecies_Result["order_common"],
                        dSpecies_Result["family"],
                        dSpecies_Result["genus"],
                        dSpecies_Result["genus_conf"],
                        dSpecies_Result["genus_source"],
                        dSpecies_Result["species"],
                        dSpecies_Result["species_conf"],
                        dSpecies_Result["species_type"]
                    ))
                    conn.commit()                    
            else:
                # File saving mode
                page_folder = os.path.join(output_dir, f"Page_{iPage_Num+1:03d}")
                os.makedirs(page_folder, exist_ok=True)

                with open(os.path.join(page_folder, "raw_text.txt"), "w", encoding="utf-8") as f:
                    f.write(sText)
                with open(os.path.join(page_folder, "ocr_text.txt"), "w", encoding="utf-8") as f:
                    f.write(sOCRText)
                with open(os.path.join(page_folder, "cleaned_text.txt"), "w", encoding="utf-8") as f:
                    f.write(sTextClean)
                if _bUse_Ollama:
                    with open(os.path.join(page_folder, "llm_text.txt"), "w", encoding="utf-8") as f:
                        f.write(sText_Ollama)
                    with open(os.path.join(page_folder, "llm_explanation.txt"), "w", encoding="utf-8") as f:
                        f.write(sText_OllamaExp)

                for iImageCount, img_base64 in enumerate(oImages):
                    img_data = base64.b64decode(img_base64)
                    img_path = os.path.join(page_folder, f"image_{iImageCount + 1:02d}.jpg")
                    with open(img_path, "wb") as img_file:
                        img_file.write(img_data)

            PageElapsed_Time = round(time.time() - dPage_Start_Time, 3)
            print(f"Page {iPage_Num+1} Processed and Stored in {PageElapsed_Time} seconds (Illustrations Count : {len(oImages)})")
    except Exception as e:
        print(f"Error : {e}")
    finally:
        if _sSave_Mode == "SQL":
            conn.close()
            print("Connection closed.")

# Process launch...

In [None]:
# LAUNCH THE IMPORT AND CLEANING     #
######################################

start_time = time.time()
print ("Current Time :", datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "\n ")

if _sSave_Mode == "SQL":
    # Connecting SQL Server
    conn = pyodbc.connect(_sConnectString)
    cursor = conn.cursor()

Do_Process_PDF(_sPDF_Path)
print("\nDone !")

print ("\nCurrent Time :", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
elapsed_time = round(time.time() - start_time, 3)
print(f"\nCell execution time : {elapsed_time} seconds")