# Benchmark des solutions d'extraction de documents
Ce notebook compare trois solutions d'extraction de contenu de documents :
- **Docling v2**
- **LayoutParser**
- **Tesseract + TableBank**

Les métriques évaluées sont :
- **Temps de traitement**
- **Similarité textuelle** (par rapport à un texte de référence)

Une interface permet de visualiser les résultats.

### Installation des dépendances

In [None]:
from tqdm import tqdm
import subprocess
import getpass, subprocess, platform

In [None]:
# Installer Tesseract OCR sur Linux

def run_sudo(cmd):
    subprocess.run(f"echo {getpass.getpass('Mot de passe sudo : ')} | sudo -S {cmd}", shell=True)

run_sudo("apt update -qq && apt install -y tesseract-ocr -qq")
subprocess.run("tesseract --version", shell=True, check=True)
print(f"\nDétails de l'OS: {platform.platform()}")


In [None]:
# Installation de pytesseract, docling, layoutparser, kaggle, datasets, panel, ipywidgets via pip
libs = {
    "Solution Benchmarkées": ["pytesseract", "docling", "layoutparser", "paddleocr", "paddlepaddle"],
    "Données": ["kaggle", "datasets"],
    "Évaluation & metrics": ["evaluate", "jiwer", "rouge-score"],
    "UI": ["panel[recommended]", "ipywidgets"]
}

print("Début de l'installation des dépendances...")

# Installation avec barre de progression
for category, packages in libs.items():
    for lib in tqdm(packages, desc=f"Installation: {category}"):
        subprocess.run(f"pip install -q {lib}", shell=True)

print("Installation terminée.")



In [None]:
%pip install editdistance

In [None]:
import os
import time
import requests
from tqdm import tqdm

# Dossier contenant les fichiers texte
TEXT_FILES_DIR = '/home/pi/Téléchargements/data-gouv-pdf-txt/data_gouv_txt/'
# Dossier où les PDF seront téléchargés
DOWNLOAD_DIR = '/home/pi/Téléchargements/data-gouv-pdf-txt/data_gouv_pdf/'
# Fichier pour enregistrer les erreurs
ERROR_LOG = '/home/pi/Téléchargements/data-gouv-pdf-txt/errors.log'
# Délai entre les requêtes pour éviter de surcharger le serveur (en secondes)
REQUEST_DELAY = 2

def extract_ids(filename):
    """Extrait l'ID du dataset et l'ID de la ressource à partir du nom du fichier."""
    base_name = os.path.basename(filename)
    dataset_id, resource_id = base_name.replace('.txt', '').split('--')
    return dataset_id, resource_id

def get_resource_url(dataset_id, resource_id):
    """Récupère l'URL de la ressource à partir de l'API de data.gouv.fr."""
    api_url = f'https://www.data.gouv.fr/api/1/datasets/{dataset_id}/'
    response = requests.get(api_url)
    if response.status_code == 200:
        dataset = response.json()
        for resource in dataset.get('resources', []):
            if resource['id'] == resource_id:
                return resource.get('url')
    else:
        print(f"Erreur lors de l'accès à {api_url} : {response.status_code}")
    return None

def download_pdf(pdf_url, download_path):
    """Télécharge le PDF depuis l'URL spécifiée vers le chemin de téléchargement."""
    try:
        response = requests.get(pdf_url, stream=True)
        if response.status_code == 200:
            with open(download_path, 'wb') as pdf_file:
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        pdf_file.write(chunk)
            print(f"Téléchargé : {download_path}")
            return True
        else:
            print(f"Erreur lors du téléchargement de {pdf_url} : {response.status_code}")
    except requests.RequestException as e:
        print(f"Erreur réseau lors du téléchargement de {pdf_url} : {e}")
    return False

def get_all_text_files(directory):
    """Récupère tous les fichiers texte dans le répertoire et ses sous-répertoires."""
    text_files = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith('.txt'):
                text_files.append(os.path.join(root, file))
    return text_files

def log_error(dataset_id, resource_id):
    """Enregistre une erreur dans le fichier d'erreurs."""
    with open(ERROR_LOG, 'a') as log_file:
        log_file.write(f"{dataset_id}--{resource_id}\n")

def load_errors():
    """Charge les erreurs précédemment enregistrées."""
    if not os.path.exists(ERROR_LOG):
        return []
    with open(ERROR_LOG, 'r') as log_file:
        return [line.strip() for line in log_file.readlines()]

def main():
    if not os.path.exists(DOWNLOAD_DIR):
        os.makedirs(DOWNLOAD_DIR)

    # Charger les fichiers texte et les erreurs précédentes
    text_files = get_all_text_files(TEXT_FILES_DIR)
    errors = load_errors()

    if not text_files:
        print("Aucun fichier texte trouvé dans le dossier spécifié.")
        return

    for text_file in tqdm(text_files, desc="Traitement des fichiers texte"):
        dataset_id, resource_id = extract_ids(text_file)
        pdf_filename = f"{dataset_id}--{resource_id}.pdf"
        download_path = os.path.join(DOWNLOAD_DIR, pdf_filename)

        # Ne pas retélécharger si le fichier existe déjà
        if os.path.exists(download_path):
            print(f"Déjà téléchargé : {download_path}")
            continue

        # Vérifier si cette ressource est déjà dans les erreurs
        if f"{dataset_id}--{resource_id}" in errors:
            print(f"Échec précédent pour : {dataset_id}--{resource_id}")
            continue

        pdf_url = get_resource_url(dataset_id, resource_id)
        if pdf_url:
            success = download_pdf(pdf_url, download_path)
            if not success:
                log_error(dataset_id, resource_id)
        else:
            print(f"URL PDF non trouvée pour la ressource {resource_id} dans le dataset {dataset_id}")
            log_error(dataset_id, resource_id)

        time.sleep(REQUEST_DELAY)

if __name__ == '__main__':
    main()


In [None]:
import os
from pathlib import Path
import pandas as pd
from typing import List
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.document import ConversionStatus
from docling.datamodel.pipeline_options import PdfPipelineOptions
import evaluate

# Directories for input files
TEXT_FILES_DIR = '/home/pi/Téléchargements/data-gouv-pdf-txt/data_gouv_txt/'
DOWNLOAD_DIR = '/home/pi/Téléchargements/data-gouv-pdf-txt/data_gouv_pdf/'

# Initialize Hugging Face Evaluate metrics
bleu_metric = evaluate.load("bleu")
rouge_metric = evaluate.load("rouge")
cer_metric = evaluate.load("cer")
wer_metric = evaluate.load("wer")

# Configurer Docling
def create_docling_converter():
    options = PdfPipelineOptions()
    options.do_ocr = True
    options.generate_page_images = False
    converter = DocumentConverter(
        allowed_formats=[InputFormat.PDF],
        format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=options)}
    )
    return converter

# Calcul des métriques avec Evaluate
def calculate_metrics(pdf_text: str, txt_text: str) -> dict:
    # CER
    cer_score = cer_metric.compute(predictions=[pdf_text], references=[txt_text])
    
    # WER
    wer_score = wer_metric.compute(predictions=[pdf_text], references=[txt_text])
    
    # ROUGE
    rouge_scores = rouge_metric.compute(predictions=[pdf_text], references=[txt_text])
    
    # BLEU
    bleu_score = bleu_metric.compute(predictions=[pdf_text.split()], references=[[txt_text.split()]])
    
    return {
        "CER": cer_score['cer'],
        "WER": wer_score['wer'],
        "ROUGE-1": rouge_scores['rouge1'].fmeasure,
        "ROUGE-2": rouge_scores['rouge2'].fmeasure,
        "ROUGE-L": rouge_scores['rougeL'].fmeasure,
        "BLEU": bleu_score['bleu']
    }

# Processus principal
def process_files(pdf_dir: str, txt_dir: str) -> pd.DataFrame:
    pdf_dir = Path(pdf_dir)
    txt_dir = Path(txt_dir)

    converter = create_docling_converter()
    results = []

    for pdf_file in pdf_dir.glob("*.pdf"):
        txt_file = txt_dir / f"{pdf_file.stem}.txt"

        if not txt_file.exists():
            print(f"Corresponding TXT file not found for {pdf_file.name}")
            continue

        # Extraire le texte du PDF
        result = list(converter.convert_all([pdf_file]))[0]
        if result.status != ConversionStatus.SUCCESS:
            print(f"Failed to process {pdf_file.name}")
            continue

        pdf_text = result.document.text_content.strip()
        with txt_file.open("r", encoding="utf-8") as f:
            txt_text = f.read().strip()

        # Calculer les métriques
        metrics = calculate_metrics(pdf_text, txt_text)
        metrics.update({
            "PDF File": pdf_file.name,
            "TXT File": txt_file.name
        })
        results.append(metrics)

    # Créer un DataFrame
    df = pd.DataFrame(results)
    return df

# Lancer le processus
df_results = process_files(DOWNLOAD_DIR, TEXT_FILES_DIR)

# Afficher les résultats
print(df_results.head())

# Sauvegarder les résultats dans un fichier CSV
# output_csv = "comparison_results_hf.csv"
# df_results.to_csv(output_csv, index=False, encoding="utf-8")
# print(f"Results saved to {output_csv}")


Corresponding TXT file not found for 5b5ec5c188ee3842dbe6ba19--20f7b628-3300-420a-8e3d-90144397d176.pdf
Corresponding TXT file not found for 5b5ec5c188ee3842dbe6ba19--9b60cd53-6fa6-48c2-8e69-4a6217483b87.pdf
Corresponding TXT file not found for 5b4eeb71a3a7297183e1c36f--8efb868f-0e82-4e5d-af57-5b6a8c109887.pdf
Corresponding TXT file not found for 5369998ea3a729239d2052a1--4d9ffbbe-f55d-428a-9e53-ebcc1bc835bd.pdf
Corresponding TXT file not found for 5cfa38578b4c411846c84b99--002a5bec-805e-4334-8ddb-b06411779883.pdf
Corresponding TXT file not found for 5b4ee8a7b595087918d496ca--d7238360-0ae4-4e80-8b27-c0f7000f7f78.pdf
Corresponding TXT file not found for 5bb6133b8b4c410ea4b9113c--2f5b5b28-ff36-4ea0-82fc-e46eef042f72.pdf
Corresponding TXT file not found for 5b5ec5c188ee3842dbe6ba19--9c8e7b03-4ba6-4e77-b5a9-67e341b7b433.pdf
Corresponding TXT file not found for 5c7929508b4c4150ff5452bc--6cd6e1aa-074d-48c5-952b-1ec87c377ef5.pdf
Corresponding TXT file not found for 5b5ec5c188ee3842dbe6ba19--5

# Jeux de données pour un benchmark IDP (Intelligent Document Processing)

Pour un benchmark orienté IDP (Intelligent Document Processing) avec un objectif métier clair, tel que la transformation de documents non structurés en informations exploitables, voici une sélection de petits jeux de données adaptés :

## 1. FUNSD (Form Understanding in Noisy Scanned Documents)
- **Description** : Un jeu de données contenant 199 formulaires annotés avec des entités liées à leur contenu (en-têtes, questions, réponses). Ces formulaires sont souvent utilisés pour tester la capacité des modèles à extraire et structurer des informations.
- **Lien** : [FUNSD GitHub](https://github.com/microsoft/unilm/tree/master/funsd)
- **Format** : PDF et annotations JSON.
- **Avantages** : Conçu pour les tâches d'extraction de données clés et la structuration de documents semi-structurés.

## 2. CORD (Consolidated Receipt Dataset)
- **Description** : Un jeu de données qui comprend des tickets de caisse annotés pour la reconnaissance des entités (montants, articles, etc.).
- **Lien** : [CORD GitHub](https://github.com/clovaai/cord)
- **Format** : Images et JSON.
- **Avantages** : Idéal pour tester l'extraction d'informations dans des documents commerciaux et structurés.

## 3. Kleister Charity
- **Description** : Un jeu de données conçu pour la classification de documents et l'extraction d'informations à partir de documents longs, notamment des contrats et des rapports.
- **Lien** : [Kleister GitHub](https://github.com/allenai/kleister-charity)
- **Format** : JSON et PDF.
- **Avantages** : Bien adapté pour les cas d'usage métier où les données sont enfermées dans de longs rapports.

## 4. SROIE (Scanned Receipts OCR and Information Extraction)
- **Description** : Un jeu de données qui contient des scans de reçus avec annotations sur des champs spécifiques (nom de l'entreprise, date, montant, etc.).
- **Lien** : [SROIE Resources](https://rrc.cvc.uab.es/?ch=13)
- **Format** : Images et JSON.
- **Avantages** : Permet de tester la capacité à traiter des documents bruités ou mal scannés.

## 5. Invoice Dataset
- **Description** : Un petit jeu de données contenant des factures annotées, conçu pour tester les capacités de reconnaissance des documents structurés.
- **Lien** : [Invoice Dataset Kaggle](https://www.kaggle.com/datasets/ashishraut64/invoice-dataset)
- **Format** : PDF et annotations.
- **Avantages** : Couvre les besoins liés à l'extraction de données dans les cas d'usage financier et administratif.

## 6. RVL-CDIP (Ryerson Vision Lab Complex Document Information Processing)
- **Description** : Un jeu de données comprenant 400 000 pages de documents classées dans 16 catégories (lettres, mémos, bulletins, etc.).
- **Lien** : [RVL-CDIP Details](http://www.cs.cmu.edu/~aharley/rvl-cdip/)
- **Format** : TIFF et catégories.
- **Avantages** : Utile pour tester la classification de documents non structurés.

---

## **Sélection Optimale**
Pour un benchmark léger mais pertinent, il est recommandé de commencer par **FUNSD**, **CORD**, et **SROIE**, car ils couvrent des cas variés d'extraction de données :
1. **Documents semi-structurés** (ex. formulaires).
2. **Documents commerciaux structurés** (tickets, factures).
3. **Documents bruités** (scans de qualité variable).


### Téléhargement et extraction des données

In [None]:
import os, json, zipfile, shutil, re, requests, shutil
from pathlib import Path
from datasets import load_dataset
from kaggle.api.kaggle_api_extended import KaggleApi

# Répertoire de téléchargement des datasets
DATASET_DIR = Path("datasets")
DATASET_DIR.mkdir(parents=True, exist_ok=True)

# Dossier de sortie unique pour tous les fichiers
MERGED_DATASET_DIR = Path("merged_dataset")
MERGED_DATASET_DIR.mkdir(parents=True, exist_ok=True)

# Sources des datasets
DATASET_SOURCES = {
    "FUNSD": {
        "type": "url",
        "url": "https://guillaumejaume.github.io/FUNSD/dataset.zip",
        "zip_name": "funsd.zip",
        "annotation_folder": "dataset/training_data/annotations",
        "image_folder": "dataset/training_data/images",
    },
    "CORD": {
        "type": "huggingface",
        "huggingface_dataset": "naver-clova-ix/cord-v2",
    },
    "SROIE": {
        "type": "kaggle",
        "kaggle_dataset": "urbikn/sroie-datasetv2",
        "zip_name": "sroie-datasetv2.zip",
    },
}

In [None]:
def download_dataset(name, info, download_dir):
    """
    Télécharge un dataset en fonction de son type et sauvegarde les fichiers associés.
    """
    print(f"[INFO] Traitement du dataset : {name}")

    if info["type"] == "url":
        # Gestion pour les fichiers ZIP depuis une URL (FUNSD)
        zip_path = download_dir / info["zip_name"]
        if not zip_path.exists():
            print(f"[INFO] {name} - Téléchargement depuis l'URL : {info['url']}")
            try:
                with requests.get(info["url"], stream=True) as response:
                    response.raise_for_status()
                    with open(zip_path, "wb") as f:
                        for chunk in response.iter_content(chunk_size=8192):
                            f.write(chunk)
                print(f"[SUCCESS] {name} - Fichier téléchargé : {zip_path}")
            except Exception as e:
                print(f"[ERROR] {name} - Erreur lors du téléchargement : {e}")
        else:
            print(f"[INFO] {name} - Fichier déjà présent, téléchargement ignoré : {zip_path}")

    elif info["type"] == "huggingface":
        # Gestion pour CORD (Hugging Face)
        dataset_dir = download_dir / name
        annotations_file = dataset_dir / "annotations.json"
        dataset_dir.mkdir(parents=True, exist_ok=True)

        # Téléchargement des images
        print(f"[INFO] {name} - Téléchargement des images depuis Hugging Face")
        try:
            dataset = load_dataset(info["huggingface_dataset"], split="train")
            annotations = []  # Stockage des annotations

            for i, example in enumerate(dataset):
                # Vérifie si l'image existe déjà
                image_path = dataset_dir / f"{name}_image_{i}.jpg"
                if not image_path.exists():
                    if "image" in example and example["image"] is not None:
                        example["image"].save(image_path)
                        print(f"[SUCCESS] {name} - Image sauvegardée : {image_path}")

                # Collecte les annotations
                annotations.append({
                    "file_name": f"{name}_image_{i}.jpg",
                    "annotations": example.get("text", ""),
                    "bbox": example.get("bboxes", []),
                })

            # Vérifie si les annotations existent déjà avant de les sauvegarder
            if not annotations_file.exists():
                with open(annotations_file, "w", encoding="utf-8") as f:
                    json.dump(annotations, f, ensure_ascii=False, indent=4)
                print(f"[SUCCESS] {name} - Annotations sauvegardées : {annotations_file}")
            else:
                print(f"[INFO] {name} - Annotations déjà présentes, téléchargement ignoré : {annotations_file}")

        except Exception as e:
            print(f"[ERROR] {name} - Erreur lors du téléchargement Hugging Face : {e}")

    elif info["type"] == "kaggle":
        # Gestion pour les fichiers ZIP depuis Kaggle (SROIE)
        api = KaggleApi()
        api.authenticate()
        zip_path = download_dir / info["zip_name"]
        if not zip_path.exists():
            print(f"[INFO] {name} - Téléchargement depuis Kaggle : {info['kaggle_dataset']}")
            try:
                api.dataset_download_files(info["kaggle_dataset"], path=download_dir, unzip=False)
                print(f"[SUCCESS] {name} - Fichier téléchargé : {zip_path}")
            except Exception as e:
                print(f"[ERROR] {name} - Erreur lors du téléchargement Kaggle : {e}")
        else:
            print(f"[INFO] {name} - Fichier déjà présent, téléchargement ignoré : {zip_path}")
    
    else:
        print(f"[ERROR] {name} - Type de dataset inconnu")

# Téléchargement des datasets
for dataset_name, dataset_info in DATASET_SOURCES.items():
    download_dataset(dataset_name, dataset_info, DATASET_DIR)

print("[INFO] Téléchargement des datasets terminé.")


### Preprocessing et merge des données

In [None]:
# Gestion des conflits de noms
def copy_without_overwrite(src, dst_dir):
    dst_path = dst_dir / src.name
    counter = 1
    while dst_path.exists():
        dst_path = dst_dir / f"{src.stem}_{counter}{src.suffix}"
        counter += 1
    shutil.copy(src, dst_path)
    return dst_path.name

# Fonction pour tenter de corriger automatiquement les erreurs JSON
def fix_json_format(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            content = f.read()
        
        # Remplacer les single quotes par des double quotes pour les clés
        fixed_content = re.sub(r"(?<!\\)'([^']*?)'(?!:)", r'"\1"', content)

        # Retirer les virgules finales après le dernier élément
        fixed_content = re.sub(r",(\s*[\}\]])", r"\1", fixed_content)

        # Écrire le contenu corrigé dans le même fichier
        with open(file_path, "w", encoding="utf-8") as f:
            f.write(fixed_content)
        print(f"[INFO] Format JSON corrigé pour : {file_path}")
    except Exception as e:
        print(f"[ERREUR] Impossible de corriger le fichier JSON : {file_path}. Erreur : {e}")

# Fonction pour charger des fichiers JSON avec validation
def load_json_file(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            return json.load(f)
    except json.JSONDecodeError as e:
        print(f"[ERREUR] Fichier JSON invalide : {file_path}. Erreur : {e}")
        print("[INFO] Tentative de correction automatique...")
        fix_json_format(file_path)
        try:
            with open(file_path, "r", encoding="utf-8") as f:
                return json.load(f)
        except json.JSONDecodeError as e:
            print(f"[ERREUR] Impossible de charger le fichier JSON après correction : {file_path}. Erreur : {e}")
            return None

# Extraction et fusion pour FUNSD
def process_funsd(funsd_dir, merged_dir):
    print("[INFO] Traitement FUNSD...")
    annotations = []
    for subset in ["training_data", "testing_data"]:
        annotation_dir = funsd_dir / "dataset" / subset / "annotations"
        image_dir = funsd_dir / "dataset" / subset / "images"
        
        for annotation_file in tqdm(annotation_dir.glob("*.json"), desc=f"[FUNSD] {subset}"):
            data = load_json_file(annotation_file)
            if data:
                image_file = image_dir / f"{annotation_file.stem}.png"
                if image_file.exists():
                    new_image_name = copy_without_overwrite(image_file, merged_dir)
                    text = " ".join([block["text"] for block in data.get("form", [])])
                    annotations.append({"file_name": new_image_name, "text": text})
    return annotations

# Extraction et fusion pour SROIE
def process_sroie(sroie_dir, merged_dir):
    print("[INFO] Traitement SROIE...")
    annotations = []
    for subset in ["train", "test"]:
        annotation_dir = sroie_dir / "SROIE2019" / subset / "entities"
        image_dir = sroie_dir / "SROIE2019" / subset / "img"

        for annotation_file in tqdm(annotation_dir.glob("*.txt"), desc=f"[SROIE] {subset}"):
            with open(annotation_file, "r") as f:
                text = f.read().strip()
            image_file = image_dir / f"{annotation_file.stem}.jpg"
            if image_file.exists():
                new_image_name = copy_without_overwrite(image_file, merged_dir)
                annotations.append({"file_name": new_image_name, "text": text})
    return annotations

# Extraction et fusion pour CORD
def process_cord(cord_dir, merged_dir):
    print("[INFO] Traitement CORD...")
    annotations = []
    annotation_file = cord_dir / "annotations.json"
    data = load_json_file(annotation_file)
    if data:
        for item in tqdm(data, desc="[CORD]"):
            image_file = cord_dir / item["file_name"]
            if image_file.exists():
                new_image_name = copy_without_overwrite(image_file, merged_dir)
                annotations.append({
                    "file_name": new_image_name,
                    "text": item.get("text", ""),
                    "bboxes": item.get("bboxes", [])
                })
    return annotations

# Fusionner tous les datasets
def merge_datasets():
    all_annotations = []
    # Traiter FUNSD
    funsd_dir = DATASET_DIR / "funsd"
    all_annotations.extend(process_funsd(funsd_dir, MERGED_DATASET_DIR))

    # Traiter SROIE
    sroie_dir = DATASET_DIR / "sroie-datasetv2"
    all_annotations.extend(process_sroie(sroie_dir, MERGED_DATASET_DIR))

    # Traiter CORD
    cord_dir = DATASET_DIR / "CORD"
    all_annotations.extend(process_cord(cord_dir, MERGED_DATASET_DIR))

    # Sauvegarder les annotations fusionnées
    merged_annotations_file = MERGED_DATASET_DIR / "merged_annotations.json"
    with open(merged_annotations_file, "w", encoding="utf-8") as f:
        json.dump(all_annotations, f, ensure_ascii=False, indent=4)
    print(f"[INFO] Annotations fusionnées sauvegardées dans : {merged_annotations_file}")

# Exécution principale
if __name__ == "__main__":
    merge_datasets()


### Benchmark et evaluations

In [None]:
import time
import json
import pandas as pd
from pathlib import Path
from difflib import SequenceMatcher
from pdf2image import convert_from_path
import cv2
import pytesseract
import layoutparser as lp
from paddleocr import PaddleOCR
import numpy as np
import holoviews as hv
import panel as pn
import evaluate
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions, TableStructureOptions, TesseractCliOcrOptions, TableFormerMode
)
from docling.datamodel.document import ConversionStatus
from docling.datamodel.base_models import InputFormat

# Configuration
hv.extension("bokeh")
pytesseract.pytesseract_cmd = r'/usr/bin/tesseract'

# Initialize Hugging Face Evaluate metrics
bleu_metric = evaluate.load("bleu")
rouge_metric = evaluate.load("rouge")
cer_metric = evaluate.load("cer")
wer_metric = evaluate.load("wer")

# Generate interactive plots
def generate_interactive_plots(results_df):
    time_plot = hv.Bars(results_df, kdims=["Document", "Modèle"], vdims=["Temps (s)"]).opts(
        title="Temps de traitement par modèle",
        xlabel="Document et Modèle",
        ylabel="Temps (s)",
        width=800,
        height=400,
        tools=["hover"]
    )
    bleu_plot = hv.Bars(results_df, kdims=["Document", "Modèle"], vdims=["BLEU"]).opts(
        title="Score BLEU par modèle",
        xlabel="Document et Modèle",
        ylabel="BLEU",
        width=800,
        height=400,
        tools=["hover"]
    )
    rouge_plot = hv.Bars(results_df, kdims=["Document", "Modèle"], vdims=["ROUGE"]).opts(
        title="Score ROUGE par modèle",
        xlabel="Document et Modèle",
        ylabel="ROUGE",
        width=800,
        height=400,
        tools=["hover"]
    )
    cer_plot = hv.Bars(results_df, kdims=["Document", "Modèle"], vdims=["CER"]).opts(
        title="Score CER par modèle",
        xlabel="Document et Modèle",
        ylabel="CER",
        width=800,
        height=400,
        tools=["hover"]
    )
    wer_plot = hv.Bars(results_df, kdims=["Document", "Modèle"], vdims=["WER"]).opts(
        title="Score WER par modèle",
        xlabel="Document et Modèle",
        ylabel="WER",
        width=800,
        height=400,
        tools=["hover"]
    )
    return time_plot, bleu_plot, rouge_plot, cer_plot, wer_plot

In [None]:
# Initialize Docling
def initialize_docling():
    # Configuration du pipeline Docling
    pipeline_options = PdfPipelineOptions(
        do_table_structure=True,
        do_ocr=True,
        table_structure_options=TableStructureOptions(
            do_cell_matching=True, mode=TableFormerMode.ACCURATE
        ),
        ocr_options=TesseractCliOcrOptions(
            lang=["eng", "fra"], tesseract_cmd="tesseract"
        ),
        generate_page_images=True,
        generate_picture_images=True,
        generate_table_images=True,
    )

    # Retourne un convertisseur Docling configuré
    return DocumentConverter(
        allowed_formats=[InputFormat.PDF],
        format_options={
            InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
        },
    )

# Initialisation de Docling
docling_converter = initialize_docling()

# Initialize PaddleOCR
paddle_ocr = PaddleOCR(use_angle_cls=True, lang='en', show_log=False)

# Utility functions
def calculate_similarity(text1, text2):
    return SequenceMatcher(None, text1, text2).ratio()

def process_with_layoutparser(doc_path):
    try:
        if doc_path.suffix.lower() == ".pdf":
            images = convert_from_path(str(doc_path))
            image = np.array(images[0])
        else:
            image = cv2.imread(str(doc_path))
        
        ocr_agent = lp.TesseractAgent(languages="eng")
        layout = ocr_agent.detect(image)

        # Vérification et extraction robuste des textes
        return " ".join([block.text if hasattr(block, "text") else str(block) for block in layout])
    except Exception as e:
        print(f"Erreur avec LayoutParser : {e}")
        return ""


def process_with_tesseract(doc_path):
    try:
        image = convert_from_path(str(doc_path))[0] if doc_path.suffix.lower() == ".pdf" else cv2.imread(str(doc_path))
        return pytesseract.image_to_string(np.array(image))
    except Exception as e:
        print(f"Erreur avec Tesseract : {e}")
        return ""

def process_with_docling(doc_path):
    if not docling_converter:
        print("Docling non initialisé.")
        return ""
    try:
        result = docling_converter.convert(doc_path)
        if result.status == ConversionStatus.SUCCESS and result.document:
            return result.document.export_to_text()
        elif result.status == ConversionStatus.PARTIAL_SUCCESS and result.document:
            return result.document.export_to_text()
        print(f"Docling : Conversion échouée pour {doc_path}")
        return ""
    except Exception as e:
        print(f"Erreur avec Docling : {e}")
        return ""

def process_with_paddleocr(doc_path):
    try:
        image = convert_from_path(str(doc_path))[0] if doc_path.suffix.lower() == ".pdf" else cv2.imread(str(doc_path))
        image = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2RGB)
        result = paddle_ocr.ocr(image, cls=True)
        return " ".join([line[1][0] for line in result[0]])
    except Exception as e:
        print(f"Erreur avec PaddleOCR : {e}")
        return ""

def evaluate_metrics(pred_text, ref_text):
    if not pred_text.strip() or not ref_text.strip():
        print("Attention : Texte prédit ou de référence vide.")
        return None, None, None, None
    try:
        bleu = bleu_metric.compute(predictions=[pred_text], references=[[ref_text]])["bleu"]
        rouge = rouge_metric.compute(predictions=[pred_text], references=[ref_text])["rougeL"]
        cer = cer_metric.compute(predictions=[pred_text], references=[ref_text])
        wer = wer_metric.compute(predictions=[pred_text], references=[ref_text])
    except Exception as e:
        print(f"Erreur lors de l'évaluation des métriques : {e}")
        bleu, rouge, cer, wer = None, None, None, None
    return bleu, rouge, cer, wer

# Benchmark des modèles
def benchmark_models(files, references):
    results = []
    for file_path in files:
        reference_text = references.get(file_path.name, "")
        for process_fn, model_name in [
            (process_with_layoutparser, "LayoutParser"),
            (process_with_tesseract, "Tesseract"),
            (process_with_docling, "Docling"),
            (process_with_paddleocr, "PaddleOCR"),
        ]:
            try:
                start_time = time.time()
                pred_text = process_fn(file_path)
                elapsed_time = time.time() - start_time
                metrics = evaluate_metrics(pred_text, reference_text)
            except Exception as e:
                elapsed_time, metrics = None, (None, None, None, None)
                print(f"Erreur avec {model_name} : {e}")
            results.append({
                "Document": file_path.name,
                "Modèle": model_name,
                "Temps (s)": elapsed_time,
                "BLEU": metrics[0],
                "ROUGE": metrics[1],
                "CER": metrics[2],
                "WER": metrics[3],
            })
    return pd.DataFrame(results)


# Interactive dashboard
def create_dashboard():
    dataset_slider = pn.widgets.IntSlider(name="Taille du Dataset", start=10, end=50, step=10)
    benchmark_btn = pn.widgets.Button(name="Lancer le Benchmark", button_type="primary")
    error_pane = pn.pane.Markdown("", styles={"color": "red"})
    result_table = pn.pane.DataFrame(sizing_mode="stretch_width")
    plot_panes = [pn.pane.HoloViews() for _ in range(5)]

    def run_benchmark(event):
        error_pane.object = ""
        merged_dataset_dir = Path("merged_dataset")
        annotations_file = merged_dataset_dir / "merged_annotations.json"
        if not annotations_file.exists():
            error_pane.object = f"**Erreur :** {annotations_file} est introuvable."
            return

        with open(annotations_file, "r", encoding="utf-8") as f:
            references = {item["file_name"]: item["text"] for item in json.load(f)}

        files = list(merged_dataset_dir.glob("*"))
        valid_files = [file for file in files if file.suffix.lower() in [".pdf", ".png", ".jpg"]]
        selected_files = valid_files[:dataset_slider.value]
        if not selected_files:
            error_pane.object = "**Erreur :** Aucun fichier valide trouvé."
            return

        results_df = benchmark_models(selected_files, references)
        result_table.object = results_df

        plots = generate_interactive_plots(results_df)
        for plot_pane, plot in zip(plot_panes, plots):
            plot_pane.object = plot

    benchmark_btn.on_click(run_benchmark)

    return pn.Column(
        dataset_slider,
        benchmark_btn,
        error_pane,
        result_table,
        *plot_panes
    )

pn.extension()
create_dashboard().servable()


In [None]:
import time
import json
import pandas as pd
from pathlib import Path
from difflib import SequenceMatcher
from pdf2image import convert_from_path
import cv2
import pytesseract
import layoutparser as lp
import numpy as np
import holoviews as hv
import panel as pn
import evaluate

from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions, TableStructureOptions, TesseractCliOcrOptions, TableFormerMode
)
from docling.datamodel.document import ConversionStatus

# Configuration
hv.extension("bokeh")
pytesseract.pytesseract_cmd = r'/usr/bin/tesseract'

# Initialize Metrics
bleu_metric = evaluate.load("bleu")
rouge_metric = evaluate.load("rouge")
cer_metric = evaluate.load("cer")
wer_metric = evaluate.load("wer")

# Initialize Docling
def initialize_docling():
    options = PdfPipelineOptions(
        do_table_structure=True,
        do_ocr=True,
        table_structure_options=TableStructureOptions(do_cell_matching=True, mode=TableFormerMode.ACCURATE),
        ocr_options=TesseractCliOcrOptions(lang=["eng", "fra"], tesseract_cmd="tesseract"),
        generate_page_images=True,
        generate_picture_images=True,
        generate_table_images=True,
    )
    return DocumentConverter(
        allowed_formats=["pdf"],
        format_options={"pdf": PdfFormatOption(pipeline_options=options, backend=PyPdfiumDocumentBackend)}
    )

docling_converter = initialize_docling()

# Calcul de similarité
def calculate_similarity(text1, text2):
    return SequenceMatcher(None, text1, text2).ratio()


# Initialize Docling
def initialize_docling():
    pipeline_options = PdfFormatOption(
        pipeline_options={"do_ocr": True},
        backend=PyPdfiumDocumentBackend
    )
    return DocumentConverter(allowed_formats=["pdf"], format_options={"pdf": pipeline_options})

docling_converter = initialize_docling()

# Traitement avec LayoutParser
def process_with_layoutparser(doc_path):
    try:
        if doc_path.suffix.lower() == ".pdf":
            images = convert_from_path(str(doc_path))
            image = np.array(images[0])
        else:
            image = cv2.imread(str(doc_path))
        
        ocr_agent = lp.TesseractAgent(languages="eng")
        layout = ocr_agent.detect(image)

        # Vérification et extraction robuste des textes
        return " ".join([block.text if hasattr(block, "text") else str(block) for block in layout])
    except Exception as e:
        print(f"Erreur avec LayoutParser : {e}")
        return ""


# Traitement avec Tesseract
def process_with_tesseract(doc_path):
    if doc_path.suffix.lower() == ".pdf":
        images = convert_from_path(str(doc_path))
        image = np.array(images[0])
    else:
        image = cv2.imread(str(doc_path))
    return pytesseract.image_to_string(image)

def process_with_docling(doc_path):
    try:
        result = docling_converter.convert(doc_path)
        if result.status == ConversionStatus.SUCCESS and result.document:
            return result.document.export_to_text()
        elif result.status == ConversionStatus.PARTIAL_SUCCESS:
            return result.document.export_to_text() if result.document else ""
        return ""
    except Exception as e:
        print(f"Erreur avec Docling : {e}")
        return ""

def evaluate_metrics(pred_text, ref_text):
    if not pred_text.strip() or not ref_text.strip():
        print("Attention : Texte prédit ou de référence vide.")
        return None, None, None, None
    try:
        bleu = bleu_metric.compute(predictions=[pred_text], references=[[ref_text]])["bleu"]
        rouge = rouge_metric.compute(predictions=[pred_text], references=[ref_text])["rougeL"]
        cer = cer_metric.compute(predictions=[pred_text], references=[ref_text])
        wer = wer_metric.compute(predictions=[pred_text], references=[ref_text])
    except Exception as e:
        print(f"Erreur lors de l'évaluation des métriques : {e}")
        bleu, rouge, cer, wer = None, None, None, None
    return bleu, rouge, cer, wer

# Benchmark des modèles
def benchmark_models(files, references):
    results = []
    for file_path in files:
        reference_text = references.get(file_path.name, "")
        for process_fn, model_name in [
            (process_with_layoutparser, "LayoutParser"),
            (process_with_tesseract, "Tesseract"),
            (process_with_docling, "Docling"),
        ]:
            try:
                start_time = time.time()
                pred_text = process_fn(file_path)
                elapsed_time = time.time() - start_time
                metrics = evaluate_metrics(pred_text, reference_text)
            except Exception as e:
                elapsed_time, metrics = None, (None, None, None, None)
                print(f"Erreur avec {model_name} : {e}")
            results.append({
                "Document": file_path.name,
                "Modèle": model_name,
                "Temps (s)": elapsed_time,
                "BLEU": metrics[0],
                "ROUGE": metrics[1],
                "CER": metrics[2],
                "WER": metrics[3],
            })
    return pd.DataFrame(results)


# Tableau de bord interactif
def create_dashboard():
    dataset_slider = pn.widgets.IntSlider(name="Taille du Dataset", start=10, end=50, step=10)
    benchmark_btn = pn.widgets.Button(name="Lancer le Benchmark", button_type="primary")
    error_pane = pn.pane.Markdown("", styles={"color": "red"})
    result_table = pn.pane.DataFrame(sizing_mode="stretch_width")
    plot_panes = [pn.pane.HoloViews() for _ in range(5)]

    def run_benchmark(event):
        error_pane.object = ""
        merged_dataset_dir = Path(MERGED_DATASET_DIR)
        annotations_file = MERGED_DATASET_DIR / "merged_annotations.json"
        if not annotations_file.exists():
            error_pane.object = f"**Erreur :** {annotations_file} est introuvable."
            return

        with open(annotations_file, "r", encoding="utf-8") as f:
            references = {item["file_name"]: item["text"] for item in json.load(f)}

        files = list(merged_dataset_dir.glob("*"))
        valid_files = [file for file in files if file.suffix.lower() in [".pdf", ".png", ".jpg"]]
        selected_files = valid_files[:dataset_slider.value]
        if not selected_files:
            error_pane.object = "**Erreur :** Aucun fichier valide trouvé."
            return

        results_df = benchmark_models(selected_files, references)
        result_table.object = results_df

        plots = generate_interactive_plots(results_df)
        for plot_pane, plot in zip(plot_panes, plots):
            plot_pane.object = plot

    benchmark_btn.on_click(run_benchmark)

    return pn.Column(
        dataset_slider,
        benchmark_btn,
        error_pane,
        result_table,
        *plot_panes
    )

pn.extension()
create_dashboard().servable()
